diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java index 7c1757a55..99d3daa7e 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import lombok.Builder; import lombok.Value; @@ -32,6 +33,7 @@ @Value @Builder public class InternalPartitionField { + public static final String NUM_BUCKETS = "NUM_BUCKETS"; // Source field the partition is based on InternalField sourceField; /* @@ -47,4 +49,6 @@ public class InternalPartitionField { @Builder.Default List partitionFieldNames = Collections.emptyList(); // An enum describing how the source data was transformed into the partition value PartitionTransformType transformType; + // Transform options such as number of buckets in the BUCKET transform type + @Builder.Default Map transformOptions = Collections.emptyMap(); } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java index d6e082f16..8af9cea82 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java @@ -30,7 +30,8 @@ public enum PartitionTransformType { MONTH, DAY, HOUR, - VALUE; + VALUE, + BUCKET; public boolean isTimeBased() { return this == YEAR || this == MONTH || this == DAY || this == HOUR; diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java index 3492857f4..69dcdd6ea 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java @@ -75,6 +75,7 @@ public class DeltaPartitionExtractor { private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd"; private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM"; private static final String DATE_FORMAT_FOR_YEAR = "yyyy"; + private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)"; // For timestamp partition fields, actual partition column names in delta format will be of type // generated & and with a name like `delta_partition_col_{transform_type}_{source_field_name}`. private static final String DELTA_PARTITION_COL_NAME_FORMAT = "xtable_partition_col_%s_%s"; @@ -242,7 +243,7 @@ public Map convertToDeltaPartitionFormat( currPartitionColumnName = internalPartitionField.getSourceField().getName(); field = null; } else { - // Since partition field of timestamp type, create new field in schema. + // Since partition field of timestamp or bucket type, create new field in schema. field = getGeneratedField(internalPartitionField); currPartitionColumnName = field.name(); } @@ -270,6 +271,10 @@ public Map partitionValueSerialization(InternalDataFile internal ""); partitionValuesSerialized.put( partitionField.getSourceField().getName(), partitionValueSerialized); + } else if (transformType == PartitionTransformType.BUCKET) { + partitionValueSerialized = partitionValue.getRange().getMaxValue().toString(); + partitionValuesSerialized.put( + getGeneratedColumnName(partitionField), partitionValueSerialized); } else { // use appropriate date formatter for value serialization. partitionValueSerialized = @@ -352,7 +357,6 @@ private StructField getGeneratedField(InternalPartitionField internalPartitionFi String generatedExpression; DataType dataType; String currPartitionColumnName = getGeneratedColumnName(internalPartitionField); - Map generatedExpressionMetadata = new HashMap<>(); switch (internalPartitionField.getTransformType()) { case YEAR: generatedExpression = @@ -373,10 +377,23 @@ private StructField getGeneratedField(InternalPartitionField internalPartitionFi String.format(CAST_FUNCTION, internalPartitionField.getSourceField().getPath()); dataType = DataTypes.DateType; break; + case BUCKET: + generatedExpression = + String.format( + BUCKET_FUNCTION, + internalPartitionField.getSourceField().getPath(), + Integer.MAX_VALUE, + (int) + internalPartitionField + .getTransformOptions() + .get(InternalPartitionField.NUM_BUCKETS)); + dataType = DataTypes.IntegerType; + break; default: throw new PartitionSpecException("Invalid transform type"); } - generatedExpressionMetadata.put(DELTA_GENERATION_EXPRESSION, generatedExpression); + Map generatedExpressionMetadata = + Collections.singletonMap(DELTA_GENERATION_EXPRESSION, generatedExpression); Metadata partitionFieldMetadata = new Metadata(ScalaUtils.convertJavaMapToScala(generatedExpressionMetadata)); return new StructField(currPartitionColumnName, dataType, true, partitionFieldMetadata); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java index 7c48e88d4..c6ac35fb3 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java @@ -35,10 +35,12 @@ import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.exception.TableNotFoundException; +import org.apache.xtable.exception.PartitionSpecException; import org.apache.xtable.exception.UpdateException; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.model.storage.DataLayoutStrategy; /** A class used to initialize new Hudi tables and load the metadata of existing tables. */ @@ -124,6 +126,12 @@ HoodieTableMetaClient initializeHudiTable(String tableDataPath, InternalTable ta @VisibleForTesting static String getKeyGeneratorClass( List partitionFields, List recordKeyFields) { + if (partitionFields.stream() + .anyMatch( + internalPartitionField -> + internalPartitionField.getTransformType() == PartitionTransformType.BUCKET)) { + throw new PartitionSpecException("Bucket partition is not yet supported by Hudi targets"); + } boolean multipleRecordKeyFields = recordKeyFields.size() > 1; boolean multiplePartitionFields = partitionFields.size() > 1; String keyGeneratorClass; diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java index b51f81638..8e202f32c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java @@ -18,9 +18,14 @@ package org.apache.xtable.iceberg; +import static org.apache.xtable.iceberg.IcebergPartitionValueConverter.BUCKET; + import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -32,6 +37,7 @@ import org.apache.iceberg.types.Types; import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.exception.PartitionSpecException; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; @@ -41,6 +47,7 @@ /** Partition spec builder and extractor for Iceberg. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class IcebergPartitionSpecExtractor { + private static final Pattern NUM_BUCKETS_MATCHER = Pattern.compile("bucket\\[(\\d+)\\]"); private static final IcebergPartitionSpecExtractor INSTANCE = new IcebergPartitionSpecExtractor(); public static IcebergPartitionSpecExtractor getInstance() { @@ -70,6 +77,12 @@ public PartitionSpec toIceberg(List partitionFields, Sch case VALUE: partitionSpecBuilder.identity(fieldPath); break; + case BUCKET: + partitionSpecBuilder.bucket( + fieldPath, + (int) + partitioningField.getTransformOptions().get(InternalPartitionField.NUM_BUCKETS)); + break; default: throw new IllegalArgumentException( "Unsupported type: " + partitioningField.getTransformType()); @@ -99,13 +112,27 @@ PartitionTransformType fromIcebergTransform(Transform transform) { throw new NotSupportedException(transformName); } - if (transformName.startsWith("bucket")) { - throw new NotSupportedException(transformName); + if (transformName.startsWith(BUCKET)) { + return PartitionTransformType.BUCKET; } throw new NotSupportedException(transform.toString()); } + private Map getPartitionTransformOptions(Transform transform) { + if (transform.toString().startsWith(BUCKET)) { + Matcher matcher = NUM_BUCKETS_MATCHER.matcher(transform.toString()); + if (matcher.matches()) { + return Collections.singletonMap( + InternalPartitionField.NUM_BUCKETS, Integer.parseInt(matcher.group(1))); + } else { + throw new PartitionSpecException( + "Cannot parse number of buckets from partition transform: " + transform); + } + } + return Collections.emptyMap(); + } + /** * Generates internal representation of the Iceberg partition spec. * @@ -121,6 +148,10 @@ public List fromIceberg( List irPartitionFields = new ArrayList<>(iceSpec.fields().size()); for (PartitionField iceField : iceSpec.fields()) { + // skip void transform + if (iceField.transform().isVoid()) { + continue; + } // fetch the ice field from the schema to properly handle hidden partition fields int sourceColumnId = iceField.sourceId(); Types.NestedField iceSchemaField = iceSchema.findField(sourceColumnId); @@ -131,6 +162,7 @@ public List fromIceberg( InternalPartitionField.builder() .sourceField(irField) .transformType(fromIcebergTransform(iceField.transform())) + .transformOptions(getPartitionTransformOptions(iceField.transform())) .build(); irPartitionFields.add(irPartitionField); } diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java index a6abd2a91..83979a66d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java @@ -66,6 +66,7 @@ public class IcebergPartitionValueConverter { private static final String DAY = "day"; private static final String HOUR = "hour"; private static final String IDENTITY = "identity"; + static final String BUCKET = "bucket"; public static IcebergPartitionValueConverter getInstance() { return INSTANCE; @@ -124,8 +125,16 @@ public List toXTable( transformType = PartitionTransformType.VALUE; break; default: - throw new NotSupportedException( - "Partition transform not supported: " + partitionField.transform().toString()); + if (partitionField.transform().toString().startsWith(BUCKET)) { + value = structLike.get(fieldPosition, Integer.class); + transformType = PartitionTransformType.BUCKET; + } else if (partitionField.transform().isVoid()) { + // skip void type + continue; + } else { + throw new NotSupportedException( + "Partition transform not supported: " + partitionField.transform().toString()); + } } Types.NestedField partitionSourceField = partitionSpec.schema().findField(partitionField.sourceId()); diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 3d539766a..bb7da9eaf 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -100,6 +100,7 @@ import org.apache.xtable.hudi.HudiConversionSourceProvider; import org.apache.xtable.hudi.HudiTestUtil; import org.apache.xtable.iceberg.IcebergConversionSourceProvider; +import org.apache.xtable.iceberg.TestIcebergDataHelper; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; @@ -743,6 +744,34 @@ public void testMetadataRetention() throws Exception { } } + @Test + void otherIcebergPartitionTypes() { + String tableName = getTableName(); + ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); + List targetTableFormats = Collections.singletonList(DELTA); + + ConversionSourceProvider conversionSourceProvider = getConversionSourceProvider(ICEBERG); + try (TestIcebergTable table = + new TestIcebergTable( + tableName, + tempDir, + jsc.hadoopConfiguration(), + "id", + Arrays.asList("level", "string_field"), + TestIcebergDataHelper.SchemaType.COMMON)) { + table.insertRows(100); + + ConversionConfig conversionConfig = + getTableSyncConfig( + ICEBERG, SyncMode.FULL, tableName, table, targetTableFormats, null, null); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(ICEBERG, table, targetTableFormats, 100); + // Query with filter to assert partition does not impact ability to query + checkDatasetEquivalenceWithFilter( + ICEBERG, table, targetTableFormats, "level == 'INFO' AND string_field > 'abc'"); + } + } + private Map getTimeTravelOption(String tableFormat, Instant time) { Map options = new HashMap<>(); switch (tableFormat) { diff --git a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java index 0c8336fef..5a394a96f 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java @@ -352,7 +352,7 @@ private DataFile writeAndGetDataFile(List records, StructLike partitionK Path baseDataPath = Paths.get(icebergTable.location(), "data"); String filePath; if (icebergDataHelper.getPartitionSpec().isPartitioned()) { - String partitionPath = getPartitionPath(partitionKey.get(0, String.class)); + String partitionPath = ((PartitionKey) partitionKey).toPath(); filePath = baseDataPath.resolve(partitionPath).resolve(UUID.randomUUID() + ".parquet").toString(); } else { @@ -434,14 +434,4 @@ private List writeAllDataFiles(Map> recordsBy .map(entry -> writeAndGetDataFile(entry.getValue(), entry.getKey())) .collect(Collectors.toList()); } - - private String getPartitionPath(Object partitionValue) { - Preconditions.checkArgument( - icebergDataHelper.getPartitionFieldNames().size() == 1, - "Only single partition field is supported for grouping records by partition"); - Preconditions.checkArgument( - icebergDataHelper.getPartitionFieldNames().get(0).equals("level"), - "Only level partition field is supported for grouping records by partition"); - return "level=" + partitionValue; - } } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java index ac5daa992..3575dc117 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java @@ -18,6 +18,7 @@ package org.apache.xtable.delta; +import static org.apache.xtable.delta.DeltaPartitionExtractor.DELTA_GENERATION_EXPRESSION; import static org.junit.jupiter.api.Assertions.*; import java.util.Arrays; @@ -494,6 +495,42 @@ public void testYearMonthDayHourGeneratedPartitionValueExtraction() { assertEquals(expectedPartitionValues, partitionValues); } + @Test + void convertBucketPartition() { + InternalPartitionField internalPartitionField = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("partition_column1") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(PartitionTransformType.BUCKET) + .transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS, 5)) + .build(); + Map actual = + deltaPartitionExtractor.convertToDeltaPartitionFormat( + Collections.singletonList(internalPartitionField)); + Metadata expectedPartitionFieldMetadata = + new Metadata( + ScalaUtils.convertJavaMapToScala( + Collections.singletonMap( + DELTA_GENERATION_EXPRESSION, + "MOD((HASH(partition_column1) & 2147483647), 5)"))); + Map expected = + Collections.singletonMap( + "xtable_partition_col_BUCKET_partition_column1", + new StructField( + "xtable_partition_col_BUCKET_partition_column1", + DataTypes.IntegerType, + true, + expectedPartitionFieldMetadata)); + assertEquals(expected, actual); + } + private scala.collection.mutable.Map convertJavaMapToScalaMap( Map javaMap) { return JavaConverters.mapAsScalaMapConverter(javaMap).asScala(); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java index d90ba169f..a8518c1f9 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java @@ -126,7 +126,7 @@ public class TestIcebergDataHelper { String recordKeyField; List partitionFieldNames; - public static enum SchemaType { + public enum SchemaType { BASIC, COMMON, COMMON_WITH_ADDITIONAL_COLUMNS, @@ -202,6 +202,13 @@ public PartitionSpec getPartitionSpec() { if (partitionFieldNames.isEmpty()) { return PartitionSpec.unpartitioned(); } + if (partitionFieldNames.equals(Arrays.asList("level", "string_field"))) { + return PartitionSpec.builderFor(tableSchema) + .alwaysNull("bytes_field") + .identity("level") + .bucket("string_field", 10) + .build(); + } if (partitionFieldNames.size() > 1) { throw new IllegalArgumentException( "Please modify the code to support multiple partition columns"); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java index 31fbb6f54..5935f4cbb 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java @@ -18,6 +18,8 @@ package org.apache.xtable.iceberg; +import static org.junit.jupiter.api.Assertions.assertEquals; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -54,7 +56,7 @@ public void testUnpartitioned() { PartitionSpec actual = IcebergPartitionSpecExtractor.getInstance().toIceberg(null, icebergSchema); PartitionSpec expected = PartitionSpec.unpartitioned(); - Assertions.assertEquals(expected, actual); + assertEquals(expected, actual); } @Test @@ -84,7 +86,7 @@ public void testMultiplePartitions() { .hour("timestamp_hour") .identity("string_field") .build(); - Assertions.assertEquals(expected, actual); + assertEquals(expected, actual); } @Test @@ -103,7 +105,7 @@ public void testYearPartitioning() { PartitionSpec actual = IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, TEST_SCHEMA); PartitionSpec expected = PartitionSpec.builderFor(TEST_SCHEMA).year("timestamp_year").build(); - Assertions.assertEquals(expected, actual); + assertEquals(expected, actual); } @Test @@ -122,7 +124,7 @@ public void testMonthPartitioning() { PartitionSpec actual = IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, TEST_SCHEMA); PartitionSpec expected = PartitionSpec.builderFor(TEST_SCHEMA).month("timestamp_month").build(); - Assertions.assertEquals(expected, actual); + assertEquals(expected, actual); } @Test @@ -141,7 +143,7 @@ public void testDayPartitioning() { PartitionSpec actual = IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, TEST_SCHEMA); PartitionSpec expected = PartitionSpec.builderFor(TEST_SCHEMA).day("timestamp_day").build(); - Assertions.assertEquals(expected, actual); + assertEquals(expected, actual); } @Test @@ -160,7 +162,7 @@ public void testHourPartitioning() { PartitionSpec actual = IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, TEST_SCHEMA); PartitionSpec expected = PartitionSpec.builderFor(TEST_SCHEMA).hour("timestamp_hour").build(); - Assertions.assertEquals(expected, actual); + assertEquals(expected, actual); } @Test @@ -187,7 +189,27 @@ public void testNestedPartitionField() { IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, icebergSchema); PartitionSpec expected = PartitionSpec.builderFor(icebergSchema).identity("data.nested").build(); - Assertions.assertEquals(expected, actual); + assertEquals(expected, actual); + } + + @Test + void testBucketPartition() { + List partitionFieldList = + Collections.singletonList( + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("string_field") + .schema(InternalSchema.builder().dataType(InternalType.STRING).build()) + .build()) + .transformType(PartitionTransformType.BUCKET) + .transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS, 3)) + .build()); + PartitionSpec actual = + IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, TEST_SCHEMA); + PartitionSpec expected = + PartitionSpec.builderFor(TEST_SCHEMA).bucket("string_field", 3).build(); + assertEquals(expected, actual); } @Test @@ -195,7 +217,7 @@ public void testFromIcebergUnPartitioned() { IcebergPartitionSpecExtractor extractor = IcebergPartitionSpecExtractor.getInstance(); List fields = extractor.fromIceberg(PartitionSpec.unpartitioned(), null, null); - Assertions.assertEquals(0, fields.size()); + assertEquals(0, fields.size()); } @Test @@ -227,13 +249,52 @@ public void testFromIcebergSingleColumn() { List irPartitionSpec = extractor.fromIceberg(icePartitionSpec, iceSchema, irSchema); - Assertions.assertEquals(1, irPartitionSpec.size()); + assertEquals(1, irPartitionSpec.size()); InternalField sourceField = irPartitionSpec.get(0).getSourceField(); - Assertions.assertEquals("key_string", sourceField.getName()); - Assertions.assertEquals(1, sourceField.getFieldId()); - Assertions.assertEquals(InternalType.STRING, sourceField.getSchema().getDataType()); - Assertions.assertEquals( - PartitionTransformType.VALUE, irPartitionSpec.get(0).getTransformType()); + assertEquals("key_string", sourceField.getName()); + assertEquals(1, sourceField.getFieldId()); + assertEquals(InternalType.STRING, sourceField.getSchema().getDataType()); + assertEquals(PartitionTransformType.VALUE, irPartitionSpec.get(0).getTransformType()); + } + + @Test + void testFromIcebergBucket() { + IcebergPartitionSpecExtractor extractor = IcebergPartitionSpecExtractor.getInstance(); + + Schema iceSchema = + new Schema( + Types.NestedField.required(0, "data_int", Types.IntegerType.get()), + Types.NestedField.required(1, "key_string", Types.StringType.get())); + PartitionSpec icePartitionSpec = + PartitionSpec.builderFor(iceSchema).bucket("data_int", 2).build(); + + InternalSchema irSchema = + InternalSchema.builder() + .name("test_schema") + .fields( + Arrays.asList( + InternalField.builder() + .name("data_int") + .schema(InternalSchema.builder().dataType(InternalType.INT).build()) + .build(), + InternalField.builder() + .name("key_string") + .fieldId(1) + .schema(InternalSchema.builder().dataType(InternalType.STRING).build()) + .build())) + .build(); + + List irPartitionSpec = + extractor.fromIceberg(icePartitionSpec, iceSchema, irSchema); + + InternalPartitionField expected = + InternalPartitionField.builder() + .sourceField(irSchema.getFields().get(0)) + .transformType(PartitionTransformType.BUCKET) + .transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS, 2)) + .build(); + + assertEquals(Collections.singletonList(expected), irPartitionSpec); } @Test @@ -266,37 +327,34 @@ public void testFromIcebergMultiColumn() { List irPartitionSpec = extractor.fromIceberg(icePartitionSpec, iceSchema, irSchema); - Assertions.assertEquals(2, irPartitionSpec.size()); + assertEquals(2, irPartitionSpec.size()); InternalField sourceField = irPartitionSpec.get(0).getSourceField(); - Assertions.assertEquals("key_string", sourceField.getName()); - Assertions.assertEquals(11, sourceField.getFieldId()); - Assertions.assertEquals(InternalType.STRING, sourceField.getSchema().getDataType()); - Assertions.assertEquals( - PartitionTransformType.VALUE, irPartitionSpec.get(0).getTransformType()); + assertEquals("key_string", sourceField.getName()); + assertEquals(11, sourceField.getFieldId()); + assertEquals(InternalType.STRING, sourceField.getSchema().getDataType()); + assertEquals(PartitionTransformType.VALUE, irPartitionSpec.get(0).getTransformType()); sourceField = irPartitionSpec.get(1).getSourceField(); - Assertions.assertEquals("key_year", sourceField.getName()); - Assertions.assertEquals(10, sourceField.getFieldId()); - Assertions.assertEquals(InternalType.DATE, sourceField.getSchema().getDataType()); - Assertions.assertEquals(PartitionTransformType.YEAR, irPartitionSpec.get(1).getTransformType()); + assertEquals("key_year", sourceField.getName()); + assertEquals(10, sourceField.getFieldId()); + assertEquals(InternalType.DATE, sourceField.getSchema().getDataType()); + assertEquals(PartitionTransformType.YEAR, irPartitionSpec.get(1).getTransformType()); } @Test public void fromIcebergTransformType() { IcebergPartitionSpecExtractor extractor = IcebergPartitionSpecExtractor.getInstance(); - Assertions.assertEquals( - PartitionTransformType.YEAR, extractor.fromIcebergTransform(Transforms.year())); - Assertions.assertEquals( - PartitionTransformType.MONTH, extractor.fromIcebergTransform(Transforms.month())); - Assertions.assertEquals( - PartitionTransformType.DAY, extractor.fromIcebergTransform(Transforms.day())); - Assertions.assertEquals( - PartitionTransformType.HOUR, extractor.fromIcebergTransform(Transforms.hour())); - Assertions.assertEquals( + assertEquals(PartitionTransformType.YEAR, extractor.fromIcebergTransform(Transforms.year())); + assertEquals(PartitionTransformType.MONTH, extractor.fromIcebergTransform(Transforms.month())); + assertEquals(PartitionTransformType.DAY, extractor.fromIcebergTransform(Transforms.day())); + assertEquals(PartitionTransformType.HOUR, extractor.fromIcebergTransform(Transforms.hour())); + assertEquals( PartitionTransformType.VALUE, extractor.fromIcebergTransform(Transforms.identity())); + assertEquals( + PartitionTransformType.BUCKET, extractor.fromIcebergTransform(Transforms.bucket(2))); Assertions.assertThrows( - NotSupportedException.class, () -> extractor.fromIcebergTransform(Transforms.bucket(10))); + NotSupportedException.class, () -> extractor.fromIcebergTransform(Transforms.truncate(10))); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java index 7ff331ec6..bdbb444cc 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java @@ -109,6 +109,31 @@ public void testToXTableYearPartitioned() { assertEquals(expectedPartitionValues, partitionValues); } + @Test + void testToXTableBucketPartitioned() { + Schema schemaWithPartition = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "birthDate", Types.TimestampType.withZone()), + Types.NestedField.optional(4, "name_bucket", Types.IntegerType.get())); + StructLike structLike = Row.of(schemaWithPartition, 1, "abc", 1614556800000L, 5); + List expectedPartitionValues = + Collections.singletonList( + PartitionValue.builder() + .partitionField(getPartitionField("name", PartitionTransformType.BUCKET)) + .range(Range.scalar(5)) + .build()); + PartitionSpec partitionSpec = PartitionSpec.builderFor(SCHEMA).bucket("name", 8).build(); + List partitionValues = + partitionValueConverter.toXTable( + buildInternalTable(true, "name", PartitionTransformType.BUCKET), + structLike, + partitionSpec); + assertEquals(1, partitionValues.size()); + assertEquals(expectedPartitionValues, partitionValues); + } + private InternalTable buildInternalTable(boolean isPartitioned) { return buildInternalTable(isPartitioned, null, null); }