Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Prevent dropping column which is referenced by active partition specs #11842

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

anuragmantri
Copy link
Contributor

Prevents

Supersedes: #10352

Co-authored by: @amogh-jahagirdar

Summary

If we drop a partition column that is still referenced in an older spec, this will break the table. This can be easily reproduced in Spark by adding a SELECT after these tests

   sql("SELECT * FROM %s LIMIT 1", tableName);

Will throw

Type cannot be null
java.lang.NullPointerException: Type cannot be null
	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:922)
	at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:615)
	at org.apache.iceberg.types.Types$NestedField.optional(Types.java:508)
	at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:134)
	at org.apache.iceberg.Partitioning.buildPartitionProjectionType(Partitioning.java:284)
	at org.apache.iceberg.Partitioning.partitionType(Partitioning.java:242)
	at org.apache.iceberg.spark.source.SparkTable.metadataColumns(SparkTable.java:258)

As discussed by @amogh-jahagirdar and @advancedxy here, the solution is two parts

Part 1: Prevent dropping the column referenced in older specs (This PR)
Part 2: Provide a maintenance API for users to remove all unused specs (#10755)

Both can be merged independently.

@anuragmantri
Copy link
Contributor Author

@amogh-jahagirdar, @advancedxy @RussellSpitzer - Could you please take a look? Thanks!

szehon-ho
szehon-ho previously approved these changes Dec 20, 2024
@szehon-ho szehon-ho dismissed their stale review December 20, 2024 23:41

Mistake :)

@anuragmantri
Copy link
Contributor Author

I will fix other partition evolution tests

.forEach(
(specId, spec) -> {
// Prevent dropping fields that are referenced in older partitions specs in use.
if (!specId.equals(base.defaultSpecId())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we will run into the issue again if we allow dropping from current spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, dropping partition source columns from the current spec will fail later when building table metadata.
I don't think we should allow that here.

We should remove the if condition here as we certainly cannot remove partition source columns in the current spec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. My concern is that this is a big behavior change. Users cannot drop partition columns at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. My concern is that this is a big behavior change. Users cannot drop partition columns at all.

I don't users can drop partition source field/columns(please note , not the partition columns) in current spec. See the following test:

    TestTables.TestTable table =
        TestTables.create(tableDir, "test", SCHEMA, BY_DATA_SPEC, V1_FORMAT_VERSION);
    table.updateSchema().deleteColumn("data").commit();

will throws:

Cannot find source column for partition field: 1000: data: identity(2)
org.apache.iceberg.exceptions.ValidationException: Cannot find source column for partition field: 1000: data: identity(2)
	at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
	at org.apache.iceberg.PartitionSpec.checkCompatibility(PartitionSpec.java:636)
	at org.apache.iceberg.TableMetadata$Builder.build(TableMetadata.java:1469)
	at org.apache.iceberg.TableMetadata.updateSchema(TableMetadata.java:560)
	at org.apache.iceberg.SchemaUpdate.commit(SchemaUpdate.java:447)
	at org.apache.iceberg.TestPartitioning.testDropPartitionSourceField(TestPartitioning.java:94)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:766)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$8(TestMethodTestDescriptor.java:217)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:156)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:160)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:160)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:124)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:99)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:94)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:63)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:92)
	at jdk.proxy1/jdk.proxy1.$Proxy4.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:200)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:132)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:103)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:63)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:121)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)

So, it make sense to not allow dropping partition source columns in other active partition specs as well. The only problem is that the void transform in V1 table, which still references a source column.

@@ -508,7 +508,8 @@ private static Schema applyChanges(
Multimap<Integer, Types.NestedField> adds,
Multimap<Integer, Move> moves,
Set<String> identifierFieldNames,
boolean caseSensitive) {
boolean caseSensitive,
TableMetadata base) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe just passing Map<Integer, PartitionSpec> specsById here.

@advancedxy
Copy link
Contributor

advancedxy commented Dec 21, 2024

