Skip to content

Commit 3a10c75

Browse files
committed
[Feature] support group execution
Signed-off-by: stdpain <[email protected]>
1 parent 2236631 commit 3a10c75

File tree

63 files changed

+2539
-413
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+2539
-413
lines changed

be/src/connector/connector.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ Status DataSource::parse_runtime_filters(RuntimeState* state) {
8080
if (_runtime_filters == nullptr || _runtime_filters->size() == 0) return Status::OK();
8181
for (const auto& item : _runtime_filters->descriptors()) {
8282
RuntimeFilterProbeDescriptor* probe = item.second;
83-
const JoinRuntimeFilter* filter = probe->runtime_filter();
83+
DCHECK(runtime_bloom_filter_eval_context.driver_sequence == -1);
84+
const JoinRuntimeFilter* filter = probe->runtime_filter(runtime_bloom_filter_eval_context.driver_sequence);
8485
if (filter == nullptr) continue;
8586
SlotId slot_id;
8687
if (!probe->is_probe_slot_ref(&slot_id)) continue;

be/src/exec/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,9 @@ set(EXEC_FILES
270270
pipeline/pipeline.cpp
271271
pipeline/spill_process_operator.cpp
272272
pipeline/spill_process_channel.cpp
273+
pipeline/group_execution/execution_group.cpp
274+
pipeline/group_execution/execution_group_builder.cpp
275+
pipeline/group_execution/group_operator.cpp
273276
workgroup/work_group.cpp
274277
workgroup/scan_executor.cpp
275278
workgroup/scan_task_queue.cpp

be/src/exec/aggregate/aggregate_streaming_node.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,9 @@ pipeline::OpFactories AggregateStreamingNode::decompose_to_pipeline(pipeline::Pi
213213
size_t degree_of_parallelism = context->source_operator(ops_with_sink)->degree_of_parallelism();
214214

215215
auto should_cache = context->should_interpolate_cache_operator(id(), ops_with_sink[0]);
216-
if (!should_cache && _tnode.agg_node.__isset.interpolate_passthrough && _tnode.agg_node.interpolate_passthrough &&
217-
context->could_local_shuffle(ops_with_sink)) {
216+
bool could_local_shuffle = !should_cache && !context->enable_group_execution();
217+
if (could_local_shuffle && _tnode.agg_node.__isset.interpolate_passthrough &&
218+
_tnode.agg_node.interpolate_passthrough && context->could_local_shuffle(ops_with_sink)) {
218219
ops_with_sink = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), ops_with_sink,
219220
degree_of_parallelism, true);
220221
}

be/src/exec/connector_scan_node.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ int ConnectorScanNode::_estimate_max_concurrent_chunks() const {
109109
}
110110

111111
pipeline::OpFactories ConnectorScanNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
112+
auto exec_group = context->find_exec_group_by_plan_node_id(_id);
113+
context->set_current_execution_group(exec_group);
114+
112115
size_t dop = context->dop_of_source_operator(id());
113116
std::shared_ptr<pipeline::ConnectorScanOperatorFactory> scan_op = nullptr;
114117
bool stream_data_source = _data_source_provider->stream_data_source();

be/src/exec/cross_join_node.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -609,8 +609,10 @@ std::vector<std::shared_ptr<pipeline::OperatorFactory>> CrossJoinNode::_decompos
609609
std::move(_conjunct_ctxs), std::move(cross_join_context), _join_op);
610610
// Initialize OperatorFactory's fields involving runtime filters.
611611
this->init_runtime_filter_for_operator(left_factory.get(), context, rc_rf_probe_collector);
612-
left_ops = context->maybe_interpolate_local_adpative_passthrough_exchange(runtime_state(), id(), left_ops,
613-
context->degree_of_parallelism());
612+
if (context->is_colocate_group()) {
613+
left_ops = context->maybe_interpolate_local_adpative_passthrough_exchange(runtime_state(), id(), left_ops,
614+
context->degree_of_parallelism());
615+
}
614616
left_ops.emplace_back(std::move(left_factory));
615617

616618
if (limit() != -1) {

be/src/exec/exchange_node.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ void ExchangeNode::debug_string(int indentation_level, std::stringstream* out) c
247247

248248
pipeline::OpFactories ExchangeNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
249249
using namespace pipeline;
250-
250+
auto exec_group = context->find_exec_group_by_plan_node_id(_id);
251+
context->set_current_execution_group(exec_group);
251252
OpFactories operators;
252253
if (!_is_merging) {
253254
auto* query_ctx = context->runtime_state()->query_ctx();

be/src/exec/exec_node.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ Status ExecNode::prepare(RuntimeState* state) {
220220
_mem_tracker.reset(new MemTracker(_runtime_profile.get(), std::make_tuple(true, false, false), "", -1,
221221
_runtime_profile->name(), nullptr));
222222
RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state));
223-
RETURN_IF_ERROR(_runtime_filter_collector.prepare(state, row_desc(), _runtime_profile.get()));
223+
RETURN_IF_ERROR(_runtime_filter_collector.prepare(state, _runtime_profile.get()));
224224

225225
// TODO(zc):
226226
// AddExprCtxsToFree(_conjunct_ctxs);

be/src/exec/hash_join_node.cpp

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include "exec/hash_joiner.h"
2626
#include "exec/pipeline/chunk_accumulate_operator.h"
2727
#include "exec/pipeline/exchange/exchange_source_operator.h"
28+
#include "exec/pipeline/group_execution/execution_group_builder.h"
29+
#include "exec/pipeline/group_execution/execution_group_fwd.h"
2830
#include "exec/pipeline/hashjoin/hash_join_build_operator.h"
2931
#include "exec/pipeline/hashjoin/hash_join_probe_operator.h"
3032
#include "exec/pipeline/hashjoin/hash_joiner_factory.h"
@@ -38,6 +40,8 @@
3840
#include "exprs/expr.h"
3941
#include "exprs/in_const_predicate.hpp"
4042
#include "exprs/runtime_filter_bank.h"
43+
#include "gen_cpp/PlanNodes_types.h"
44+
#include "gen_cpp/RuntimeFilter_types.h"
4145
#include "gutil/strings/substitute.h"
4246
#include "runtime/current_thread.h"
4347
#include "runtime/runtime_filter_worker.h"
@@ -425,30 +429,27 @@ void HashJoinNode::close(RuntimeState* state) {
425429
template <class HashJoinerFactory, class HashJoinBuilderFactory, class HashJoinProbeFactory>
426430
pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
427431
using namespace pipeline;
428-
429432
auto rhs_operators = child(1)->decompose_to_pipeline(context);
433+
// "col NOT IN (NULL, val1, val2)" always returns false, so hash join should
434+
// return empty result in this case. Hash join cannot be divided into multiple
435+
// partitions in this case. Otherwise, NULL value in right table will only occur
436+
// in some partition hash table, and other partition hash table can output chunk.
437+
// TODO: support nullaware left anti join with shuffle join
438+
DCHECK(_join_type != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || _distribution_mode == TJoinDistributionMode::BROADCAST);
430439
if (_distribution_mode == TJoinDistributionMode::BROADCAST) {
431440
// Broadcast join need only create one hash table, because all the HashJoinProbeOperators
432441
// use the same hash table with their own different probe states.
433442
rhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), rhs_operators);
434443
} else {
435-
// "col NOT IN (NULL, val1, val2)" always returns false, so hash join should
436-
// return empty result in this case. Hash join cannot be divided into multiple
437-
// partitions in this case. Otherwise, NULL value in right table will only occur
438-
// in some partition hash table, and other partition hash table can output chunk.
439-
if (_join_type == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
440-
rhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), rhs_operators);
441-
} else {
442-
// Both HashJoin{Build, Probe}Operator are parallelized
443-
// There are two ways of shuffle
444-
// 1. If previous op is ExchangeSourceOperator and its partition type is HASH_PARTITIONED or BUCKET_SHUFFLE_HASH_PARTITIONED
445-
// then pipeline level shuffle will be performed at sender side (ExchangeSinkOperator), so
446-
// there is no need to perform local shuffle again at receiver side
447-
// 2. Otherwise, add LocalExchangeOperator
448-
// to shuffle multi-stream into #degree_of_parallelism# streams each of that pipes into HashJoin{Build, Probe}Operator.
449-
rhs_operators = context->maybe_interpolate_local_shuffle_exchange(runtime_state(), id(), rhs_operators,
450-
_build_equivalence_partition_expr_ctxs);
451-
}
444+
// Both HashJoin{Build, Probe}Operator are parallelized
445+
// There are two ways of shuffle
446+
// 1. If previous op is ExchangeSourceOperator and its partition type is HASH_PARTITIONED or BUCKET_SHUFFLE_HASH_PARTITIONED
447+
// then pipeline level shuffle will be performed at sender side (ExchangeSinkOperator), so
448+
// there is no need to perform local shuffle again at receiver side
449+
// 2. Otherwise, add LocalExchangeOperator
450+
// to shuffle multi-stream into #degree_of_parallelism# streams each of that pipes into HashJoin{Build, Probe}Operator.
451+
rhs_operators = context->maybe_interpolate_local_shuffle_exchange(runtime_state(), id(), rhs_operators,
452+
_build_equivalence_partition_expr_ctxs);
452453
}
453454

454455
size_t num_right_partitions = context->source_operator(rhs_operators)->degree_of_parallelism();
@@ -468,12 +469,8 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui
468469
_build_runtime_filters, _output_slots, _output_slots, _distribution_mode, false);
469470
auto hash_joiner_factory = std::make_shared<starrocks::pipeline::HashJoinerFactory>(param);
470471

471-
// add placeholder into RuntimeFilterHub, HashJoinBuildOperator will generate runtime filters and fill it,
472-
// Operators consuming the runtime filters will inspect this placeholder.
473-
context->fragment_context()->runtime_filter_hub()->add_holder(_id);
474-
475472
// Create a shared RefCountedRuntimeFilterCollector
476-
auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
473+
auto rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
477474
// In default query engine, we only build one hash table for join right child.
478475
// But for pipeline query engine, we will build `num_right_partitions` hash tables, so we need to enlarge the limit
479476

@@ -508,12 +505,20 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui
508505
DeferOp pop_dependent_pipeline([context]() { context->pop_dependent_pipeline(); });
509506

510507
auto lhs_operators = child(0)->decompose_to_pipeline(context);
511-
if (_distribution_mode == TJoinDistributionMode::BROADCAST) {
512-
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators,
513-
context->degree_of_parallelism());
508+
auto join_colocate_group = context->find_exec_group_by_plan_node_id(_id);
509+
if (join_colocate_group->type() == ExecutionGroupType::COLOCATE) {
510+
DCHECK(context->current_execution_group()->is_colocate_exec_group());
511+
DCHECK_EQ(context->current_execution_group(), join_colocate_group);
512+
context->set_current_execution_group(join_colocate_group);
514513
} else {
515-
if (_join_type == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
516-
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators);
514+
// left child is colocate group, but current join is not colocate group
515+
if (context->current_execution_group()->is_colocate_exec_group()) {
516+
lhs_operators = context->interpolate_grouped_exchange(_id, lhs_operators);
517+
}
518+
519+
if (_distribution_mode == TJoinDistributionMode::BROADCAST) {
520+
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators,
521+
context->degree_of_parallelism());
517522
} else {
518523
auto* rhs_source_op = context->source_operator(rhs_operators);
519524
auto* lhs_source_op = context->source_operator(lhs_operators);
@@ -522,13 +527,27 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui
522527
_probe_equivalence_partition_expr_ctxs);
523528
}
524529
}
530+
525531
lhs_operators.emplace_back(std::move(probe_op));
532+
// add placeholder into RuntimeFilterHub, HashJoinBuildOperator will generate runtime filters and fill it,
533+
// Operators consuming the runtime filters will inspect this placeholder.
534+
if (context->is_colocate_group() && _distribution_mode == TJoinDistributionMode::COLOCATE) {
535+
for (auto runtime_filter_build_desc : _build_runtime_filters) {
536+
// local colocate won't generate global runtime filter
537+
DCHECK(!runtime_filter_build_desc->has_remote_targets());
538+
runtime_filter_build_desc->set_num_colocate_partition(num_right_partitions);
539+
}
540+
context->fragment_context()->runtime_filter_hub()->add_holder(_id, num_right_partitions);
541+
} else {
542+
context->fragment_context()->runtime_filter_hub()->add_holder(_id);
543+
}
526544

527545
if (limit() != -1) {
528546
lhs_operators.emplace_back(std::make_shared<LimitOperatorFactory>(context->next_operator_id(), id(), limit()));
529547
}
530548

531-
if (_hash_join_node.__isset.interpolate_passthrough && _hash_join_node.interpolate_passthrough) {
549+
if (_hash_join_node.__isset.interpolate_passthrough && _hash_join_node.interpolate_passthrough &&
550+
!context->is_colocate_group()) {
532551
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators,
533552
context->degree_of_parallelism(), true);
534553
}

be/src/exec/hash_joiner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,7 @@ Status HashJoiner::_create_runtime_bloom_filters(RuntimeState* state, int64_t li
571571
bool eq_null = _is_null_safes[expr_order];
572572
MutableJoinRuntimeFilterPtr filter = nullptr;
573573
auto multi_partitioned = rf_desc->layout().pipeline_level_multi_partitioned();
574+
multi_partitioned |= rf_desc->num_colocate_partition() > 0;
574575
if (multi_partitioned) {
575576
LogicalType build_type = rf_desc->build_expr_type();
576577
filter = std::shared_ptr<JoinRuntimeFilter>(

be/src/exec/olap_scan_node.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -872,6 +872,9 @@ pipeline::OpFactories OlapScanNode::decompose_to_pipeline(pipeline::PipelineBuil
872872
scan_prepare_op->set_degree_of_parallelism(shared_morsel_queue ? 1 : dop);
873873
this->init_runtime_filter_for_operator(scan_prepare_op.get(), context, rc_rf_probe_collector);
874874

875+
auto exec_group = context->find_exec_group_by_plan_node_id(_id);
876+
context->set_current_execution_group(exec_group);
877+
875878
auto scan_prepare_pipeline = pipeline::OpFactories{
876879
std::move(scan_prepare_op),
877880
std::make_shared<pipeline::NoopSinkOperatorFactory>(context->next_operator_id(), id()),

be/src/exec/olap_scan_prepare.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ void OlapScanConjunctsManager::normalize_join_runtime_filter(const SlotDescripto
387387
// bloom runtime filter
388388
for (const auto& it : runtime_filters->descriptors()) {
389389
const RuntimeFilterProbeDescriptor* desc = it.second;
390-
const JoinRuntimeFilter* rf = desc->runtime_filter();
390+
const JoinRuntimeFilter* rf = desc->runtime_filter(driver_sequence);
391391
using RangeType = ColumnValueRange<RangeValueType>;
392392
using ValueType = typename RunTimeTypeTraits<SlotType>::CppType;
393393
SlotId slot_id;
@@ -397,7 +397,7 @@ void OlapScanConjunctsManager::normalize_join_runtime_filter(const SlotDescripto
397397

398398
// runtime filter existed and does not have null.
399399
if (rf == nullptr) {
400-
rt_ranger_params.add_unarrived_rf(desc, &slot);
400+
rt_ranger_params.add_unarrived_rf(desc, &slot, driver_sequence);
401401
continue;
402402
}
403403

be/src/exec/olap_scan_prepare.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class OlapScanConjunctsManager {
3636
const std::vector<std::string>* key_column_names;
3737
const RuntimeFilterProbeCollector* runtime_filters;
3838
RuntimeState* runtime_state;
39+
int32_t driver_sequence = -1;
3940

4041
private:
4142
// fields generated by parsing conjunct ctxs.

be/src/exec/pipeline/exchange/sink_buffer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ SinkBuffer::SinkBuffer(FragmentContext* fragment_ctx, const std::vector<TPlanFra
3434
_is_dest_merge(is_dest_merge),
3535
_rpc_http_min_size(fragment_ctx->runtime_state()->get_rpc_http_min_size()),
3636
_sent_audit_stats_frequency_upper_limit(
37-
std::max((int64_t)64, BitUtil::RoundUpToPowerOfTwo(fragment_ctx->num_drivers() * 4))) {
37+
std::max((int64_t)64, BitUtil::RoundUpToPowerOfTwo(fragment_ctx->total_dop() * 4))) {
3838
for (const auto& dest : destinations) {
3939
const auto& instance_id = dest.fragment_instance_id;
4040
// instance_id.lo == -1 indicates that the destination is pseudo for bucket shuffle join.

0 commit comments

Comments
 (0)