Skip to content

[Feature] support Group execution #42352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/connector/connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ Status DataSource::parse_runtime_filters(RuntimeState* state) {
if (_runtime_filters == nullptr || _runtime_filters->size() == 0) return Status::OK();
for (const auto& item : _runtime_filters->descriptors()) {
RuntimeFilterProbeDescriptor* probe = item.second;
const JoinRuntimeFilter* filter = probe->runtime_filter();
DCHECK(runtime_bloom_filter_eval_context.driver_sequence == -1);
const JoinRuntimeFilter* filter = probe->runtime_filter(runtime_bloom_filter_eval_context.driver_sequence);
if (filter == nullptr) continue;
SlotId slot_id;
if (!probe->is_probe_slot_ref(&slot_id)) continue;
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ set(EXEC_FILES
pipeline/pipeline.cpp
pipeline/spill_process_operator.cpp
pipeline/spill_process_channel.cpp
pipeline/group_execution/execution_group.cpp
pipeline/group_execution/execution_group_builder.cpp
pipeline/group_execution/group_operator.cpp
workgroup/work_group.cpp
workgroup/scan_executor.cpp
workgroup/scan_task_queue.cpp
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/aggregate/aggregate_streaming_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ pipeline::OpFactories AggregateStreamingNode::decompose_to_pipeline(pipeline::Pi
size_t degree_of_parallelism = context->source_operator(ops_with_sink)->degree_of_parallelism();

auto should_cache = context->should_interpolate_cache_operator(id(), ops_with_sink[0]);
if (!should_cache && _tnode.agg_node.__isset.interpolate_passthrough && _tnode.agg_node.interpolate_passthrough &&
context->could_local_shuffle(ops_with_sink)) {
bool could_local_shuffle = !should_cache && !context->enable_group_execution();
if (could_local_shuffle && _tnode.agg_node.__isset.interpolate_passthrough &&
_tnode.agg_node.interpolate_passthrough && context->could_local_shuffle(ops_with_sink)) {
ops_with_sink = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), ops_with_sink,
degree_of_parallelism, true);
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ int ConnectorScanNode::_estimate_max_concurrent_chunks() const {
}

pipeline::OpFactories ConnectorScanNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
auto exec_group = context->find_exec_group_by_plan_node_id(_id);
context->set_current_execution_group(exec_group);

size_t dop = context->dop_of_source_operator(id());
std::shared_ptr<pipeline::ConnectorScanOperatorFactory> scan_op = nullptr;
bool stream_data_source = _data_source_provider->stream_data_source();
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,10 @@ std::vector<std::shared_ptr<pipeline::OperatorFactory>> CrossJoinNode::_decompos
std::move(_conjunct_ctxs), std::move(cross_join_context), _join_op);
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(left_factory.get(), context, rc_rf_probe_collector);
left_ops = context->maybe_interpolate_local_adpative_passthrough_exchange(runtime_state(), id(), left_ops,
context->degree_of_parallelism());
if (context->is_colocate_group()) {
left_ops = context->maybe_interpolate_local_adpative_passthrough_exchange(runtime_state(), id(), left_ops,
context->degree_of_parallelism());
}
left_ops.emplace_back(std::move(left_factory));

if (limit() != -1) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ void ExchangeNode::debug_string(int indentation_level, std::stringstream* out) c

pipeline::OpFactories ExchangeNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
using namespace pipeline;

auto exec_group = context->find_exec_group_by_plan_node_id(_id);
context->set_current_execution_group(exec_group);
OpFactories operators;
if (!_is_merging) {
auto* query_ctx = context->runtime_state()->query_ctx();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Status ExecNode::prepare(RuntimeState* state) {
_mem_tracker.reset(new MemTracker(_runtime_profile.get(), std::make_tuple(true, false, false), "", -1,
_runtime_profile->name(), nullptr));
RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state));
RETURN_IF_ERROR(_runtime_filter_collector.prepare(state, row_desc(), _runtime_profile.get()));
RETURN_IF_ERROR(_runtime_filter_collector.prepare(state, _runtime_profile.get()));

// TODO(zc):
// AddExprCtxsToFree(_conjunct_ctxs);
Expand Down
77 changes: 48 additions & 29 deletions be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "exec/hash_joiner.h"
#include "exec/pipeline/chunk_accumulate_operator.h"
#include "exec/pipeline/exchange/exchange_source_operator.h"
#include "exec/pipeline/group_execution/execution_group_builder.h"
#include "exec/pipeline/group_execution/execution_group_fwd.h"
#include "exec/pipeline/hashjoin/hash_join_build_operator.h"
#include "exec/pipeline/hashjoin/hash_join_probe_operator.h"
#include "exec/pipeline/hashjoin/hash_joiner_factory.h"
Expand All @@ -38,6 +40,8 @@
#include "exprs/expr.h"
#include "exprs/in_const_predicate.hpp"
#include "exprs/runtime_filter_bank.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/RuntimeFilter_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/current_thread.h"
#include "runtime/runtime_filter_worker.h"
Expand Down Expand Up @@ -425,30 +429,27 @@ void HashJoinNode::close(RuntimeState* state) {
template <class HashJoinerFactory, class HashJoinBuilderFactory, class HashJoinProbeFactory>
pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
using namespace pipeline;

auto rhs_operators = child(1)->decompose_to_pipeline(context);
// "col NOT IN (NULL, val1, val2)" always returns false, so hash join should
// return empty result in this case. Hash join cannot be divided into multiple
// partitions in this case. Otherwise, NULL value in right table will only occur
// in some partition hash table, and other partition hash table can output chunk.
// TODO: support nullaware left anti join with shuffle join
DCHECK(_join_type != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || _distribution_mode == TJoinDistributionMode::BROADCAST);
if (_distribution_mode == TJoinDistributionMode::BROADCAST) {
// Broadcast join need only create one hash table, because all the HashJoinProbeOperators
// use the same hash table with their own different probe states.
rhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), rhs_operators);
} else {
// "col NOT IN (NULL, val1, val2)" always returns false, so hash join should
// return empty result in this case. Hash join cannot be divided into multiple
// partitions in this case. Otherwise, NULL value in right table will only occur
// in some partition hash table, and other partition hash table can output chunk.
if (_join_type == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
rhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), rhs_operators);
} else {
// Both HashJoin{Build, Probe}Operator are parallelized
// There are two ways of shuffle
// 1. If previous op is ExchangeSourceOperator and its partition type is HASH_PARTITIONED or BUCKET_SHUFFLE_HASH_PARTITIONED
// then pipeline level shuffle will be performed at sender side (ExchangeSinkOperator), so
// there is no need to perform local shuffle again at receiver side
// 2. Otherwise, add LocalExchangeOperator
// to shuffle multi-stream into #degree_of_parallelism# streams each of that pipes into HashJoin{Build, Probe}Operator.
rhs_operators = context->maybe_interpolate_local_shuffle_exchange(runtime_state(), id(), rhs_operators,
_build_equivalence_partition_expr_ctxs);
}
// Both HashJoin{Build, Probe}Operator are parallelized
// There are two ways of shuffle
// 1. If previous op is ExchangeSourceOperator and its partition type is HASH_PARTITIONED or BUCKET_SHUFFLE_HASH_PARTITIONED
// then pipeline level shuffle will be performed at sender side (ExchangeSinkOperator), so
// there is no need to perform local shuffle again at receiver side
// 2. Otherwise, add LocalExchangeOperator
// to shuffle multi-stream into #degree_of_parallelism# streams each of that pipes into HashJoin{Build, Probe}Operator.
rhs_operators = context->maybe_interpolate_local_shuffle_exchange(runtime_state(), id(), rhs_operators,
_build_equivalence_partition_expr_ctxs);
}

