Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-49386: Write inital tech note #1

Merged
merged 6 commits into from
Mar 20, 2025
Merged

DM-49386: Write inital tech note #1

merged 6 commits into from
Mar 20, 2025

Conversation

dhirving
Copy link
Contributor

@dhirving dhirving commented Mar 8, 2025

No description provided.

@dhirving dhirving marked this pull request as ready for review March 10, 2025 04:43
username for Prompt Processing would be added. A retention period would be
configured on the topic to retain messages for a certain amount of days to allow
for events to be processed if the Butler database or the writer service is down.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add something here about Kafka partitions. Since, as I understand it, Kafka message ordering is only guaranteed within a partition, partitioning by something in the data id (e.g. detector) may be necessary to simultaneously enable scale-out parallelism while maintaining necessary ordering of messages. (Can't ingest datasets before their dimensions exist, nor do we want to update the collection chain before the datasets are available.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually thinking that dimension records + datasets + the collection chain update would all be together in one message, which would let us use round-robin partitioning if we wanted to.

But yeah, probably worth mentioning something here.


The retention period on the Prompt Processing Kakfa topic can also be use used as a
transaction log to replay event processing in the event of database corruption.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unembargo may also need to parallelize to provide sufficient bandwidth for moving data out of the Embargo Rack. This service would work similarly in that case.

We might also see whether it simplifies the implementation of batch "final jobs", which currently go to a separate, limited-parallelism queue to achieve many of the same effects that this service would achieve.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah when I saw the Slack traffic about final job rework I was wondering how analogous it was to this situation.

the dataset records into the database.

In addition, `Datastore.export()` copies the files to a specified location.
Prompt Processing pods could write the files to a staging area separate from
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, object stores only provide "Copy object" and "Remove object" operations, which may or may not be optimized to avoid an actual data copy, so this could possibly double bandwidth requirements. On (Weka) POSIX filesystems, there may still be a copy if the staging area is on a different filesystem from the final destination.

index.md Outdated
#### Inserting dataset records
Currently, the insertion of dataset records in Prompt Processing uses the
method `Butler.transfer_from()`. `transfer_from` is a compound operation that
both copies the artifact files and does updates the database.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, just reading up on this idea, but I caught a typo: you might mean does updates to the database or updates the database ?

Comment on lines +30 to +32
there are other potential issues lurking. For example, collection chain
updates take a lock that could cause one stalled pod to block completion of
all other work.
Copy link
Member

@kfindeisen kfindeisen Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue will be fixed as part of DM-45988 -- the init Job is a much more natural place for this operation.

(And FWIW, I haven't seen any blocking with the current hack -- instead, some chain updates fail.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though actually... maybe this proposal also solves DM-45988? The new service could merge requests for shared datasets (including init-outputs), though I'm not sure about the datastore.

Copy link
Member

@kfindeisen kfindeisen Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed at JTM. @dhirving said that the DM-45988 approach is safer from a provenance standpoint (there would be one true dataset ID, the one assigned by the init job).

index.md Outdated
Comment on lines 140 to 143
However, with writes eliminated, we no longer need to connect to the primary
database. We can set up a dedicated read replica for Prompt Processing,
insulating time-critical processing from other users/services that may
consume resources unpredictably on the primary database.
Copy link
Member

@kfindeisen kfindeisen Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth mentioning that this won't blind us to real-time updates to the repo, as discussed on DM-49293? It's probably obvious to people more familiar with the Butler architecture than me, but being explicit never hurt.

Copy link
Member

@timj timj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach looks okay. I think it was mentioned elsewhere that a similar approach could be considered for gathering outputs from batch production as quanta complete but care would be needed to make sure that datasets that are to be zipped at workflow completion (like the metadata/log/config files) are not sent to kafka.

At the end of execution, each Prompt Processing pod does the following writes
to the central Butler database:

1. Inserts dimension records (metadata) associated with the processing that occurred
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The central butler hasn't yet received the exposure/visit definitions and is waiting on the prompt processing to generate those (even though each of the 189 pods will have the same dimension metadata so only one write is needed?)

Copy link
Member

@kfindeisen kfindeisen Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposure definitions should be generated during raw ingest. But yes, we're the only source for the visits and visit-detectors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought transfers from summit do generate visit definitions in embargo rack but maybe that's happening too slowly for prompt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case, we only transfer dimension records if they're absent...

Instead of calling `transfer_from()`, we can use `Datastore.export()`.
`Datastore.export()` generates a `FileDataset` object containing all the
information necessary to insert a dataset record into the database.
`FileDataset` does not currently have a convenient JSON serialization, but one
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is currently possible to put the records in the DatasetRef if we really wanted to but I'm fine with a SerializedFileDataset model -- the refs are part of this and are you going to use list[SerializedDatasetRef] or the SerialiazedDatasetRefContainer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of matching the structure we write into the export YAML, since FileDataset is closely associated with that existing export, the format is straightforward/minimal and we have to keep that around for a long time anyway.

to their final location. This has the downside of potentially doubling the
bandwidth requirements for the file copy, depending on the S3 implementation.

Alternatively, the pods could write the files directly to their final
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consistency problems because the file copy has to complete before you send the kafka message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm mostly concerned about a mismatch of the datastore configuration between the two sides. Or something like accidental duplication of data IDs that results in some kind of collision between two writes, or we end up writing files that are ultimately rejected or never seen by the registry.

@dhirving dhirving merged commit f5ff5fc into main Mar 20, 2025
2 checks passed
@dhirving dhirving deleted the tickets/DM-49386 branch March 20, 2025 22:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants