Skip to content

[668] Support bucket partition transform for Iceberg Sources and Delta Targets #670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;

import lombok.Builder;
import lombok.Value;
Expand All @@ -32,6 +33,7 @@
@Value
@Builder
public class InternalPartitionField {
public static final String NUM_BUCKETS = "NUM_BUCKETS";
// Source field the partition is based on
InternalField sourceField;
/*
Expand All @@ -47,4 +49,6 @@ public class InternalPartitionField {
@Builder.Default List<String> partitionFieldNames = Collections.emptyList();
// An enum describing how the source data was transformed into the partition value
PartitionTransformType transformType;
// Transform options such as number of buckets in the BUCKET transform type
@Builder.Default Map<String, Object> transformOptions = Collections.emptyMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of Object, should we add a new class known as InternalValue ?

InternalValue { 
  InternalType type; 
  Object value;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, I made this similar to the metadata in InternalSchema. Another option I considered is to change PartitionTransformType into a class which can then have its own instance variables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is okay for now.

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public enum PartitionTransformType {
MONTH,
DAY,
HOUR,
VALUE;
VALUE,
BUCKET;

public boolean isTimeBased() {
return this == YEAR || this == MONTH || this == DAY || this == HOUR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class DeltaPartitionExtractor {
private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd";
private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM";
private static final String DATE_FORMAT_FOR_YEAR = "yyyy";
private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)";
// For timestamp partition fields, actual partition column names in delta format will be of type
// generated & and with a name like `delta_partition_col_{transform_type}_{source_field_name}`.
private static final String DELTA_PARTITION_COL_NAME_FORMAT = "xtable_partition_col_%s_%s";
Expand Down Expand Up @@ -242,7 +243,7 @@ public Map<String, StructField> convertToDeltaPartitionFormat(
currPartitionColumnName = internalPartitionField.getSourceField().getName();
field = null;
} else {
// Since partition field of timestamp type, create new field in schema.
// Since partition field of timestamp or bucket type, create new field in schema.
field = getGeneratedField(internalPartitionField);
currPartitionColumnName = field.name();
}
Expand Down Expand Up @@ -270,6 +271,10 @@ public Map<String, String> partitionValueSerialization(InternalDataFile internal
"");
partitionValuesSerialized.put(
partitionField.getSourceField().getName(), partitionValueSerialized);
} else if (transformType == PartitionTransformType.BUCKET) {
partitionValueSerialized = partitionValue.getRange().getMaxValue().toString();
partitionValuesSerialized.put(
getGeneratedColumnName(partitionField), partitionValueSerialized);
} else {
// use appropriate date formatter for value serialization.
partitionValueSerialized =
Expand Down Expand Up @@ -352,7 +357,6 @@ private StructField getGeneratedField(InternalPartitionField internalPartitionFi
String generatedExpression;
DataType dataType;
String currPartitionColumnName = getGeneratedColumnName(internalPartitionField);
Map<String, String> generatedExpressionMetadata = new HashMap<>();
switch (internalPartitionField.getTransformType()) {
case YEAR:
generatedExpression =
Expand All @@ -373,10 +377,23 @@ private StructField getGeneratedField(InternalPartitionField internalPartitionFi
String.format(CAST_FUNCTION, internalPartitionField.getSourceField().getPath());
dataType = DataTypes.DateType;
break;
case BUCKET:
generatedExpression =
String.format(
BUCKET_FUNCTION,
internalPartitionField.getSourceField().getPath(),
Integer.MAX_VALUE,
(int)
internalPartitionField
.getTransformOptions()
.get(InternalPartitionField.NUM_BUCKETS));
dataType = DataTypes.IntegerType;
break;
default:
throw new PartitionSpecException("Invalid transform type");
}
generatedExpressionMetadata.put(DELTA_GENERATION_EXPRESSION, generatedExpression);
Map<String, String> generatedExpressionMetadata =
Collections.singletonMap(DELTA_GENERATION_EXPRESSION, generatedExpression);
Metadata partitionFieldMetadata =
new Metadata(ScalaUtils.convertJavaMapToScala(generatedExpressionMetadata));
return new StructField(currPartitionColumnName, dataType, true, partitionFieldMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.exception.TableNotFoundException;

import org.apache.xtable.exception.PartitionSpecException;
import org.apache.xtable.exception.UpdateException;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.storage.DataLayoutStrategy;

/** A class used to initialize new Hudi tables and load the metadata of existing tables. */
Expand Down Expand Up @@ -124,6 +126,12 @@ HoodieTableMetaClient initializeHudiTable(String tableDataPath, InternalTable ta
@VisibleForTesting
static String getKeyGeneratorClass(
List<InternalPartitionField> partitionFields, List<InternalField> recordKeyFields) {
if (partitionFields.stream()
.anyMatch(
internalPartitionField ->
internalPartitionField.getTransformType() == PartitionTransformType.BUCKET)) {
throw new PartitionSpecException("Bucket partition is not yet supported by Hudi targets");
}
boolean multipleRecordKeyFields = recordKeyFields.size() > 1;
boolean multiplePartitionFields = partitionFields.size() > 1;
String keyGeneratorClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@

package org.apache.xtable.iceberg;

import static org.apache.xtable.iceberg.IcebergPartitionValueConverter.BUCKET;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
Expand All @@ -32,6 +37,7 @@
import org.apache.iceberg.types.Types;

import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.exception.PartitionSpecException;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
Expand All @@ -41,6 +47,7 @@
/** Partition spec builder and extractor for Iceberg. */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class IcebergPartitionSpecExtractor {
private static final Pattern NUM_BUCKETS_MATCHER = Pattern.compile("bucket\\[(\\d+)\\]");
private static final IcebergPartitionSpecExtractor INSTANCE = new IcebergPartitionSpecExtractor();

public static IcebergPartitionSpecExtractor getInstance() {
Expand Down Expand Up @@ -70,6 +77,12 @@ public PartitionSpec toIceberg(List<InternalPartitionField> partitionFields, Sch
case VALUE:
partitionSpecBuilder.identity(fieldPath);
break;
case BUCKET:
partitionSpecBuilder.bucket(
fieldPath,
(int)
partitioningField.getTransformOptions().get(InternalPartitionField.NUM_BUCKETS));
break;
default:
throw new IllegalArgumentException(
"Unsupported type: " + partitioningField.getTransformType());
Expand Down Expand Up @@ -99,13 +112,27 @@ PartitionTransformType fromIcebergTransform(Transform<?, ?> transform) {
throw new NotSupportedException(transformName);
}

if (transformName.startsWith("bucket")) {
throw new NotSupportedException(transformName);
if (transformName.startsWith(BUCKET)) {
return PartitionTransformType.BUCKET;
}

throw new NotSupportedException(transform.toString());
}

private Map<String, Object> getPartitionTransformOptions(Transform<?, ?> transform) {
if (transform.toString().startsWith(BUCKET)) {
Matcher matcher = NUM_BUCKETS_MATCHER.matcher(transform.toString());
if (matcher.matches()) {
return Collections.singletonMap(
InternalPartitionField.NUM_BUCKETS, Integer.parseInt(matcher.group(1)));
} else {
throw new PartitionSpecException(
"Cannot parse number of buckets from partition transform: " + transform);
}
}
return Collections.emptyMap();
}

/**
* Generates internal representation of the Iceberg partition spec.
*
Expand All @@ -121,6 +148,10 @@ public List<InternalPartitionField> fromIceberg(

List<InternalPartitionField> irPartitionFields = new ArrayList<>(iceSpec.fields().size());
for (PartitionField iceField : iceSpec.fields()) {
// skip void transform
if (iceField.transform().isVoid()) {
continue;
}
// fetch the ice field from the schema to properly handle hidden partition fields
int sourceColumnId = iceField.sourceId();
Types.NestedField iceSchemaField = iceSchema.findField(sourceColumnId);
Expand All @@ -131,6 +162,7 @@ public List<InternalPartitionField> fromIceberg(
InternalPartitionField.builder()
.sourceField(irField)
.transformType(fromIcebergTransform(iceField.transform()))
.transformOptions(getPartitionTransformOptions(iceField.transform()))
.build();
irPartitionFields.add(irPartitionField);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class IcebergPartitionValueConverter {
private static final String DAY = "day";
private static final String HOUR = "hour";
private static final String IDENTITY = "identity";
static final String BUCKET = "bucket";

public static IcebergPartitionValueConverter getInstance() {
return INSTANCE;
Expand Down Expand Up @@ -124,8 +125,16 @@ public List<PartitionValue> toXTable(
transformType = PartitionTransformType.VALUE;
break;
default:
throw new NotSupportedException(
"Partition transform not supported: " + partitionField.transform().toString());
if (partitionField.transform().toString().startsWith(BUCKET)) {
value = structLike.get(fieldPosition, Integer.class);
transformType = PartitionTransformType.BUCKET;
} else if (partitionField.transform().isVoid()) {
// skip void type
continue;
} else {
throw new NotSupportedException(
"Partition transform not supported: " + partitionField.transform().toString());
}
}
Types.NestedField partitionSourceField =
partitionSpec.schema().findField(partitionField.sourceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.xtable.hudi.HudiConversionSourceProvider;
import org.apache.xtable.hudi.HudiTestUtil;
import org.apache.xtable.iceberg.IcebergConversionSourceProvider;
import org.apache.xtable.iceberg.TestIcebergDataHelper;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;

Expand Down Expand Up @@ -743,6 +744,34 @@ public void testMetadataRetention() throws Exception {
}
}

@Test
void otherIcebergPartitionTypes() {
String tableName = getTableName();
ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration());
List<String> targetTableFormats = Collections.singletonList(DELTA);

ConversionSourceProvider<?> conversionSourceProvider = getConversionSourceProvider(ICEBERG);
try (TestIcebergTable table =
new TestIcebergTable(
tableName,
tempDir,
jsc.hadoopConfiguration(),
"id",
Arrays.asList("level", "string_field"),
TestIcebergDataHelper.SchemaType.COMMON)) {
table.insertRows(100);

ConversionConfig conversionConfig =
getTableSyncConfig(
ICEBERG, SyncMode.FULL, tableName, table, targetTableFormats, null, null);
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(ICEBERG, table, targetTableFormats, 100);
// Query with filter to assert partition does not impact ability to query
checkDatasetEquivalenceWithFilter(
ICEBERG, table, targetTableFormats, "level == 'INFO' AND string_field > 'abc'");
}
}

private Map<String, String> getTimeTravelOption(String tableFormat, Instant time) {
Map<String, String> options = new HashMap<>();
switch (tableFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ private DataFile writeAndGetDataFile(List<Record> records, StructLike partitionK
Path baseDataPath = Paths.get(icebergTable.location(), "data");
String filePath;
if (icebergDataHelper.getPartitionSpec().isPartitioned()) {
String partitionPath = getPartitionPath(partitionKey.get(0, String.class));
String partitionPath = ((PartitionKey) partitionKey).toPath();
filePath =
baseDataPath.resolve(partitionPath).resolve(UUID.randomUUID() + ".parquet").toString();
} else {
Expand Down Expand Up @@ -434,14 +434,4 @@ private List<DataFile> writeAllDataFiles(Map<StructLike, List<Record>> recordsBy
.map(entry -> writeAndGetDataFile(entry.getValue(), entry.getKey()))
.collect(Collectors.toList());
}

private String getPartitionPath(Object partitionValue) {
Preconditions.checkArgument(
icebergDataHelper.getPartitionFieldNames().size() == 1,
"Only single partition field is supported for grouping records by partition");
Preconditions.checkArgument(
icebergDataHelper.getPartitionFieldNames().get(0).equals("level"),
"Only level partition field is supported for grouping records by partition");
return "level=" + partitionValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.xtable.delta;

import static org.apache.xtable.delta.DeltaPartitionExtractor.DELTA_GENERATION_EXPRESSION;
import static org.junit.jupiter.api.Assertions.*;

import java.util.Arrays;
Expand Down Expand Up @@ -494,6 +495,42 @@ public void testYearMonthDayHourGeneratedPartitionValueExtraction() {
assertEquals(expectedPartitionValues, partitionValues);
}

@Test
void convertBucketPartition() {
InternalPartitionField internalPartitionField =
InternalPartitionField.builder()
.sourceField(
InternalField.builder()
.name("partition_column1")
.schema(
InternalSchema.builder()
.name("string")
.dataType(InternalType.STRING)
.build())
.build())
.transformType(PartitionTransformType.BUCKET)
.transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS, 5))
.build();
Map<String, StructField> actual =
deltaPartitionExtractor.convertToDeltaPartitionFormat(
Collections.singletonList(internalPartitionField));
Metadata expectedPartitionFieldMetadata =
new Metadata(
ScalaUtils.convertJavaMapToScala(
Collections.singletonMap(
DELTA_GENERATION_EXPRESSION,
"MOD((HASH(partition_column1) & 2147483647), 5)")));
Map<String, StructField> expected =
Collections.singletonMap(
"xtable_partition_col_BUCKET_partition_column1",
new StructField(
"xtable_partition_col_BUCKET_partition_column1",
DataTypes.IntegerType,
true,
expectedPartitionFieldMetadata));
assertEquals(expected, actual);
}

private scala.collection.mutable.Map<String, String> convertJavaMapToScalaMap(
Map<String, String> javaMap) {
return JavaConverters.mapAsScalaMapConverter(javaMap).asScala();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public class TestIcebergDataHelper {
String recordKeyField;
List<String> partitionFieldNames;

public static enum SchemaType {
public enum SchemaType {
BASIC,
COMMON,
COMMON_WITH_ADDITIONAL_COLUMNS,
Expand Down Expand Up @@ -202,6 +202,13 @@ public PartitionSpec getPartitionSpec() {
if (partitionFieldNames.isEmpty()) {
return PartitionSpec.unpartitioned();
}
if (partitionFieldNames.equals(Arrays.asList("level", "string_field"))) {
return PartitionSpec.builderFor(tableSchema)
.alwaysNull("bytes_field")
.identity("level")
.bucket("string_field", 10)
.build();
}
if (partitionFieldNames.size() > 1) {
throw new IllegalArgumentException(
"Please modify the code to support multiple partition columns");
Expand Down
Loading