size_t num_right_partitions = context->source_operator(rhs_operators)->degree_of_parallelism();
Expand All @@ -468,12 +469,8 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui
_build_runtime_filters, _output_slots, _output_slots, _distribution_mode, false);
auto hash_joiner_factory = std::make_shared<starrocks::pipeline::HashJoinerFactory>(param);

// add placeholder into RuntimeFilterHub, HashJoinBuildOperator will generate runtime filters and fill it,
// Operators consuming the runtime filters will inspect this placeholder.
context->fragment_context()->runtime_filter_hub()->add_holder(_id);

// Create a shared RefCountedRuntimeFilterCollector
auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
auto rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
// In default query engine, we only build one hash table for join right child.
// But for pipeline query engine, we will build `num_right_partitions` hash tables, so we need to enlarge the limit

Expand Down Expand Up @@ -508,12 +505,20 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui
DeferOp pop_dependent_pipeline([context]() { context->pop_dependent_pipeline(); });

auto lhs_operators = child(0)->decompose_to_pipeline(context);
if (_distribution_mode == TJoinDistributionMode::BROADCAST) {
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators,
context->degree_of_parallelism());
auto join_colocate_group = context->find_exec_group_by_plan_node_id(_id);
if (join_colocate_group->type() == ExecutionGroupType::COLOCATE) {
DCHECK(context->current_execution_group()->is_colocate_exec_group());
DCHECK_EQ(context->current_execution_group(), join_colocate_group);
context->set_current_execution_group(join_colocate_group);
} else {
if (_join_type == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators);
// left child is colocate group, but current join is not colocate group
if (context->current_execution_group()->is_colocate_exec_group()) {
lhs_operators = context->interpolate_grouped_exchange(_id, lhs_operators);
}

if (_distribution_mode == TJoinDistributionMode::BROADCAST) {
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators,
context->degree_of_parallelism());
} else {
auto* rhs_source_op = context->source_operator(rhs_operators);
auto* lhs_source_op = context->source_operator(lhs_operators);
Expand All @@ -522,13 +527,27 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui
_probe_equivalence_partition_expr_ctxs);
}
}

