Skip to content

Commit a99736e

Browse files
[Bugfix] Populate table name from the identifier in Iceberge conversion
What is the problem? While converting the iceberg table to other format such as Hudi, the icerberg source table do not populate the table name. This is due to iceberg table's behavior as it is treated as Hadoop tables. This leads to table identified as table-location, leading to confusing conversation. Solution: This commit handles the conversation logic, when icebege table manager provides HadoopTable, it populate the table name from provided input TableIdentifier. This ensures that source table name is carried over to the transformation. Testing: - Added unit test to cover this scenario. Co-authored-by: Tim Brown <[email protected]>
1 parent 14967dd commit a99736e

File tree

2 files changed

+104
-1
lines changed

2 files changed

+104
-1
lines changed

xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,18 @@ public InternalTable getTable(Snapshot snapshot) {
120120
irPartitionFields.size() > 0
121121
? DataLayoutStrategy.HIVE_STYLE_PARTITION
122122
: DataLayoutStrategy.FLAT;
123+
// When the table name is not explicitly specified, Iceberg assumes the table is HDFS-based,
124+
// treating the table name as the location in HDFS. This assumption can lead to mismatches
125+
// during metadata conversion. To mitigate this issue, we rely on the table name provided in the
126+
// source configuration of the conversation so target matches the user's expectations.
127+
// See https://github.com/apache/incubator-xtable/issues/494
123128
return InternalTable.builder()
124129
.tableFormat(TableFormat.ICEBERG)
125130
.basePath(iceTable.location())
126-
.name(iceTable.name())
131+
.name(
132+
iceTable.name().contains(iceTable.location())
133+
? sourceTableConfig.getName()
134+
: iceTable.name())
127135
.partitioningFields(irPartitionFields)
128136
.latestCommitTime(Instant.ofEpochMilli(snapshot.timestampMillis()))
129137
.readSchema(irSchema)

xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,25 @@
2525
import static org.junit.jupiter.api.Assertions.assertEquals;
2626
import static org.junit.jupiter.api.Assertions.assertFalse;
2727
import static org.junit.jupiter.api.Assertions.assertTrue;
28+
import static org.mockito.Mockito.mock;
29+
import static org.mockito.Mockito.when;
2830

2931
import java.nio.file.Path;
3032
import java.time.Instant;
3133
import java.time.temporal.ChronoUnit;
3234
import java.util.ArrayList;
3335
import java.util.Arrays;
3436
import java.util.Collections;
37+
import java.util.HashMap;
3538
import java.util.List;
3639
import java.util.Map;
3740

3841
import lombok.SneakyThrows;
3942

4043
import org.apache.hadoop.conf.Configuration;
44+
import org.apache.iceberg.Table;
45+
import org.apache.iceberg.catalog.Catalog;
46+
import org.apache.iceberg.catalog.TableIdentifier;
4147
import org.junit.jupiter.api.BeforeEach;
4248
import org.junit.jupiter.api.Test;
4349
import org.junit.jupiter.api.io.TempDir;
@@ -61,6 +67,7 @@
6167
import org.apache.xtable.model.schema.PartitionTransformType;
6268
import org.apache.xtable.model.storage.DataLayoutStrategy;
6369
import org.apache.xtable.model.storage.TableFormat;
70+
import org.mockito.Mockito;
6471

6572
public class ITIcebergConversionSource {
6673
private static final Configuration hadoopConf = new Configuration();
@@ -126,7 +133,95 @@ void getCurrentTableTest() {
126133
.build();
127134
validateTable(
128135
internalTable,
136+
testIcebergTable.getTableName(),
137+
TableFormat.ICEBERG,
138+
internalSchema,
139+
DataLayoutStrategy.FLAT,
129140
testIcebergTable.getBasePath(),
141+
Collections.emptyList());
142+
}
143+
}
144+
145+
@Test
146+
void getCurrentTableWithCatalogConfigTest() {
147+
String tableName = getTableName();
148+
try (TestIcebergTable testIcebergTable =
149+
new TestIcebergTable(
150+
tableName,
151+
tempDir,
152+
hadoopConf,
153+
"field1",
154+
Collections.singletonList(null),
155+
TestIcebergDataHelper.SchemaType.BASIC)) {
156+
testIcebergTable.insertRows(50);
157+
158+
// create resources
159+
Map<String, String> OPTIONS = Collections.singletonMap("key", "value");
160+
Catalog mockCatalog = mock(Catalog.class);
161+
String catalogName = "catalog1";
162+
StubCatalog.registerMock(catalogName, mockCatalog);
163+
IcebergCatalogConfig catalogConfig =
164+
IcebergCatalogConfig.builder()
165+
.catalogImpl(StubCatalog.class.getName())
166+
.catalogName(catalogName)
167+
.catalogOptions(OPTIONS)
168+
.build();
169+
Map<Integer, org.apache.iceberg.Schema> map = new HashMap<>();
170+
map.put(0, testIcebergTable.getSchema());
171+
Table mockTable = mock(Table.class, Mockito.RETURNS_DEEP_STUBS);
172+
when(mockTable.name()).thenReturn(tableName);
173+
when(mockTable.location()).thenReturn(testIcebergTable.getBasePath());
174+
when(mockTable.schemas()).thenReturn(map);
175+
176+
when(mockCatalog.loadTable(Mockito.any(TableIdentifier.class)))
177+
.thenReturn(mockTable);
178+
179+
// begin test
180+
SourceTable tableConfig =
181+
SourceTable.builder()
182+
.name(testIcebergTable.getTableName())
183+
.basePath(testIcebergTable.getBasePath())
184+
.formatName(TableFormat.ICEBERG)
185+
.catalogConfig(catalogConfig)
186+
.build();
187+
IcebergConversionSource conversionSource =
188+
sourceProvider.getConversionSourceInstance(tableConfig);
189+
InternalTable internalTable = conversionSource.getCurrentTable();
190+
191+
// expectations
192+
InternalSchema internalSchema =
193+
InternalSchema.builder()
194+
.name("record")
195+
.dataType(InternalType.RECORD)
196+
.fields(
197+
Arrays.asList(
198+
InternalField.builder()
199+
.name("field1")
200+
.fieldId(1)
201+
.schema(
202+
InternalSchema.builder()
203+
.name("string")
204+
.dataType(InternalType.STRING)
205+
.isNullable(true)
206+
.build())
207+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
208+
.build(),
209+
InternalField.builder()
210+
.name("field2")
211+
.fieldId(2)
212+
.schema(
213+
InternalSchema.builder()
214+
.name("string")
215+
.dataType(InternalType.STRING)
216+
.isNullable(true)
217+
.build())
218+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
219+
.build()))
220+
.build();
221+
222+
validateTable(
223+
internalTable,
224+
testIcebergTable.getTableName(),
130225
TableFormat.ICEBERG,
131226
internalSchema,
132227
DataLayoutStrategy.FLAT,

0 commit comments

Comments
 (0)