From a99736e6303a585b4c357339993bed71480c7231 Mon Sep 17 00:00:00 2001 From: Jalpan Randeri Date: Wed, 12 Feb 2025 19:14:46 -0800 Subject: [PATCH] [Bugfix] Populate table name from the identifier in Iceberge conversion What is the problem? While converting the iceberg table to other format such as Hudi, the icerberg source table do not populate the table name. This is due to iceberg table's behavior as it is treated as Hadoop tables. This leads to table identified as table-location, leading to confusing conversation. Solution: This commit handles the conversation logic, when icebege table manager provides HadoopTable, it populate the table name from provided input TableIdentifier. This ensures that source table name is carried over to the transformation. Testing: - Added unit test to cover this scenario. Co-authored-by: Tim Brown --- .../iceberg/IcebergConversionSource.java | 10 +- .../iceberg/ITIcebergConversionSource.java | 95 +++++++++++++++++++ 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 0d400e28b..2a4b50ae2 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -120,10 +120,18 @@ public InternalTable getTable(Snapshot snapshot) { irPartitionFields.size() > 0 ? DataLayoutStrategy.HIVE_STYLE_PARTITION : DataLayoutStrategy.FLAT; + // When the table name is not explicitly specified, Iceberg assumes the table is HDFS-based, + // treating the table name as the location in HDFS. This assumption can lead to mismatches + // during metadata conversion. To mitigate this issue, we rely on the table name provided in the + // source configuration of the conversation so target matches the user's expectations. + // See https://github.com/apache/incubator-xtable/issues/494 return InternalTable.builder() .tableFormat(TableFormat.ICEBERG) .basePath(iceTable.location()) - .name(iceTable.name()) + .name( + iceTable.name().contains(iceTable.location()) + ? sourceTableConfig.getName() + : iceTable.name()) .partitioningFields(irPartitionFields) .latestCommitTime(Instant.ofEpochMilli(snapshot.timestampMillis())) .readSchema(irSchema) diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java index 210b6f9d6..a921c209f 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java @@ -25,6 +25,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.nio.file.Path; import java.time.Instant; @@ -32,12 +34,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.SneakyThrows; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -61,6 +67,7 @@ import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.TableFormat; +import org.mockito.Mockito; public class ITIcebergConversionSource { private static final Configuration hadoopConf = new Configuration(); @@ -126,7 +133,95 @@ void getCurrentTableTest() { .build(); validateTable( internalTable, + testIcebergTable.getTableName(), + TableFormat.ICEBERG, + internalSchema, + DataLayoutStrategy.FLAT, testIcebergTable.getBasePath(), + Collections.emptyList()); + } + } + + @Test + void getCurrentTableWithCatalogConfigTest() { + String tableName = getTableName(); + try (TestIcebergTable testIcebergTable = + new TestIcebergTable( + tableName, + tempDir, + hadoopConf, + "field1", + Collections.singletonList(null), + TestIcebergDataHelper.SchemaType.BASIC)) { + testIcebergTable.insertRows(50); + + // create resources + Map OPTIONS = Collections.singletonMap("key", "value"); + Catalog mockCatalog = mock(Catalog.class); + String catalogName = "catalog1"; + StubCatalog.registerMock(catalogName, mockCatalog); + IcebergCatalogConfig catalogConfig = + IcebergCatalogConfig.builder() + .catalogImpl(StubCatalog.class.getName()) + .catalogName(catalogName) + .catalogOptions(OPTIONS) + .build(); + Map map = new HashMap<>(); + map.put(0, testIcebergTable.getSchema()); + Table mockTable = mock(Table.class, Mockito.RETURNS_DEEP_STUBS); + when(mockTable.name()).thenReturn(tableName); + when(mockTable.location()).thenReturn(testIcebergTable.getBasePath()); + when(mockTable.schemas()).thenReturn(map); + + when(mockCatalog.loadTable(Mockito.any(TableIdentifier.class))) + .thenReturn(mockTable); + + // begin test + SourceTable tableConfig = + SourceTable.builder() + .name(testIcebergTable.getTableName()) + .basePath(testIcebergTable.getBasePath()) + .formatName(TableFormat.ICEBERG) + .catalogConfig(catalogConfig) + .build(); + IcebergConversionSource conversionSource = + sourceProvider.getConversionSourceInstance(tableConfig); + InternalTable internalTable = conversionSource.getCurrentTable(); + + // expectations + InternalSchema internalSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name("field1") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(), + InternalField.builder() + .name("field2") + .fieldId(2) + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + + validateTable( + internalTable, + testIcebergTable.getTableName(), TableFormat.ICEBERG, internalSchema, DataLayoutStrategy.FLAT,