Skip to content

Commit 4275eb1

Browse files
shrinidhijoshimeta-codesync[bot]
authored andcommitted
feat(native-pos): Add new materialized Exchange path in presto-on-spark native (1/n) (#27573)
Summary: Pull Request resolved: #27573 #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
1 parent 90910c7 commit 4275eb1

File tree

14 files changed

+1996
-5
lines changed

14 files changed

+1996
-5
lines changed

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
#include "presto_cpp/main/http/filters/StatsFilter.h"
4040
#include "presto_cpp/main/operators/BroadcastExchangeSource.h"
4141
#include "presto_cpp/main/operators/BroadcastWrite.h"
42+
#include "presto_cpp/main/operators/ExchangeRead.h"
43+
#include "presto_cpp/main/operators/ExchangeWrite.h"
4244
#include "presto_cpp/main/operators/LocalShuffle.h"
4345
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
4446
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
@@ -1475,6 +1477,10 @@ void PrestoServer::registerCustomOperators() {
14751477
std::make_unique<operators::ShuffleWriteTranslator>());
14761478
velox::exec::Operator::registerOperator(
14771479
std::make_unique<operators::ShuffleReadTranslator>());
1480+
velox::exec::Operator::registerOperator(
1481+
std::make_unique<operators::ExchangeWriteTranslator>());
1482+
velox::exec::Operator::registerOperator(
1483+
std::make_unique<operators::ExchangeReadTranslator>());
14781484

14791485
// Todo - Split Presto & Presto-on-Spark server into different classes
14801486
// which will allow server specific operator registration.

presto-native-execution/presto_cpp/main/common/Configs.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,11 @@ SystemConfig::SystemConfig() {
233233
BOOL_PROP(kEnableVeloxExprSetLogging, false),
234234
NUM_PROP(kLocalShuffleMaxPartitionBytes, 268435456),
235235
STR_PROP(kShuffleName, ""),
236+
BOOL_PROP(kExchangeMaterializationEnabled, false),
237+
NUM_PROP(
238+
kExchangeMaterializationPartitioningRowBatchBufferSize,
239+
16L << 20),
240+
NUM_PROP(kExchangeMaterializationPerPartitionBufferSize, 130L * 1024),
236241
STR_PROP(kRemoteFunctionServerCatalogName, ""),
237242
STR_PROP(kRemoteFunctionServerSerde, "presto_page"),
238243
BOOL_PROP(kHttpEnableAccessLog, false),
@@ -762,6 +767,24 @@ std::string SystemConfig::shuffleName() const {
762767
return optionalProperty(kShuffleName).value();
763768
}
764769

770+
bool SystemConfig::exchangeMaterializationEnabled() const {
771+
return optionalProperty<bool>(kExchangeMaterializationEnabled)
772+
.value_or(false);
773+
}
774+
775+
int64_t SystemConfig::exchangeMaterializationPartitioningRowBatchBufferSize()
776+
const {
777+
return optionalProperty<int64_t>(
778+
kExchangeMaterializationPartitioningRowBatchBufferSize)
779+
.value_or(16L << 20);
780+
}
781+
782+
int64_t SystemConfig::exchangeMaterializationPerPartitionBufferSize() const {
783+
return optionalProperty<int64_t>(
784+
kExchangeMaterializationPerPartitionBufferSize)
785+
.value_or(130L * 1024);
786+
}
787+
765788
bool SystemConfig::enableSerializedPageChecksum() const {
766789
return optionalProperty<bool>(kEnableSerializedPageChecksum).value();
767790
}

presto-native-execution/presto_cpp/main/common/Configs.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,27 @@ class SystemConfig : public ConfigBase {
662662
static constexpr std::string_view kLocalShuffleMaxPartitionBytes{
663663
"shuffle.local.max-partition-bytes"};
664664
static constexpr std::string_view kShuffleName{"shuffle.name"};
665+
666+
/// Enable materialized exchange I/O (ExchangeWrite/ExchangeRead operators).
667+
/// When false, falls back to PnS + LocalPartition + ShuffleWrite.
668+
/// Default: false.
669+
static constexpr std::string_view kExchangeMaterializationEnabled{
670+
"exchange.materialization.enabled"};
671+
672+
/// ExchangeWrite flat buffer flush threshold in bytes. Controls how much
673+
/// serialized CompactRow data accumulates per driver before flushing to
674+
/// the ExchangeOutputBuffer. Default: 16MB.
675+
static constexpr std::string_view
676+
kExchangeMaterializationPartitioningRowBatchBufferSize{
677+
"exchange.materialization.partitioning-row-batch-buffer-size"};
678+
679+
/// ExchangeOutputBuffer per-partition drain threshold in bytes. When a
680+
/// partition accumulates this much data, it is drained to the writer.
681+
/// Default: 130KB.
682+
static constexpr std::string_view
683+
kExchangeMaterializationPerPartitionBufferSize{
684+
"exchange.materialization.per-partition-buffer-size"};
685+
665686
static constexpr std::string_view kHttpEnableAccessLog{
666687
"http-server.enable-access-log"};
667688
static constexpr std::string_view kHttpEnableStatsFilter{
@@ -1115,6 +1136,12 @@ class SystemConfig : public ConfigBase {
11151136

11161137
std::string shuffleName() const;
11171138

1139+
bool exchangeMaterializationEnabled() const;
1140+
1141+
int64_t exchangeMaterializationPartitioningRowBatchBufferSize() const;
1142+
1143+
int64_t exchangeMaterializationPerPartitionBufferSize() const;
1144+
11181145
bool enableSerializedPageChecksum() const;
11191146

11201147
bool enableVeloxTaskLogging() const;

presto-native-execution/presto_cpp/main/operators/CMakeLists.txt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,18 @@
1313
# limitations under the License.
1414
add_library(
1515
presto_operators
16+
BinarySortableSerializer.cpp
17+
BroadcastExchangeSource.cpp
18+
BroadcastFile.cpp
19+
BroadcastWrite.cpp
20+
ExchangeOutputBuffer.cpp
21+
ExchangeRead.cpp
22+
ExchangeWrite.cpp
23+
LocalShuffle.cpp
1624
PartitionAndSerialize.cpp
1725
ShuffleExchangeSource.cpp
1826
ShuffleRead.cpp
1927
ShuffleWrite.cpp
20-
LocalShuffle.cpp
21-
BroadcastWrite.cpp
22-
BroadcastFile.cpp
23-
BroadcastExchangeSource.cpp
24-
BinarySortableSerializer.cpp
2528
)
2629

2730
target_link_libraries(

0 commit comments

Comments
 (0)