Skip to content

Commit 6844d31

Browse files
committed
Use parallelDo in chord to pull instance descriptors into memory once during setup
1 parent 0a3b581 commit 6844d31

File tree

1 file changed

+50
-29
lines changed
  • ivory-extract/src/main/scala/com/ambiata/ivory/extract

1 file changed

+50
-29
lines changed

ivory-extract/src/main/scala/com/ambiata/ivory/extract/Chord.scala

+50-29
Original file line numberDiff line numberDiff line change
@@ -71,47 +71,62 @@ case class HdfsChord(repoPath: Path, store: String, dictName: String, entities:
7171

7272
HdfsSnapshot.readFacts(repo, store, latestDate, incremental).map { input =>
7373

74-
val mappings: DObject[Mappings] = lazyObject(Chord.deserialiseChords(chordPath).run(sc).run.unsafePerformIO() match {
75-
case Ok(m) => m
76-
case Error(e) => sys.error("Can not deserialise chord map - " + Result.asString(e))
77-
})
78-
7974
// filter out the facts which are not in the entityMap or
8075
// which date are greater than the required dates for this entity
81-
val facts: DList[(Priority, Fact)] = (mappings join input.map({
76+
val facts: DList[(Priority, Fact)] = input.map({
8277
case -\/(e) => sys.error("A critical error has occured, where we could not determine priority and namespace from partitioning: " + e)
8378
case \/-(v) => v
84-
})).collect({
85-
case (map, (p, _, f)) if(DateMap.keep(map, f.entity, f.date.year, f.date.month, f.date.day)) => (p, f)
79+
}).parallelDo(new DoFn[(Priority, Factset, Fact), (Priority, Fact)] {
80+
var mappings: Mappings = null
81+
override def setup() {
82+
mappings = Chord.getMappings(chordPath)
83+
}
84+
override def process(input: (Priority, Factset, Fact), emitter: Emitter[(Priority, Fact)]) {
85+
input match { case (p, _, f) =>
86+
if(DateMap.keep(mappings, f.entity, f.date.year, f.date.month, f.date.day)) emitter.emit((p, f))
87+
}
88+
}
89+
override def cleanup(emitter: Emitter[(Priority, Fact)]) { }
8690
})
8791

8892
/**
8993
* 1. group by entity and feature id
9094
* 2. for a given entity and feature id, get the latest facts, with the lowest priority
9195
*/
92-
val grp = facts.groupBy { case (p, f) => (f.entity, f.featureId.toString) }
9396
val latest: DList[(Priority, Fact)] =
94-
(mappings join grp).mapFlatten { case (map, ((entityId, featureId), fs)) =>
95-
// the required dates
96-
val dates = map.get(entityId)
97-
98-
// we traverse all facts and for each required date
99-
// we keep the "best" fact which date is just before that date
100-
fs.foldLeft(dates.map((_, Priority.Min, None)): Array[(Int, Priority, Option[Fact])]) { case (ds, (priority, fact)) =>
101-
val factDate = fact.date.int
102-
ds.map {
103-
case previous @ (date, p, None) =>
104-
// we found a first suitable fact for that date
105-
if (factDate <= date) (date, priority, Some(fact))
106-
else previous
107-
108-
case previous @ (date, p, Some(f)) =>
109-
// we found a fact with a better time, or better priority if there is a tie
110-
if (factDate <= date && (fact, priority) < ((f, p))) (date, priority, Some(fact))
111-
else previous
97+
facts
98+
.groupBy { case (p, f) => (f.entity, f.featureId.toString) }
99+
.parallelDo(new DoFn[((String, String), Iterable[(Priority, Fact)]), (Priority, Fact)] {
100+
var mappings: Mappings = null
101+
override def setup() {
102+
mappings = Chord.getMappings(chordPath)
103+
}
104+
override def process(input: ((String, String), Iterable[(Priority, Fact)]), emitter: Emitter[(Priority, Fact)]) {
105+
input match { case ((entityId, featureId), fs) =>
106+
// the required dates
107+
val dates = mappings.get(entityId)
108+
109+
// we traverse all facts and for each required date
110+
// we keep the "best" fact which date is just before that date
111+
fs.foldLeft(dates.map((_, Priority.Min, None)): Array[(Int, Priority, Option[Fact])]) { case (ds, (priority, fact)) =>
112+
val factDate = fact.date.int
113+
ds.map {
114+
case previous @ (date, p, None) =>
115+
// we found a first suitable fact for that date
116+
if (factDate <= date) (date, priority, Some(fact))
117+
else previous
118+
119+
case previous @ (date, p, Some(f)) =>
120+
// we found a fact with a better time, or better priority if there is a tie
121+
if (factDate <= date && (fact, priority) < ((f, p))) (date, priority, Some(fact))
122+
else previous
123+
}
124+
}.collect({ case (d, p, Some(f)) => (p, f.withEntity(f.entity + ":" + Date.unsafeFromInt(d).hyphenated)) })
125+
.foreach({ case (p, f) => if(!f.isTombstone) emitter.emit((p, f)) })
112126
}
113-
}.collect { case (d, p, Some(f)) => (p, f.withEntity(f.entity + ":" + Date.unsafeFromInt(d).hyphenated)) }.toIterable
114-
}.collect { case (p, f) if !f.isTombstone => (p, f) }
127+
}
128+
override def cleanup(emitter: Emitter[(Priority, Fact)]) { }
129+
})
115130

116131
val validated: DList[Fact] = latest.map({ case (p, f) =>
117132
Validate.validateFact(f, dict).disjunction.leftMap(e => e + " - Factset " + factsetMap.get(p).getOrElse("Unknown, priority " + p))
@@ -168,4 +183,10 @@ object Chord {
168183
def readChords(path: Path): Hdfs[HashMap[String, Array[Int]]] = for {
169184
chords <- Hdfs.readWith(path, is => Streams.read(is))
170185
} yield DateMap.chords(chords)
186+
187+
def getMappings(chordPath: Path)(implicit sc: ScoobiConfiguration): HashMap[String, Array[Int]] =
188+
deserialiseChords(chordPath).run(sc).run.unsafePerformIO() match {
189+
case Ok(m) => m
190+
case Error(e) => sys.error("Can not deserialise chord map - " + Result.asString(e))
191+
}
171192
}

0 commit comments

Comments
 (0)