Skip to content

Commit

Permalink
feature(collator): int queue single state
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Mar 4, 2025
1 parent 859ade1 commit d670431
Show file tree
Hide file tree
Showing 20 changed files with 707 additions and 1,105 deletions.
9 changes: 3 additions & 6 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use futures_util::future::BoxFuture;
use tycho_block_util::block::BlockIdRelation;
use tycho_collator::collator::CollatorStdImplFactory;
use tycho_collator::internal_queue::queue::{QueueConfig, QueueFactory, QueueFactoryStdImpl};
use tycho_collator::internal_queue::state::commited_state::CommittedStateImplFactory;
use tycho_collator::internal_queue::state::uncommitted_state::UncommittedStateImplFactory;
use tycho_collator::internal_queue::state::storage::QueueStateImplFactory;
use tycho_collator::manager::CollationManager;
use tycho_collator::mempool::MempoolAdapterStdImpl;
use tycho_collator::queue_adapter::MessageQueueAdapterStdImpl;
Expand Down Expand Up @@ -317,12 +316,10 @@ impl Node {
// Create collator
tracing::info!("starting collator");

let session_state_factory = UncommittedStateImplFactory::new(self.storage.clone());
let persistent_state_factory = CommittedStateImplFactory::new(self.storage.clone());
let queue_state_factory = QueueStateImplFactory::new(self.storage.clone());

let queue_factory = QueueFactoryStdImpl {
uncommitted_state_factory: session_state_factory,
committed_state_factory: persistent_state_factory,
state: queue_state_factory,
config: self.internal_queue_config,
};
let queue = queue_factory.create();
Expand Down
8 changes: 1 addition & 7 deletions collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,9 @@ impl Phase<FinalizeState> {
(None, Some(self.state.mc_data.make_block_ref()))
};

let version = if self.state.collation_data.block_id_short.seqno % 3 == 0 {
0
} else {
1
};

// build block info
let mut new_block_info = BlockInfo {
version,
version: 0,
key_block: matches!(&mc_state_extra, Some(extra) if extra.after_key_block),
shard: self.state.collation_data.block_id_short.shard,
seqno: self.state.collation_data.block_id_short.seqno,
Expand Down
2 changes: 1 addition & 1 deletion collator/src/collator/messages_reader/internals_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ impl<V: InternalMessageValue> InternalsPartitionReader<V> {
"check range limit: cannot get diff with stats from queue for block {}",
diff_block_id,
);
return CollatorError::Cancelled(CollationCancelReason::DiffNotFoundInQueue(
CollatorError::Cancelled(CollationCancelReason::DiffNotFoundInQueue(
diff_block_id,
))
})?;
Expand Down
9 changes: 3 additions & 6 deletions collator/src/collator/tests/messages_reader_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use super::{
use crate::collator::messages_buffer::MessageGroup;
use crate::collator::types::{AnchorsCache, ParsedMessage};
use crate::internal_queue::queue::{QueueFactory, QueueFactoryStdImpl};
use crate::internal_queue::state::commited_state::CommittedStateImplFactory;
use crate::internal_queue::state::uncommitted_state::UncommittedStateImplFactory;
use crate::internal_queue::state::storage::QueueStateImplFactory;
use crate::internal_queue::types::{DiffStatistics, EnqueuedMessage, InternalMessageValue};
use crate::mempool::{ExternalMessage, MempoolAnchor, MempoolAnchorId};
use crate::queue_adapter::{MessageQueueAdapter, MessageQueueAdapterStdImpl};
Expand Down Expand Up @@ -1484,11 +1483,9 @@ impl std::fmt::Debug for TestInternalMessageType {
async fn create_test_queue_adapter<V: InternalMessageValue>(
) -> Result<(Arc<dyn MessageQueueAdapter<V>>, tempfile::TempDir)> {
let (storage, tmp_dir) = Storage::new_temp().await?;
let uncommitted_state_factory = UncommittedStateImplFactory::new(storage.clone());
let committed_state_factory = CommittedStateImplFactory::new(storage.clone());
let committed_state_factory = QueueStateImplFactory::new(storage.clone());
let queue_factory = QueueFactoryStdImpl {
uncommitted_state_factory,
committed_state_factory,
state: committed_state_factory,
config: Default::default(),
};
let queue = queue_factory.create();
Expand Down
6 changes: 3 additions & 3 deletions collator/src/internal_queue/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_util::metrics::HistogramGuard;
use tycho_util::FastHashMap;

use crate::internal_queue::state::commited_state::CommittedState;
use crate::internal_queue::state::storage::QueueState;
use crate::internal_queue::types::{InternalMessageValue, QueueShardRange};
use crate::tracing_targets;

Expand All @@ -19,7 +19,7 @@ pub struct GcManager {

impl GcManager {
pub fn start<V: InternalMessageValue>(
committed_state: Arc<dyn CommittedState<V>>,
committed_state: Arc<dyn QueueState<V>>,
execution_interval: Duration,
) -> Self {
let delete_until = Arc::new(Mutex::new(GcRange::new()));
Expand Down Expand Up @@ -77,7 +77,7 @@ impl Drop for GcManager {

fn gc_task<V: InternalMessageValue>(
gc_state: Arc<Mutex<GcRange>>,
committed_state: Arc<dyn CommittedState<V>>,
committed_state: Arc<dyn QueueState<V>>,
delete_until: GcRange,
) {
let _histogram = HistogramGuard::begin("tycho_internal_queue_gc_execute_task_time");
Expand Down
Loading

0 comments on commit d670431

Please sign in to comment.