diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java index e312761f7..a10ee1208 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java @@ -59,6 +59,7 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaSchemaExtractor { private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id"; + private static final String COMMENT = "comment"; private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor(); public static DeltaSchemaExtractor getInstance() { @@ -74,7 +75,7 @@ public StructType fromInternalSchema(InternalSchema internalSchema) { field.getName(), convertFieldType(field), field.getSchema().isNullable(), - getMetaData(field.getSchema().getDataType()))) + getMetaData(field.getSchema()))) .toArray(StructField[]::new); return new StructType(fields); } @@ -144,12 +145,16 @@ private DataType convertFieldType(InternalField field) { } } - private Metadata getMetaData(InternalType type) { + private Metadata getMetaData(InternalSchema schema) { + InternalType type = schema.getDataType(); + MetadataBuilder metadataBuilder = new MetadataBuilder(); if (type == InternalType.UUID) { - return new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build(); - } else { - return Metadata.empty(); + metadataBuilder.putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid"); } + if (schema.getComment() != null) { + metadataBuilder.putString(COMMENT, schema.getComment()); + } + return metadataBuilder.build(); } public InternalSchema toInternalSchema(StructType structType) { diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java index 05e89469d..800938cb4 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.function.Supplier; @@ -132,6 +133,12 @@ private static Map> updateColumn( latestColumn.fieldId(), () -> updateSchema.requireColumn(latestColumn.name())); } } + // update the comment of the column + if (!Objects.equals(currentColumn.doc(), latestColumn.doc())) { + updates.put( + latestColumn.fieldId(), + () -> updateSchema.updateColumnDoc(latestColumn.name(), latestColumn.doc())); + } if (latestColumn.type().isStructType()) { updates.putAll( addUpdates( diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java index 4b0eacd06..361245feb 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java @@ -56,6 +56,7 @@ public void testPrimitiveTypes() { .name("boolean") .dataType(InternalType.BOOLEAN) .isNullable(false) + .comment("requiredBooleanComment") .build()) .build(), InternalField.builder() @@ -226,7 +227,7 @@ public void testPrimitiveTypes() { StructType structRepresentation = new StructType() - .add("requiredBoolean", DataTypes.BooleanType, false) + .add("requiredBoolean", DataTypes.BooleanType, false, "requiredBooleanComment") .add("optionalBoolean", DataTypes.BooleanType, true) .add("requiredInt", DataTypes.IntegerType, false) .add("optionalInt", DataTypes.IntegerType, true) @@ -268,6 +269,7 @@ public void testFixedBytes() { .name("fixed") .dataType(InternalType.FIXED) .isNullable(false) + .comment("comment") .build()) .build(), InternalField.builder() @@ -296,6 +298,7 @@ public void testFixedBytes() { .name("binary") .dataType(InternalType.BYTES) .isNullable(false) + .comment("comment") .build()) .build(), InternalField.builder() @@ -311,7 +314,7 @@ public void testFixedBytes() { .build(); StructType structRepresentation = new StructType() - .add("requiredFixed", DataTypes.BinaryType, false) + .add("requiredFixed", DataTypes.BinaryType, false, "comment") .add("optionalFixed", DataTypes.BinaryType, true); Assertions.assertEquals( @@ -681,6 +684,7 @@ public void testNestedRecords() { .name("struct") .dataType(InternalType.RECORD) .isNullable(true) + .comment("comment") .fields( Arrays.asList( InternalField.builder() @@ -691,6 +695,7 @@ public void testNestedRecords() { .name("integer") .dataType(InternalType.INT) .isNullable(true) + .comment("nestedOptionalIntComment") .build()) .defaultValue( InternalField.Constants.NULL_DEFAULT_VALUE) @@ -740,13 +745,18 @@ public void testNestedRecords() { .add( "nestedOne", new StructType() - .add("nestedOptionalInt", DataTypes.IntegerType, true) + .add( + "nestedOptionalInt", + DataTypes.IntegerType, + true, + "nestedOptionalIntComment") .add("nestedRequiredDouble", DataTypes.DoubleType, false) .add( "nestedTwo", new StructType().add("doublyNestedString", DataTypes.StringType, true), false), - true); + true, + "comment"); Assertions.assertEquals( structRepresentation, DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema)); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java index 347eb3096..98254591e 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java @@ -43,14 +43,15 @@ public class TestIcebergSchemaSync { private static final Schema SCHEMA = new Schema( - Types.NestedField.required(1, "timestamp_field", Types.TimestampType.withoutZone()), + Types.NestedField.required( + 1, "timestamp_field", Types.TimestampType.withoutZone(), "doc"), Types.NestedField.optional(2, "date_field", Types.DateType.get()), Types.NestedField.required(3, "group_id", Types.IntegerType.get()), Types.NestedField.required( 4, "record", Types.StructType.of( - Types.NestedField.required(5, "string_field", Types.StringType.get()), + Types.NestedField.required(5, "string_field", Types.StringType.get(), "doc"), Types.NestedField.required(6, "int_field", Types.IntegerType.get()))), Types.NestedField.required( 7, @@ -228,6 +229,112 @@ public void testMakeExistingColumnRequired() { verify(mockUpdateSchema).commit(); } + @Test + void testAddFieldComment() { + UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class); + when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema); + Types.NestedField updated = + Types.NestedField.optional(2, "date_field", Types.DateType.get(), "doc"); + Schema latest = addCommentToDefault(updated, 2); + + schemaSync.sync(SCHEMA, latest, mockTransaction); + + verify(mockUpdateSchema).updateColumnDoc("date_field", "doc"); + verify(mockUpdateSchema).commit(); + } + + @Test + void testDropFieldComment() { + UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class); + when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema); + Types.NestedField updated = + Types.NestedField.optional(1, "timestamp_field", Types.DateType.get()); + Schema latest = addCommentToDefault(updated, 1); + + schemaSync.sync(SCHEMA, latest, mockTransaction); + + verify(mockUpdateSchema).updateColumnDoc("timestamp_field", null); + verify(mockUpdateSchema).commit(); + } + + @Test + void tesUpdatedFieldComment() { + UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class); + when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema); + Types.NestedField updated = + Types.NestedField.optional(1, "timestamp_field", Types.DateType.get(), "new comment"); + Schema latest = addCommentToDefault(updated, 1); + + schemaSync.sync(SCHEMA, latest, mockTransaction); + + verify(mockUpdateSchema).updateColumnDoc("timestamp_field", "new comment"); + verify(mockUpdateSchema).commit(); + } + + @Test + void testAddNestedFieldComment() { + UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class); + when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema); + Types.NestedField updated = + Types.NestedField.required( + 4, + "record", + Types.StructType.of( + Types.NestedField.required(5, "string_field", Types.StringType.get(), "doc"), + Types.NestedField.required(6, "int_field", Types.IntegerType.get(), "doc"))); + Schema latest = addCommentToDefault(updated, 4); + + schemaSync.sync(SCHEMA, latest, mockTransaction); + + verify(mockUpdateSchema).updateColumnDoc("int_field", "doc"); + verify(mockUpdateSchema).commit(); + } + + @Test + void testAddListFieldComment() { + UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class); + when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema); + Types.NestedField updated = + Types.NestedField.required( + 10, + "array_field", + Types.ListType.ofRequired( + 11, + Types.StructType.of( + Types.NestedField.required(15, "element_string", Types.StringType.get(), "doc"), + Types.NestedField.optional(16, "element_int", Types.IntegerType.get())))); + Schema latest = addCommentToDefault(updated, 10); + + schemaSync.sync(SCHEMA, latest, mockTransaction); + + verify(mockUpdateSchema).updateColumnDoc("element_string", "doc"); + verify(mockUpdateSchema).commit(); + } + + @Test + void testAddMapFieldComment() { + UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class); + when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema); + Types.NestedField updated = + Types.NestedField.required( + 7, + "map_field", + Types.MapType.ofRequired( + 8, + 9, + Types.StructType.of( + Types.NestedField.required(12, "key_string", Types.StringType.get())), + Types.StructType.of( + Types.NestedField.required(13, "value_string", Types.StringType.get(), "doc"), + Types.NestedField.optional(14, "value_int", Types.IntegerType.get())))); + Schema latest = addCommentToDefault(updated, 7); + + schemaSync.sync(SCHEMA, latest, mockTransaction); + + verify(mockUpdateSchema).updateColumnDoc("value_string", "doc"); + verify(mockUpdateSchema).commit(); + } + private Schema addColumnToDefault(Schema schema, Types.NestedField field, Integer parentId) { List fields = new ArrayList<>(); for (Types.NestedField existingField : schema.columns()) { @@ -251,6 +358,18 @@ private Schema addColumnToDefault(Schema schema, Types.NestedField field, Intege return new Schema(fields); } + private Schema addCommentToDefault(Types.NestedField updated, int fieldId) { + List fields = new ArrayList<>(); + for (Types.NestedField existingField : SCHEMA.columns()) { + if (existingField.fieldId() == fieldId) { + fields.add(updated); + } else { + fields.add(existingField); + } + } + return new Schema(fields); + } + private Schema updateFieldRequired(int fieldId) { List fields = new ArrayList<>(); for (Types.NestedField existingField : SCHEMA.columns()) { @@ -260,7 +379,8 @@ private Schema updateFieldRequired(int fieldId) { existingField.fieldId(), !existingField.isOptional(), existingField.name(), - existingField.type())); + existingField.type(), + existingField.doc())); } else { fields.add(existingField); }