Skip to content

Commit 84ab4d1

Browse files
apurva-metafacebook-github-bot
authored andcommitted
feat: [velox+prestissimo][iceberg] Iceberg V3 full C++ support: deletion vectors, equality deletes, sequence number conflict resolution, DV writer, DWRF data sink, Manifold filesystem, PUFFIN protocol
Summary: X-link: facebookincubator/velox#16959 Combined velox/prestissimo diffs for Iceberg V3 C++ support: - Improve IcebergSplitReader error handling and fix test file handle leaks - Add Iceberg V3 deletion vector support (DeletionVectorReader) - Add Iceberg equality delete file reader (EqualityDeleteFileReader) - Add sequence number conflict resolution for equality deletes - Add sequence number conflict resolution for positional deletes and deletion vectors - Add Iceberg V3 deletion vector writer (DeletionVectorWriter) - Add DWRF file format support for Iceberg data sink - Add Manifold filesystem support with CAT token authentication - Reformat FileContent enum to multi-line for extensibility - Wire PUFFIN file format through C++ protocol and connector layer Original diffs preserved: D97530140, D97530142, D97530141, D97530136, D97530139, D97530137, D97530138, D97599411, D97531548, D97531555 Differential Revision: D98704718
1 parent 49de53b commit 84ab4d1

File tree

3 files changed

+35
-10
lines changed

3 files changed

+35
-10
lines changed

presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ velox::dwio::common::FileFormat toVeloxFileFormat(
4040
return velox::dwio::common::FileFormat::ORC;
4141
} else if (format == protocol::iceberg::FileFormat::PARQUET) {
4242
return velox::dwio::common::FileFormat::PARQUET;
43+
} else if (format == protocol::iceberg::FileFormat::PUFFIN) {
44+
// PUFFIN is used for Iceberg V3 deletion vectors. The DeletionVectorReader
45+
// reads raw binary from the file and does not use the DWRF/Parquet reader,
46+
// so we map PUFFIN to DWRF as a placeholder — the format value is not
47+
// actually used by the reader. This mapping is only safe for deletion
48+
// vector files; if PUFFIN is encountered for other file content types,
49+
// the DV routing logic in toHiveIcebergSplit() must reclassify it first.
50+
return velox::dwio::common::FileFormat::DWRF;
4351
}
4452
VELOX_UNSUPPORTED("Unsupported file format: {}", fmt::underlying(format));
4553
}
@@ -171,7 +179,7 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
171179
const protocol::ConnectorId& catalogId,
172180
const protocol::ConnectorSplit* connectorSplit,
173181
const protocol::SplitContext* splitContext) const {
174-
auto icebergSplit =
182+
const auto* icebergSplit =
175183
dynamic_cast<const protocol::iceberg::IcebergSplit*>(connectorSplit);
176184
VELOX_CHECK_NOT_NULL(
177185
icebergSplit, "Unexpected split type {}", connectorSplit->_type);
@@ -191,14 +199,27 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
191199
std::vector<velox::connector::hive::iceberg::IcebergDeleteFile> deletes;
192200
deletes.reserve(icebergSplit->deletes.size());
193201
for (const auto& deleteFile : icebergSplit->deletes) {
194-
std::unordered_map<int32_t, std::string> lowerBounds(
202+
const std::unordered_map<int32_t, std::string> lowerBounds(
195203
deleteFile.lowerBounds.begin(), deleteFile.lowerBounds.end());
196204

197-
std::unordered_map<int32_t, std::string> upperBounds(
205+
const std::unordered_map<int32_t, std::string> upperBounds(
198206
deleteFile.upperBounds.begin(), deleteFile.upperBounds.end());
199207

200-
velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile(
201-
toVeloxFileContent(deleteFile.content),
208+
// Iceberg V3 deletion vectors arrive from the coordinator as
209+
// POSITION_DELETES with PUFFIN format. Reclassify them as
210+
// kDeletionVector so that IcebergSplitReader routes them to
211+
// DeletionVectorReader instead of PositionalDeleteFileReader.
212+
velox::connector::hive::iceberg::FileContent veloxContent =
213+
toVeloxFileContent(deleteFile.content);
214+
if (veloxContent ==
215+
velox::connector::hive::iceberg::FileContent::kPositionalDeletes &&
216+
deleteFile.format == protocol::iceberg::FileFormat::PUFFIN) {
217+
veloxContent =
218+
velox::connector::hive::iceberg::FileContent::kDeletionVector;
219+
}
220+
221+
const velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile(
222+
veloxContent,
202223
deleteFile.path,
203224
toVeloxFileFormat(deleteFile.format),
204225
deleteFile.recordCount,
@@ -211,8 +232,7 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
211232
}
212233

213234
std::unordered_map<std::string, std::string> infoColumns = {
214-
{"$data_sequence_number",
215-
std::to_string(icebergSplit->dataSequenceNumber)},
235+
{"$data_sequence_number", std::to_string(icebergSplit->dataSequenceNumber)},
216236
{"$path", icebergSplit->path}};
217237

218238
return std::make_unique<velox::connector::hive::iceberg::HiveIcebergSplit>(

presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ static const std::pair<FileFormat, json> FileFormat_enum_table[] =
306306
{FileFormat::ORC, "ORC"},
307307
{FileFormat::PARQUET, "PARQUET"},
308308
{FileFormat::AVRO, "AVRO"},
309-
{FileFormat::METADATA, "METADATA"}};
309+
{FileFormat::METADATA, "METADATA"},
310+
{FileFormat::PUFFIN, "PUFFIN"}};
310311
void to_json(json& j, const FileFormat& e) {
311312
static_assert(std::is_enum<FileFormat>::value, "FileFormat must be an enum!");
312313
const auto* it = std::find_if(

presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,16 @@ void to_json(json& j, const ChangelogSplitInfo& p);
7878
void from_json(const json& j, ChangelogSplitInfo& p);
7979
} // namespace facebook::presto::protocol::iceberg
8080
namespace facebook::presto::protocol::iceberg {
81-
enum class FileContent { DATA, POSITION_DELETES, EQUALITY_DELETES };
81+
enum class FileContent {
82+
DATA,
83+
POSITION_DELETES,
84+
EQUALITY_DELETES,
85+
};
8286
extern void to_json(json& j, const FileContent& e);
8387
extern void from_json(const json& j, FileContent& e);
8488
} // namespace facebook::presto::protocol::iceberg
8589
namespace facebook::presto::protocol::iceberg {
86-
enum class FileFormat { ORC, PARQUET, AVRO, METADATA };
90+
enum class FileFormat { ORC, PARQUET, AVRO, METADATA, PUFFIN };
8791
extern void to_json(json& j, const FileFormat& e);
8892
extern void from_json(const json& j, FileFormat& e);
8993
} // namespace facebook::presto::protocol::iceberg

0 commit comments

Comments
 (0)