Commit 8207959
feat(native-pos): Add new materialized Exchange path in presto-on-spark native (1/n) (#27573)
Summary:
Pull Request resolved: #27573
#### Summary
Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.
We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)
To accomplish this, instead of updating existing operators, we add new operators as below
- `ExchangeWrite+ExchangeOutputBuffer`
- `ExchangeRead` operators — a new shuffle write/read path
these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator
### Current architecture
{F1988112256}
### New architecture
{F1988112251}
#### New classes (all in presto_cpp/main/operators):
- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators
### Configs
- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`
### Planner wiring
`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.
Uses allPeersFinished for writer close — no setNumDrivers needed.
Differential Revision: D1003657671 parent 90910c7 commit 8207959
File tree
14 files changed
+1996
-5
lines changed- presto-native-execution/presto_cpp/main
- common
- operators
- tests
- types
- tests
14 files changed
+1996
-5
lines changed| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
39 | 39 | | |
40 | 40 | | |
41 | 41 | | |
| 42 | + | |
| 43 | + | |
42 | 44 | | |
43 | 45 | | |
44 | 46 | | |
| |||
1475 | 1477 | | |
1476 | 1478 | | |
1477 | 1479 | | |
| 1480 | + | |
| 1481 | + | |
| 1482 | + | |
| 1483 | + | |
1478 | 1484 | | |
1479 | 1485 | | |
1480 | 1486 | | |
| |||
Lines changed: 23 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
233 | 233 | | |
234 | 234 | | |
235 | 235 | | |
| 236 | + | |
| 237 | + | |
| 238 | + | |
| 239 | + | |
| 240 | + | |
236 | 241 | | |
237 | 242 | | |
238 | 243 | | |
| |||
762 | 767 | | |
763 | 768 | | |
764 | 769 | | |
| 770 | + | |
| 771 | + | |
| 772 | + | |
| 773 | + | |
| 774 | + | |
| 775 | + | |
| 776 | + | |
| 777 | + | |
| 778 | + | |
| 779 | + | |
| 780 | + | |
| 781 | + | |
| 782 | + | |
| 783 | + | |
| 784 | + | |
| 785 | + | |
| 786 | + | |
| 787 | + | |
765 | 788 | | |
766 | 789 | | |
767 | 790 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
662 | 662 | | |
663 | 663 | | |
664 | 664 | | |
| 665 | + | |
| 666 | + | |
| 667 | + | |
| 668 | + | |
| 669 | + | |
| 670 | + | |
| 671 | + | |
| 672 | + | |
| 673 | + | |
| 674 | + | |
| 675 | + | |
| 676 | + | |
| 677 | + | |
| 678 | + | |
| 679 | + | |
| 680 | + | |
| 681 | + | |
| 682 | + | |
| 683 | + | |
| 684 | + | |
| 685 | + | |
665 | 686 | | |
666 | 687 | | |
667 | 688 | | |
| |||
1115 | 1136 | | |
1116 | 1137 | | |
1117 | 1138 | | |
| 1139 | + | |
| 1140 | + | |
| 1141 | + | |
| 1142 | + | |
| 1143 | + | |
| 1144 | + | |
1118 | 1145 | | |
1119 | 1146 | | |
1120 | 1147 | | |
| |||
Lines changed: 8 additions & 5 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
13 | 13 | | |
14 | 14 | | |
15 | 15 | | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
16 | 24 | | |
17 | 25 | | |
18 | 26 | | |
19 | 27 | | |
20 | | - | |
21 | | - | |
22 | | - | |
23 | | - | |
24 | | - | |
25 | 28 | | |
26 | 29 | | |
27 | 30 | | |
| |||
0 commit comments