Uncategorized

Azure Synapse Analytics : Optimize for Distributions (Dedicated SQL Pools)

As you can see in 3rd party’s benchmarking results for Test-H and Test-DS* (see here), the dedicated SQL pools in Azure Synapse Analytics (formerly, Azure SQL Data Warehouse) outperforms compared with other analytics database, such as, BigQuery, Redshift, and Snowflake.
However, to take this advantage of better performance and cost-effectiveness in such a limitless architecture, MPP (massively parallel processing), you should consider several things, such as, partitioning, indexing, caching, so on and so forth.

In my several posts in series, I’ll show you well-known techniques (tutorials) in layouts and design to optimize performance in Azure Synapse Analytics SQL dedicated pools using real code and examples.

Synapse Analytics : Designing for Performance – Table of Contents (updated on 10/30/2020)

  1. Optimize for Distributions (this post)
  2. Choose Right Index and Partition
  3. How Statistics and Cache Works

In this post, we see distribution strategies.

Note : Through my posts in series, I will often use World Wide Importer samples. If you follow my writing, please see here and import sample data into your own pool.

Replication Strategy (Optimize Broadcast Data Movement)

First, let me assume the following tables for both fact table (transaction table) and surrounding dimension tables, i.e, which has the structure, so-called “star schema”.

When you create a table without specifying a distribution strategy as follows, Synapse Analytics will create a table with round-robin fashion for distribution strategy. That is, each rows will be distributed in each databases (it’s totally 60 databases in a single pool) in round-robin fashion.

CREATE TABLE [wwi].[dimension_City]
( 
  [City Key] int NOT NULL,
  [WWI City ID] int NOT NULL,
  [City] nvarchar(50) NOT NULL,
  ...
)

This round-robin fashion table is optimal for loading large raw data, but not optimal for querying in most cases.

Now let’s try to run the following query for this round-robin fashion table.

SELECT TOP(100) s.[Sale Key], c.[City]
FROM [wwi].[fact_Sale] s LEFT JOIN [wwi].[dimension_City] c ON s.[City Key] = c.[City Key]

In order to see the execution plan (see how optimal plan is determined and the query will be executed) for this query, please run the following script by simply adding EXPLAIN clause at the beginning.

EXPLAIN
SELECT TOP(100) s.[Sale Key], c.[City]
FROM [wwi].[fact_Sale] s LEFT JOIN [wwi].[dimension_City] c ON s.[City Key] = c.[City Key]

output

<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="60" number_distributions_per_node="60">
  <sql>SELECT TOP(100) s.[Sale Key], c.[City]
FROM [wwi].[fact_Sale] s LEFT JOIN [wwi].[dimension_City] c ON s.[City Key] = c.[City Key]</sql>
  <dsql_operations total_cost="2902.7232" total_number_operations="5">
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_3</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllComputeNodes" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [qtabledb].[dbo].[TEMP_ID_3] ([City Key] INT NOT NULL, [City] NVARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL ) WITH(DISTRIBUTED_MOVE_FILE='');</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="BROADCAST_MOVE">
      <operation_cost cost="2902.7232" accumulative_cost="2902.7232" average_rowsize="104" output_rows="116295" GroupNumber="4" />
      <source_statement>SELECT [T1_1].[City Key] AS [City Key], [T1_1].[City] AS [City] FROM [WWI_test01].[wwi].[dimension_City] AS T1_1
