Skip to content

Commit e4f863d

Browse files
committed
Remove unused tmp path from ingest* and ImportWorkflow
1 parent 54e11be commit e4f863d

File tree

3 files changed

+21
-24
lines changed

3 files changed

+21
-24
lines changed

ivory-cli/src/main/scala/com/ambiata/ivory/cli/ingest.scala

+7-8
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import scalaz.{DList => _, _}, Scalaz._
1919

2020
object ingest extends IvoryApp {
2121

22-
case class CliArguments(repo: String, dictionary: Option[String], input: String, namespace: String, tmp: String, timezone: DateTimeZone, runOnSingleMachine: Boolean)
22+
case class CliArguments(repo: String, dictionary: Option[String], input: String, namespace: String, timezone: DateTimeZone, runOnSingleMachine: Boolean)
2323

2424
val parser = new scopt.OptionParser[CliArguments]("ingest") {
2525
head("""
@@ -31,7 +31,6 @@ object ingest extends IvoryApp {
3131

3232
help("help") text "shows this usage text"
3333
opt[String]('r', "repo") action { (x, c) => c.copy(repo = x) } required() text "Path to an ivory repository."
34-
opt[String]('t', "tmp") action { (x, c) => c.copy(tmp = x) } required() text "Path to store tmp data."
3534
opt[String]('i', "input") action { (x, c) => c.copy(input = x) } required() text "Path to data to import."
3635
opt[String]('d', "dictionary") action { (x, c) => c.copy(dictionary = Some(x)) } text "Name of dictionary to use."
3736
opt[String]('n', "namespace") action { (x, c) => c.copy(namespace = x) } required() text "Namespace'."
@@ -41,20 +40,20 @@ object ingest extends IvoryApp {
4140

4241
}
4342

44-
def cmd = IvoryCmd[CliArguments](parser, CliArguments("", None, "", "", "", DateTimeZone.getDefault, false), HadoopCmd { configuration => c =>
45-
val res = onHdfs(new Path(c.repo), c.dictionary, c.namespace, new Path(c.input), new Path(c.tmp), c.timezone, c.runOnSingleMachine)
43+
def cmd = IvoryCmd[CliArguments](parser, CliArguments("", None, "", "", DateTimeZone.getDefault, false), HadoopCmd { configuration => c =>
44+
val res = onHdfs(new Path(c.repo), c.dictionary, c.namespace, new Path(c.input), c.timezone, c.runOnSingleMachine)
4645
res.run(configuration.modeIs(com.nicta.scoobi.core.Mode.Cluster)).map {
4746
case f => List(s"Successfully imported '${c.input}' as ${f} into '${c.repo}'")
4847
}
4948
})
5049

51-
def onHdfs(repo: Path, dictionary: Option[String], namespace: String, input: Path, tmp: Path, timezone: DateTimeZone, runOnSingleMachine: Boolean): ScoobiAction[Factset] =
52-
fatrepo.ImportWorkflow.onHdfs(repo, dictionary.map(defaultDictionaryImport(_)), importFeed(input, namespace, runOnSingleMachine), tmp, timezone)
50+
def onHdfs(repo: Path, dictionary: Option[String], namespace: String, input: Path, timezone: DateTimeZone, runOnSingleMachine: Boolean): ScoobiAction[Factset] =
51+
fatrepo.ImportWorkflow.onHdfs(repo, dictionary.map(defaultDictionaryImport(_)), importFeed(input, namespace, runOnSingleMachine), timezone)
5352

54-
def defaultDictionaryImport(dictionary: String)(repo: HdfsRepository, name: String, tmpPath: Path): Hdfs[Unit] =
53+
def defaultDictionaryImport(dictionary: String)(repo: HdfsRepository, name: String): Hdfs[Unit] =
5554
DictionaryImporter.onHdfs(repo.root.toHdfs, repo.dictionaryByName(dictionary).toHdfs, name)
5655

57-
def importFeed(input: Path, namespace: String, runOnSingleMachine: Boolean)(repo: HdfsRepository, factset: Factset, dname: String, tmpPath: Path, errorPath: Path, timezone: DateTimeZone): ScoobiAction[Unit] = for {
56+
def importFeed(input: Path, namespace: String, runOnSingleMachine: Boolean)(repo: HdfsRepository, factset: Factset, dname: String, errorPath: Path, timezone: DateTimeZone): ScoobiAction[Unit] = for {
5857
dict <- ScoobiAction.fromHdfs(IvoryStorage.dictionaryFromIvory(repo, dname))
5958
conf <- ScoobiAction.scoobiConfiguration
6059
_ <- if (!runOnSingleMachine)

ivory-cli/src/main/scala/com/ambiata/ivory/cli/ingestBulk.scala

+7-8
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import scalaz.{DList => _, _}, Scalaz._
1818

1919
object ingestBulk extends IvoryApp {
2020

21-
case class CliArguments(repo: String, dictionary: Option[String], input: String, tmp: String, timezone: DateTimeZone, optimal: Long, codec: Option[CompressionCodec])
21+
case class CliArguments(repo: String, dictionary: Option[String], input: String, timezone: DateTimeZone, optimal: Long, codec: Option[CompressionCodec])
2222

2323
val parser = new scopt.OptionParser[CliArguments]("ingest-bulk") {
2424
head("""
@@ -32,7 +32,6 @@ object ingestBulk extends IvoryApp {
3232
opt[Unit]('n', "no-compression") action { (_, c) => c.copy(codec = None) } text "Don't use compression."
3333

3434
opt[String]('r', "repo") action { (x, c) => c.copy(repo = x) } required() text "Path to an ivory repository."
35-
opt[String]('t', "tmp") action { (x, c) => c.copy(tmp = x) } required() text "Path to store tmp data."
3635
opt[String]('i', "input") action { (x, c) => c.copy(input = x) } required() text "Path to data to import."
3736
opt[Long]('o', "optimal-input-chunk") action { (x, c) => c.copy(optimal = x) } text "Optimal size (in bytes) of input chunk.."
3837
opt[String]('d', "dictionary") action { (x, c) => c.copy(dictionary = Some(x)) } text "Name of dictionary to use."
@@ -45,21 +44,21 @@ object ingestBulk extends IvoryApp {
4544
type Parts = String
4645

4746
def cmd = IvoryCmd[CliArguments](parser,
48-
CliArguments("", None, "", "", DateTimeZone.getDefault, 1024 * 1024 * 256 /* 256MB */, Some(new SnappyCodec)),
47+
CliArguments("", None, "", DateTimeZone.getDefault, 1024 * 1024 * 256 /* 256MB */, Some(new SnappyCodec)),
4948
ScoobiCmd(configuration => c => {
50-
val res = onHdfs(new Path(c.repo), c.dictionary, new Path(c.input), new Path(c.tmp), c.timezone, c.optimal, c.codec)
49+
val res = onHdfs(new Path(c.repo), c.dictionary, new Path(c.input), c.timezone, c.optimal, c.codec)
5150
res.run(configuration).map {
5251
case f => List(s"Successfully imported '${c.input}' as ${f} into '${c.repo}'")
5352
}
5453
}))
5554

56-
def onHdfs(repo: Path, dictionary: Option[String], input: Path, tmp: Path, timezone: DateTimeZone, optimal: Long, codec: Option[CompressionCodec]): ScoobiAction[Factset] =
57-
fatrepo.ImportWorkflow.onHdfs(repo, dictionary.map(defaultDictionaryImport(_)), importFeed(input, optimal, codec), tmp, timezone)
55+
def onHdfs(repo: Path, dictionary: Option[String], input: Path, timezone: DateTimeZone, optimal: Long, codec: Option[CompressionCodec]): ScoobiAction[Factset] =
56+
fatrepo.ImportWorkflow.onHdfs(repo, dictionary.map(defaultDictionaryImport(_)), importFeed(input, optimal, codec), timezone)
5857

59-
def defaultDictionaryImport(dictionary: String)(repo: HdfsRepository, name: String, tmpPath: Path): Hdfs[Unit] =
58+
def defaultDictionaryImport(dictionary: String)(repo: HdfsRepository, name: String): Hdfs[Unit] =
6059
DictionaryImporter.onHdfs(repo.root.toHdfs, repo.dictionaryByName(dictionary).toHdfs, name)
6160

62-
def importFeed(input: Path, optimal: Long, codec: Option[CompressionCodec])(repo: HdfsRepository, factset: Factset, dname: String, tmpPath: Path, errorPath: Path, timezone: DateTimeZone): ScoobiAction[Unit] = for {
61+
def importFeed(input: Path, optimal: Long, codec: Option[CompressionCodec])(repo: HdfsRepository, factset: Factset, dname: String, errorPath: Path, timezone: DateTimeZone): ScoobiAction[Unit] = for {
6362
dict <- ScoobiAction.fromHdfs(IvoryStorage.dictionaryFromIvory(repo, dname))
6463
list <- listing(input)
6564
conf <- ScoobiAction.scoobiConfiguration

ivory-storage/src/main/scala/com/ambiata/ivory/storage/legacy/fatrepo/ImportWorkflow.scala

+7-8
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,12 @@ object ImportWorkflow {
3636
type DictionaryName = String
3737
type DictionaryPath = Path
3838
type ErrorPath = Path
39-
type TmpPath = Path
40-
type ImportDictFunc = (HdfsRepository, DictionaryName, TmpPath) => Hdfs[Unit]
41-
type ImportFactsFunc = (HdfsRepository, Factset, DictionaryName, TmpPath, ErrorPath, DateTimeZone) => ScoobiAction[Unit]
39+
type ImportDictFunc = (HdfsRepository, DictionaryName) => Hdfs[Unit]
40+
type ImportFactsFunc = (HdfsRepository, Factset, DictionaryName, ErrorPath, DateTimeZone) => ScoobiAction[Unit]
4241

4342
private implicit val logger = LogFactory.getLog("ivory.repository.fatrepo.Import")
4443

45-
def onHdfs(repoPath: Path, importDict: Option[ImportDictFunc], importFacts: ImportFactsFunc, tmpPath: Path, timezone: DateTimeZone): ScoobiAction[Factset] = {
44+
def onHdfs(repoPath: Path, importDict: Option[ImportDictFunc], importFacts: ImportFactsFunc, timezone: DateTimeZone): ScoobiAction[Factset] = {
4645
val start = System.currentTimeMillis
4746
for {
4847
sc <- ScoobiAction.scoobiConfiguration
@@ -53,7 +52,7 @@ object ImportWorkflow {
5352
println(s"created repository in ${x - start}ms")
5453
x
5554
}
56-
dname <- ScoobiAction.fromHdfs(importDictionary(repo, new Path(tmpPath, "dictionaries"), importDict)
55+
dname <- ScoobiAction.fromHdfs(importDictionary(repo, importDict)
5756
)
5857
t2 = {
5958
val x = System.currentTimeMillis
@@ -66,7 +65,7 @@ object ImportWorkflow {
6665
println(s"created fact set in ${x - t2}ms")
6766
x
6867
}
69-
_ <- importFacts(repo, factset, dname, new Path(tmpPath, "facts"), new Path(repo.errors.path, factset.name), timezone)
68+
_ <- importFacts(repo, factset, dname, new Path(repo.errors.path, factset.name), timezone)
7069
t4 = {
7170
val x = System.currentTimeMillis
7271
println(s"imported fact set in ${x - t3}ms")
@@ -95,7 +94,7 @@ object ImportWorkflow {
9594
}
9695
} yield ()
9796

98-
def importDictionary(repo: HdfsRepository, tmpPath: Path, importer: Option[ImportDictFunc]): Hdfs[String] = importer match {
97+
def importDictionary(repo: HdfsRepository, importer: Option[ImportDictFunc]): Hdfs[String] = importer match {
9998
case None =>
10099
Hdfs.globPaths(repo.dictionaries.toHdfs, "*").map(dicts =>
101100
dicts
@@ -110,7 +109,7 @@ object ImportWorkflow {
110109
for {
111110
e <- Hdfs.exists(repo.dictionaryByName(name).toHdfs)
112111
_ <- if(!e) copyLatestDictionary(repo, name) else Hdfs.ok(())
113-
_ <- importDict(repo, name, tmpPath)
112+
_ <- importDict(repo, name)
114113
_ = logger.info(s"Successfully imported dictionary '${name}'")
115114
} yield name
116115
}

0 commit comments

Comments
 (0)