Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -364,8 +365,10 @@ private static ConnectorPageSourceWithRowPositions createParquetPageSource(
if (column.getColumnType() == IcebergColumnHandle.ColumnType.SYNTHESIZED &&
!column.isUpdateRowIdColumn() && !column.isMergeTargetTableRowIdColumn()) {
Subfield pushedDownSubfield = getPushedDownSubfield(column);
List<String> nestedColumnPath = nestedColumnPath(pushedDownSubfield);
Optional<ColumnIO> columnIO = findNestedColumnIO(lookupColumnByName(messageColumnIO, pushedDownSubfield.getRootName()), nestedColumnPath);
List<String> nestedColumnPath = nestedColumnPath(pushedDownSubfield).stream()
.map(AvroSchemaUtil::makeCompatibleName)
.collect(Collectors.toList());
Optional<ColumnIO> columnIO = findNestedColumnIO(lookupColumnByName(messageColumnIO, AvroSchemaUtil.makeCompatibleName(pushedDownSubfield.getRootName())), nestedColumnPath);
if (columnIO.isPresent()) {
internalFields.add(constructField(prestoType, columnIO.get()));
}
Expand All @@ -379,7 +382,7 @@ private static ConnectorPageSourceWithRowPositions createParquetPageSource(
internalFields.add(Optional.empty());
}
else {
internalFields.add(constructField(column.getType(), messageColumnIO.getChild(parquetField.get().getName())));
internalFields.add(constructField(column.getType(), lookupColumnByName(messageColumnIO, AvroSchemaUtil.makeCompatibleName(parquetField.get().getName()))));
}
}
isRowPositionList.add(column.isRowPositionColumn());
Expand Down Expand Up @@ -421,7 +424,10 @@ public static Optional<org.apache.parquet.schema.Type> getColumnType(
{
if (isPushedDownSubfield(column)) {
Subfield pushedDownSubfield = getPushedDownSubfield(column);
return getSubfieldType(messageType, pushedDownSubfield.getRootName(), nestedColumnPath(pushedDownSubfield));
List<String> encodedPath = nestedColumnPath(pushedDownSubfield).stream()
.map(AvroSchemaUtil::makeCompatibleName)
.collect(Collectors.toList());
return getSubfieldType(messageType, AvroSchemaUtil.makeCompatibleName(pushedDownSubfield.getRootName()), encodedPath);
}

if (parquetIdToField.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,64 @@ private static HiveType icebergTypeToHiveType(org.apache.iceberg.types.Type iceb
return HiveType.HIVE_LONG;
}

return HiveType.toHiveType(HiveSchemaUtil.convert(icebergType));
return HiveType.valueOf(sanitizeTypeString(icebergType));
}

/**
* Converts Iceberg type to Hive type string with sanitized field names.
* Hive's TypeInfoParser doesn't support special characters like hyphens in field names,
* so we replace them with underscores to make the type string parseable.
*/
private static String sanitizeTypeString(org.apache.iceberg.types.Type icebergType)
{
if (icebergType.isPrimitiveType()) {
return HiveSchemaUtil.convert(icebergType).getTypeName();
}

if (icebergType.isStructType()) {
org.apache.iceberg.types.Types.StructType structType = icebergType.asStructType();
List<String> fieldStrings = structType.fields().stream()
.map(field -> sanitizeFieldName(field.name()) + ":" + sanitizeTypeString(field.type()))
.collect(toImmutableList());
return "struct<" + String.join(",", fieldStrings) + ">";
}

if (icebergType.isListType()) {
org.apache.iceberg.types.Types.ListType listType = icebergType.asListType();
return "array<" + sanitizeTypeString(listType.elementType()) + ">";
}

if (icebergType.isMapType()) {
org.apache.iceberg.types.Types.MapType mapType = icebergType.asMapType();
return "map<" + sanitizeTypeString(mapType.keyType()) + "," +
sanitizeTypeString(mapType.valueType()) + ">";
}

// Fallback to default conversion for any other types
return HiveSchemaUtil.convert(icebergType).getTypeName();
}

/**
* Sanitizes field names for Hive Metastore type string storage.
* Hive's TypeInfoParser rejects special characters like '-' in struct field names.
* We replace them with '_' to make the type string parseable by HMS.
*
* Note: This sanitization is ONLY for HMS type string storage. The actual
* Iceberg schema (stored in Iceberg metadata JSON) preserves the original
* field names. The Parquet files use makeCompatibleName encoding (e.g. aws_x2Dregion).
* This method is intentionally different from makeCompatibleName — HMS just needs
* a valid parseable type string, not the exact encoded name.
*
* Note: Simple underscore replacement could cause name collisions
* (e.g. "aws-region" and "aws_region" both become "aws_region") but this
* is acceptable since HMS type string is not used for query execution in
* the Iceberg connector. Query execution uses the Iceberg metadata JSON
* which preserves original field names, and Parquet reading uses makeCompatibleName
* encoding to match the hex-encoded names in the files.
*/
private static String sanitizeFieldName(String fieldName)
{
return fieldName.replaceAll("[^a-zA-Z0-9_]", "_");
}

public static FileFormat getFileFormat(Table table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
Expand Down Expand Up @@ -92,6 +93,7 @@ public final class TypeConverter
{
public static final String ORC_ICEBERG_ID_KEY = "iceberg.id";
public static final String ORC_ICEBERG_REQUIRED_KEY = "iceberg.required";
private static final Pattern UNQUOTED_IDENTIFIER = Pattern.compile("[a-zA-Z_][a-zA-Z0-9_]*");

private TypeConverter() {}

Expand Down Expand Up @@ -139,13 +141,18 @@ public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager
case STRUCT:
List<Types.NestedField> fields = ((Types.StructType) type).fields();
return RowType.from(fields.stream()
.map(field -> new RowType.Field(Optional.of(field.name()), toPrestoType(field.type(), typeManager)))
.map(field -> new RowType.Field(Optional.of(field.name()), toPrestoType(field.type(), typeManager), needsDelimiting(field.name())))
.collect(toImmutableList()));
default:
throw new UnsupportedOperationException(format("Cannot convert from Iceberg type '%s' (%s) to Presto type", type, type.typeId()));
}
}

private static boolean needsDelimiting(String name)
{
return !UNQUOTED_IDENTIFIER.matcher(name).matches();
}

public static org.apache.iceberg.types.Type toIcebergType(
Type type,
String columnName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private void visitRowType(RowType type, String name, List<String> parent)
parent = ImmutableList.<String>builder().addAll(parent).add(name).build();
for (RowType.Field field : type.getFields()) {
checkArgument(field.getName().isPresent(), "field in struct type doesn't have name");
visitType(field.getType(), field.getName().get(), parent);
visitType(field.getType(), makeCompatibleName(field.getName().get()), parent);
}
}
}
Loading
Loading