diff --git a/go/cli/mcap/cmd/add.go b/go/cli/mcap/cmd/add.go new file mode 100644 index 0000000000..20b48f6042 --- /dev/null +++ b/go/cli/mcap/cmd/add.go @@ -0,0 +1,14 @@ +package cmd + +import "github.com/spf13/cobra" + +var addCmd = &cobra.Command{ + Use: "add", + Short: "add records to an existing mcap file", + Run: func(cmd *cobra.Command, args []string) { + }, +} + +func init() { + rootCmd.AddCommand(addCmd) +} diff --git a/go/cli/mcap/cmd/attachment.go b/go/cli/mcap/cmd/attachment.go new file mode 100644 index 0000000000..ef24a27f65 --- /dev/null +++ b/go/cli/mcap/cmd/attachment.go @@ -0,0 +1,190 @@ +package cmd + +import ( + "context" + "fmt" + "io" + "os" + "time" + + "github.com/foxglove/mcap/go/cli/mcap/utils" + "github.com/foxglove/mcap/go/mcap" + "github.com/spf13/cobra" +) + +var ( + addAttachmentLogTime uint64 + addAttachmentCreationTime uint64 + addAttachmentFilename string + addAttachmentMediaType string +) + +var ( + getAttachmentName string + getAttachmentOffset uint64 + getAttachmentOutput string +) + +func getAttachment(w io.Writer, rs io.ReadSeeker, idx *mcap.AttachmentIndex) error { + _, err := rs.Seek(int64( + idx.Offset+ + 1+ // opcode + 8+ // record length + 8+ // log time + 8+ // creation time + 4+ // name length + uint64(len(idx.Name))+ + 4+ // content type length + uint64(len(idx.MediaType))+ + 8), // data length + io.SeekStart) + if err != nil { + return fmt.Errorf("failed to seek to offset %d: %w", idx.Offset, err) + } + _, err = io.CopyN(w, rs, int64(idx.DataSize)) + if err != nil { + return fmt.Errorf("failed to copy attachment to output: %w", err) + } + return nil +} + +var getAttachmentCmd = &cobra.Command{ + Use: "attachment", + Short: "Get an attachment by name or offset", + Run: func(cmd *cobra.Command, args []string) { + ctx := context.Background() + if len(args) != 1 { + die("Unexpected number of args") + } + filename := args[0] + + var output io.Writer + var err error + if getAttachmentOutput == "" { + if !utils.StdoutRedirected() { + die(PleaseRedirect) + } + output = os.Stdout + } else { + output, err = os.Create(getAttachmentOutput) + if err != nil { + die("failed to create output file: %s", err) + } + } + + err = utils.WithReader(ctx, filename, func(_ bool, rs io.ReadSeeker) error { + reader, err := mcap.NewReader(rs) + if err != nil { + return fmt.Errorf("failed to construct reader: %w", err) + } + info, err := reader.Info() + if err != nil { + return fmt.Errorf("failed to get mcap info: %w", err) + } + attachments := make(map[string][]*mcap.AttachmentIndex) + for _, attachmentIdx := range info.AttachmentIndexes { + attachments[attachmentIdx.Name] = append( + attachments[attachmentIdx.Name], + attachmentIdx, + ) + } + + switch { + case len(attachments[getAttachmentName]) == 0: + die("attachment %s not found", getAttachmentName) + case len(attachments[getAttachmentName]) == 1: + getAttachment(output, rs, attachments[getAttachmentName][0]) + case len(attachments[getAttachmentName]) > 1: + if getAttachmentOffset == 0 { + return fmt.Errorf( + "multiple attachments named %s exist. Specify an offset.", + getAttachmentName, + ) + } + for _, idx := range attachments[getAttachmentName] { + if idx.Offset == getAttachmentOffset { + return getAttachment(output, rs, idx) + } + } + return fmt.Errorf( + "failed to find attachment %s at offset %d", + getAttachmentName, + getAttachmentOffset, + ) + } + return nil + }) + if err != nil { + die("failed to extract attachment: %s", err) + } + }, +} + +var addAttachmentCmd = &cobra.Command{ + Use: "attachment", + Short: "Add an attachment to an mcap file", + Run: func(cmd *cobra.Command, args []string) { + ctx := context.Background() + if len(args) != 1 { + die("Unexpected number of args") + } + filename := args[0] + tempName := filename + ".new" + tmpfile, err := os.Create(tempName) + if err != nil { + die("failed to create temp file: %s", err) + } + attachment, err := os.ReadFile(addAttachmentFilename) + if err != nil { + die("failed to read attachment: %s", err) + } + err = utils.WithReader(ctx, filename, func(remote bool, rs io.ReadSeeker) error { + if remote { + die("not supported on remote mcap files") + } + fi, err := os.Stat(addAttachmentFilename) + if err != nil { + die("failed to stat file %s", addAttachmentFilename) + } + createTime := uint64(fi.ModTime().UTC().UnixNano()) + if addAttachmentCreationTime > 0 { + createTime = addAttachmentCreationTime + } + logTime := uint64(time.Now().UTC().UnixNano()) + if addAttachmentLogTime > 0 { + logTime = addAttachmentLogTime + } + return utils.RewriteMCAP(tmpfile, rs, func(w *mcap.Writer) error { + return w.WriteAttachment(&mcap.Attachment{ + LogTime: logTime, + CreateTime: createTime, + Name: addAttachmentFilename, + MediaType: addAttachmentMediaType, + Data: attachment, + }) + }) + }) + if err != nil { + die("failed to add attachment: %s", err) + } + err = os.Rename(tempName, filename) + if err != nil { + die("failed to rename temporary output: %s", err) + } + }, +} + +func init() { + addCmd.AddCommand(addAttachmentCmd) + addAttachmentCmd.PersistentFlags().StringVarP(&addAttachmentFilename, "file", "f", "", "filename of attachment to add") + addAttachmentCmd.PersistentFlags().StringVarP(&addAttachmentMediaType, "content-type", "", "application/octet-stream", "content type of attachment") + addAttachmentCmd.PersistentFlags().Uint64VarP(&addAttachmentLogTime, "log-time", "", 0, "attachment log time in nanoseconds (defaults to current timestamp)") + addAttachmentCmd.PersistentFlags().Uint64VarP(&addAttachmentLogTime, "creation-time", "", 0, "attachment creation time in nanoseconds (defaults to ctime)") + addAttachmentCmd.MarkPersistentFlagRequired("file") + + getCmd.AddCommand(getAttachmentCmd) + getAttachmentCmd.PersistentFlags().StringVarP(&getAttachmentName, "name", "n", "", "name of attachment to extract") + getAttachmentCmd.PersistentFlags().Uint64VarP(&getAttachmentOffset, "offset", "", 0, "offset of attachment to extract") + getAttachmentCmd.PersistentFlags().StringVarP(&getAttachmentOutput, "output", "o", "", "location to write attachment to") + getAttachmentCmd.MarkPersistentFlagRequired("name") +} diff --git a/go/cli/mcap/cmd/attachments.go b/go/cli/mcap/cmd/attachments.go index 5719073980..56b683a950 100644 --- a/go/cli/mcap/cmd/attachments.go +++ b/go/cli/mcap/cmd/attachments.go @@ -14,17 +14,19 @@ import ( func printAttachments(w io.Writer, attachmentIndexes []*mcap.AttachmentIndex) { rows := make([][]string, 0, len(attachmentIndexes)) rows = append(rows, []string{ - "log time", "name", "media type", + "log time", + "creation time", "content length", "offset", }) for _, idx := range attachmentIndexes { row := []string{ - fmt.Sprintf("%d", idx.LogTime), idx.Name, idx.MediaType, + fmt.Sprintf("%d", idx.LogTime), + fmt.Sprintf("%d", idx.CreateTime), fmt.Sprintf("%d", idx.DataSize), fmt.Sprintf("%d", idx.Offset), } @@ -51,7 +53,7 @@ var attachmentsCmd = &cobra.Command{ if err != nil { return fmt.Errorf("failed to get info: %w", err) } - printChunks(os.Stdout, info.ChunkIndexes) + printAttachments(os.Stdout, info.AttachmentIndexes) return nil }) if err != nil { diff --git a/go/cli/mcap/cmd/filter.go b/go/cli/mcap/cmd/filter.go index d8f92c598c..bddfa1b596 100644 --- a/go/cli/mcap/cmd/filter.go +++ b/go/cli/mcap/cmd/filter.go @@ -122,7 +122,7 @@ usage: var writer io.Writer if filterOutput == "" { if !utils.StdoutRedirected() { - die("Binary output can screw up your terminal. Supply -o or redirect to a file or pipe") + die(PleaseRedirect) } writer = os.Stdout } else { diff --git a/go/cli/mcap/cmd/get.go b/go/cli/mcap/cmd/get.go new file mode 100644 index 0000000000..6396d18406 --- /dev/null +++ b/go/cli/mcap/cmd/get.go @@ -0,0 +1,14 @@ +package cmd + +import "github.com/spf13/cobra" + +var getCmd = &cobra.Command{ + Use: "get", + Short: "get a record from an mcap file", + Run: func(cmd *cobra.Command, args []string) { + }, +} + +func init() { + rootCmd.AddCommand(getCmd) +} diff --git a/go/cli/mcap/cmd/merge.go b/go/cli/mcap/cmd/merge.go index 7320ea2db2..6431f94dd4 100644 --- a/go/cli/mcap/cmd/merge.go +++ b/go/cli/mcap/cmd/merge.go @@ -240,7 +240,7 @@ var mergeCmd = &cobra.Command{ Short: "Merge a selection of mcap files by record timestamp", Run: func(cmd *cobra.Command, args []string) { if mergeOutputFile == "" && !utils.StdoutRedirected() { - die("Binary output can screw up your terminal. Supply -o or redirect to a file or pipe.") + die(PleaseRedirect) } var readers []io.Reader for _, arg := range args { diff --git a/go/cli/mcap/cmd/metadata.go b/go/cli/mcap/cmd/metadata.go index 9141015618..368e5da97b 100644 --- a/go/cli/mcap/cmd/metadata.go +++ b/go/cli/mcap/cmd/metadata.go @@ -1,19 +1,28 @@ package cmd import ( - "bytes" "context" "encoding/json" "fmt" "io" "math" "os" + "strings" "github.com/foxglove/mcap/go/cli/mcap/utils" "github.com/foxglove/mcap/go/mcap" "github.com/spf13/cobra" ) +var ( + addMetadataKeyValues []string + addMetadataName string +) + +var ( + getMetadataName string +) + func printMetadata(w io.Writer, r io.ReadSeeker, info *mcap.Info) error { rows := make([][]string, 0, len(info.MetadataIndexes)) rows = append(rows, []string{ @@ -45,17 +54,15 @@ func printMetadata(w io.Writer, r io.ReadSeeker, info *mcap.Info) error { if err != nil { return fmt.Errorf("failed to marshal metadata to JSON: %w", err) } - - indented := &bytes.Buffer{} - err = json.Indent(indented, jsonSerialized, "", " ") + prettyJSON, err := utils.PrettyJSON(jsonSerialized) if err != nil { - return fmt.Errorf("failed to indent JSON: %w", err) + return fmt.Errorf("failed to pretty JSON: %w", err) } rows = append(rows, []string{ idx.Name, fmt.Sprintf("%d", idx.Offset), fmt.Sprintf("%d", idx.Length), - indented.String(), + prettyJSON, }) } utils.FormatTable(w, rows) @@ -88,6 +95,129 @@ var listMetadataCmd = &cobra.Command{ }, } +var addMetadataCmd = &cobra.Command{ + Use: "metadata", + Short: "Add metadata to an mcap file", + Run: func(cmd *cobra.Command, args []string) { + ctx := context.Background() + if len(args) != 1 { + die("Unexpected number of args") + } + filename := args[0] + tempName := filename + ".new" + tmpfile, err := os.Create(tempName) + if err != nil { + die("failed to create temp file: %s", err) + } + metadata := make(map[string]string) + for _, kv := range addMetadataKeyValues { + parts := strings.FieldsFunc(kv, func(c rune) bool { + return c == '=' + }) + if len(parts) != 2 { + die("failed to parse key/value %s", kv) + } + metadata[parts[0]] = parts[1] + } + err = utils.WithReader(ctx, filename, func(remote bool, rs io.ReadSeeker) error { + if remote { + die("not supported on remote mcap files") + } + return utils.RewriteMCAP(tmpfile, rs, func(w *mcap.Writer) error { + return w.WriteMetadata(&mcap.Metadata{ + Name: addMetadataName, + Metadata: metadata, + }) + }) + }) + if err != nil { + die("failed to add metadata: %s", err) + } + err = os.Rename(tempName, filename) + if err != nil { + die("failed to rename temporary output: %s", err) + } + }, +} + +var getMetadataCmd = &cobra.Command{ + Use: "metadata", + Short: "get metadata by name", + Run: func(cmd *cobra.Command, args []string) { + ctx := context.Background() + if len(args) != 1 { + die("Unexpected number of args") + } + filename := args[0] + err := utils.WithReader(ctx, filename, func(_ bool, rs io.ReadSeeker) error { + reader, err := mcap.NewReader(rs) + if err != nil { + return fmt.Errorf("failed to build reader: %w", err) + } + info, err := reader.Info() + if err != nil { + return fmt.Errorf("failed to collect mcap info: %w", err) + } + + output := make(map[string]string) + + metadataIndexes := make(map[string][]*mcap.MetadataIndex) + for _, idx := range info.MetadataIndexes { + metadataIndexes[idx.Name] = append(metadataIndexes[idx.Name], idx) + } + indexes, ok := metadataIndexes[getMetadataName] + if !ok { + return fmt.Errorf("metadata %s does not exist", getMetadataName) + } + + for _, idx := range indexes { + _, err = rs.Seek(int64(idx.Offset+1+8), io.SeekStart) + if err != nil { + return fmt.Errorf("failed to seek to metadata record at %d: %w", idx.Offset, err) + } + data := make([]byte, idx.Length) + _, err = io.ReadFull(rs, data) + if err != nil { + return fmt.Errorf("failed to read metadata record: %w", err) + } + record, err := mcap.ParseMetadata(data) + if err != nil { + return fmt.Errorf("failed to parse metadata: %w", err) + } + for k, v := range record.Metadata { + output[k] = v + } + } + + jsonBytes, err := json.Marshal(output) + if err != nil { + return fmt.Errorf("failed to marshal output to JSON: %w", err) + } + prettyJSON, err := utils.PrettyJSON(jsonBytes) + if err != nil { + return fmt.Errorf("failed to pretty JSON: %w", err) + } + _, err = os.Stdout.Write([]byte(prettyJSON + "\n")) + if err != nil { + return fmt.Errorf("failed to write metadata to output: %w", err) + } + return nil + }) + if err != nil { + die("failed to fetch metadata: %s", err) + } + }, +} + func init() { listCmd.AddCommand(listMetadataCmd) + + addCmd.AddCommand(addMetadataCmd) + addMetadataCmd.PersistentFlags().StringVarP(&addMetadataName, "name", "n", "", "name of metadata record to add") + addMetadataCmd.PersistentFlags().StringSliceVarP(&addMetadataKeyValues, "key", "k", []string{}, "key=value pair") + addMetadataCmd.MarkPersistentFlagRequired("name") + + getCmd.AddCommand(getMetadataCmd) + getMetadataCmd.PersistentFlags().StringVarP(&getMetadataName, "name", "n", "", "name of metadata record to create") + getMetadataCmd.MarkPersistentFlagRequired("name") } diff --git a/go/cli/mcap/cmd/root.go b/go/cli/mcap/cmd/root.go index d8bcc83526..11e53635d4 100644 --- a/go/cli/mcap/cmd/root.go +++ b/go/cli/mcap/cmd/root.go @@ -15,6 +15,8 @@ var rootCmd = &cobra.Command{ Short: "\U0001F52A Officially the top-rated CLI tool for slicing and dicing MCAP files.", } +var PleaseRedirect = "Binary output can screw up your terminal. Supply -o or redirect to a file or pipe" + func Execute() { cobra.CheckErr(rootCmd.Execute()) } diff --git a/go/cli/mcap/utils/utils.go b/go/cli/mcap/utils/utils.go index 2bdaab14e5..9a98b09f10 100644 --- a/go/cli/mcap/utils/utils.go +++ b/go/cli/mcap/utils/utils.go @@ -1,13 +1,17 @@ package utils import ( + "bytes" "context" + "encoding/json" + "errors" "fmt" "io" "os" "regexp" "cloud.google.com/go/storage" + "github.com/foxglove/mcap/go/mcap" "github.com/olekukonko/tablewriter" ) @@ -111,6 +115,140 @@ func FormatTable(w io.Writer, rows [][]string) { tw.Render() } +func inferWriterOptions(info *mcap.Info) *mcap.WriterOptions { + // assume if there are no chunk indexes, the file is not chunked. This + // assumption may be invalid if the file is chunked but not indexed. + if len(info.ChunkIndexes) == 0 { + return &mcap.WriterOptions{ + Chunked: false, + } + } + // if there are chunk indexes, create a chunked output with attributes + // approximating those of the first chunk. + idx := info.ChunkIndexes[0] + return &mcap.WriterOptions{ + IncludeCRC: true, + Chunked: true, + ChunkSize: int64(idx.ChunkLength), + Compression: idx.Compression, + } +} + +// RewriteMCAP rewrites the mcap file wrapped by the provided ReadSeeker and +// performs the operations described by the supplied fns at the end of writing. +// It is used for adding metadata and attachments to an existing file. In the +// future this can be optimized to rewrite only the summary section, which +// should make it run much faster but will require some tricky modifications of +// indexes pointing into the summary section. +func RewriteMCAP(w io.Writer, r io.ReadSeeker, fns ...func(writer *mcap.Writer) error) error { + reader, err := mcap.NewReader(r) + if err != nil { + return fmt.Errorf("failed to open mcap reader: %w", err) + } + info, err := reader.Info() + if err != nil { + return fmt.Errorf("failed to get mcap info") + } + opts := inferWriterOptions(info) + writer, err := mcap.NewWriter(w, opts) + if err != nil { + return fmt.Errorf("failed to construct mcap writer: %w", err) + } + defer writer.Close() + if err := writer.WriteHeader(info.Header); err != nil { + return fmt.Errorf("failed to rewrite header: %w", err) + } + _, err = r.Seek(0, io.SeekStart) + if err != nil { + return fmt.Errorf("failed to seek to reader start: %w", err) + } + lexer, err := mcap.NewLexer(r, &mcap.LexerOptions{ + SkipMagic: false, + ValidateCRC: false, + EmitChunks: false, + }) + if err != nil { + return fmt.Errorf("failed to construct lexer: %w", err) + } + buf := make([]byte, 1024) + schemas := make(map[uint16]bool) + channels := make(map[uint16]bool) + for { + tokenType, token, err := lexer.Next(buf) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return fmt.Errorf("failed to pull next record: %w", err) + } + if len(token) > len(buf) { + buf = token + } + switch tokenType { + case mcap.TokenChannel: + record, err := mcap.ParseChannel(token) + if err != nil { + return fmt.Errorf("failed to parse channel: %w", err) + } + if !channels[record.ID] { + err := writer.WriteChannel(record) + if err != nil { + return fmt.Errorf("failed to write channel: %w", err) + } + channels[record.ID] = true + } + case mcap.TokenSchema: + record, err := mcap.ParseSchema(token) + if err != nil { + return fmt.Errorf("failed to parse schema: %w", err) + } + if !schemas[record.ID] { + err := writer.WriteSchema(record) + if err != nil { + return fmt.Errorf("failed to write schema: %w", err) + } + schemas[record.ID] = true + } + case mcap.TokenMessage: + record, err := mcap.ParseMessage(token) + if err != nil { + return fmt.Errorf("failed to parse message: %w", err) + } + err = writer.WriteMessage(record) + if err != nil { + return fmt.Errorf("failed to write message: %w", err) + } + case mcap.TokenMetadata: + record, err := mcap.ParseMetadata(token) + if err != nil { + return fmt.Errorf("failed to parse metadata: %w", err) + } + err = writer.WriteMetadata(record) + if err != nil { + return fmt.Errorf("failed to write metadata: %w", err) + } + case mcap.TokenAttachment: + record, err := mcap.ParseAttachment(token) + if err != nil { + return fmt.Errorf("failed to parse metadata: %w", err) + } + err = writer.WriteAttachment(record) + if err != nil { + return fmt.Errorf("failed to write metadata: %w", err) + } + default: + continue + } + } + for _, f := range fns { + err = f(writer) + if err != nil { + return fmt.Errorf("failed to apply writer function: %w", err) + } + } + return nil +} + func Keys[T any](m map[string]T) []string { keys := []string{} for k := range m { @@ -118,3 +256,12 @@ func Keys[T any](m map[string]T) []string { } return keys } + +func PrettyJSON(data []byte) (string, error) { + indented := &bytes.Buffer{} + err := json.Indent(indented, data, "", " ") + if err != nil { + return "", err + } + return indented.String(), nil +} diff --git a/go/cspell.json b/go/cspell.json index 8935afa08d..6826bdd32a 100644 --- a/go/cspell.json +++ b/go/cspell.json @@ -1,3 +1,3 @@ { - "words": ["sqlx", "Queryx", "Rowx"] + "words": ["sqlx", "Queryx", "Rowx", "MCAP"] }