Skip to content

Commit 605e72e

Browse files
feat(executor_v2): migrate executors to run TPCH (#727)
1 parent 2bedd78 commit 605e72e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1739
-439
lines changed

src/array/data_chunk.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::ops::RangeBounds;
55
use std::sync::Arc;
66

77
use super::*;
8-
use crate::types::DataValue;
8+
use crate::types::{DataValue, Row};
99

1010
/// A collection of arrays.
1111
///
@@ -108,6 +108,16 @@ impl DataChunk {
108108
}
109109
arrays.into_iter().collect()
110110
}
111+
112+
/// Concatenate two chunks in rows.
113+
pub fn row_concat(self, other: Self) -> Self {
114+
assert_eq!(self.cardinality(), other.cardinality());
115+
self.arrays
116+
.iter()
117+
.chain(other.arrays.iter())
118+
.cloned()
119+
.collect()
120+
}
111121
}
112122

113123
/// Print the data chunk as a pretty table.
@@ -251,4 +261,8 @@ impl RowRef<'_> {
251261
pub fn values(&self) -> impl Iterator<Item = DataValue> + '_ {
252262
self.chunk.arrays().iter().map(|a| a.get(self.row_idx))
253263
}
264+
265+
pub fn to_owned(&self) -> Row {
266+
self.values().collect()
267+
}
254268
}

src/array/internal_ext.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub trait ArrayFromDataExt: Array {
3636

3737
/// Implement dispatch functions for `ArrayImplValidExt` and `ArrayImplEstimateExt`
3838
macro_rules! impl_array_impl_internal_ext {
39-
([], $( { $Abc:ident, $abc:ident, $AbcArray:ty, $AbcArrayBuilder:ty, $Value:ident, $Type:pat } ),*) => {
39+
([], $( { $Abc:ident, $Type:ty, $abc:ident, $AbcArray:ty, $AbcArrayBuilder:ty, $Value:ident, $Pattern:pat } ),*) => {
4040
impl ArrayImplValidExt for ArrayImpl {
4141
fn get_valid_bitmap(&self) -> &BitVec {
4242
match self {

src/array/iterator.rs

+19-8
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ use super::Array;
99
#[derive(Clone)]
1010
pub struct ArrayIter<'a, A: Array> {
1111
data: &'a A,
12-
pos: usize,
13-
_phantom: PhantomData<&'a usize>,
12+
begin: usize,
13+
end: usize,
1414
}
1515

1616
impl<'a, A: Array> ArrayIter<'a, A> {
1717
pub fn new(data: &'a A) -> Self {
1818
Self {
1919
data,
20-
pos: 0,
21-
_phantom: PhantomData,
20+
begin: 0,
21+
end: data.len(),
2222
}
2323
}
2424
}
@@ -27,21 +27,32 @@ impl<'a, A: Array> Iterator for ArrayIter<'a, A> {
2727
type Item = Option<&'a A::Item>;
2828

2929
fn next(&mut self) -> Option<Self::Item> {
30-
if self.pos >= self.data.len() {
30+
if self.begin >= self.end {
3131
None
3232
} else {
33-
let item = self.data.get(self.pos);
34-
self.pos += 1;
33+
let item = self.data.get(self.begin);
34+
self.begin += 1;
3535
Some(item)
3636
}
3737
}
3838

3939
fn size_hint(&self) -> (usize, Option<usize>) {
40-
let exact = self.data.len() - self.pos;
40+
let exact = self.end - self.begin;
4141
(exact, Some(exact))
4242
}
4343
}
4444

45+
impl<'a, A: Array> DoubleEndedIterator for ArrayIter<'a, A> {
46+
fn next_back(&mut self) -> Option<Self::Item> {
47+
if self.begin >= self.end {
48+
None
49+
} else {
50+
self.end -= 1;
51+
Some(self.data.get(self.end))
52+
}
53+
}
54+
}
55+
4556
pub struct NonNullArrayIter<'a, A: Array> {
4657
data: &'a A,
4758
pos: usize,

src/array/mod.rs

+13-13
Original file line numberDiff line numberDiff line change
@@ -225,15 +225,15 @@ macro_rules! for_all_variants {
225225
($macro:tt $(, $x:tt)*) => {
226226
$macro! {
227227
[$($x),*],
228-
{ Int32, int32, I32Array, I32ArrayBuilder, Int32, Int32 },
229-
{ Int64, int64, I64Array, I64ArrayBuilder, Int64, Int64 },
230-
{ Float64, float64, F64Array, F64ArrayBuilder, Float64, Float64 },
231-
{ Utf8, utf8, Utf8Array, Utf8ArrayBuilder, String, String },
232-
{ Blob, blob, BlobArray, BlobArrayBuilder, Blob, Blob },
233-
{ Bool, bool, BoolArray, BoolArrayBuilder, Bool, Bool },
234-
{ Decimal, decimal, DecimalArray, DecimalArrayBuilder, Decimal, Decimal(_, _) },
235-
{ Date, date, DateArray, DateArrayBuilder, Date, Date },
236-
{ Interval, interval, IntervalArray, IntervalArrayBuilder, Interval, Interval }
228+
{ Bool, bool, bool, BoolArray, BoolArrayBuilder, Bool, Bool },
229+
{ Int32, i32, int32, I32Array, I32ArrayBuilder, Int32, Int32 },
230+
{ Int64, i64, int64, I64Array, I64ArrayBuilder, Int64, Int64 },
231+
{ Float64, F64, float64, F64Array, F64ArrayBuilder, Float64, Float64 },
232+
{ Decimal, Decimal, decimal, DecimalArray, DecimalArrayBuilder, Decimal, Decimal(_, _) },
233+
{ Date, Date, date, DateArray, DateArrayBuilder, Date, Date },
234+
{ Interval, Interval, interval, IntervalArray, IntervalArrayBuilder, Interval, Interval },
235+
{ Utf8, str, utf8, Utf8Array, Utf8ArrayBuilder, String, String },
236+
{ Blob, BlobRef, blob, BlobArray, BlobArrayBuilder, Blob, Blob }
237237
}
238238
};
239239
}
@@ -244,7 +244,7 @@ pub struct TypeMismatch;
244244

245245
/// Implement `From` and `TryFrom` between conversions of concrete array types and enum sum type.
246246
macro_rules! impl_from {
247-
([], $( { $Abc:ident, $abc:ident, $AbcArray:ty, $AbcArrayBuilder:ty, $Value:ident, $Type:pat } ),*) => {
247+
([], $( { $Abc:ident, $Type:ty, $abc:ident, $AbcArray:ty, $AbcArrayBuilder:ty, $Value:ident, $Pattern:pat } ),*) => {
248248
$(
249249
/// Implement `AbcArray -> ArrayImpl`
250250
impl From<$AbcArray> for ArrayImpl {
@@ -315,7 +315,7 @@ for_all_variants! { impl_from }
315315

316316
/// Implement dispatch functions for `ArrayBuilderImpl`.
317317
macro_rules! impl_array_builder {
318-
([], $( { $Abc:ident, $abc:ident, $AbcArray:ty, $AbcArrayBuilder:ty, $Value:ident, $Type:pat } ),*) => {
318+
([], $( { $Abc:ident, $Type:ty, $abc:ident, $AbcArray:ty, $AbcArrayBuilder:ty, $Value:ident, $Pattern:pat } ),*) => {
319319
impl ArrayBuilderImpl {
320320
/// Reserve at least `capacity` values.
321321
pub fn reserve(&mut self, capacity: usize) {
@@ -342,7 +342,7 @@ macro_rules! impl_array_builder {
342342
Null => Self::Int32(I32ArrayBuilder::with_capacity(capacity)),
343343
Struct(_) => todo!("array of Struct type"),
344344
$(
345-
$Type => Self::$Abc(<$AbcArrayBuilder>::with_capacity(capacity)),
345+
$Pattern => Self::$Abc(<$AbcArrayBuilder>::with_capacity(capacity)),
346346
)*
347347
}
348348
}
@@ -444,7 +444,7 @@ impl ArrayBuilderImpl {
444444

445445
/// Implement dispatch functions for `ArrayImpl`.
446446
macro_rules! impl_array {
447-
([], $( { $Abc:ident, $abc:ident, $AbcArray:ty, $AbcArrayBuilder:ty, $Value:ident, $Type:pat } ),*) => {
447+
([], $( { $Abc:ident, $Type:ty, $abc:ident, $AbcArray:ty, $AbcArrayBuilder:ty, $Value:ident, $Pattern:pat } ),*) => {
448448
impl ArrayImpl {
449449
$(
450450
paste! {

src/array/shuffle_ext.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ pub trait ArrayImplSortExt {
122122

123123
/// Implement dispatch functions for `ArrayImplBuilderPickExt` and `ArrayImplSortExt`
124124
macro_rules! impl_array_impl_shuffle_ext {
125-
([], $( { $Abc:ident, $abc:ident, $AbcArray:ty, $AbcArrayBuilder:ty, $Value:ident, $Type:pat } ),*) => {
125+
([], $( { $Abc:ident, $Type:ty, $abc:ident, $AbcArray:ty, $AbcArrayBuilder:ty, $Value:ident, $Pattern:pat } ),*) => {
126126
impl ArrayImplBuilderPickExt for ArrayBuilderImpl {
127127
fn pick_from(&mut self, array: &ArrayImpl, logical_rows: &[usize]) {
128128
match (self, array) {

src/binder_v2/copy.rs

+10-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::result::Result as RawResult;
1+
use std::path::PathBuf;
22
use std::str::FromStr;
33

44
use serde::{Deserialize, Serialize};
@@ -7,7 +7,7 @@ use super::*;
77

88
#[derive(Debug, PartialEq, PartialOrd, Ord, Hash, Eq, Clone, Serialize, Deserialize)]
99
pub struct ExtSource {
10-
pub path: String,
10+
pub path: PathBuf,
1111
pub format: FileFormat,
1212
}
1313

@@ -40,8 +40,7 @@ impl std::fmt::Display for FileFormat {
4040

4141
impl FromStr for ExtSource {
4242
type Err = ();
43-
44-
fn from_str(_s: &str) -> RawResult<Self, Self::Err> {
43+
fn from_str(_s: &str) -> std::result::Result<Self, Self::Err> {
4544
Err(())
4645
}
4746
}
@@ -55,11 +54,11 @@ impl Binder {
5554
target: CopyTarget,
5655
options: &[CopyOption],
5756
) -> Result {
58-
let cols = self.bind_table_columns(table_name, columns)?;
57+
let cols = self.bind_table_columns(&table_name, columns)?;
5958

6059
let ext_source = self.egraph.add(Node::ExtSource(ExtSource {
6160
path: match target {
62-
CopyTarget::File { filename } => filename,
61+
CopyTarget::File { filename } => filename.into(),
6362
t => todo!("unsupported copy target: {:?}", t),
6463
},
6564
format: FileFormat::from_options(options),
@@ -71,8 +70,11 @@ impl Binder {
7170
self.egraph.add(Node::CopyTo([ext_source, scan]))
7271
} else {
7372
// COPY <dest_table> FROM <source_file>
74-
let copy = self.egraph.add(Node::CopyFrom(ext_source));
75-
self.egraph.add(Node::Insert([cols, copy]))
73+
let table = self.bind_table_id(&table_name)?;
74+
let types = self.check_type(cols)?.kind();
75+
let types = self.egraph.add(Node::Type(types));
76+
let copy = self.egraph.add(Node::CopyFrom([ext_source, types]));
77+
self.egraph.add(Node::Insert([table, cols, copy]))
7678
};
7779

7880
Ok(copy)

src/binder_v2/create_table.rs

+6-14
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,24 @@ use std::str::FromStr;
55
use serde::{Deserialize, Serialize};
66

77
use super::*;
8-
use crate::catalog::{ColumnCatalog, ColumnDesc};
8+
use crate::catalog::ColumnCatalog;
99
use crate::types::{ColumnId, DatabaseId, SchemaId};
1010

1111
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
1212
pub struct CreateTable {
1313
pub database_id: DatabaseId,
1414
pub schema_id: SchemaId,
1515
pub table_name: String,
16-
pub columns_desc: Vec<ColumnDesc>,
16+
pub columns: Vec<ColumnCatalog>,
1717
pub ordered_pk_ids: Vec<ColumnId>,
1818
}
1919

2020
impl std::fmt::Display for CreateTable {
2121
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2222
write!(
2323
f,
24-
"databaseId: {}, schemaId: {}, tableName: {}, columnDesc: {:?}, orderedIds: {:?}",
25-
self.database_id,
26-
self.schema_id,
27-
self.table_name,
28-
self.columns_desc,
29-
self.ordered_pk_ids
24+
"databaseId: {}, schemaId: {}, tableName: {}, columns: {:?}, orderedIds: {:?}",
25+
self.database_id, self.schema_id, self.table_name, self.columns, self.ordered_pk_ids
3026
)
3127
}
3228
}
@@ -46,7 +42,7 @@ impl Binder {
4642
columns: &[ColumnDef],
4743
constraints: &[TableConstraint],
4844
) -> Result {
49-
let name = lower_case_name(name);
45+
let name = lower_case_name(&name);
5046
let (database_name, schema_name, table_name) = split_name(&name)?;
5147
let db = self
5248
.catalog
@@ -109,11 +105,7 @@ impl Binder {
109105
database_id: db.id(),
110106
schema_id: schema.id(),
111107
table_name: table_name.into(),
112-
columns_desc: columns
113-
.iter()
114-
.map(|col| col.desc())
115-
.cloned()
116-
.collect::<Vec<ColumnDesc>>(),
108+
columns,
117109
ordered_pk_ids,
118110
}));
119111
Ok(create)

src/binder_v2/delete.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ impl Binder {
66
table_name: TableFactor,
77
selection: Option<Expr>,
88
) -> Result {
9-
let table_id = self.bind_table_id(table_name.clone())?;
9+
let TableFactor::Table { name, .. } = &table_name else {
10+
todo!("unsupported delete target: {:?}", table_name);
11+
};
12+
let table_id = self.bind_table_id(name)?;
1013
let scan = self.bind_table(table_name)?;
1114
let cond = self.bind_condition(selection)?;
1215
let filter = self.egraph.add(Node::Filter([cond, scan]));

src/binder_v2/drop.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl Binder {
5353
) -> Result {
5454
match object_type {
5555
ObjectType::Table => {
56-
let name = lower_case_name(names[0].clone());
56+
let name = lower_case_name(&names[0]);
5757
let (database_name, schema_name, table_name) = split_name(&name)?;
5858
let table_ref_id = self
5959
.catalog

src/binder_v2/expr.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ impl Binder {
7676
return Err(BindError::AmbiguousColumn(column_name.into()));
7777
}
7878
let id = self.egraph.add(Node::Column(column_ref_id));
79-
self.egraph[id].data.type_ =
80-
Ok(self.catalog.get_column(&column_ref_id).unwrap().datatype());
8179
return Ok(id);
8280
}
8381
if let Some(id) = self.current_ctx().aliases.get(column_name) {
@@ -192,7 +190,11 @@ impl Binder {
192190
"max" => Node::Max(args[0]),
193191
"min" => Node::Min(args[0]),
194192
"sum" => Node::Sum(args[0]),
195-
"avg" => Node::Avg(args[0]),
193+
"avg" => {
194+
let sum = self.egraph.add(Node::Sum(args[0]));
195+
let count = self.egraph.add(Node::Count(args[0]));
196+
Node::Div([sum, count])
197+
}
196198
"first" => Node::First(args[0]),
197199
"last" => Node::Last(args[0]),
198200
name => todo!("Unsupported function: {}", name),

src/binder_v2/insert.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ impl Binder {
1010
columns: Vec<Ident>,
1111
source: Box<Query>,
1212
) -> Result {
13-
let cols = self.bind_table_columns(table_name, &columns)?;
13+
let table = self.bind_table_id(&table_name)?;
14+
let cols = self.bind_table_columns(&table_name, &columns)?;
1415
let source = self.bind_query(*source)?;
15-
let id = self.egraph.add(Node::Insert([cols, source]));
16+
let id = self.egraph.add(Node::Insert([table, cols, source]));
1617
Ok(id)
1718
}
1819
}

src/binder_v2/mod.rs

+6-8
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::parser::*;
1212
use crate::planner::{Expr as Node, RecExpr, TypeError, TypeSchemaAnalysis};
1313
use crate::types::{DataTypeKind, DataValue};
1414

15-
mod copy;
15+
pub mod copy;
1616
mod create_table;
1717
mod delete;
1818
mod drop;
@@ -21,7 +21,6 @@ mod insert;
2121
mod select;
2222
mod table;
2323

24-
pub use self::copy::*;
2524
pub use self::create_table::*;
2625
pub use self::delete::*;
2726
pub use self::drop::*;
@@ -92,8 +91,8 @@ impl Binder {
9291
/// Create a new binder.
9392
pub fn new(catalog: Arc<RootCatalog>) -> Self {
9493
Binder {
95-
egraph: egg::EGraph::default(),
96-
catalog,
94+
catalog: catalog.clone(),
95+
egraph: egg::EGraph::new(TypeSchemaAnalysis { catalog }),
9796
contexts: vec![Context::default()],
9897
}
9998
}
@@ -173,9 +172,8 @@ impl Binder {
173172
Ok(())
174173
}
175174

176-
fn check_type(&self, id: Id) -> Result<()> {
177-
self.egraph[id].data.type_.clone()?;
178-
Ok(())
175+
fn check_type(&self, id: Id) -> Result<crate::types::DataType> {
176+
Ok(self.egraph[id].data.type_.clone()?)
179177
}
180178

181179
fn bind_explain(&mut self, query: Statement) -> Result {
@@ -196,7 +194,7 @@ fn split_name(name: &ObjectName) -> Result<(&str, &str, &str)> {
196194
}
197195

198196
/// Convert an object name into lower case
199-
fn lower_case_name(name: ObjectName) -> ObjectName {
197+
fn lower_case_name(name: &ObjectName) -> ObjectName {
200198
ObjectName(
201199
name.0
202200
.iter()

0 commit comments

Comments
 (0)