For v1 tables, it's possible that the partition specs are evolved and have a void transform in the spec. After this PR, the source field in the void transform cannot be removed either(That's why some of the tests are failing). We have two options:

  1. upgrade the v1 table to v2 and then remove the void transform in the old spec and produces a new one. Call ExpireSnapshot with metadata clean when all the data files written by old spec are rewritten/cleaned after ttl. After that, the v1 specs are expired and the offending columns can be removed.
  2. Allow removing the partition source field in void transform in this PR. I think it requires extra work and we may need to keep the input type for void transform, which might lead to a table spec change.

I'd prefer option 1 even though it's a bit inconvenient for users.

@anuragmantri
Copy link
Contributor Author

Thanks for your review @advancedxy. My concern is that this is a big behavior change. Partition columns cannot be dropped anymore. I will start a thread in dev channel to see what others think.

@Fokko
Copy link
Contributor

Fokko commented Dec 23, 2024

Allow removing the partition source field in void transform in this PR. I think it requires extra work and we may need to keep the input type for void transform, which might lead to a table spec change.

This is not safe, unfortunately. I was working on a solution for this (see #11604), but that's also unsafe for V1 tables. I'm planning to work on a different solution that I have in mind.

@advancedxy
Copy link
Contributor

Allow removing the partition source field in void transform in this PR. I think it requires extra work and we may need to keep the input type for void transform, which might lead to a table spec change.

This is not safe, unfortunately. I was working on a solution for this (see #11604), but that's also unsafe for V1 tables. I'm planning to work on a different solution that I have in mind.

Looking forward for your solution. BTW, what do you think about the option 1, it might some additional work from users, but I think it should be doable?

@Fokko
Copy link
Contributor

Fokko commented Dec 24, 2024

@advancedxy Sorry for ignoring comment 1, I had to think about that one a bit:

upgrade the v1 table to v2 and then remove the void transform in the old spec and produces a new one. Call ExpireSnapshot with metadata clean when all the data files written by old spec are rewritten/cleaned after ttl. After that, the v1 specs are expired and the offending columns can be removed.

So, we cannot alter existing partition specs. Even after upgrading to V2, the metadata is still in V1. The relevant part of the spec:

image

This means they are still sequential, and we cannot rely on field-IDs. Removing one field in the middle is dangerous since you alter the order. Best case it will throw an error when evaluating the data (if the types are compatible), worst case it will return faulty data. I think when it finds a partition field, that does not point to a field anymore because it is dropped, it should just be ignored when the partition-spec is evaluated.

@advancedxy
Copy link
Contributor

So, we cannot alter existing partition specs. Even after upgrading to V2, the metadata is still in V1. The relevant part of the spec:

I think we should be able to evolve the partition spec after upgrading to V2? The relevant part of the spec is for V1 tables, so any implementation when dealing with V1 tables, it should evolve the partition specs in that way. Once the table is upgraded to V2, I think old clients(ones that support v1 tables only) will be rejected. For any relative new clients(supports v2 format), it should produces specs with field id included.

@Fokko
Copy link
Contributor

Fokko commented Dec 24, 2024

For any relative new clients(supports v2 format), it should produces specs with field id included.

It should indeed, but you cannot guarantee that, and it is not enforced by the spec.

@advancedxy
Copy link
Contributor

For any relative new clients(supports v2 format), it should produces specs with field id included.

It should indeed, but you cannot guarantee that, and it is not enforced by the spec.

Hmmm, maybe I'm not following here. When it's upgraded to v2, I think the table metadata should be regenerated with partition field id included? And the spec enforces that the field id should be unique across all the specs:
https://iceberg.apache.org/spec/#partitioning

image

@Fokko
Copy link
Contributor

Fokko commented Dec 24, 2024

Just to double check, with dropping the offending column, I was assuming that you would mutate an existing spec. But I think after going to V2, we should rewrite it into a new spec (that drops the column).

@advancedxy
Copy link
Contributor

Just to double check, with dropping the offending column, I was assuming that you would mutate an existing spec.

No, I'm not proposing to mutate an existing spec directly. After going to v2, we need to update table's current spec to remove the void transform, in that way, new data files will be produced without the void transform. After all the old data files are rewrote, the spec with void transform could be expired and the source column can be then dropped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants