Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle partition schema evolution in partitions metadata #12416

Merged
merged 2 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ColumnMetadata;
Expand All @@ -37,16 +38,20 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.StructLikeWrapper;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -59,6 +64,7 @@
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toUnmodifiableSet;

public class PartitionTable
implements SystemTable
Expand All @@ -67,9 +73,9 @@ public class PartitionTable
private final Table icebergTable;
private final Optional<Long> snapshotId;
private final Map<Integer, Type.PrimitiveType> idToTypeMapping;
private final List<Types.NestedField> nonPartitionPrimitiveColumns;
private final Optional<RowType> partitionColumnType;
private final List<io.trino.spi.type.Type> partitionColumnTypes;
private final List<NestedField> nonPartitionPrimitiveColumns;
private final Optional<IcebergPartitionColumn> partitionColumnType;
private final List<PartitionField> partitionFields;
private final Optional<RowType> dataColumnType;
private final List<RowType> columnMetricTypes;
private final List<io.trino.spi.type.Type> resultTypes;
Expand All @@ -82,21 +88,14 @@ public PartitionTable(SchemaTableName tableName, TypeManager typeManager, Table
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
this.idToTypeMapping = primitiveFieldTypes(icebergTable.schema());

List<Types.NestedField> columns = icebergTable.schema().columns();
List<PartitionField> partitionFields = icebergTable.spec().fields();
List<NestedField> columns = icebergTable.schema().columns();
this.partitionFields = getAllPartitionFields(icebergTable);

ImmutableList.Builder<ColumnMetadata> columnMetadataBuilder = ImmutableList.builder();

this.partitionColumnType = getPartitionColumnType(partitionFields, icebergTable.schema());
if (partitionColumnType.isPresent()) {
columnMetadataBuilder.add(new ColumnMetadata("partition", partitionColumnType.get()));
this.partitionColumnTypes = partitionColumnType.get().getFields().stream()
.map(RowType.Field::getType)
.collect(toImmutableList());
}
else {
this.partitionColumnTypes = ImmutableList.of();
}
partitionColumnType.ifPresent(icebergPartitionColumn ->
columnMetadataBuilder.add(new ColumnMetadata("partition", icebergPartitionColumn.rowType)));

Stream.of("record_count", "file_count", "total_size")
.forEach(metric -> columnMetadataBuilder.add(new ColumnMetadata(metric, BIGINT)));
Expand Down Expand Up @@ -140,20 +139,53 @@ public ConnectorTableMetadata getTableMetadata()
return connectorTableMetadata;
}

