Skip to content

Commit 7de3176

Browse files
apurva-metameta-codesync[bot]
authored andcommitted
Add Iceberg equality delete file reader
Summary: Implements Iceberg equality delete support for the Velox Iceberg connector. Equality delete files contain rows with values for one or more columns (identified by equalityFieldIds). A base data row is deleted if its values match ALL specified columns of ANY row in the delete file. The implementation: - Adds EqualityDeleteFileReader that eagerly reads the entire delete file and builds an in-memory hash multimap of delete key tuples during construction. - Wires EqualityDeleteFileReader into IcebergSplitReader::prepareSplit() to resolve equalityFieldIds to column names/types from the table schema, and into IcebergSplitReader::next() to apply post-read equality delete filtering with row compaction. - Handles lazy vectors from file readers via loadedVector() before accessing values for hashing and comparison. - Supports BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, REAL, DOUBLE, VARCHAR, VARBINARY, and TIMESTAMP column types. Differential Revision: D97530141
1 parent d82325b commit 7de3176

File tree

7 files changed

+983
-1
lines changed

7 files changed

+983
-1
lines changed

velox/connectors/hive/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ set(
2222
IcebergDataSink.cpp
2323
IcebergDataSource.cpp
2424
IcebergPartitionName.cpp
25+
EqualityDeleteFileReader.cpp
2526
IcebergSplit.cpp
2627
IcebergSplitReader.cpp
2728
PartitionSpec.cpp
Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h"
18+
19+
#include "velox/common/base/BitUtil.h"
20+
#include "velox/connectors/hive/BufferedInputBuilder.h"
21+
#include "velox/connectors/hive/HiveConnectorUtil.h"
22+
#include "velox/connectors/hive/TableHandle.h"
23+
#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h"
24+
#include "velox/dwio/common/ReaderFactory.h"
25+
26+
namespace facebook::velox::connector::hive::iceberg {
27+
28+
namespace {
29+
30+
/// Hashes a single value from a vector at the given index.
31+
/// Handles lazy vectors via loadedVector(). Returns 0 for null values.
32+
uint64_t hashValue(const VectorPtr& vectorPtr, vector_size_t index) {
33+
const auto* vector = vectorPtr->loadedVector();
34+
if (vector->isNullAt(index)) {
35+
return 0;
36+
}
37+
38+
auto type = vector->type();
39+
switch (type->kind()) { // NOLINT(clang-diagnostic-switch-enum)
40+
case TypeKind::BOOLEAN:
41+
return std::hash<bool>{}(
42+
vector->as<SimpleVector<bool>>()->valueAt(index));
43+
case TypeKind::TINYINT:
44+
return std::hash<int8_t>{}(
45+
vector->as<SimpleVector<int8_t>>()->valueAt(index));
46+
case TypeKind::SMALLINT:
47+
return std::hash<int16_t>{}(
48+
vector->as<SimpleVector<int16_t>>()->valueAt(index));
49+
case TypeKind::INTEGER:
50+
return std::hash<int32_t>{}(
51+
vector->as<SimpleVector<int32_t>>()->valueAt(index));
52+
case TypeKind::BIGINT:
53+
return std::hash<int64_t>{}(
54+
vector->as<SimpleVector<int64_t>>()->valueAt(index));
55+
case TypeKind::REAL:
56+
return std::hash<float>{}(
57+
vector->as<SimpleVector<float>>()->valueAt(index));
58+
case TypeKind::DOUBLE:
59+
return std::hash<double>{}(
60+
vector->as<SimpleVector<double>>()->valueAt(index));
61+
case TypeKind::VARCHAR:
62+
case TypeKind::VARBINARY: {
63+
auto sv = vector->as<SimpleVector<StringView>>()->valueAt(index);
64+
return folly::hasher<std::string_view>{}(
65+
std::string_view(sv.data(), sv.size()));
66+
}
67+
case TypeKind::TIMESTAMP: {
68+
auto ts = vector->as<SimpleVector<Timestamp>>()->valueAt(index);
69+
return std::hash<int64_t>{}(ts.toNanos());
70+
}
71+
default:
72+
VELOX_NYI(
73+
"Equality delete hash not implemented for type: {}",
74+
type->toString());
75+
}
76+
}
77+
78+
/// Compares two values from vectors at given indices.
79+
/// Handles lazy vectors via loadedVector().
80+
bool compareValues(
81+
const VectorPtr& leftPtr,
82+
vector_size_t leftIdx,
83+
const VectorPtr& rightPtr,
84+
vector_size_t rightIdx) {
85+
const auto* left = leftPtr->loadedVector();
86+
const auto* right = rightPtr->loadedVector();
87+
bool leftNull = left->isNullAt(leftIdx);
88+
bool rightNull = right->isNullAt(rightIdx);
89+
if (leftNull && rightNull) {
90+
return true;
91+
}
92+
if (leftNull || rightNull) {
93+
return false;
94+
}
95+
96+
auto type = left->type();
97+
switch (type->kind()) { // NOLINT(clang-diagnostic-switch-enum)
98+
case TypeKind::BOOLEAN:
99+
return left->as<SimpleVector<bool>>()->valueAt(leftIdx) ==
100+
right->as<SimpleVector<bool>>()->valueAt(rightIdx);
101+
case TypeKind::TINYINT:
102+
return left->as<SimpleVector<int8_t>>()->valueAt(leftIdx) ==
103+
right->as<SimpleVector<int8_t>>()->valueAt(rightIdx);
104+
case TypeKind::SMALLINT:
105+
return left->as<SimpleVector<int16_t>>()->valueAt(leftIdx) ==
106+
right->as<SimpleVector<int16_t>>()->valueAt(rightIdx);
107+
case TypeKind::INTEGER:
108+
return left->as<SimpleVector<int32_t>>()->valueAt(leftIdx) ==
109+
right->as<SimpleVector<int32_t>>()->valueAt(rightIdx);
110+
case TypeKind::BIGINT:
111+
return left->as<SimpleVector<int64_t>>()->valueAt(leftIdx) ==
112+
right->as<SimpleVector<int64_t>>()->valueAt(rightIdx);
113+
case TypeKind::REAL:
114+
return left->as<SimpleVector<float>>()->valueAt(leftIdx) ==
115+
right->as<SimpleVector<float>>()->valueAt(rightIdx);
116+
case TypeKind::DOUBLE:
117+
return left->as<SimpleVector<double>>()->valueAt(leftIdx) ==
118+
right->as<SimpleVector<double>>()->valueAt(rightIdx);
119+
case TypeKind::VARCHAR:
120+
case TypeKind::VARBINARY: {
121+
auto lv = left->as<SimpleVector<StringView>>()->valueAt(leftIdx);
122+
auto rv = right->as<SimpleVector<StringView>>()->valueAt(rightIdx);
123+
return std::string_view(lv.data(), lv.size()) ==
124+
std::string_view(rv.data(), rv.size());
125+
}
126+
case TypeKind::TIMESTAMP:
127+
return left->as<SimpleVector<Timestamp>>()->valueAt(leftIdx) ==
128+
right->as<SimpleVector<Timestamp>>()->valueAt(rightIdx);
129+
default:
130+
VELOX_NYI(
131+
"Equality delete comparison not implemented for type: {}",
132+
type->toString());
133+
}
134+
}
135+
136+
} // namespace
137+
138+
EqualityDeleteFileReader::EqualityDeleteFileReader(
139+
const IcebergDeleteFile& deleteFile,
140+
const std::vector<std::string>& equalityColumnNames,
141+
const std::vector<TypePtr>& equalityColumnTypes,
142+
const std::string& /*baseFilePath*/,
143+
FileHandleFactory* fileHandleFactory,
144+
const ConnectorQueryCtx* connectorQueryCtx,
145+
folly::Executor* executor,
146+
const std::shared_ptr<const HiveConfig>& hiveConfig,
147+
const std::shared_ptr<io::IoStatistics>& ioStatistics,
148+
const std::shared_ptr<IoStats>& ioStats,
149+
dwio::common::RuntimeStatistics& runtimeStats,
150+
const std::string& connectorId)
151+
: equalityColumnNames_(equalityColumnNames),
152+
equalityColumnTypes_(equalityColumnTypes),
153+
pool_(connectorQueryCtx->memoryPool()) {
154+
VELOX_CHECK(
155+
deleteFile.content == FileContent::kEqualityDeletes,
156+
"Expected equality delete file but got content type: {}",
157+
static_cast<int>(deleteFile.content));
158+
VELOX_CHECK_GT(deleteFile.recordCount, 0, "Empty equality delete file.");
159+
VELOX_CHECK(
160+
!equalityColumnNames_.empty(),
161+
"Equality delete file must specify at least one column.");
162+
VELOX_CHECK_EQ(
163+
equalityColumnNames_.size(),
164+
equalityColumnTypes_.size(),
165+
"Equality column names and types must have the same size.");
166+
167+
// Build the file schema for the equality delete columns only.
168+
auto deleteFileSchema =
169+
ROW(std::vector<std::string>(equalityColumnNames_),
170+
std::vector<TypePtr>(equalityColumnTypes_));
171+
172+
// Create a ScanSpec that reads only the equality delete columns.
173+
auto scanSpec = std::make_shared<common::ScanSpec>("<root>");
174+
for (size_t i = 0; i < equalityColumnNames_.size(); ++i) {
175+
scanSpec->addField(equalityColumnNames_[i], static_cast<int>(i));
176+
}
177+
178+
auto deleteSplit = std::make_shared<HiveConnectorSplit>(
179+
connectorId,
180+
deleteFile.filePath,
181+
deleteFile.fileFormat,
182+
0,
183+
deleteFile.fileSizeInBytes);
184+
185+
dwio::common::ReaderOptions deleteReaderOpts(pool_);
186+
configureReaderOptions(
187+
hiveConfig,
188+
connectorQueryCtx,
189+
deleteFileSchema,
190+
deleteSplit,
191+
/*tableParameters=*/{},
192+
deleteReaderOpts);
193+
194+
const FileHandleKey fileHandleKey{
195+
.filename = deleteFile.filePath,
196+
.tokenProvider = connectorQueryCtx->fsTokenProvider()};
197+
auto deleteFileHandleCachePtr = fileHandleFactory->generate(fileHandleKey);
198+
auto deleteFileInput = BufferedInputBuilder::getInstance()->create(
199+
*deleteFileHandleCachePtr,
200+
deleteReaderOpts,
201+
connectorQueryCtx,
202+
ioStatistics,
203+
ioStats,
204+
executor);
205+
206+
auto deleteReader =
207+
dwio::common::getReaderFactory(deleteReaderOpts.fileFormat())
208+
->createReader(std::move(deleteFileInput), deleteReaderOpts);
209+
210+
if (!testFilters(
211+
scanSpec.get(),
212+
deleteReader.get(),
213+
deleteSplit->filePath,
214+
deleteSplit->partitionKeys,
215+
{},
216+
hiveConfig->readTimestampPartitionValueAsLocalTime(
217+
connectorQueryCtx->sessionProperties()))) {
218+
runtimeStats.skippedSplitBytes += static_cast<int64_t>(deleteSplit->length);
219+
return;
220+
}
221+
222+
dwio::common::RowReaderOptions deleteRowReaderOpts;
223+
configureRowReaderOptions(
224+
{},
225+
scanSpec,
226+
nullptr,
227+
deleteFileSchema,
228+
deleteSplit,
229+
nullptr,
230+
nullptr,
231+
nullptr,
232+
deleteRowReaderOpts);
233+
234+
auto deleteRowReader = deleteReader->createRowReader(deleteRowReaderOpts);
235+
236+
// Read the entire equality delete file and build the hash set.
237+
VectorPtr output;
238+
output = BaseVector::create(deleteFileSchema, 0, pool_);
239+
240+
while (true) {
241+
auto rowsRead = deleteRowReader->next(
242+
std::max(static_cast<uint64_t>(1'000), deleteFile.recordCount), output);
243+
if (rowsRead == 0) {
244+
break;
245+
}
246+
247+
auto numRows = output->size();
248+
if (numRows == 0) {
249+
continue;
250+
}
251+
252+
output->loadedVector();
253+
auto rowOutput = std::dynamic_pointer_cast<RowVector>(output);
254+
VELOX_CHECK_NOT_NULL(rowOutput);
255+
256+
size_t batchIndex = deleteRows_.size();
257+
deleteRows_.push_back(rowOutput);
258+
259+
// Resolve column indices on the first batch.
260+
if (deleteColumnIndices_.empty()) {
261+
for (const auto& colName : equalityColumnNames_) {
262+
auto idx = rowOutput->type()->as<TypeKind::ROW>().getChildIdx(colName);
263+
deleteColumnIndices_.push_back(static_cast<column_index_t>(idx));
264+
}
265+
}
266+
267+
// Hash each row and insert into the multimap.
268+
for (vector_size_t i = 0; i < numRows; ++i) {
269+
uint64_t hash = hashRow(rowOutput, i);
270+
deleteKeyHashes_.emplace(hash, DeleteKeyEntry{batchIndex, i});
271+
}
272+
273+
// Reset output for next batch.
274+
output = BaseVector::create(deleteFileSchema, 0, pool_);
275+
}
276+
}
277+
278+
void EqualityDeleteFileReader::applyDeletes(
279+
const RowVectorPtr& output,
280+
BufferPtr deleteBitmap) {
281+
if (deleteKeyHashes_.empty() || output->size() == 0) {
282+
return;
283+
}
284+
285+
auto* bitmap = deleteBitmap->asMutable<uint8_t>();
286+
287+
// For each row in the output, compute its hash and probe the delete set.
288+
for (vector_size_t i = 0; i < output->size(); ++i) {
289+
// Skip rows already deleted by positional/DV deletes.
290+
if (bits::isBitSet(bitmap, i)) {
291+
continue;
292+
}
293+
294+
uint64_t hash = hashRow(output, i);
295+
auto range = deleteKeyHashes_.equal_range(hash);
296+
297+
for (auto it = range.first; it != range.second; ++it) {
298+
auto& entry = it->second;
299+
if (equalRows(output, i, deleteRows_[entry.batchIndex], entry.rowIndex)) {
300+
bits::setBit(bitmap, i);
301+
break;
302+
}
303+
}
304+
}
305+
}
306+
307+
uint64_t EqualityDeleteFileReader::hashRow(
308+
const RowVectorPtr& row,
309+
vector_size_t index) const {
310+
uint64_t hash = 0;
311+
312+
// For the delete file rows, use deleteColumnIndices_.
313+
// For the base data rows, look up columns by name.
314+
const auto& rowType = row->type()->asRow();
315+
316+
for (size_t c = 0; c < equalityColumnNames_.size(); ++c) {
317+
auto colIdx = rowType.getChildIdxIfExists(equalityColumnNames_[c]);
318+
VELOX_CHECK(
319+
colIdx.has_value(),
320+
"Column not found in row: {}",
321+
equalityColumnNames_[c]);
322+
auto colHash = hashValue(row->childAt(*colIdx), index);
323+
// Combine hashes using a simple mix.
324+
hash ^= colHash + 0x9e3779b97f4a7c15ULL + (hash << 6) + (hash >> 2);
325+
}
326+
return hash;
327+
}
328+
329+
bool EqualityDeleteFileReader::equalRows(
330+
const RowVectorPtr& left,
331+
vector_size_t leftIndex,
332+
const RowVectorPtr& right,
333+
vector_size_t rightIndex) const {
334+
const auto& leftType = left->type()->asRow();
335+
const auto& rightType = right->type()->asRow();
336+
337+
for (size_t c = 0; c < equalityColumnNames_.size(); ++c) {
338+
auto leftColIdx = leftType.getChildIdxIfExists(equalityColumnNames_[c]);
339+
auto rightColIdx = rightType.getChildIdxIfExists(equalityColumnNames_[c]);
340+
VELOX_CHECK(leftColIdx.has_value() && rightColIdx.has_value());
341+
342+
if (!compareValues(
343+
left->childAt(*leftColIdx),
344+
leftIndex,
345+
right->childAt(*rightColIdx),
346+
rightIndex)) {
347+
return false;
348+
}
349+
}
350+
return true;
351+
}
352+
353+
} // namespace facebook::velox::connector::hive::iceberg

0 commit comments

Comments
 (0)