-
Notifications
You must be signed in to change notification settings - Fork 220
/
Copy pathlogical_aggregate.rs
141 lines (127 loc) · 4.05 KB
/
logical_aggregate.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright 2022 RisingLight Project Authors. Licensed under Apache-2.0.
use std::fmt;
use serde::Serialize;
use super::*;
use crate::binder::{BoundAggCall, BoundExpr};
use crate::optimizer::logical_plan_rewriter::ExprRewriter;
/// The logical plan of hash aggregate operation.
#[derive(Debug, Clone, Serialize)]
pub struct LogicalAggregate {
agg_calls: Vec<BoundAggCall>,
/// Group keys in hash aggregation (optional)
group_keys: Vec<BoundExpr>,
child: PlanRef,
}
impl LogicalAggregate {
pub fn new(agg_calls: Vec<BoundAggCall>, group_keys: Vec<BoundExpr>, child: PlanRef) -> Self {
LogicalAggregate {
agg_calls,
group_keys,
child,
}
}
/// Get a reference to the logical aggregate's agg calls.
pub fn agg_calls(&self) -> &[BoundAggCall] {
self.agg_calls.as_ref()
}
/// Get a reference to the logical aggregate's group keys.
pub fn group_keys(&self) -> &[BoundExpr] {
self.group_keys.as_ref()
}
pub fn clone_with_rewrite_expr(
&self,
new_child: PlanRef,
rewriter: &impl ExprRewriter,
) -> Self {
let mut new_agg_calls = self.agg_calls().to_vec();
let mut new_keys = self.group_keys().to_vec();
for agg in &mut new_agg_calls {
for arg in &mut agg.args {
rewriter.rewrite_expr(arg);
}
}
for keys in &mut new_keys {
rewriter.rewrite_expr(keys);
}
LogicalAggregate::new(new_agg_calls, new_keys, new_child)
}
}
impl PlanTreeNodeUnary for LogicalAggregate {
fn child(&self) -> PlanRef {
self.child.clone()
}
#[must_use]
fn clone_with_child(&self, child: PlanRef) -> Self {
Self::new(self.agg_calls().to_vec(), self.group_keys().to_vec(), child)
}
}
impl_plan_tree_node_for_unary!(LogicalAggregate);
impl PlanNode for LogicalAggregate {
fn schema(&self) -> Vec<ColumnDesc> {
let child_schema = self.child.schema();
self.group_keys
.iter()
.map(|expr| {
ColumnDesc::new(
expr.return_type().unwrap(),
expr.format_name(&child_schema),
false,
)
})
.chain(self.agg_calls.iter().map(|agg_call| {
agg_call
.return_type
.clone()
.to_column(format!("{}", agg_call.kind))
}))
.collect()
}
fn estimated_cardinality(&self) -> usize {
self.child().estimated_cardinality()
}
}
impl fmt::Display for LogicalAggregate {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "LogicalAggregate: {} agg calls", self.agg_calls.len(),)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::binder::AggKind;
use crate::types::{DataTypeExt, DataTypeKind};
#[test]
fn test_aggregate_out_names() {
let plan = LogicalAggregate::new(
vec![
BoundAggCall {
kind: AggKind::Sum,
args: vec![],
return_type: DataTypeKind::Double.not_null(),
},
BoundAggCall {
kind: AggKind::Avg,
args: vec![],
return_type: DataTypeKind::Double.not_null(),
},
BoundAggCall {
kind: AggKind::Count,
args: vec![],
return_type: DataTypeKind::Double.not_null(),
},
BoundAggCall {
kind: AggKind::RowCount,
args: vec![],
return_type: DataTypeKind::Double.not_null(),
},
],
vec![],
Arc::new(Dummy {}),
);
let column_names = plan.out_names();
assert_eq!(column_names[0], "sum");
assert_eq!(column_names[1], "avg");
assert_eq!(column_names[2], "count");
assert_eq!(column_names[3], "count");
}
}