Skip to content

Commit 8c07d44

Browse files
author
Russell Aronson
committed
Fixed bugs in mr snapshot, added changed factDiff to work on snapshot sequence files
1 parent 13888cd commit 8c07d44

File tree

4 files changed

+57
-44
lines changed

4 files changed

+57
-44
lines changed

ivory-cli/src/main/scala/com/ambiata/ivory/cli/factDiff.scala

+7-8
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,23 @@ import com.ambiata.ivory.validate._
66

77
object factDiff extends IvoryApp {
88

9-
case class CliArguments(input1: String, input2: String, output: String, errors: String)
9+
case class CliArguments(input1: String, input2: String, output: String)
1010

1111
val parser = new scopt.OptionParser[CliArguments]("fact-diff") {
1212
head("""
13-
|Compute diff between two files containing facts
13+
|Compute diff between two sets of sequence files containing facts
1414
|""".stripMargin)
1515

1616
help("help") text "shows this usage text"
17-
opt[String]("input1") action { (x, c) => c.copy(input1 = x) } required() text s"Hdfs location to the first facts file."
18-
opt[String]("input2") action { (x, c) => c.copy(input2 = x) } required() text s"Hdfs location to the first facts file."
17+
opt[String]("input1") action { (x, c) => c.copy(input1 = x) } required() text s"Hdfs glob path to the first facts."
18+
opt[String]("input2") action { (x, c) => c.copy(input2 = x) } required() text s"Hdfs glob path to the second facts."
1919
opt[String]('o', "output") action { (x, c) => c.copy(output = x) } required() text s"Hdfs location to store the difference."
20-
opt[String]('e', "errors") action { (x, c) => c.copy(errors = x) } required() text s"Hdfs location to store any errors."
2120
}
2221

23-
val cmd = IvoryCmd[CliArguments](parser, CliArguments("", "", "", ""), ScoobiCmd { configuration => c =>
24-
val res = FactDiff.scoobiJob(c.input1, c.input2, c.output, c.errors)
22+
val cmd = IvoryCmd[CliArguments](parser, CliArguments("", "", ""), ScoobiCmd { configuration => c =>
23+
val res = FactDiff.flatFacts(c.input1, c.input2, c.output)
2524
res.run(configuration).map {
26-
case _ => List(s"Any differences can be found in '${c.output}', errors in '${c.errors}'")
25+
case _ => List(s"Any differences can be found in '${c.output}'")
2726
}
2827
})
2928
}

ivory-extract/src/main/scala/com/ambiata/ivory/extract/SnapshotMr.scala

+17-5
Original file line numberDiff line numberDiff line change
@@ -297,20 +297,32 @@ class SnapshotReducer extends Reducer[Text, BytesWritable, NullWritable, BytesWr
297297
val vout = new BytesWritable
298298

299299
override def reduce(key: Text, iter: JIterable[BytesWritable], context: Reducer[Text, BytesWritable, NullWritable, BytesWritable]#Context): Unit = {
300+
301+
/****************** !!!!!! WARNING !!!!!! ******************
302+
*
303+
* This is some nasty nasty mutation that can coorrupt data
304+
* without knowing, so double/triple check with others when
305+
* changing.
306+
*
307+
***********************************************************/
300308
val iterator = iter.iterator
301309
var latestContainer: PrioritizedFactBytes = null
302-
var latestDate: Int = 0
310+
var latestDate = 0l
311+
var isTombstone = true
303312
while (iterator.hasNext) {
304313
val next = iterator.next
305314
deserializer.deserialize(container, next.getBytes)
306315
deserializer.deserialize(fact, container.getFactbytes)
307-
val nextDate = fact.getYyyyMMdd
316+
val nextDate = fact.datetime.long
308317
if(latestContainer == null || nextDate > latestDate || (nextDate == latestDate && container.getPriority < latestContainer.getPriority)) {
309-
latestContainer = container
318+
latestContainer = container.deepCopy
310319
latestDate = nextDate
320+
isTombstone = fact.isTombstone
311321
}
312322
}
313-
vout.set(latestContainer.getFactbytes, 0, latestContainer.getFactbytes.length)
314-
context.write(NullWritable.get, vout)
323+
if(!isTombstone) {
324+
vout.set(latestContainer.getFactbytes, 0, latestContainer.getFactbytes.length)
325+
context.write(NullWritable.get, vout)
326+
}
315327
}
316328
}

ivory-validate/src/main/scala/com/ambiata/ivory/validate/FactDiff.scala

+31-24
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,36 @@ import FactFormats._
1212

1313
object FactDiff {
1414

15-
def scoobiJob(input1: String, input2: String, outputPath: String, errorPath: String): ScoobiAction[Unit] = {
15+
def partitionFacts(input1: String, input2: String, outputPath: String): ScoobiAction[Unit] = for {
16+
res <- ScoobiAction.scoobiJob({ implicit sc: ScoobiConfiguration =>
17+
val dlist1 = PartitionFactThriftStorageV1.PartitionedFactThriftLoader(List(input1)).loadScoobi.map({
18+
case -\/(e) => sys.error(s"Can not parse fact - ${e}")
19+
case \/-(f) => f
20+
})
21+
val dlist2 = PartitionFactThriftStorageV1.PartitionedFactThriftLoader(List(input2)).loadScoobi.map({
22+
case -\/(e) => sys.error(s"Can not parse fact - ${e}")
23+
case \/-(f) => f
24+
})
25+
(dlist1, dlist2)
26+
})
27+
(dlist1, dlist2) = res
28+
_ <- scoobiJob(dlist1, dlist2, outputPath)
29+
} yield ()
30+
31+
def flatFacts(input1: String, input2: String, outputPath: String): ScoobiAction[Unit] = for {
32+
res <- ScoobiAction.scoobiJob({ implicit sc: ScoobiConfiguration =>
33+
val dlist1 = valueFromSequenceFile[Fact](input1)
34+
val dlist2 = valueFromSequenceFile[Fact](input2)
35+
(dlist1, dlist2)
36+
})
37+
(dlist1, dlist2) = res
38+
_ <- scoobiJob(dlist1, dlist2, outputPath)
39+
} yield ()
40+
41+
def scoobiJob(first_facts: DList[Fact], second_facts: DList[Fact], outputPath: String): ScoobiAction[Unit] = {
1642
ScoobiAction.scoobiJob({ implicit sc: ScoobiConfiguration =>
17-
val (first_errs, first_facts) = byflag(PartitionFactThriftStorageV1.PartitionedFactThriftLoader(List(input1)).loadScoobi, true)
18-
val (second_errs, second_facts) = byflag(PartitionFactThriftStorageV1.PartitionedFactThriftLoader(List(input2)).loadScoobi, false)
1943

20-
val errors = first_errs ++ second_errs
21-
val facts = first_facts ++ second_facts
44+
val facts = first_facts.map((true, _)) ++ second_facts.map((false, _))
2245

2346
val grp = facts.groupBy({ case (flag, fact) => (fact.entity, fact.featureId.toString, fact.date.int, fact.time.seconds, fact.value.stringValue) })
2447

@@ -31,28 +54,12 @@ object FactDiff {
3154
})
3255

3356
val out: DList[String] = diff.map({
34-
case (true, fact) :: Nil => s"Fact '${fact}' does not exist in ${input2}"
35-
case (false, fact) :: Nil => s"Fact '${fact}' does not exist in ${input1}"
57+
case (true, fact) :: Nil => s"Fact '${fact}' does not exist in input2"
58+
case (false, fact) :: Nil => s"Fact '${fact}' does not exist in input1"
3659
case g => s"Found duplicates - '${g}'"
3760
})
3861

39-
val error_out: DList[String] = errors.map({
40-
case (true, e) => s"${e.message} - ${input1}"
41-
case (false, e) => s"${e.message} - ${input2}"
42-
})
43-
44-
persist(error_out.toTextFile(errorPath, overwrite = true), out.toTextFile(outputPath, overwrite = true))
62+
persist(out.toTextFile(outputPath, overwrite = true))
4563
})
4664
}
47-
48-
def byflag(dlist: DList[ParseError \/ Fact], flag: Boolean): (DList[(Boolean, ParseError)], DList[(Boolean, Fact)]) = {
49-
val errs = dlist.collect {
50-
case -\/(e) => (flag, e)
51-
}
52-
53-
val facts = dlist.collect {
54-
case \/-(f) => (flag, f)
55-
}
56-
(errs, facts)
57-
}
5865
}

ivory-validate/src/test/scala/com/ambiata/ivory/validate/FactDiffSpec.scala

+2-7
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,14 @@ class FactDiffSpec extends HadoopSpecification with SimpleJobs with FileMatchers
3636
val input1 = directory + "/1"
3737
val input2 = directory + "/2"
3838
val output = directory + "/out"
39-
val errors = directory + "/errors"
4039

4140
persist(PartitionFactThriftStorageV1.PartitionedFactThriftStorer(input1, None).storeScoobi(facts1),
4241
PartitionFactThriftStorageV1.PartitionedFactThriftStorer(input2, None).storeScoobi(facts2))
4342

44-
FactDiff.scoobiJob(input1, input2, output, errors).run(sc) must beOk
43+
FactDiff.partitionFacts(input1, input2, output).run(sc) must beOk
4544

4645
val out = fromTextFile(output).run.toList
4746
out must have size(6)
48-
49-
Hdfs.readWith(new Path(errors), is => Streams.read(is)).run(sc) must beOkValue("")
5047
}
5148

5249
"FactDiff finds no difference" >> { implicit sc: ScoobiConfiguration =>
@@ -59,15 +56,13 @@ class FactDiffSpec extends HadoopSpecification with SimpleJobs with FileMatchers
5956
val input1 = directory + "/1"
6057
val input2 = directory + "/2"
6158
val output = directory + "/out"
62-
val errors = directory + "/errors"
6359

6460
persist(PartitionFactThriftStorageV1.PartitionedFactThriftStorer(input1, None).storeScoobi(facts1),
6561
PartitionFactThriftStorageV1.PartitionedFactThriftStorer(input2, None).storeScoobi(facts1))
6662

67-
FactDiff.scoobiJob(input1, input2, output, errors).run(sc) must beOk
63+
FactDiff.partitionFacts(input1, input2, output).run(sc) must beOk
6864

6965
Hdfs.readWith(new Path(output), is => Streams.read(is)).run(sc) must beOkValue("")
70-
Hdfs.readWith(new Path(errors), is => Streams.read(is)).run(sc) must beOkValue("")
7166
}
7267

7368
}

0 commit comments

Comments
 (0)