Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RRD manifests bootstrap #9053

Merged
merged 2 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/store/re_format_arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ pub fn format_record_batch_opts(
}

/// Nicely format this record batch, either with the given fixed width, or with the terminal width (`None`).
///
/// If `transposed` is `true`, the dataframe will be printed transposed on its diagonal axis.
/// This is very useful for wide (i.e. lots of columns), short (i.e. not many rows) datasets.
pub fn format_record_batch_with_width(
batch: &arrow::array::RecordBatch,
width: Option<usize>,
Expand Down
9 changes: 9 additions & 0 deletions crates/store/re_log_types/src/time_point/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ pub enum TimeType {
Sequence,
}

impl std::fmt::Display for TimeType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Time => f.write_str("time"),
Self::Sequence => f.write_str("sequence"),
}
}
}

impl TimeType {
#[inline]
fn hash(&self) -> u64 {
Expand Down
193 changes: 189 additions & 4 deletions crates/store/re_protos/proto/rerun/v0/remote_store.proto
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an ideal world, all of this new unstable manifest stuff should never make it into the main repo's proto definitions.

Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@ package rerun.remote_store.v0;
import "rerun/v0/common.proto";

service StorageNode {
// data API calls
// Data APIs

rpc Query(QueryRequest) returns (stream DataframePart) {}

rpc FetchRecording(FetchRecordingRequest) returns (stream rerun.common.v0.RerunChunk) {}

// Index APIs

rpc CreateIndex(CreateIndexRequest) returns (CreateIndexResponse) {}

rpc ReIndex(ReIndexRequest) returns (ReIndexResponse) {}

rpc GetChunkIds(GetChunkIdsRequest) returns (stream GetChunkIdsResponse) {}

rpc GetChunks(GetChunksRequest) returns (stream rerun.common.v0.RerunChunk) {}

// The response to `SearchIndex` a RecordBatch with 3 columns:
Expand All @@ -25,17 +31,32 @@ service StorageNode {
// - 'data' column with the data that is returned for the matched timepoints
rpc SearchIndex(SearchIndexRequest) returns (stream DataframePart) {}

// metadata API calls
// Chunk manifest APIs

rpc CreateManifests(CreateManifestsRequest) returns (CreateManifestsResponse) {}

rpc ListManifests(ListManifestsRequest) returns (stream DataframePart) {}

rpc QueryManifest(QueryManifestRequest) returns (stream DataframePart) {}

// TODO(zehiko, cmc): DeleteManifest

// Metadata APIs

rpc QueryCatalog(QueryCatalogRequest) returns (stream DataframePart) {}

rpc UpdateCatalog(UpdateCatalogRequest) returns (UpdateCatalogResponse) {}

rpc GetRecordingSchema(GetRecordingSchemaRequest) returns (GetRecordingSchemaResponse) {}

// Registration APIs

// TODO(zehiko) support registering more than one recording at a time
rpc RegisterRecording(RegisterRecordingRequest) returns (DataframePart) {}

rpc UnregisterRecording(UnregisterRecordingRequest) returns (UnregisterRecordingResponse) {}
rpc UnregisterAllRecordings(UnregisterAllRecordingsRequest)
returns (UnregisterAllRecordingsResponse) {}

rpc UnregisterAllRecordings(UnregisterAllRecordingsRequest) returns (UnregisterAllRecordingsResponse) {}
}

// ---------------- Common response message ------------------
Expand Down Expand Up @@ -192,6 +213,170 @@ message CatalogEntry {
string name = 1;
}


// ---------------- CreateManifests ------------------

// TODO(zehiko, cmc): At some point, this will need to be fully async (i.e. caller gets assigned
// a unique request ID and polls it for completion), but:
// A) Let's wait until we have a real need for this.
// B) That's true of everything in the platform, so this needs to be properly generalized.

message CreateManifestsRequest {
// Which catalog entry do we want to create manifests for?
CatalogEntry entry = 1;
}

message CreateManifestsResponse {}

// ---------------- ListManifests ------------------

message ListManifestsRequest {
// Which catalog entry do we want to list the manifests of?
CatalogEntry entry = 1;

// Generic parameters that will influence the behavior of the Lance scanner.
//
// TODO(zehiko, cmc): actually support those.
ScanParameters scan_parameters = 500;
}

message ListManifestsResponse {
rerun.common.v0.EncoderVersion encoder_version = 1;

// The record batch of the response, encoded according to `encoder_version`.
bytes payload = 2;
}

// ---------------- QueryManifest ------------------

// TODO(zehiko, cmc): Being able to specify only a collection ID rather than a resource ID could be
// super useful for cross-recording queries (resource_id becomes a column of the result).

// A manifest query will find all the relevant chunk IDs (and optionally a bunch of related metadata)
// for a given Rerun query (latest-at, range, etc).
//
// The result might contain duplicated chunk IDs, it is the responsibility of the caller to deduplicate
// them as needed.
message QueryManifestRequest {
// What resource are we querying the manifest for?
rerun.common.v0.RecordingId resource_id = 100;

// What columns of the manifest are we interested in?
ColumnProjection columns = 200;

// If true, `columns` will contain the entire schema.
bool columns_always_include_everything = 210;

// If true, `columns` always includes `chunk_id`,
bool columns_always_include_chunk_ids = 220;

// If true, `columns` always includes `byte_offset` and `byte_size`.
bool columns_always_include_byte_offsets = 230;

// If true, `columns` always includes all static component-level indexes.
bool columns_always_include_static_indexes = 240;

// If true, `columns` always includes all temporal chunk-level indexes.
bool columns_always_include_global_indexes = 250;

// If true, `columns` always includes all component-level indexes.
bool columns_always_include_component_indexes = 260;

// If specified, will perform a latest-at query with the given parameters.
//
// Incompatible with `range`.
QueryManifestLatestAtRelevantChunks latest_at = 300;

// If specified, will perform a range query with the given parameters.
//
// Incompatible with `latest_at`.
QueryManifestRangeRelevantChunks range = 400;

Comment on lines +288 to +294
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I understood, "incompatible" here means that these 2 are mutually exclusive and both shouldn't be defined? if so, have you chosen not to use oneof intentionally cause it comes with some obvious downsides? if nothing else, less ergonomic to use and forward breaking changes (although not sure how relevant that would be here)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes to all three of these questions.

// Generic parameters that will influence the behavior of the Lance scanner.
ScanParameters scan_parameters = 500;
}

message QueryManifestLatestAtRelevantChunks {
// Which index column should we perform the query on? E.g. `log_time`.
rerun.common.v0.IndexColumnSelector index = 1;

// What index value are we looking for?
int64 at = 2;

// Which components are we interested in?
//
// If left unspecified, all existing components are considered of interest.
//
// This will perform a basic fuzzy match on the available columns' descriptors.
// The fuzzy logic is a simple case-sensitive `contains()` query.
// For example, given a `log_tick__SeriesLine:StrokeWidth#width` index, all of the following
// would match: `SeriesLine:StrokeWidth#width`, `StrokeWidth`, `Stroke`, `Width`, `width`,
// `SeriesLine`, etc.
repeated string fuzzy_descriptors = 3;
}

message QueryManifestRangeRelevantChunks {
// Which index column should we perform the query on? E.g. `log_time`.
rerun.common.v0.IndexColumnSelector index = 1;

// What index range are we looking for?
rerun.common.v0.TimeRange index_range = 2;

// Which components are we interested in?
//
// If left unspecified, all existing components are considered of interest.
//
// This will perform a basic fuzzy match on the available columns' descriptors.
// The fuzzy logic is a simple case-sensitive `contains()` query.
// For example, given a `log_tick__SeriesLine:StrokeWidth#width` index, all of the following
// would match: `SeriesLine:StrokeWidth#width`, `StrokeWidth`, `Stroke`, `Width`, `width`,
// `SeriesLine`, etc.
repeated string fuzzy_descriptors = 3;
}

// Generic parameters that will influence the behavior of the Lance scanner.
//
// TODO(zehiko, cmc): This should be available for every endpoint that queries data in
// one way or another.
message ScanParameters {
// An arbitrary filter expression that will be passed to the Lance scanner as-is.
//
// ```text
// scanner.filter(filter)
// ```
string filter = 100;

// An arbitrary offset that will be passed to the Lance scanner as-is.
//
// ```text
// scanner.limit(_, limit_offset)
// ```
int64 limit_offset = 200;

// An arbitrary limit that will be passed to the Lance scanner as-is.
//
// ```text
// scanner.limit(limit_len, _)
// ```
int64 limit_len = 201;

// An arbitrary order clause that will be passed to the Lance scanner as-is.
//
// ```text
// scanner.order_by(…)
// ```
ScanParametersOrderClause order_by = 300;

// If set, the output of `scanner.explain_plan` will be dumped to the server's log.
bool explain = 400;
}

message ScanParametersOrderClause {
bool ascending = 10;
bool nulls_first = 20;
string column_name = 30;
}

// ---------------- GetRecordingSchema ------------------

message GetRecordingSchemaRequest {
Expand Down
Loading
Loading