diff --git a/Cargo.lock b/Cargo.lock index d77084e92..bd1448492 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -428,6 +428,20 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch 0.9.7", + "crossbeam-queue", + "crossbeam-utils 0.8.7", +] + [[package]] name = "crossbeam-channel" version = "0.5.2" @@ -477,6 +491,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dd435b205a4842da59efd07628f921c096bc1cc0a156835b4fa0bcb9a19bcce" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils 0.8.7", +] + [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -520,6 +544,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctor" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccc0a48a9b826acdf4028595adc9db92caea352f7af011a3034acd172a52a0aa" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "dirs" version = "4.0.0" @@ -711,6 +745,7 @@ checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -753,6 +788,17 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.21" @@ -803,10 +849,13 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -820,7 +869,7 @@ checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" dependencies = [ "cfg-if 1.0.0", "libc", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", ] [[package]] @@ -1052,6 +1101,45 @@ dependencies = [ "autocfg", ] +[[package]] +name = "minitrace" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a07fdf302cc0591c97eb45939550ddaddd9962e400c20b319aa16c244cb1f16" +dependencies = [ + "crossbeam", + "futures", + "minitrace-macro", + "minstant", + "once_cell", + "parking_lot 0.11.2", + "pin-project", + "retain_mut", +] + +[[package]] +name = "minitrace-macro" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4132dfe6097f4a90c0bbb34be0687c38d14303dd2e74f8442ae80e9bc5a34c47" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "minstant" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cb320648b7883b43ce5dfbc5c6f4a84038194c3f67b4fcb7d05c994e6006557" +dependencies = [ + "ctor", + "libc", + "wasi 0.7.0", +] + [[package]] name = "mio" version = "0.8.0" @@ -1443,7 +1531,7 @@ dependencies = [ "mach", "once_cell", "raw-cpuid", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", "web-sys", "winapi", ] @@ -1561,6 +1649,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "retain_mut" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086" + [[package]] name = "risinglight" version = "0.1.0" @@ -1595,6 +1689,7 @@ dependencies = [ "iter-chunks", "itertools", "manifest-dir-macros", + "minitrace", "moka", "num-traits", "parking_lot 0.12.0", @@ -2239,6 +2334,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "wasi" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b89c3ce4ce14bdc6fb6beaf9ec7928ca331de5df7e5ea278375642a2f478570d" + [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 38dc0fd37..6c21803c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ version = "0.1.0" default = ["jemalloc"] simd = [] jemalloc = ["tikv-jemallocator"] +enable_tracing = [] [dependencies] anyhow = "1" @@ -56,6 +57,7 @@ tokio = { version = "1", features = ["full"] } tokio-util = "0.7" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "parking_lot"] } +minitrace = "0.4.0" [dev-dependencies] criterion = { version = "0.3", features = ["async_tokio"] } diff --git a/src/db.rs b/src/db.rs index 7fc6ea179..2de35c8ab 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use futures::TryStreamExt; +use minitrace::prelude::*; use risinglight_proto::rowset::block_statistics::BlockStatisticsType; use tracing::debug; @@ -193,12 +194,21 @@ impl Database { let optimized_plan = optimizer.optimize(logical_plan); debug!("{:#?}", optimized_plan); - let executor_builder = ExecutorBuilder::new(context.clone(), self.storage.clone()); - let executor = executor_builder.clone().build(optimized_plan); - let output: Vec = executor.try_collect().await.map_err(|e| { - debug!("error: {}", e); - e - })?; + let mut executor_builder = ExecutorBuilder::new(context.clone(), self.storage.clone()); + let executor = executor_builder.build(optimized_plan); + + let (root, _collector) = Span::root("root"); + let output: Vec = if cfg!(feature = "enable_tracing") { + executor.try_collect().in_span(root).await.map_err(|e| { + debug!("error: {}", e); + e + })? + } else { + executor.try_collect().await.map_err(|e| { + debug!("error: {}", e); + e + })? + }; for chunk in &output { debug!("output:\n{}", chunk); } @@ -207,6 +217,10 @@ impl Database { chunk.set_header(column_names); } outputs.push(chunk); + #[cfg(feature = "enable_tracing")] + let records: Vec = _collector.collect().await; + #[cfg(feature = "enable_tracing")] + println!("{records:#?}"); } Ok(outputs) } diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 236dff7fe..658c45c37 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use futures::stream::{BoxStream, StreamExt}; use futures_async_stream::{for_await, try_stream}; use itertools::Itertools; +use minitrace::prelude::*; use tokio_util::sync::CancellationToken; pub use self::aggregation::*; @@ -136,6 +137,17 @@ impl ExecutorBuilder { pub fn build(&mut self, plan: PlanRef) -> BoxedExecutor { self.visit(plan).unwrap() } + + #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] + pub async fn trace_execute(mut executor_stream: BoxedExecutor, identifier: &'static str) { + while let Some(item) = executor_stream + .next() + .in_span(Span::enter_with_local_parent(identifier)) + .await + { + yield item? + } + } } /// Helper function to select the given future along with cancellation token. @@ -210,69 +222,82 @@ impl ExecutorExt for BoxedExecutor { impl PlanVisitor for ExecutorBuilder { fn visit_dummy(&mut self, _plan: &Dummy) -> Option { - Some(DummyScanExecutor.execute()) + Some(ExecutorBuilder::trace_execute( + DummyScanExecutor.execute(), + "DummyScanExecutor", + )) } fn visit_internal(&mut self, plan: &Internal) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( InternalTableExecutor { table_name: plan.table_name(), } .execute(), - ) + "InternalTableExecutor", + )) } fn visit_physical_create_table(&mut self, plan: &PhysicalCreateTable) -> Option { - Some(match &self.storage { - StorageImpl::InMemoryStorage(storage) => CreateTableExecutor { - plan: plan.clone(), - storage: storage.clone(), - } - .execute(), - StorageImpl::SecondaryStorage(storage) => CreateTableExecutor { - plan: plan.clone(), - storage: storage.clone(), - } - .execute(), - }) + Some(ExecutorBuilder::trace_execute( + match &self.storage { + StorageImpl::InMemoryStorage(storage) => CreateTableExecutor { + plan: plan.clone(), + storage: storage.clone(), + } + .execute(), + StorageImpl::SecondaryStorage(storage) => CreateTableExecutor { + plan: plan.clone(), + storage: storage.clone(), + } + .execute(), + }, + "CreateTableExecutor", + )) } fn visit_physical_drop(&mut self, plan: &PhysicalDrop) -> Option { - Some(match &self.storage { - StorageImpl::InMemoryStorage(storage) => DropExecutor { - plan: plan.clone(), - storage: storage.clone(), - } - .execute(), - StorageImpl::SecondaryStorage(storage) => DropExecutor { - plan: plan.clone(), - storage: storage.clone(), - } - .execute(), - }) + Some(ExecutorBuilder::trace_execute( + match &self.storage { + StorageImpl::InMemoryStorage(storage) => DropExecutor { + plan: plan.clone(), + storage: storage.clone(), + } + .execute(), + StorageImpl::SecondaryStorage(storage) => DropExecutor { + plan: plan.clone(), + storage: storage.clone(), + } + .execute(), + }, + "DropExecutor", + )) } fn visit_physical_insert(&mut self, plan: &PhysicalInsert) -> Option { - Some(match &self.storage { - StorageImpl::InMemoryStorage(storage) => InsertExecutor { - context: self.context.clone(), - table_ref_id: plan.logical().table_ref_id(), - column_ids: plan.logical().column_ids().to_vec(), - storage: storage.clone(), - child: self.visit(plan.child()).unwrap(), - } - .execute() - .cancellable(self.context.token().child_token()), - StorageImpl::SecondaryStorage(storage) => InsertExecutor { - context: self.context.clone(), - table_ref_id: plan.logical().table_ref_id(), - column_ids: plan.logical().column_ids().to_vec(), - storage: storage.clone(), - child: self.visit(plan.child()).unwrap(), - } - .execute() - .cancellable(self.context.token().child_token()), - }) + Some(ExecutorBuilder::trace_execute( + match &self.storage { + StorageImpl::InMemoryStorage(storage) => InsertExecutor { + context: self.context.clone(), + table_ref_id: plan.logical().table_ref_id(), + column_ids: plan.logical().column_ids().to_vec(), + storage: storage.clone(), + child: self.visit(plan.child()).unwrap(), + } + .execute() + .cancellable(self.context.token().child_token()), + StorageImpl::SecondaryStorage(storage) => InsertExecutor { + context: self.context.clone(), + table_ref_id: plan.logical().table_ref_id(), + column_ids: plan.logical().column_ids().to_vec(), + storage: storage.clone(), + child: self.visit(plan.child()).unwrap(), + } + .execute() + .cancellable(self.context.token().child_token()), + }, + "InsertExecutor", + )) } fn visit_physical_nested_loop_join( @@ -281,7 +306,7 @@ impl PlanVisitor for ExecutorBuilder { ) -> Option { let left_child = self.visit(plan.left()).unwrap(); let right_child = self.visit(plan.right()).unwrap(); - Some( + Some(ExecutorBuilder::trace_execute( NestedLoopJoinExecutor { left_child, right_child, @@ -291,73 +316,81 @@ impl PlanVisitor for ExecutorBuilder { right_types: plan.right().out_types(), } .execute(), - ) + "NestedLoopJoinExecutor", + )) } fn visit_physical_table_scan(&mut self, plan: &PhysicalTableScan) -> Option { - Some(match &self.storage { - StorageImpl::InMemoryStorage(storage) => TableScanExecutor { - context: self.context.clone(), - plan: plan.clone(), - expr: None, - storage: storage.clone(), - } - .execute() - .cancellable(self.context.token().child_token()), - StorageImpl::SecondaryStorage(storage) => TableScanExecutor { - context: self.context.clone(), - plan: plan.clone(), - expr: plan.logical().expr().cloned(), - storage: storage.clone(), - } - .execute() - .cancellable(self.context.token().child_token()), - }) + Some(ExecutorBuilder::trace_execute( + match &self.storage { + StorageImpl::InMemoryStorage(storage) => TableScanExecutor { + context: self.context.clone(), + plan: plan.clone(), + expr: None, + storage: storage.clone(), + } + .execute() + .cancellable(self.context.token().child_token()), + StorageImpl::SecondaryStorage(storage) => TableScanExecutor { + context: self.context.clone(), + plan: plan.clone(), + expr: plan.logical().expr().cloned(), + storage: storage.clone(), + } + .execute() + .cancellable(self.context.token().child_token()), + }, + "TableScanExecutor", + )) } fn visit_physical_projection(&mut self, plan: &PhysicalProjection) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( ProjectionExecutor { project_expressions: plan.logical().project_expressions().to_vec(), child: self.visit(plan.child()).unwrap(), } .execute(), - ) + "ProjectionExecutor", + )) } fn visit_physical_filter(&mut self, plan: &PhysicalFilter) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( FilterExecutor { expr: plan.logical().expr().clone(), child: self.visit(plan.child()).unwrap(), } .execute(), - ) + "FilterExecutor", + )) } fn visit_physical_order(&mut self, plan: &PhysicalOrder) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( OrderExecutor { comparators: plan.logical().comparators().to_vec(), child: self.visit(plan.child()).unwrap(), } .execute(), - ) + "OrderExecutor", + )) } fn visit_physical_limit(&mut self, plan: &PhysicalLimit) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( LimitExecutor { child: self.visit(plan.child()).unwrap(), offset: plan.logical().offset(), limit: plan.logical().limit(), } .execute(), - ) + "LimitExecutor", + )) } fn visit_physical_top_n(&mut self, plan: &PhysicalTopN) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( TopNExecutor { child: self.visit(plan.child()).unwrap(), offset: plan.logical().offset(), @@ -365,22 +398,27 @@ impl PlanVisitor for ExecutorBuilder { comparators: plan.logical().comparators().to_owned(), } .execute(), - ) + "TopNExecutor", + )) } fn visit_physical_explain(&mut self, plan: &PhysicalExplain) -> Option { - Some(ExplainExecutor { plan: plan.clone() }.execute()) + Some(ExecutorBuilder::trace_execute( + ExplainExecutor { plan: plan.clone() }.execute(), + "ExplainExecutor", + )) } fn visit_physical_hash_agg(&mut self, plan: &PhysicalHashAgg) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( HashAggExecutor { agg_calls: plan.logical().agg_calls().to_vec(), group_keys: plan.logical().group_keys().to_vec(), child: self.visit(plan.child()).unwrap(), } .execute(), - ) + "HashAggExecutor", + )) } fn visit_physical_hash_join(&mut self, plan: &PhysicalHashJoin) -> Option { @@ -395,7 +433,7 @@ impl PlanVisitor for ExecutorBuilder { .iter() .map(|(left, right)| (left.index, right.index - left_col_num)) .unzip(); - Some( + Some(ExecutorBuilder::trace_execute( HashJoinExecutor { left_child, right_child, @@ -407,67 +445,74 @@ impl PlanVisitor for ExecutorBuilder { right_types: plan.right().out_types(), } .execute(), - ) + "HashJoinExecutor", + )) } fn visit_physical_simple_agg(&mut self, plan: &PhysicalSimpleAgg) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( SimpleAggExecutor { agg_calls: plan.agg_calls().to_vec(), child: self.visit(plan.child()).unwrap(), } .execute(), - ) + "SimpleAggExecutor", + )) } fn visit_physical_delete(&mut self, plan: &PhysicalDelete) -> Option { let child = self.visit(plan.child()).unwrap(); - Some(match &self.storage { - StorageImpl::InMemoryStorage(storage) => DeleteExecutor { - context: self.context.clone(), - child, - table_ref_id: plan.logical().table_ref_id(), - storage: storage.clone(), - } - .execute() - .cancellable(self.context.token().child_token()), - StorageImpl::SecondaryStorage(storage) => DeleteExecutor { - context: self.context.clone(), - child, - table_ref_id: plan.logical().table_ref_id(), - storage: storage.clone(), - } - .execute() - .cancellable(self.context.token().child_token()), - }) + Some(ExecutorBuilder::trace_execute( + match &self.storage { + StorageImpl::InMemoryStorage(storage) => DeleteExecutor { + context: self.context.clone(), + child, + table_ref_id: plan.logical().table_ref_id(), + storage: storage.clone(), + } + .execute() + .cancellable(self.context.token().child_token()), + StorageImpl::SecondaryStorage(storage) => DeleteExecutor { + context: self.context.clone(), + child, + table_ref_id: plan.logical().table_ref_id(), + storage: storage.clone(), + } + .execute() + .cancellable(self.context.token().child_token()), + }, + "DeleteExecutor", + )) } fn visit_physical_values(&mut self, plan: &PhysicalValues) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( ValuesExecutor { column_types: plan.logical().column_types().to_vec(), values: plan.logical().values().to_vec(), } .execute(), - ) + "ValuesExecutor", + )) } fn visit_physical_copy_from_file( &mut self, plan: &PhysicalCopyFromFile, ) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( CopyFromFileExecutor { context: self.context.clone(), plan: plan.clone(), } .execute() .cancellable(self.context.token().child_token()), - ) + "CopyFromFileExecutor", + )) } fn visit_physical_copy_to_file(&mut self, plan: &PhysicalCopyToFile) -> Option { - Some( + Some(ExecutorBuilder::trace_execute( CopyToFileExecutor { context: self.context.clone(), child: self.visit(plan.child()).unwrap(), @@ -476,6 +521,7 @@ impl PlanVisitor for ExecutorBuilder { } .execute() .cancellable(self.context.token().child_token()), - ) + "CopyToFileExecutor", + )) } }