Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -4328,6 +4328,7 @@ export class BaseQuery {
') AS {{ from_alias }}{% elif from_prepared %}\n' +
'FROM {{ from_prepared }}' +
'{% endif %}' +
'{% for join in joins %}\n{{ join }}{% endfor %}' +
'{% if filter %}\nWHERE {{ filter }}{% endif %}' +
'{% if group_by %}\nGROUP BY {{ group_by }}{% endif %}' +
'{% if having %}\nHAVING {{ having }}{% endif %}' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ export class PrestodbQuery extends BaseQuery {
'FROM (\n {{ from }}\n) AS {{ from_alias }} {% elif from_prepared %}\n' +
'FROM {{ from_prepared }}' +
'{% endif %}' +
'{% for join in joins %}\n{{ join }}{% endfor %}' +
'{% if filter %}\nWHERE {{ filter }}{% endif %}' +
'{% if group_by %} GROUP BY {{ group_by }}{% endif %}' +
'{% if having %}\nHAVING {{ having }}{% endif %}' +
Expand Down
201 changes: 182 additions & 19 deletions rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use std::{
convert::TryInto,
fmt,
future::Future,
mem::take,
pin::Pin,
result,
sync::{Arc, LazyLock},
Expand Down Expand Up @@ -345,6 +346,14 @@ fn expr_name(e: &Expr, schema: &DFSchema) -> Result<String> {
}
}

/// Holds information on whether a column name has been referenced once or multiple times
/// with different values; this is useful to resolve conflicts when remapping columns
/// with non-matching qualifiers.
enum ColumnReferenceState {
Single(Column),
Multiple,
}

/// Holds column remapping for generated SQL
/// Can be used to remap expression in logical plans on top,
/// and to generate mapping between schema and Cube load query in wrapper
Expand Down Expand Up @@ -381,7 +390,47 @@ impl ColumnRemapping {
}

pub fn extend(&mut self, other: ColumnRemapping) {
self.column_remapping.extend(other.column_remapping);
// Let's collect the map again, obtaining conflicting column names and merging on the fly.
// We will need this to remove conflicting unqualified mappings when merging.
let mut conflicting_columns = HashMap::new();
let old_column_remapping = take(&mut self.column_remapping);

for (from, to) in old_column_remapping
.into_iter()
.chain(other.column_remapping)
{
let key = from.name.to_string();
match conflicting_columns.get(&key) {
None => {
// No conflicting columns, keep current value and mark as single reference
conflicting_columns.insert(key, ColumnReferenceState::Single(to.clone()));
self.column_remapping.insert(from, to);
}
Some(ColumnReferenceState::Single(single_to)) => {
if single_to == &to {
// Same target column, no conflict
self.column_remapping.insert(from, to);
continue;
}

// Target is different, so we have a conflict now
conflicting_columns.insert(key, ColumnReferenceState::Multiple);
// Remove unqualified mapping as it's now conflicting
self.column_remapping
.remove(&Column::from_name(from.name.clone()));
// Keep current mapping if it's qualified
if from.relation.is_some() {
self.column_remapping.insert(from, to);
}
}
Some(ColumnReferenceState::Multiple) => {
// Already marked as multiple, insert current value only if it's qualified
if from.relation.is_some() {
self.column_remapping.insert(from, to);
}
}
}
}
}
}

Expand All @@ -392,6 +441,7 @@ struct Remapper {
can_rename_columns: bool,
remapping: HashMap<Column, Column>,
used_targets: HashSet<String>,
used_columns: HashMap<String, ColumnReferenceState>,
}

impl Remapper {
Expand All @@ -407,6 +457,7 @@ impl Remapper {

remapping: HashMap::new(),
used_targets: HashSet::new(),
used_columns: HashMap::new(),
}
}

Expand Down Expand Up @@ -462,24 +513,45 @@ impl Remapper {
};

self.used_targets.insert(new_alias.clone());
self.remapping.insert(
Column::from_name(&original_column.name),
target_column.clone(),
);
if let Some(from_alias) = &self.from_alias {
self.remapping.insert(
Column {
name: original_column.name.clone(),
relation: Some(from_alias.clone()),
},
target_column.clone(),
);
if let Some(original_relation) = &original_column.relation {
if original_relation != from_alias {
self.remapping
.insert(original_column.clone(), target_column);
let unqualified_column = Column::from_name(&original_column.name);
let reference_state = self.used_columns.get(&original_column.name);
match reference_state {
None => {
// No columns with this name yet, mark as single reference
self.remapping
.insert(unqualified_column, target_column.clone());
self.used_columns.insert(
original_column.name.clone(),
ColumnReferenceState::Single(original_column.clone()),
);
}
Some(ColumnReferenceState::Single(ref_column)) => {
// Already have a single reference, check if the target is the same
// and remove unqualified variant if not
if ref_column != &target_column {
self.remapping.remove(&unqualified_column);
self.used_columns
.insert(original_column.name.clone(), ColumnReferenceState::Multiple);
}
}
Some(ColumnReferenceState::Multiple) => {
// Already marked as multiple, nothing to do
}
}

if let Some(from_alias) = &self.from_alias {
if original_column.relation.is_some() {
self.remapping
.insert(original_column.clone(), target_column);
} else {
self.remapping.insert(
Column {
name: original_column.name.clone(),
relation: Some(from_alias.clone()),
},
target_column,
);
}
}
}

Expand All @@ -506,7 +578,11 @@ impl Remapper {
expr: &Expr,
) -> result::Result<String, CubeError> {
let original_alias = expr_name(original_expr, schema)?;
let original_alias_key = Column::from_name(&original_alias);
let original_alias_key = if let Expr::Column(column) = &original_expr {
column.clone()
} else {
Column::from_name(&original_alias)
};
if let Some(alias_column) = self.remapping.get(&original_alias_key) {
return Ok(alias_column.name.clone());
}
Expand Down Expand Up @@ -587,6 +663,7 @@ macro_rules! generate_sql_for_timestamp {
};
}

#[derive(Debug)]
struct GeneratedColumns {
projection: Vec<(AliasedColumn, HashSet<String>)>,
group_by: Vec<(AliasedColumn, HashSet<String>)>,
Expand Down Expand Up @@ -879,6 +956,7 @@ impl CubeScanWrapperNode {
.get_sql_templates()
.select(
new_sql.sql.to_string(),
vec![],
columns,
vec![],
vec![],
Expand Down Expand Up @@ -3528,7 +3606,7 @@ impl WrappedSelectNode {
let SqlGenerationResult {
data_source,
from_alias,
column_remapping,
mut column_remapping,
mut sql,
request,
} = CubeScanWrapperNode::generate_sql_for_node_rec(
Expand All @@ -3542,6 +3620,90 @@ impl WrappedSelectNode {
)
.await?;

let mut joins_sql = Vec::with_capacity(self.joins.len());
for (join_plan, join_condition, join_type) in &self.joins {
let SqlGenerationResult {
data_source,
from_alias,
column_remapping: join_column_remapping,
sql: join_sql,
request: _,
} = CubeScanWrapperNode::generate_sql_for_node_rec(
meta,
Arc::clone(&transport),
Arc::clone(&load_request_meta),
Arc::clone(join_plan),
true,
values.clone(),
parent_data_source,
)
.await?;

let Some(from_alias) = from_alias else {
return Err(CubeError::internal(
"Can't generate SQL for wrapped select: join subquery has no alias".to_string(),
));
};

let Some(data_source) = data_source else {
return Err(CubeError::internal(format!(
"Can't generate SQL for wrapped select: no data source for {:?}",
node
)));
};

let generator = Arc::clone(
meta.data_source_to_sql_generator
.get(&data_source)
.ok_or_else(|| {
CubeError::internal(format!(
"Can't generate SQL for wrapped select: no sql generator for {:?}",
node
))
})?,
);

let join_type_sql = match join_type {
JoinType::Left => generator.get_sql_templates().left_join()?,
JoinType::Inner => generator.get_sql_templates().inner_join()?,
_ => {
return Err(CubeError::internal(format!(
"Unsupported join type for join subquery: {join_type:?}"
)))
}
};

let (join_condition_sql, join_sql) = Self::generate_sql_for_expr(
join_sql,
generator.clone(),
join_condition.clone(),
None,
&HashMap::new(),
)
.await?;

let (join_sql_str, new_values) = join_sql.unpack();
sql.extend_values(new_values);
if let Some(join_column_remapping) = join_column_remapping {
if let Some(column_remapping) = column_remapping.as_mut() {
column_remapping.extend(join_column_remapping);
} else {
column_remapping = Some(join_column_remapping);
}
};

let aliased_join_sql = generator
.get_sql_templates()
.query_aliased(&join_sql_str, &from_alias)?;

let final_join_sql = generator.get_sql_templates().join(
&join_type_sql,
&aliased_join_sql,
&join_condition_sql,
)?;
joins_sql.push(final_join_sql);
}

let subqueries_sql = self
.prepare_subqueries_sql(
meta,
Expand Down Expand Up @@ -3593,6 +3755,7 @@ impl WrappedSelectNode {
.get_sql_templates()
.select(
sql.sql.to_string(),
joins_sql,
projection.into_iter().map(|(m, _)| m).collect(),
group_by.into_iter().map(|(m, _)| m).collect(),
group_descs,
Expand Down
54 changes: 54 additions & 0 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18180,4 +18180,58 @@ LIMIT {{ limit }}{% endif %}"#.to_string(),

Ok(())
}

#[tokio::test]
async fn test_grouped_join_grouped_sql_push_down() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_testing_logger();

let query_plan = convert_select_to_query_plan(
r#"
WITH t1 AS (
SELECT DISTINCT customer_gender
FROM KibanaSampleDataEcommerce
WHERE has_subscription = TRUE
),
t2 AS (
SELECT
customer_gender,
COUNT(DISTINCT id) AS user_count,
COUNT(
DISTINCT CASE
WHEN has_subscription = TRUE THEN id
END
) AS subscribed_user_count
FROM KibanaSampleDataEcommerce
WHERE customer_gender IS NOT NULL
GROUP BY 1
)
SELECT
t1.customer_gender,
t2.user_count,
t2.subscribed_user_count
FROM
t1 AS t1,
t2 AS t2
WHERE t1.customer_gender = t2.customer_gender
ORDER BY t2.user_count DESC
LIMIT 5000
"#
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let logical_plan = query_plan.as_logical_plan();
let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;
assert!(sql.contains("INNER JOIN ("));

let physical_plan = query_plan.as_physical_plan().await.unwrap();
println!(
"Physical plan: {}",
displayable(physical_plan.as_ref()).indent()
);
}
}
Loading
Loading