@@ -18,9 +18,7 @@ import scalaz.{DList => _, _}, Scalaz._
18
18
19
19
object ingestBulk extends IvoryApp {
20
20
21
- val tombstone = List (" ☠" )
22
-
23
- case class CliArguments (repo : String , dictionary : Option [String ], input : String , tmp : String , timezone : DateTimeZone , optimal : Long , codec : Option [CompressionCodec ])
21
+ case class CliArguments (repo : String , dictionary : Option [String ], input : String , timezone : DateTimeZone , optimal : Long , codec : Option [CompressionCodec ])
24
22
25
23
val parser = new scopt.OptionParser [CliArguments ](" ingest-bulk" ) {
26
24
head("""
@@ -34,7 +32,6 @@ object ingestBulk extends IvoryApp {
34
32
opt[Unit ]('n' , " no-compression" ) action { (_, c) => c.copy(codec = None ) } text " Don't use compression."
35
33
36
34
opt[String ]('r' , " repo" ) action { (x, c) => c.copy(repo = x) } required() text " Path to an ivory repository."
37
- opt[String ]('t' , " tmp" ) action { (x, c) => c.copy(tmp = x) } required() text " Path to store tmp data."
38
35
opt[String ]('i' , " input" ) action { (x, c) => c.copy(input = x) } required() text " Path to data to import."
39
36
opt[Long ]('o' , " optimal-input-chunk" ) action { (x, c) => c.copy(optimal = x) } text " Optimal size (in bytes) of input chunk.."
40
37
opt[String ]('d' , " dictionary" ) action { (x, c) => c.copy(dictionary = Some (x)) } text " Name of dictionary to use."
@@ -47,21 +44,21 @@ object ingestBulk extends IvoryApp {
47
44
type Parts = String
48
45
49
46
def cmd = IvoryCmd [CliArguments ](parser,
50
- CliArguments (" " , None , " " , " " , DateTimeZone .getDefault, 1024 * 1024 * 256 /* 256MB */ , Some (new SnappyCodec )),
47
+ CliArguments (" " , None , " " , DateTimeZone .getDefault, 1024 * 1024 * 256 /* 256MB */ , Some (new SnappyCodec )),
51
48
ScoobiCmd (configuration => c => {
52
- val res = onHdfs(new Path (c.repo), c.dictionary, new Path (c.input), tombstone, new Path (c.tmp), c.timezone, c.optimal, c.codec)
49
+ val res = onHdfs(new Path (c.repo), c.dictionary, new Path (c.input), c.timezone, c.optimal, c.codec)
53
50
res.run(configuration).map {
54
51
case f => List (s " Successfully imported ' ${c.input}' as ${f} into ' ${c.repo}' " )
55
52
}
56
53
}))
57
54
58
- def onHdfs (repo : Path , dictionary : Option [String ], input : Path , tombstone : List [ String ], tmp : Path , timezone : DateTimeZone , optimal : Long , codec : Option [CompressionCodec ]): ScoobiAction [Factset ] =
59
- fatrepo.ImportWorkflow .onHdfs(repo, dictionary.map(defaultDictionaryImport(_)), importFeed(input, optimal, codec), tombstone, tmp, timezone)
55
+ def onHdfs (repo : Path , dictionary : Option [String ], input : Path , timezone : DateTimeZone , optimal : Long , codec : Option [CompressionCodec ]): ScoobiAction [Factset ] =
56
+ fatrepo.ImportWorkflow .onHdfs(repo, dictionary.map(defaultDictionaryImport(_)), importFeed(input, optimal, codec), timezone)
60
57
61
- def defaultDictionaryImport (dictionary : String )(repo : HdfsRepository , name : String , tombstone : List [ String ], tmpPath : Path ): Hdfs [Unit ] =
58
+ def defaultDictionaryImport (dictionary : String )(repo : HdfsRepository , name : String ): Hdfs [Unit ] =
62
59
DictionaryImporter .onHdfs(repo.root.toHdfs, repo.dictionaryByName(dictionary).toHdfs, name)
63
60
64
- def importFeed (input : Path , optimal : Long , codec : Option [CompressionCodec ])(repo : HdfsRepository , factset : Factset , dname : String , tmpPath : Path , errorPath : Path , timezone : DateTimeZone ): ScoobiAction [Unit ] = for {
61
+ def importFeed (input : Path , optimal : Long , codec : Option [CompressionCodec ])(repo : HdfsRepository , factset : Factset , dname : String , errorPath : Path , timezone : DateTimeZone ): ScoobiAction [Unit ] = for {
65
62
dict <- ScoobiAction .fromHdfs(IvoryStorage .dictionaryFromIvory(repo, dname))
66
63
list <- listing(input)
67
64
conf <- ScoobiAction .scoobiConfiguration
0 commit comments