Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPJ] Skweded partitions harm merge performances #11800

Open
aiss93 opened this issue Dec 17, 2024 · 10 comments
Open

[SPJ] Skweded partitions harm merge performances #11800

aiss93 opened this issue Dec 17, 2024 · 10 comments
Labels
question Further information is requested

Comments

@aiss93
Copy link

aiss93 commented Dec 17, 2024

Query engine

  • I'm using AWS Glue interactive session with glue version 5.0.
  • Spark 3.5.2
  • iceberg 1.6.1

Question

Hi

I have two s3 data sources, full_time_series and batch_time_series. Both tables are clustered by measure and datetime columns. Theses tables are partitionned by month(datetime).

I'm using the following configurations to enable SPJ and tune performance by using a shuffle hash join istead of a sort merge join :

- spark.sql.sources.v2.bucketing.enabled = true
- spark.sql.sources.v2.bucketing.pushPartValues.enabled = true
- spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled = true
- spark.sql.requireAllClusterKeysForCoPartition = false
- spark.sql.iceberg.planning.preserve-data-grouping = true
- spark.sql.shuffledHashJoinFactor = 1
- spark.sql.join.preferSortMergeJoin = false

Unless I'm mistaken, the configuration spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled = true is used to split big partitions into smaller chunks. Even tough this configuration is enabled I'm getting skewed partitions as shown in the figure below :

image

Here is the query I'm performing :
merge into full_time_series target using full_time_series as source on target.measure = source.measure and target.datetime_utc = source.datetime_utc when matched then update set * when not matched then insert *

@aiss93 aiss93 added the question Further information is requested label Dec 17, 2024
@aiss93
Copy link
Author

aiss93 commented Dec 17, 2024

@szehon-ho I saw that video you made on this topic during the Iceberg Summit. Do you see anything missing in the configuration ?
Thank you for your help.

@szehon-ho
Copy link
Collaborator

Yes unfortunately that optimization is a bit limited, it splits the big size and replicate the small side, so is only correct to do for inner join. I think in this case, you have matched and not matched, so it is an outer join, and not eligible

@aiss93
Copy link
Author

aiss93 commented Dec 18, 2024

Thank you for your fast reply @szehon-ho.
Is there any plans to make it work for full outer joins as well ? I'd like to work on this issue if possible.

What do you suggest for this kind of situations ?
Perhaps, salting can help to resolve this issue.

@szehon-ho
Copy link
Collaborator

szehon-ho commented Dec 18, 2024

Yea im not sure how to solve the problem. What do you mean salting?

To explain the current problem, if you have two side [A] and [B], the algoirthm split A and duplicate B like [A1, B], [A2, B], [A3, B]. If you have A outer join B, then it wont work because rows of B will appear multiple times.

@aiss93
Copy link
Author

aiss93 commented Dec 18, 2024

Yeah I see the problem now. Thanks.
I'm not an iceberg/spark internals expert, but wouldn't it be possible to compute a flag per partition to know if there was at least one match between one of the table B replicated partitions and the table A splited partition ? And in case there is no match at all, we can then add one copy of the B partition ?

@szehon-ho
Copy link
Collaborator

Hm not sure i get it, can you give an example? SPJ is at the planning stage, we have to decide what partition from each side to put together.

@aiss93
Copy link
Author

aiss93 commented Dec 18, 2024

I don't know if it makes sense regarding Spark/Iceberg internals. If we consider the following example

Table A huge partition to split Table B partition to replicate
date id value
10/10/2024 1 a
10/10/2024 2 b
10/10/2024 3 c
10/10/2024 4 d
10/10/2024 5 e
10/10/2024 6 f
date id value
10/10/2024 7 x
10/10/2024 8 y

After the partition split we'll get the following :

Table A splited partitionTable B replicated partition
10/10/2024 1 a
10/10/2024 2 b
10/10/2024 7 x
10/10/2024 8 y
10/10/2024 3 c
10/10/2024 4 d
10/10/2024 7 x
10/10/2024 8 y
10/10/2024 5 e
10/10/2024 6 f
10/10/2024 7 x
10/10/2024 8 y

In case we have a if not matched then insert *, as you explained above each replicated partition from table B will be inserted and therefore we'll have duplicates. For the example above, we'll have rows with id 7 and 8 inserted 3 time each.

The idea I was suggesting consists in :

  • Computing an aggregated boolean that tells if not matched check is true for all of theses replicated partitions.
  • If this flag is true, then assign only one replicated partition to execute the insert statement.

@szehon-ho
Copy link
Collaborator

Hm, but for "not match" check, you need to check each replicated partition against all the splitted partitions, so it defeats the point of splitting I think.

@aiss93
Copy link
Author

aiss93 commented Dec 28, 2024

Hm I'm a little bit confused. The check need be done only once since the replicated partitions hold the same data, right ?

@szehon-ho
Copy link
Collaborator

I think there are two 'not matched' here,
1 ) entries in A not matched in B
2) entries in B not matched in A.

Case (1) is do-able.
For case (2), it is harder because each of the replicate partitions only gets a partial view of the split side. Say:

TableA(split)       TableB (replicate)            
(1, a)                     (1, a)
(2, b)                     (1, a)

The second pair (2, b) vs (1, a) will wrongly calculate that TableB has a non-match with TableA and return (1,a). But actually it match in the first pair.

Hope that makes sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants