Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error with multiple nested partition columns on Iceberg #24629

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jinyangli34
Copy link
Contributor

@jinyangli34 jinyangli34 commented Jan 4, 2025

Description

Assign channel number based on the order of field ID.

Fixes #24628

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`24628`)

Copy link

cla-bot bot commented Jan 4, 2025

Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: jinyang_li.
This is most likely caused by a git client misconfiguration; please make sure to:

  1. check if your git client is configured with an email to sign commits git config --list | grep email
  2. If not, set it up using git config --global user.email [email protected]
  3. Make sure that the git commit email is configured in your GitHub account settings, see https://github.com/settings/emails

@github-actions github-actions bot added the iceberg Iceberg connector label Jan 4, 2025
@jinyangli34 jinyangli34 force-pushed the jinyang-fix_iceberg_channels branch from 5269716 to 98c7606 Compare January 4, 2025 05:16
@cla-bot cla-bot bot added the cla-signed label Jan 4, 2025
@jinyangli34 jinyangli34 changed the title Fix error with multiple nested partition columns on Iceberg [wip] Fix error with multiple nested partition columns on Iceberg Jan 4, 2025
@jinyangli34 jinyangli34 marked this pull request as draft January 4, 2025 09:33
@ebyhr ebyhr requested review from dain and raunaqmorarka January 7, 2025 11:56
@jinyangli34 jinyangli34 force-pushed the jinyang-fix_iceberg_channels branch from 98c7606 to 612c082 Compare January 11, 2025 01:29
Copy link

cla-bot bot commented Jan 11, 2025

Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: jinyang_li.
This is most likely caused by a git client misconfiguration; please make sure to:

  1. check if your git client is configured with an email to sign commits git config --list | grep email
  2. If not, set it up using git config --global user.email [email protected]
  3. Make sure that the git commit email is configured in your GitHub account settings, see https://github.com/settings/emails

@cla-bot cla-bot bot removed the cla-signed label Jan 11, 2025
@jinyangli34 jinyangli34 force-pushed the jinyang-fix_iceberg_channels branch from 612c082 to 88a4559 Compare January 11, 2025 02:29
@cla-bot cla-bot bot added the cla-signed label Jan 11, 2025
@jinyangli34 jinyangli34 force-pushed the jinyang-fix_iceberg_channels branch 2 times, most recently from dea4ec4 to 835d669 Compare January 11, 2025 03:20
@jinyangli34 jinyangli34 changed the title [wip] Fix error with multiple nested partition columns on Iceberg Fix error with multiple nested partition columns on Iceberg Jan 11, 2025
@jinyangli34 jinyangli34 marked this pull request as ready for review January 11, 2025 03:20
@wendigo wendigo requested a review from ebyhr January 11, 2025 18:07
@jinyangli34 jinyangli34 force-pushed the jinyang-fix_iceberg_channels branch from 835d669 to b391dba Compare January 14, 2025 01:18
@jinyangli34 jinyangli34 force-pushed the jinyang-fix_iceberg_channels branch from b391dba to 2986072 Compare January 14, 2025 05:41
@jinyangli34 jinyangli34 force-pushed the jinyang-fix_iceberg_channels branch from 2986072 to f2ccfcb Compare January 15, 2025 07:58
@wendigo wendigo force-pushed the jinyang-fix_iceberg_channels branch from f2ccfcb to 0afc91f Compare January 15, 2025 15:14
@wendigo
Copy link
Contributor

wendigo commented Jan 15, 2025

I've applied some fixes, ptal @ebyhr @raunaqmorarka @chenjian2664

Copy link
Member

@dain dain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the root problem here is my original code assumed the fields from the iceberg spec would be returned in field ID order and that not necessarily true. The fix is to build the data paths, sort by field ID, and then build the final paths? Maybe we should mention the faulty assumption and the fix in the commit message.

I had some comments about adding more comments but the rest looks good to me.

@@ -92,16 +113,14 @@ private static boolean buildDataPaths(Set<Integer> partitionFieldIds, Types.Stru
currentPaths.addLast(fieldOrdinal);
org.apache.iceberg.types.Type type = field.type();
if (type instanceof Types.StructType nestedStruct) {
hasPartitionFields = buildDataPaths(partitionFieldIds, nestedStruct, currentPaths, dataPaths) || hasPartitionFields;
buildDataPaths(partitionFieldIds, nestedStruct, currentPaths, dataPaths);
}
// Map and List types are not supported in partitioning
if (type.isPrimitiveType() && partitionFieldIds.contains(fieldId)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be else if? I assume structure is mutually exclusive of these cases. If so, I think it makes more sense to be explicit. If not, we should explain what the consequences of that is.

Comment on lines 70 to 78
for (Types.NestedField field : spec.schema().asStruct().fields()) {
// Partition fields can only be nested in a struct
if (field.type() instanceof Types.StructType nestedStruct) {
if (buildDataPaths(partitionFieldIds, nestedStruct, new ArrayDeque<>(List.of(channel)), fieldInfo)) {
channel++;
}
buildDataPaths(partitionFieldIds, nestedStruct, new ArrayDeque<>(ImmutableList.of(field.fieldId())), fieldInfo);
}
else if (field.type().isPrimitiveType() && partitionFieldIds.contains(field.fieldId())) {
fieldInfo.put(field.fieldId(), ImmutableList.of(channel));
channel++;
fieldInfo.put(field.fieldId(), ImmutableList.of(field.fieldId()));
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it is just one step unrolled of the inner build data paths call, but I'm pretty sure it is not. I believe the difference is this out call is using field id, but he inner code is using ordinal position. This is preexisting, but maybe add a comment mentioning this, since the code now looks so similar.

}
}
return fieldInfo;

// assign channel for top level fields based on the order of the field id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this step is doing more than the comment lets on. The paths are being rewritten from fieldId.structOrdinalX.structOrdinalY to channel.structOrdinalX.structOrdinalY, where channels are assigned based on the numeric ordering of the field ID. So I would mention this rewrites the path, changing the root path from field ID to sequential channel number (maybe phrase this better with help from copilot).

@@ -64,26 +66,45 @@ private static Map<Integer, List<Integer>> buildDataPaths(PartitionSpec spec)
{
Set<Integer> partitionFieldIds = spec.fields().stream().map(PartitionField::sourceId).collect(toImmutableSet());

int channel = 0;
Map<Integer, List<Integer>> fieldInfo = new HashMap<>();
for (Types.NestedField field : spec.schema().asStruct().fields()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the change again, it seems that the sole purpose of the change is to ensure that channels are allocated in field ID order. If so, can we fix this by sorting the fields before this loop. I'm thinking something like this:

Suggested change
for (Types.NestedField field : spec.schema().asStruct().fields()) {
List<Types.NestedField> fields = spec.schema().asStruct().fields().stream()
.sorted(Comparator.comparingInt(Types.NestedField::fieldId))
.collect(toImmutableList());
for (Types.NestedField field : fields) {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed iceberg Iceberg connector
Development

Successfully merging this pull request may close these issues.

Iceberg query fails with multiple nested partition columns
5 participants