private Optional<RowType> getPartitionColumnType(List<PartitionField> fields, Schema schema)
private static List<PartitionField> getAllPartitionFields(Table icebergTable)
{
Set<Integer> existingColumnsIds = icebergTable.schema()
.columns().stream()
.map(NestedField::fieldId)
.collect(toUnmodifiableSet());

List<PartitionField> visiblePartitionFields = icebergTable.specs()
.values().stream()
.flatMap(partitionSpec -> partitionSpec.fields().stream())
// skip columns that were dropped
.filter(partitionField -> existingColumnsIds.contains(partitionField.sourceId()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not possible right now, because of apache/iceberg#4563 right?

Copy link
Member Author

@homar homar May 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but not only, it is also to avoid name conflicts with columns that were renamed

.collect(toImmutableList());

return filterOutDuplicates(visiblePartitionFields);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Stream#distinct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't rely on PartitionField implementation of hashCode and equals as they take transformation into account and I really care about Id's. I could maybe provide my own comparator or something but I think this is cleaner now

}

private static List<PartitionField> filterOutDuplicates(List<PartitionField> visiblePartitionFields)
{
Set<Integer> alreadyExistingFieldIds = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (optional): extract the logic for filtering duplicates to a different method. (just to improve the readability)

List<PartitionField> result = new ArrayList<>();
for (PartitionField partitionField : visiblePartitionFields) {
if (!alreadyExistingFieldIds.contains(partitionField.fieldId())) {
alreadyExistingFieldIds.add(partitionField.fieldId());
result.add(partitionField);
}
}
return result;
}

private Optional<IcebergPartitionColumn> getPartitionColumnType(List<PartitionField> fields, Schema schema)
{
if (fields.isEmpty()) {
return Optional.empty();
}
List<RowType.Field> partitionFields = fields.stream()
.map(field -> RowType.field(
field.name(),
toTrinoType(field.transform().getResultType(schema.findType(field.sourceId())), typeManager)))
.collect(toImmutableList());
if (partitionFields.isEmpty()) {
return Optional.empty();
}
return Optional.of(RowType.from(partitionFields));
List<Integer> fieldIds = fields.stream()
.map(PartitionField::fieldId)
.collect(toImmutableList());
return Optional.of(new IcebergPartitionColumn(RowType.from(partitionFields), fieldIds));
}

private Optional<RowType> getMetricsColumnType(List<Types.NestedField> columns)
private Optional<RowType> getMetricsColumnType(List<NestedField> columns)
{
List<RowType.Field> metricColumns = columns.stream()
.map(column -> RowType.field(
Expand All @@ -180,21 +212,22 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
.useSnapshot(snapshotId.get())
.includeColumnStats();
// TODO make the cursor lazy
return buildRecordCursor(getStatisticsByPartition(tableScan), icebergTable.spec().fields());
return buildRecordCursor(getStatisticsByPartition(tableScan));
}

private Map<StructLikeWrapper, IcebergStatistics> getStatisticsByPartition(TableScan tableScan)
private Map<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics> getStatisticsByPartition(TableScan tableScan)
{
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
Map<StructLikeWrapper, IcebergStatistics.Builder> partitions = new HashMap<>();
Map<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics.Builder> partitions = new HashMap<>();
for (FileScanTask fileScanTask : fileScanTasks) {
DataFile dataFile = fileScanTask.file();
Types.StructType structType = fileScanTask.spec().partitionType();
StructLike partitionStruct = dataFile.partition();
StructLikeWrapper partitionWrapper = StructLikeWrapper.forType(structType).set(partitionStruct);
StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = new StructLikeWrapperWithFieldIdToIndex(partitionWrapper, structType);

partitions.computeIfAbsent(
partitionWrapper,
structLikeWrapperWithFieldIdToIndex,
ignored -> new IcebergStatistics.Builder(icebergTable.schema().columns(), typeManager))
.acceptDataFile(dataFile, fileScanTask.spec());
}
Expand All @@ -207,31 +240,40 @@ private Map<StructLikeWrapper, IcebergStatistics> getStatisticsByPartition(Table
}
}

private RecordCursor buildRecordCursor(Map<StructLikeWrapper, IcebergStatistics> partitionStatistics, List<PartitionField> partitionFields)
private RecordCursor buildRecordCursor(Map<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics> partitionStatistics)
{
List<Type> partitionTypes = partitionTypes(partitionFields);
List<Type> partitionTypes = partitionTypes();
List<? extends Class<?>> partitionColumnClass = partitionTypes.stream()
.map(type -> type.typeId().javaClass())
.collect(toImmutableList());

ImmutableList.Builder<List<Object>> records = ImmutableList.builder();

for (Map.Entry<StructLikeWrapper, IcebergStatistics> partitionEntry : partitionStatistics.entrySet()) {
StructLikeWrapper partitionStruct = partitionEntry.getKey();
for (Map.Entry<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics> partitionEntry : partitionStatistics.entrySet()) {
StructLikeWrapperWithFieldIdToIndex partitionStruct = partitionEntry.getKey();
IcebergStatistics icebergStatistics = partitionEntry.getValue();
List<Object> row = new ArrayList<>();

// add data for partition columns
partitionColumnType.ifPresent(partitionColumnType -> {
BlockBuilder partitionRowBlockBuilder = partitionColumnType.createBlockBuilder(null, 1);
BlockBuilder partitionRowBlockBuilder = partitionColumnType.rowType.createBlockBuilder(null, 1);
BlockBuilder partitionBlockBuilder = partitionRowBlockBuilder.beginBlockEntry();
List<io.trino.spi.type.Type> partitionColumnTypes = partitionColumnType.rowType.getFields().stream()
.map(RowType.Field::getType)
.collect(toImmutableList());
for (int i = 0; i < partitionColumnTypes.size(); i++) {
io.trino.spi.type.Type trinoType = partitionColumnType.getFields().get(i).getType();
Object value = convertIcebergValueToTrino(partitionTypes.get(i), partitionStruct.get().get(i, partitionColumnClass.get(i)));
io.trino.spi.type.Type trinoType = partitionColumnType.rowType.getFields().get(i).getType();
Object value = null;
Integer fieldId = partitionColumnType.fieldIds.get(i);
if (partitionStruct.fieldIdToIndex.containsKey(fieldId)) {
value = convertIcebergValueToTrino(
partitionTypes.get(i),
partitionStruct.structLikeWrapper.get().get(partitionStruct.fieldIdToIndex.get(fieldId), partitionColumnClass.get(i)));
}
writeNativeValue(trinoType, partitionBlockBuilder, value);
}
partitionRowBlockBuilder.closeEntry();
row.add(partitionColumnType.getObject(partitionRowBlockBuilder, 0));
row.add(partitionColumnType.rowType.getObject(partitionRowBlockBuilder, 0));
});

// add the top level metrics.
Expand Down Expand Up @@ -268,7 +310,7 @@ private RecordCursor buildRecordCursor(Map<StructLikeWrapper, IcebergStatistics>
return new InMemoryRecordSet(resultTypes, records.build()).cursor();
}

private List<Type> partitionTypes(List<PartitionField> partitionFields)
private List<Type> partitionTypes()
{
ImmutableList.Builder<Type> partitionTypeBuilder = ImmutableList.builder();
for (PartitionField partitionField : partitionFields) {
Expand All @@ -292,4 +334,70 @@ private static Block getColumnMetricBlock(RowType columnMetricType, Object min,
rowBlockBuilder.closeEntry();
return columnMetricType.getObject(rowBlockBuilder, 0);
}

private static class StructLikeWrapperWithFieldIdToIndex
{
private final StructLikeWrapper structLikeWrapper;
private final Map<Integer, Integer> fieldIdToIndex;

public StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper structLikeWrapper, Types.StructType structType)
{
this.structLikeWrapper = structLikeWrapper;
ImmutableMap.Builder<Integer, Integer> fieldIdToIndex = ImmutableMap.builder();
List<NestedField> fields = structType.fields();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.fieldIdToIndex = fields.stream().collect(Collectors.toMap(NestedField::fieldId, Function.identity()));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. I need mapping from fieldId -> its index, your code gives fieldId -> NestedField, I could probably do it with some zipWithIndex method but I think standard way is more readible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i didn't pay enough attention here. Thanks for the explanation.

IntStream.range(0, fields.size())
.forEach(i -> fieldIdToIndex.put(fields.get(i).fieldId(), i));
this.fieldIdToIndex = fieldIdToIndex.buildOrThrow();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StructLikeWrapperWithFieldIdToIndex that = (StructLikeWrapperWithFieldIdToIndex) o;
return Objects.equals(structLikeWrapper, that.structLikeWrapper) && Objects.equals(fieldIdToIndex, that.fieldIdToIndex);
}

@Override
public int hashCode()
{
return Objects.hash(structLikeWrapper, fieldIdToIndex);
}
}

private static class IcebergPartitionColumn
{
private final RowType rowType;
private final List<Integer> fieldIds;

public IcebergPartitionColumn(RowType rowType, List<Integer> fieldIds)
{
this.rowType = rowType;
this.fieldIds = fieldIds;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IcebergPartitionColumn that = (IcebergPartitionColumn) o;
return Objects.equals(rowType, that.rowType) && Objects.equals(fieldIds, that.fieldIds);
}

@Override
public int hashCode()
{
return Objects.hash(rowType, fieldIds);
}
}
}
Loading