OPTION (MAXDOP 1, MIN_GRANT_PERCENT = [MIN_GRANT], DISTRIBUTED_MOVE(N''))</source_statement>
      <destination_table>[TEMP_ID_3]</destination_table>
    </dsql_operation>
    <dsql_operation operation_type="RETURN">
      <location distribution="AllDistributions" />
      <select>SELECT [T1_1].[Sale Key] AS [Sale Key], [T1_1].[City] AS [City] FROM (SELECT TOP (CAST ((100) AS BIGINT)) [T2_2].[Sale Key] AS [Sale Key], [T2_1].[City] AS [City] FROM [qtabledb].[dbo].[TEMP_ID_3] AS T2_1 RIGHT OUTER JOIN
[WWI_test01].[wwi].[fact_Sale] AS T2_2
ON ([T2_2].[City Key] = [T2_1].[City Key])) AS T1_1
OPTION (MAXDOP 1, MIN_GRANT_PERCENT = [MIN_GRANT])</select>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllComputeNodes" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [qtabledb].[dbo].[TEMP_ID_3]</sql_operation>
      </sql_operations>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

By above BROADCAST_MOVE operation, the rows in dimension_City table are all copied in a temporary table (called TEMP_ID_3) on all distributed database. (See below.)
Since the size of dimension_City is small, then all rows in this table is duplicated in all database before joining. This time, we join only 2 tables, however, if a lot of tables are needed to join, this data movement will become large overhead for query execution.
A temporary table will be dropped after the query is completed. Then this overhead might also be happened in the next same query.

Note : In order to get the recommendations to optimize performance with query plan, you can use EXPLAIN WITH_RECOMMENDATIONS statement instead.

Note : By using sys.dm_pdw_request_steps table (dynamic management view, DMV) as follows, you can see how the operation is really executed and how long it took, after you’ve run this query.

SELECT step_index AS i, operation_type, distribution_type, start_time, end_time, command
FROM sys.dm_pdw_request_steps
WHERE request_id = 'QID303'
i operation_type         distribution_type start_time                  end_time                    command
---------------------------------------------------------------------------------------------------------------------
0 RandomIDOperation      Unspecified       2020-10-07T00:52:27.8230000 2020-10-07T00:52:27.8230000 TEMP_ID_3
1 OnOperation            AllComputeNodes   2020-10-07T00:52:27.8230000 2020-10-07T00:52:27.8700000 CREATE TABLE [qtabledb].[dbo].[TEMP_ID_3] ...
2 BroadcastMoveOperation AllDistributions  2020-10-07T00:52:27.8700000 2020-10-07T00:52:27.9930000 SELECT [T1_1].[City Key] ...
3 ReturnOperation        AllDistributions  2020-10-07T00:52:27.9930000 2020-10-07T00:52:29.8830000 SELECT [T1_1].[Sale Key] AS [Sale Key] ...
4 OnOperation            AllComputeNodes   2020-10-07T00:52:29.8830000 2020-10-07T00:52:29.8830000 DROP TABLE [qtabledb].[dbo].[TEMP_ID_3]

You can also monitor the execution using other DMVs. (In this post, I don’t go so far about DMV.)
For instance, you can view how many rows are moved in data movement service (DMS) using sys.dm_pdw_dms_workers as follows.

SELECT distribution_id, type, rows_processed FROM sys.dm_pdw_dms_workers
WHERE request_id = 'QID1765' AND step_index = 11;

In order to ensure that all rows should be copied and persisted in all distributed database explicitly, use REPLICA strategy for dimension tables, instead of using ROUND_ROBIN. (See below.)

DROP TABLE [wwi].[dimension_City];
GO
CREATE TABLE [wwi].[dimension_City]
...
WITH
( 
  DISTRIBUTION = REPLICATE,
  CLUSTERED COLUMNSTORE INDEX
)
GO

Now you will see that the query is planed without data movement.

EXPLAIN
SELECT TOP(100) s.[Sale Key], c.[City]
FROM [wwi].[fact_Sale] s LEFT JOIN [wwi].[dimension_City] c ON s.[City Key] = c.[City Key]
<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="60" number_distributions_per_node="60">
  <sql>SELECT TOP(100) s.[Sale Key], c.[City]
FROM [wwi].[fact_Sale] s LEFT JOIN [wwi].[dimension_City] c ON s.[City Key] = c.[City Key]</sql>
  <dsql_operations total_cost="0" total_number_operations="1">
    <dsql_operation operation_type="RETURN">
      <location distribution="AllDistributions" />
      <select>SELECT [T1_1].[Sale Key] AS [Sale Key], [T1_1].[City] AS [City] FROM (SELECT TOP (CAST ((100) AS BIGINT)) [T2_2].[Sale Key] AS [Sale Key], [T2_1].[City] AS [City] FROM [WWI_test01].[wwi].[dimension_City] AS T2_1 RIGHT OUTER JOIN
[WWI_test01].[wwi].[fact_Sale] AS T2_2
ON ([T2_2].[City Key] = [T2_1].[City Key])) AS T1_1
OPTION (MAXDOP 1)</select>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

In REPLICA strategy, all rows in a table (dimension_City table in our case) are copied in all distributed database in the first query running. (Even when you run EXPLAIN statement, this will happen.)
Once copied tables are generated in all distributed database, these tables will be reused for the following various queries. (Unlike a temporary table, this replicated tables will not be dropped after the query is completed.)

In replicate table, the copied tables and corresponding indexes in all distributions will be generated only once in the first query running. For this activity, it will be better to run query to ensure table’s copy, once the replicate table is created and all data is loaded.

Hash Distribution (Avoid Shuffle Data Movement)

Now let’s see another example.
In this example, we join fact_Sale table and fact_Invoice table as follows. As you can see, both are large tables unlike above star schema sample.

If you use default round-robin strategy for these tables, a query plan for this join will become as follows.

EXPLAIN
SELECT TOP(100) s.[Unit Price], s.[Quantity], i.[Customer Key]
FROM [wwi].[fact_Sale] s LEFT JOIN [wwi].[fact_Invoice] i ON s.[WWI Invoice ID] = i.[WWI Invoice ID]
<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="60" number_distributions_per_node="60">
  <sql>SELECT TOP(100) s.[Unit Price], s.[Quantity], i.[Customer Key]
FROM [wwi].[fact_Sale] s LEFT JOIN [wwi].[fact_Invoice] i ON s.[WWI Invoice ID] = i.[WWI Invoice ID]</sql>
  <dsql_operations total_cost="11499.3712" total_number_operations="9">
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_8</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [qtabledb].[dbo].[TEMP_ID_8] ([WWI Invoice ID] INT NOT NULL, [Customer Key] INT ) WITH(DISTRIBUTED_MOVE_FILE='');</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="SHUFFLE_MOVE">
      <operation_cost cost="1598.4352" accumulative_cost="1598.4352" average_rowsize="8" output_rows="49951100" GroupNumber="4" />
      <source_statement>SELECT [T1_1].[WWI Invoice ID] AS [WWI Invoice ID], [T1_1].[Customer Key] AS [Customer Key] FROM [WWI_test01].[wwi].[fact_Invoice] AS T1_1
OPTION (MAXDOP 1, MIN_GRANT_PERCENT = [MIN_GRANT], DISTRIBUTED_MOVE(N''))</source_statement>
      <destination_table>[TEMP_ID_8]</destination_table>
      <shuffle_columns>WWI Invoice ID;</shuffle_columns>
    </dsql_operation>
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_9</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [qtabledb].[dbo].[TEMP_ID_9] ([WWI Invoice ID] INT NOT NULL, [Quantity] INT NOT NULL, [Unit Price] DECIMAL(18, 2) NOT NULL ) WITH(DISTRIBUTED_MOVE_FILE='');</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="SHUFFLE_MOVE">
      <operation_cost cost="9900.936" accumulative_cost="11499.3712" average_rowsize="17" output_rows="145602000" GroupNumber="3" />
      <source_statement>SELECT [T1_1].[WWI Invoice ID] AS [WWI Invoice ID], [T1_1].[Quantity] AS [Quantity], [T1_1].[Unit Price] AS [Unit Price] FROM [WWI_test01].[wwi].[fact_Sale] AS T1_1
OPTION (MAXDOP 1, MIN_GRANT_PERCENT = [MIN_GRANT], DISTRIBUTED_MOVE(N''))</source_statement>
      <destination_table>[TEMP_ID_9]</destination_table>
      <shuffle_columns>WWI Invoice ID;</shuffle_columns>
    </dsql_operation>
    <dsql_operation operation_type="RETURN">
      <location distribution="AllDistributions" />
      <select>SELECT [T1_1].[Unit Price] AS [Unit Price], [T1_1].[Quantity] AS [Quantity], [T1_1].[Customer Key] AS [Customer Key] FROM (SELECT TOP (CAST ((100) AS BIGINT)) [T2_2].[Unit Price] AS [Unit Price], [T2_2].[Quantity] AS [Quantity], [T2_1].[Customer Key] AS [Customer Key] FROM [qtabledb].[dbo].[TEMP_ID_8] AS T2_1 RIGHT OUTER JOIN
[qtabledb].[dbo].[TEMP_ID_9] AS T2_2
ON ([T2_2].[WWI Invoice ID] = [T2_1].[WWI Invoice ID])) AS T1_1
OPTION (MAXDOP 1, MIN_GRANT_PERCENT = [MIN_GRANT])</select>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [qtabledb].[dbo].[TEMP_ID_9]</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [qtabledb].[dbo].[TEMP_ID_8]</sql_operation>
      </sql_operations>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

In this case, the size of joined data is very large, then the replication (i.e, copying all rows in all database) will not be appropriate. Then SHUFFLE_MOVE operation is used instead.
This operation (SHUFFLE_MOVE) will distribute both fact_Sale and fact_Invoice into each temporary tables along with the joined column, [WWI Invoice ID] . After these temporary tables are ready, finally they can join with a column, [WWI Invoice ID]. (See below.)

This is more critical, since the data will be so large.

Note : Even when the size of table is large, it might use BROADCAST_MOVE for small number of distinct [WWI Invoice ID] in fact_Sale table. Then, in above example, we assume that the number of distinct [WWI Invoice ID] is also large.
I’ll show you how plan is determined by statistics in my future post.

As I mentioned above, we couldn’t apply replication for distribution strategy this time.
How should we avoid this overhead of data shuffle ?

In such a case, you can apply hash distribution strategy for a large table as follows.
The hash distribution will distribute data (not replicate) along with the hash of a distribution column. For instance, if you use hash distribution strategy for fact_Sale table and specify [WWI Invoice ID] as a distribution column, the rows of fact_Sale table will be distributed along with the hash value of [WWI Invoice ID].

DROP TABLE [wwi].[fact_Sale];
GO
CREATE TABLE [wwi].[fact_Sale]
...
WITH
( 
  DISTRIBUTION = HASH([WWI Invoice ID]),
  CLUSTERED COLUMNSTORE INDEX
)

DROP TABLE [wwi].[fact_Invoice];
GO
CREATE TABLE [wwi].[fact_Invoice]
...
WITH
( 
  DISTRIBUTION = HASH([WWI Invoice ID]),
  CLUSTERED COLUMNSTORE INDEX
)

The common hash keys have the same data format, even when it’s in a different table.
Thus, if you set [WWI Invoice ID] as a distribution column for both fact_Sale table and fact_Invoice table, the each row for both tables will be distributed in the same database for the same [WWI Invoice ID] value.
Eventually, no shuffle occurs.

Note : Use DBCC PDW_SHOWSPACEUSED for seeing the skewness (each size in distributions, etc) in a table as follows.

DBCC PDW_SHOWSPACEUSED('wwi.fact_Sale');
ROWS    RESERVED_SPACE DATA_SPACE INDEX_SPACE UNUSED_SPACE PDW_NODE_ID DISTRIBUTION_ID
2412841 17992          17936      0           56           1           1
2474028 18424          18368      0           56           1           2
2099684 15744          15688      0           56           1           3
...

When you need more detailed information about skewness in the data, use a dynamic management view, sys.dm_pdw_nodes_db_partition_stats. (See here for details.)

SELECT * FROM sys.dm_pdw_nodes_db_partition_stats   
WHERE object_id = OBJECT_ID('wwi.fact_Sale');

In the real case, the data shuffle will happen in more complicated situations.
For instance, let’s see the following CTAS (CREATE TABLE AS SELECT) statement with GROUP BY clause.
In this case, if both fact_Sale and fact_Invoice has round-robin distribution strategy, there will also happen much overheads and you will be waited for a long long time…
However, this overhead will be mitigated, when you distribute both fact_Sale table and fact_Invoice table by the hash distribution with a distribution column, [WWI Invoice ID] .

CREATE TABLE [wwi].[fact_Invoice]
WITH
( 
  DISTRIBUTION = ROUND_ROBIN,
  CLUSTERED COLUMNSTORE INDEX
)
AS
SELECT
  [WWI Invoice ID],
  SUM([Quantity]) AS [Quantity],
  ...
FROM [wwi].[fact_Sale]
GROUP BY [WWI Invoice ID];

The hash distribution can also be used for optimizing frequent COUNT(DISTINCT) or OVER PARTITION BY .

To get better performance, choose a distribution column that distributes data evenly.

Composite Key

You cannot use multiple columns for a distribution column in hash distribution. When you need multiple columns for distribution key, you can create a new column as a composite of one or more values.

For instance, suppose, 2 large fact tables, flights and weather, will be joined (combined) with both airport id and date/time. (See the following schema.)

In this case, it’s better to generate a new column (key) as a composite of airport id and datetime, and create tables with hash distribution using this generated new key as a distribution column.

Multiple Strategies For the Same Data

If you needs multiple (different) distribution strategies for the same data, you can use CTAS (CREATE TABLE AS SELECT) to re-create another table with a different distribution column. Or use a materialized view, if the definition includes aggregate functions or GROUP BY clause .

A materialized view is a view in database, but it pre-computes, stores, and maintains its data in SQL pool just like a table. There’s no recomputation needed each time when a materialized view is used. You don’t also need to manually maintain a materialized view for the source data change.
However, just like a standard table, a materialized view can also have hash distribution with a distribution column.

CREATE MATERIALIZED VIEW [wwi].[Sale_For_Invoice_View]
WITH
(
  DISTRIBUTION = HASH ([WWI Invoice ID]),
  FOR_APPEND
)
AS
SELECT [WWI Invoice ID], MAX([Invoice Date Key]) AS [Invoice Date Key], SUM([Quantity]) AS [Quantity]
FROM [wwi].[fact_Sale]
GROUP BY [WWI Invoice ID];
GO

CREATE MATERIALIZED VIEW [wwi].[Sale_For_Customer_View]
WITH
(
  DISTRIBUTION = HASH ([Customer Key]),
  FOR_APPEND
)
AS
SELECT [Customer Key], SUM([Quantity]) AS [Quantity]
FROM [wwi].[fact_Sale]
GROUP BY [Customer Key];
GO

 

Here I’ve showed you how to design vertical distributions in Synapse Analytics SQL dedicated pools with several use case.
The broadcast and shuffle exchange is also used in Apache Spark pool and its query plan. (You can see plans with explain() function in Apache Spark.) The base idea written in this post will help you explore the performance bottlenecks also in other big data processing.

In the next post, I’ll show you the horizontal design in a table using index and partitions.

Useful Reference :

Cheat sheet for Azure Synapse Analytics
https://docs.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/cheat-sheet

Guidance for designing distributed tables in Synapse SQL pool
https://docs.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-distribute

Design guidance for using replicated tables in Synapse SQL pool
https://docs.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/design-guidance-for-replicated-tables

 

Categories: Uncategorized

Tagged as:

4 replies »

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s