lhs_operators.emplace_back(std::move(probe_op));
// add placeholder into RuntimeFilterHub, HashJoinBuildOperator will generate runtime filters and fill it,
// Operators consuming the runtime filters will inspect this placeholder.
if (context->is_colocate_group() && _distribution_mode == TJoinDistributionMode::COLOCATE) {
for (auto runtime_filter_build_desc : _build_runtime_filters) {
// local colocate won't generate global runtime filter
DCHECK(!runtime_filter_build_desc->has_remote_targets());
runtime_filter_build_desc->set_num_colocate_partition(num_right_partitions);
}
context->fragment_context()->runtime_filter_hub()->add_holder(_id, num_right_partitions);
} else {
context->fragment_context()->runtime_filter_hub()->add_holder(_id);
}

if (limit() != -1) {
lhs_operators.emplace_back(std::make_shared<LimitOperatorFactory>(context->next_operator_id(), id(), limit()));
}

if (_hash_join_node.__isset.interpolate_passthrough && _hash_join_node.interpolate_passthrough) {
if (_hash_join_node.__isset.interpolate_passthrough && _hash_join_node.interpolate_passthrough &&
!context->is_colocate_group()) {
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators,
context->degree_of_parallelism(), true);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ Status HashJoiner::_create_runtime_bloom_filters(RuntimeState* state, int64_t li
bool eq_null = _is_null_safes[expr_order];
MutableJoinRuntimeFilterPtr filter = nullptr;
auto multi_partitioned = rf_desc->layout().pipeline_level_multi_partitioned();
multi_partitioned |= rf_desc->num_colocate_partition() > 0;
if (multi_partitioned) {
LogicalType build_type = rf_desc->build_expr_type();
filter = std::shared_ptr<JoinRuntimeFilter>(
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,9 @@ pipeline::OpFactories OlapScanNode::decompose_to_pipeline(pipeline::PipelineBuil
scan_prepare_op->set_degree_of_parallelism(shared_morsel_queue ? 1 : dop);
this->init_runtime_filter_for_operator(scan_prepare_op.get(), context, rc_rf_probe_collector);

auto exec_group = context->find_exec_group_by_plan_node_id(_id);
context->set_current_execution_group(exec_group);

auto scan_prepare_pipeline = pipeline::OpFactories{
std::move(scan_prepare_op),
std::make_shared<pipeline::NoopSinkOperatorFactory>(context->next_operator_id(), id()),
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/olap_scan_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ void OlapScanConjunctsManager::normalize_join_runtime_filter(const SlotDescripto
// bloom runtime filter
for (const auto& it : runtime_filters->descriptors()) {
const RuntimeFilterProbeDescriptor* desc = it.second;
const JoinRuntimeFilter* rf = desc->runtime_filter();
const JoinRuntimeFilter* rf = desc->runtime_filter(driver_sequence);
using RangeType = ColumnValueRange<RangeValueType>;
using ValueType = typename RunTimeTypeTraits<SlotType>::CppType;
SlotId slot_id;
Expand All @@ -397,7 +397,7 @@ void OlapScanConjunctsManager::normalize_join_runtime_filter(const SlotDescripto

// runtime filter existed and does not have null.
if (rf == nullptr) {
rt_ranger_params.add_unarrived_rf(desc, &slot);
rt_ranger_params.add_unarrived_rf(desc, &slot, driver_sequence);
continue;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/exec/olap_scan_prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class OlapScanConjunctsManager {
const std::vector<std::string>* key_column_names;
const RuntimeFilterProbeCollector* runtime_filters;
RuntimeState* runtime_state;
int32_t driver_sequence = -1;

private:
// fields generated by parsing conjunct ctxs.
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/exchange/sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ SinkBuffer::SinkBuffer(FragmentContext* fragment_ctx, const std::vector<TPlanFra
_is_dest_merge(is_dest_merge),
_rpc_http_min_size(fragment_ctx->runtime_state()->get_rpc_http_min_size()),
_sent_audit_stats_frequency_upper_limit(
std::max((int64_t)64, BitUtil::RoundUpToPowerOfTwo(fragment_ctx->num_drivers() * 4))) {
std::max((int64_t)64, BitUtil::RoundUpToPowerOfTwo(fragment_ctx->total_dop() * 4))) {
for (const auto& dest : destinations) {
const auto& instance_id = dest.fragment_instance_id;
// instance_id.lo == -1 indicates that the destination is pseudo for bucket shuffle join.
Expand Down
Loading