@@ -11,14 +11,16 @@ import (
11
11
12
12
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
13
13
"github.com/ipfs/go-ipfs/core/coredag"
14
- mdag "github.com/ipfs/go-merkledag "
14
+ iface "github.com/ipfs/interface- go-ipfs-core "
15
15
16
16
cid "github.com/ipfs/go-cid"
17
17
cidenc "github.com/ipfs/go-cidutil/cidenc"
18
18
cmds "github.com/ipfs/go-ipfs-cmds"
19
19
files "github.com/ipfs/go-ipfs-files"
20
20
ipld "github.com/ipfs/go-ipld-format"
21
+ mdag "github.com/ipfs/go-merkledag"
21
22
ipfspath "github.com/ipfs/go-path"
23
+ "github.com/ipfs/interface-go-ipfs-core/options"
22
24
path "github.com/ipfs/interface-go-ipfs-core/path"
23
25
mh "github.com/multiformats/go-multihash"
24
26
@@ -32,6 +34,8 @@ import (
32
34
33
35
const (
34
36
progressOptionName = "progress"
37
+ silentOptionName = "silent"
38
+ pinRootsOptionName = "pin-roots"
35
39
)
36
40
37
41
var DagCmd = & cmds.Command {
@@ -48,6 +52,7 @@ to deprecate and replace the existing 'ipfs object' command moving forward.
48
52
"put" : DagPutCmd ,
49
53
"get" : DagGetCmd ,
50
54
"resolve" : DagResolveCmd ,
55
+ "import" : DagImportCmd ,
51
56
"export" : DagExportCmd ,
52
57
},
53
58
}
@@ -63,6 +68,16 @@ type ResolveOutput struct {
63
68
RemPath string
64
69
}
65
70
71
+ // CarImportOutput is the output type of the 'dag import' commands
72
+ type CarImportOutput struct {
73
+ Root RootMeta
74
+ }
75
+ type RootMeta struct {
76
+ Cid cid.Cid
77
+ PresentInImport bool
78
+ PinErrorMsg string
79
+ }
80
+
66
81
var DagPutCmd = & cmds.Command {
67
82
Helptext : cmds.HelpText {
68
83
Tagline : "Add a dag node to ipfs." ,
@@ -258,6 +273,267 @@ var DagResolveCmd = &cmds.Command{
258
273
Type : ResolveOutput {},
259
274
}
260
275
276
+ type importResult struct {
277
+ roots map [cid.Cid ]bool
278
+ err error
279
+ }
280
+
281
+ var DagImportCmd = & cmds.Command {
282
+ Helptext : cmds.HelpText {
283
+ Tagline : "Import the contents of .car files" ,
284
+ ShortDescription : `
285
+ 'ipfs dag import' imports all blocks present in supplied .car
286
+ ( Content Address aRchive ) files, recursively pinning any roots
287
+ specified in the CAR file headers, unless --pin-roots is set to false.
288
+
289
+ Note:
290
+ This command will import all blocks in the CAR file, not just those
291
+ reachable from the specified roots. However, these other blocks will
292
+ not be pinned and may be garbage collected later.
293
+
294
+ The pinning of the roots happens after all car files are processed,
295
+ permitting import of DAGs spanning multiple files.
296
+
297
+ Pinning takes place in offline-mode exclusively, one root at a time.
298
+ If the combination of blocks from the imported CAR files and what is
299
+ currently present in the blockstore does not represent a complete DAG,
300
+ pinning of that individual root will fail.
301
+
302
+ Maximum supported CAR version: 1
303
+ ` ,
304
+ },
305
+ Arguments : []cmds.Argument {
306
+ cmds .FileArg ("path" , true , true , "The path of a .car file." ).EnableStdin (),
307
+ },
308
+ Options : []cmds.Option {
309
+ cmds .BoolOption (silentOptionName , "No output." ),
310
+ cmds .BoolOption (pinRootsOptionName , "Pin optional roots listed in the .car headers after importing." ).WithDefault (true ),
311
+ },
312
+ Type : CarImportOutput {},
313
+ Run : func (req * cmds.Request , res cmds.ResponseEmitter , env cmds.Environment ) error {
314
+
315
+ node , err := cmdenv .GetNode (env )
316
+ if err != nil {
317
+ return err
318
+ }
319
+
320
+ api , err := cmdenv .GetApi (env , req )
321
+ if err != nil {
322
+ return err
323
+ }
324
+
325
+ // on import ensure we do not reach out to the network for any reason
326
+ // if a pin based on what is imported + what is in the blockstore
327
+ // isn't possible: tough luck
328
+ api , err = api .WithOptions (options .Api .Offline (true ))
329
+ if err != nil {
330
+ return err
331
+ }
332
+
333
+ // grab a pinlock ( which doubles as a GC lock ) so that regardless of the
334
+ // size of the streamed-in cars nothing will disappear on us before we had
335
+ // a chance to roots that may show up at the very end
336
+ // This is especially important for use cases like dagger:
337
+ // ipfs dag import $( ... | ipfs-dagger --stdout=carfifos )
338
+ //
339
+ unlocker := node .Blockstore .PinLock ()
340
+ defer unlocker .Unlock ()
341
+
342
+ doPinRoots , _ := req .Options [pinRootsOptionName ].(bool )
343
+
344
+ retCh := make (chan importResult , 1 )
345
+ go importWorker (req , res , api , retCh )
346
+
347
+ done := <- retCh
348
+ if done .err != nil {
349
+ return done .err
350
+ }
351
+
352
+ // It is not guaranteed that a root in a header is actually present in the same ( or any )
353
+ // .car file. This is the case in version 1, and ideally in further versions too
354
+ // Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
355
+ // We will attempt a pin *only* at the end in case all car files were well formed
356
+ //
357
+ // The boolean value indicates whether we have encountered the root within the car file's
358
+ roots := done .roots
359
+
360
+ // opportunistic pinning: try whatever sticks
361
+ if doPinRoots {
362
+
363
+ var failedPins int
364
+ for c , seen := range roots {
365
+
366
+ // We need to re-retrieve a block, convert it to ipld, and feed it
367
+ // to the Pinning interface, sigh...
368
+ //
369
+ // If we didn't have the problem of inability to take multiple pinlocks,
370
+ // we could use the Api directly like so (though internally it does the same):
371
+ //
372
+ // // not ideal, but the pinning api takes only paths :(
373
+ // rp := path.NewResolvedPath(
374
+ // ipfspath.FromCid(c),
375
+ // c,
376
+ // c,
377
+ // "",
378
+ // )
379
+ //
380
+ // if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil {
381
+
382
+ ret := RootMeta {Cid : c , PresentInImport : seen }
383
+
384
+ if block , err := node .Blockstore .Get (c ); err != nil {
385
+ ret .PinErrorMsg = err .Error ()
386
+ } else if nd , err := ipld .Decode (block ); err != nil {
387
+ ret .PinErrorMsg = err .Error ()
388
+ } else if err := node .Pinning .Pin (req .Context , nd , true ); err != nil {
389
+ ret .PinErrorMsg = err .Error ()
390
+ } else if err := node .Pinning .Flush (req .Context ); err != nil {
391
+ ret .PinErrorMsg = err .Error ()
392
+ }
393
+
394
+ if ret .PinErrorMsg != "" {
395
+ failedPins ++
396
+ }
397
+
398
+ if err := res .Emit (& CarImportOutput {Root : ret }); err != nil {
399
+ return err
400
+ }
401
+ }
402
+
403
+ if failedPins > 0 {
404
+ return fmt .Errorf (
405
+ "unable to pin all roots: %d out of %d failed" ,
406
+ failedPins ,
407
+ len (roots ),
408
+ )
409
+ }
410
+ }
411
+
412
+ return nil
413
+ },
414
+ Encoders : cmds.EncoderMap {
415
+ cmds .Text : cmds .MakeTypedEncoder (func (req * cmds.Request , w io.Writer , event * CarImportOutput ) error {
416
+
417
+ silent , _ := req .Options [silentOptionName ].(bool )
418
+ if silent {
419
+ return nil
420
+ }
421
+
422
+ enc , err := cmdenv .GetLowLevelCidEncoder (req )
423
+ if err != nil {
424
+ return err
425
+ }
426
+
427
+ if event .Root .PinErrorMsg != "" {
428
+ event .Root .PinErrorMsg = fmt .Sprintf ("FAILED: %s" , event .Root .PinErrorMsg )
429
+ } else {
430
+ event .Root .PinErrorMsg = "success"
431
+ }
432
+
433
+ if ! event .Root .PresentInImport {
434
+ event .Root .PinErrorMsg += " (root specified in .car header without available data)"
435
+ }
436
+
437
+ _ , err = fmt .Fprintf (
438
+ w ,
439
+ "Pinned root\t %s\t %s\n " ,
440
+ enc .Encode (event .Root .Cid ),
441
+ event .Root .PinErrorMsg ,
442
+ )
443
+ return err
444
+ }),
445
+ },
446
+ }
447
+
448
+ func importWorker (req * cmds.Request , re cmds.ResponseEmitter , api iface.CoreAPI , ret chan importResult ) {
449
+
450
+ // this is *not* a transaction
451
+ // it is simply a way to relieve pressure on the blockstore
452
+ // similar to pinner.Pin/pinner.Flush
453
+ batch := ipld .NewBatch (req .Context , api .Dag ())
454
+
455
+ roots := make (map [cid.Cid ]bool )
456
+
457
+ it := req .Files .Entries ()
458
+ for it .Next () {
459
+
460
+ file := files .FileFromEntry (it )
461
+ if file == nil {
462
+ ret <- importResult {err : errors .New ("expected a file handle" )}
463
+ return
464
+ }
465
+
466
+ // wrap a defer-closer-scope
467
+ //
468
+ // every single file in it() is already open before we start
469
+ // just close here sooner rather than later for neatness
470
+ // and to surface potential erorrs writing on closed fifos
471
+ // this won't/can't help with not running out of handles
472
+ err := func () error {
473
+ defer file .Close ()
474
+
475
+ car , err := gocar .NewCarReader (file )
476
+ if err != nil {
477
+ return err
478
+ }
479
+
480
+ // Be explicit here, until the spec is finished
481
+ if car .Header .Version != 1 {
482
+ return errors .New ("only car files version 1 supported at present" )
483
+ }
484
+
485
+ for _ , c := range car .Header .Roots {
486
+ if _ , exists := roots [c ]; ! exists {
487
+ roots [c ] = false
488
+ }
489
+ }
490
+
491
+ for {
492
+ block , err := car .Next ()
493
+ if err != nil && err != io .EOF {
494
+ return err
495
+ } else if block == nil {
496
+ break
497
+ }
498
+
499
+ // the double-decode is suboptimal, but we need it for batching
500
+ nd , err := ipld .Decode (block )
501
+ if err != nil {
502
+ return err
503
+ }
504
+
505
+ if err := batch .Add (req .Context , nd ); err != nil {
506
+ return err
507
+ }
508
+
509
+ // encountered something known to be a root, for the first time
510
+ if seen , exists := roots [nd .Cid ()]; exists && ! seen {
511
+ roots [nd .Cid ()] = true
512
+ }
513
+ }
514
+
515
+ return nil
516
+ }()
517
+
518
+ if err != nil {
519
+ ret <- importResult {err : err }
520
+ return
521
+ }
522
+ }
523
+
524
+ if err := it .Err (); err != nil {
525
+ ret <- importResult {err : err }
526
+ return
527
+ }
528
+
529
+ if err := batch .Commit (); err != nil {
530
+ ret <- importResult {err : err }
531
+ return
532
+ }
533
+
534
+ ret <- importResult {roots : roots }
535
+ }
536
+
261
537
var DagExportCmd = & cmds.Command {
262
538
Helptext : cmds.HelpText {
263
539
Tagline : "Streams the selected DAG as a .car stream on stdout." ,
0 commit comments