Skip to content

Commit

Permalink
Handle partition schema evolution in partitions metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
homar authored and findepi committed May 25, 2022
1 parent 366f81e commit db099cf
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ColumnMetadata;
Expand All @@ -37,16 +38,20 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.StructLikeWrapper;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -59,6 +64,7 @@
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toUnmodifiableSet;

public class PartitionTable
implements SystemTable
Expand All @@ -67,9 +73,9 @@ public class PartitionTable
private final Table icebergTable;
private final Optional<Long> snapshotId;
private final Map<Integer, Type.PrimitiveType> idToTypeMapping;
private final List<Types.NestedField> nonPartitionPrimitiveColumns;
private final Optional<RowType> partitionColumnType;
private final List<io.trino.spi.type.Type> partitionColumnTypes;
private final List<NestedField> nonPartitionPrimitiveColumns;
private final Optional<IcebergPartitionColumn> partitionColumnType;
private final List<PartitionField> partitionFields;
private final Optional<RowType> dataColumnType;
private final List<RowType> columnMetricTypes;
private final List<io.trino.spi.type.Type> resultTypes;
Expand All @@ -82,21 +88,14 @@ public PartitionTable(SchemaTableName tableName, TypeManager typeManager, Table
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
this.idToTypeMapping = primitiveFieldTypes(icebergTable.schema());

List<Types.NestedField> columns = icebergTable.schema().columns();
List<PartitionField> partitionFields = icebergTable.spec().fields();
List<NestedField> columns = icebergTable.schema().columns();
this.partitionFields = getAllPartitionFields(icebergTable);

ImmutableList.Builder<ColumnMetadata> columnMetadataBuilder = ImmutableList.builder();

this.partitionColumnType = getPartitionColumnType(partitionFields, icebergTable.schema());
if (partitionColumnType.isPresent()) {
columnMetadataBuilder.add(new ColumnMetadata("partition", partitionColumnType.get()));
this.partitionColumnTypes = partitionColumnType.get().getFields().stream()
.map(RowType.Field::getType)
.collect(toImmutableList());
}
else {
this.partitionColumnTypes = ImmutableList.of();
}
partitionColumnType.ifPresent(icebergPartitionColumn ->
columnMetadataBuilder.add(new ColumnMetadata("partition", icebergPartitionColumn.rowType)));

Stream.of("record_count", "file_count", "total_size")
.forEach(metric -> columnMetadataBuilder.add(new ColumnMetadata(metric, BIGINT)));
Expand Down Expand Up @@ -140,7 +139,37 @@ public ConnectorTableMetadata getTableMetadata()
return connectorTableMetadata;
}

private Optional<RowType> getPartitionColumnType(List<PartitionField> fields, Schema schema)
private static List<PartitionField> getAllPartitionFields(Table icebergTable)
{
Set<Integer> existingColumnsIds = icebergTable.schema()
.columns().stream()
.map(NestedField::fieldId)
.collect(toUnmodifiableSet());

List<PartitionField> visiblePartitionFields = icebergTable.specs()
.values().stream()
.flatMap(partitionSpec -> partitionSpec.fields().stream())
// skip columns that were dropped
.filter(partitionField -> existingColumnsIds.contains(partitionField.sourceId()))
.collect(toImmutableList());

return filterOutDuplicates(visiblePartitionFields);
}

private static List<PartitionField> filterOutDuplicates(List<PartitionField> visiblePartitionFields)
{
Set<Integer> alreadyExistingFieldIds = new HashSet<>();
List<PartitionField> result = new ArrayList<>();
for (PartitionField partitionField : visiblePartitionFields) {
if (!alreadyExistingFieldIds.contains(partitionField.fieldId())) {
alreadyExistingFieldIds.add(partitionField.fieldId());
result.add(partitionField);
}
}
return result;
}

private Optional<IcebergPartitionColumn> getPartitionColumnType(List<PartitionField> fields, Schema schema)
{
if (fields.isEmpty()) {
return Optional.empty();
Expand All @@ -150,10 +179,13 @@ private Optional<RowType> getPartitionColumnType(List<PartitionField> fields, Sc
field.name(),
toTrinoType(field.transform().getResultType(schema.findType(field.sourceId())), typeManager)))
.collect(toImmutableList());
return Optional.of(RowType.from(partitionFields));
List<Integer> fieldIds = fields.stream()
.map(PartitionField::fieldId)
.collect(toImmutableList());
return Optional.of(new IcebergPartitionColumn(RowType.from(partitionFields), fieldIds));
}

private Optional<RowType> getMetricsColumnType(List<Types.NestedField> columns)
private Optional<RowType> getMetricsColumnType(List<NestedField> columns)
{
List<RowType.Field> metricColumns = columns.stream()
.map(column -> RowType.field(
Expand All @@ -180,21 +212,22 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
.useSnapshot(snapshotId.get())
.includeColumnStats();
// TODO make the cursor lazy
return buildRecordCursor(getStatisticsByPartition(tableScan), icebergTable.spec().fields());
return buildRecordCursor(getStatisticsByPartition(tableScan));
}

private Map<StructLikeWrapper, IcebergStatistics> getStatisticsByPartition(TableScan tableScan)
private Map<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics> getStatisticsByPartition(TableScan tableScan)
{
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
Map<StructLikeWrapper, IcebergStatistics.Builder> partitions = new HashMap<>();
Map<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics.Builder> partitions = new HashMap<>();
for (FileScanTask fileScanTask : fileScanTasks) {
DataFile dataFile = fileScanTask.file();
Types.StructType structType = fileScanTask.spec().partitionType();
StructLike partitionStruct = dataFile.partition();
StructLikeWrapper partitionWrapper = StructLikeWrapper.forType(structType).set(partitionStruct);
StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = new StructLikeWrapperWithFieldIdToIndex(partitionWrapper, structType);

partitions.computeIfAbsent(
partitionWrapper,
structLikeWrapperWithFieldIdToIndex,
ignored -> new IcebergStatistics.Builder(icebergTable.schema().columns(), typeManager))
.acceptDataFile(dataFile, fileScanTask.spec());
}
Expand All @@ -207,31 +240,40 @@ private Map<StructLikeWrapper, IcebergStatistics> getStatisticsByPartition(Table
}
}

private RecordCursor buildRecordCursor(Map<StructLikeWrapper, IcebergStatistics> partitionStatistics, List<PartitionField> partitionFields)
private RecordCursor buildRecordCursor(Map<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics> partitionStatistics)
{
List<Type> partitionTypes = partitionTypes(partitionFields);
List<Type> partitionTypes = partitionTypes();
List<? extends Class<?>> partitionColumnClass = partitionTypes.stream()
.map(type -> type.typeId().javaClass())
.collect(toImmutableList());

ImmutableList.Builder<List<Object>> records = ImmutableList.builder();

for (Map.Entry<StructLikeWrapper, IcebergStatistics> partitionEntry : partitionStatistics.entrySet()) {
StructLikeWrapper partitionStruct = partitionEntry.getKey();
for (Map.Entry<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics> partitionEntry : partitionStatistics.entrySet()) {
StructLikeWrapperWithFieldIdToIndex partitionStruct = partitionEntry.getKey();
IcebergStatistics icebergStatistics = partitionEntry.getValue();
List<Object> row = new ArrayList<>();

// add data for partition columns
partitionColumnType.ifPresent(partitionColumnType -> {
BlockBuilder partitionRowBlockBuilder = partitionColumnType.createBlockBuilder(null, 1);
BlockBuilder partitionRowBlockBuilder = partitionColumnType.rowType.createBlockBuilder(null, 1);
BlockBuilder partitionBlockBuilder = partitionRowBlockBuilder.beginBlockEntry();
List<io.trino.spi.type.Type> partitionColumnTypes = partitionColumnType.rowType.getFields().stream()
.map(RowType.Field::getType)
.collect(toImmutableList());
for (int i = 0; i < partitionColumnTypes.size(); i++) {
io.trino.spi.type.Type trinoType = partitionColumnType.getFields().get(i).getType();
Object value = convertIcebergValueToTrino(partitionTypes.get(i), partitionStruct.get().get(i, partitionColumnClass.get(i)));
io.trino.spi.type.Type trinoType = partitionColumnType.rowType.getFields().get(i).getType();
Object value = null;
Integer fieldId = partitionColumnType.fieldIds.get(i);
if (partitionStruct.fieldIdToIndex.containsKey(fieldId)) {
value = convertIcebergValueToTrino(
partitionTypes.get(i),
partitionStruct.structLikeWrapper.get().get(partitionStruct.fieldIdToIndex.get(fieldId), partitionColumnClass.get(i)));
}
writeNativeValue(trinoType, partitionBlockBuilder, value);
}
partitionRowBlockBuilder.closeEntry();
row.add(partitionColumnType.getObject(partitionRowBlockBuilder, 0));
row.add(partitionColumnType.rowType.getObject(partitionRowBlockBuilder, 0));
});

// add the top level metrics.
Expand Down Expand Up @@ -268,7 +310,7 @@ private RecordCursor buildRecordCursor(Map<StructLikeWrapper, IcebergStatistics>
return new InMemoryRecordSet(resultTypes, records.build()).cursor();
}

private List<Type> partitionTypes(List<PartitionField> partitionFields)
private List<Type> partitionTypes()
{
ImmutableList.Builder<Type> partitionTypeBuilder = ImmutableList.builder();
for (PartitionField partitionField : partitionFields) {
Expand All @@ -292,4 +334,70 @@ private static Block getColumnMetricBlock(RowType columnMetricType, Object min,
rowBlockBuilder.closeEntry();
return columnMetricType.getObject(rowBlockBuilder, 0);
}

private static class StructLikeWrapperWithFieldIdToIndex
{
private final StructLikeWrapper structLikeWrapper;
private final Map<Integer, Integer> fieldIdToIndex;

public StructLikeWrapperWithFieldIdToIndex(StructLikeWrapper structLikeWrapper, Types.StructType structType)
{
this.structLikeWrapper = structLikeWrapper;
ImmutableMap.Builder<Integer, Integer> fieldIdToIndex = ImmutableMap.builder();
List<NestedField> fields = structType.fields();
IntStream.range(0, fields.size())
.forEach(i -> fieldIdToIndex.put(fields.get(i).fieldId(), i));
this.fieldIdToIndex = fieldIdToIndex.buildOrThrow();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StructLikeWrapperWithFieldIdToIndex that = (StructLikeWrapperWithFieldIdToIndex) o;
return Objects.equals(structLikeWrapper, that.structLikeWrapper) && Objects.equals(fieldIdToIndex, that.fieldIdToIndex);
}

@Override
public int hashCode()
{
return Objects.hash(structLikeWrapper, fieldIdToIndex);
}
}

private static class IcebergPartitionColumn
{
private final RowType rowType;
private final List<Integer> fieldIds;

public IcebergPartitionColumn(RowType rowType, List<Integer> fieldIds)
{
this.rowType = rowType;
this.fieldIds = fieldIds;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IcebergPartitionColumn that = (IcebergPartitionColumn) o;
return Objects.equals(rowType, that.rowType) && Objects.equals(fieldIds, that.fieldIds);
}

@Override
public int hashCode()
{
return Objects.hash(rowType, fieldIds);
}
}
}
Loading

0 comments on commit db099cf

Please sign in to comment.