Skip to content

Commit 3a0fedd

Browse files
committed
HdfsStore and spec.
1 parent 1b81c1d commit 3a0fedd

File tree

5 files changed

+359
-33
lines changed

5 files changed

+359
-33
lines changed

ivory-alien-hdfs/src/main/scala/com/ambiata/ivory/alien/hdfs/Hdfs.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import java.io._
88
import com.ambiata.mundane.control._
99

1010
case class Hdfs[+A](action: ActionT[IO, Unit, Configuration, A]) {
11-
1211
def run(conf: Configuration): ResultTIO[A] =
1312
action.executeT(conf)
1413

@@ -65,6 +64,9 @@ object Hdfs extends ActionTSupport[IO, Unit, Configuration] {
6564
def exists(p: Path): Hdfs[Boolean] =
6665
filesystem.map(fs => fs.exists(p))
6766

67+
def isDirectory(p: Path): Hdfs[Boolean] =
68+
filesystem.map(fs => fs.isDirectory(p))
69+
6870
def mustexist(p: Path): Hdfs[Unit] =
6971
exists(p).flatMap(e => if(e) Hdfs.ok(()) else Hdfs.fail(s"$p doesn't exist!"))
7072

@@ -127,6 +129,12 @@ object Hdfs extends ActionTSupport[IO, Unit, Configuration] {
127129
def mkdir(p: Path): Hdfs[Boolean] =
128130
filesystem.map(fs => fs.mkdirs(p))
129131

132+
def delete(p: Path): Hdfs[Unit] =
133+
filesystem.map(fs => fs.delete(p, false))
134+
135+
def deleteAll(p: Path): Hdfs[Unit] =
136+
filesystem.map(fs => fs.delete(p, true))
137+
130138
implicit def HdfsMonad: Monad[Hdfs] = new Monad[Hdfs] {
131139
def point[A](v: => A) = ok(v)
132140
def bind[A, B](m: Hdfs[A])(f: A => Hdfs[B]) = m.flatMap(f)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package com.ambiata.ivory.alien.hdfs
2+
3+
import com.ambiata.mundane.control._
4+
import com.ambiata.mundane.io._
5+
import com.ambiata.mundane.data._
6+
import com.ambiata.mundane.store._
7+
import java.util.UUID
8+
import java.io.{InputStream, OutputStream}
9+
import java.io.{PipedInputStream, PipedOutputStream}
10+
import org.apache.hadoop.conf.Configuration
11+
import org.apache.hadoop.fs.{FileSystem, Path}
12+
import scala.io.Codec
13+
import scalaz.{Store => _, _}, Scalaz._, scalaz.stream._, scalaz.concurrent._, effect.IO, effect.Effect._, \&/._
14+
import scodec.bits.ByteVector
15+
16+
case class HdfsStore(conf: Configuration, base: FilePath) extends Store[ResultTIO] with ReadOnlyStore[ResultTIO] {
17+
def readOnly: ReadOnlyStore[ResultTIO] =
18+
this
19+
20+
def basePath: Path =
21+
new Path(base.path)
22+
23+
def normalize(key: Path): String = {
24+
val fs = FileSystem.get(conf)
25+
fs.makeQualified(key).toString.replace(fs.makeQualified(basePath).toString + "/", "")
26+
}
27+
28+
def list(prefix: FilePath): ResultT[IO, List[FilePath]] =
29+
hdfs { Hdfs.globFilesRecursively(new Path((base </> prefix).path)).map(_.map(normalize).sorted).map(_.map(_.toFilePath)) }
30+
31+
def filter(prefix: FilePath, predicate: FilePath => Boolean): ResultT[IO, List[FilePath]] =
32+
list(prefix).map(_.filter(predicate))
33+
34+
def find(prefix: FilePath, predicate: FilePath => Boolean): ResultT[IO, Option[FilePath]] =
35+
list(prefix).map(_.find(predicate))
36+
37+
def exists(path: FilePath): ResultT[IO, Boolean] =
38+
hdfs { Hdfs.exists(new Path((base </> path).path)) }
39+
40+
def delete(path: FilePath): ResultT[IO, Unit] =
41+
hdfs { Hdfs.delete(new Path((base </> path).path)) }
42+
43+
def deleteAll(prefix: FilePath): ResultT[IO, Unit] =
44+
hdfs { Hdfs.deleteAll(new Path((base </> prefix).path)) }
45+
46+
def move(in: FilePath, out: FilePath): ResultT[IO, Unit] =
47+
copy(in, out) >> delete(in)
48+
49+
def copy(in: FilePath, out: FilePath): ResultT[IO, Unit] = hdfs { for {
50+
dir <- Hdfs.isDirectory(new Path((base </> out).path))
51+
_ <- if (dir) Hdfs.cp(new Path((base </> in).path), new Path((base </> out </> in.basename).path), false) else Hdfs.cp(new Path((base </> in).path), new Path((base </> out).path), false)
52+
} yield () }
53+
54+
def mirror(in: FilePath, out: FilePath): ResultT[IO, Unit] = for {
55+
paths <- list(in)
56+
_ <- paths.traverseU({ source =>
57+
val destination = out </> source.path.replace(in.path + "/", "")
58+
copy(source, destination)
59+
})
60+
} yield ()
61+
62+
def moveTo(store: Store[ResultTIO], src: FilePath, dest: FilePath): ResultT[IO, Unit] =
63+
copyTo(store, src, dest) >> delete(src)
64+
65+
def copyTo(store: Store[ResultTIO], src: FilePath, dest: FilePath): ResultT[IO, Unit] =
66+
unsafe.withInputStream(src) { in =>
67+
store.unsafe.withOutputStream(dest) { out =>
68+
Streams.pipe(in, out) }}
69+
70+
def mirrorTo(store: Store[ResultTIO], in: FilePath, out: FilePath): ResultT[IO, Unit] = for {
71+
paths <- list(in)
72+
_ <- paths.traverseU({ source =>
73+
val destination = out </> source.path.replace(in.path + "/", "")
74+
copyTo(store, source, destination)
75+
})
76+
} yield ()
77+
78+
def checksum(path: FilePath, algorithm: ChecksumAlgorithm): ResultT[IO, Checksum] =
79+
withInputStreamValue[Checksum](path)(in => Checksum.stream(in, algorithm))
80+
81+
val bytes: StoreBytes[ResultTIO] = new StoreBytes[ResultTIO] {
82+
def read(path: FilePath): ResultT[IO, ByteVector] =
83+
withInputStreamValue[Array[Byte]](path)(Streams.readBytes(_, 4 * 1024 * 1024)).map(ByteVector.view)
84+
85+
def write(path: FilePath, data: ByteVector): ResultT[IO, Unit] =
86+
unsafe.withOutputStream(path)(Streams.writeBytes(_, data.toArray))
87+
88+
def source(path: FilePath): Process[Task, ByteVector] =
89+
scalaz.stream.io.chunkR(FileSystem.get(conf).open(new Path((base </> path).path))).evalMap(_(1024 * 1024))
90+
91+
def sink(path: FilePath): Sink[Task, ByteVector] =
92+
io.resource(Task.delay(new PipedOutputStream))(out => Task.delay(out.close))(
93+
out => io.resource(Task.delay(new PipedInputStream))(in => Task.delay(in.close))(
94+
in => Task.now((bytes: ByteVector) => Task.delay(out.write(bytes.toArray)))).toTask)
95+
}
96+
97+
val strings: StoreStrings[ResultTIO] = new StoreStrings[ResultTIO] {
98+
def read(path: FilePath, codec: Codec): ResultT[IO, String] =
99+
bytes.read(path).map(b => new String(b.toArray, codec.name))
100+
101+
def write(path: FilePath, data: String, codec: Codec): ResultT[IO, Unit] =
102+
bytes.write(path, ByteVector.view(data.getBytes(codec.name)))
103+
}
104+
105+
val utf8: StoreUtf8[ResultTIO] = new StoreUtf8[ResultTIO] {
106+
def read(path: FilePath): ResultT[IO, String] =
107+
strings.read(path, Codec.UTF8)
108+
109+
def write(path: FilePath, data: String): ResultT[IO, Unit] =
110+
strings.write(path, data, Codec.UTF8)
111+
112+
def source(path: FilePath): Process[Task, String] =
113+
bytes.source(path) |> scalaz.stream.text.utf8Decode
114+
115+
def sink(path: FilePath): Sink[Task, String] =
116+
bytes.sink(path).map(_.contramap(s => ByteVector.view(s.getBytes("UTF-8"))))
117+
}
118+
119+
val lines: StoreLines[ResultTIO] = new StoreLines[ResultTIO] {
120+
def read(path: FilePath, codec: Codec): ResultT[IO, List[String]] =
121+
strings.read(path, codec).map(_.lines.toList)
122+
123+
def write(path: FilePath, data: List[String], codec: Codec): ResultT[IO, Unit] =
124+
strings.write(path, Lists.prepareForFile(data), codec)
125+
126+
def source(path: FilePath, codec: Codec): Process[Task, String] =
127+
scalaz.stream.io.linesR(FileSystem.get(conf).open(new Path((base </> path).path)))(codec)
128+
129+
def sink(path: FilePath, codec: Codec): Sink[Task, String] =
130+
bytes.sink(path).map(_.contramap(s => ByteVector.view(s"$s\n".getBytes(codec.name))))
131+
}
132+
133+
val linesUtf8: StoreLinesUtf8[ResultTIO] = new StoreLinesUtf8[ResultTIO] {
134+
def read(path: FilePath): ResultT[IO, List[String]] =
135+
lines.read(path, Codec.UTF8)
136+
137+
def write(path: FilePath, data: List[String]): ResultT[IO, Unit] =
138+
lines.write(path, data, Codec.UTF8)
139+
140+
def source(path: FilePath): Process[Task, String] =
141+
lines.source(path, Codec.UTF8)
142+
143+
def sink(path: FilePath): Sink[Task, String] =
144+
lines.sink(path, Codec.UTF8)
145+
}
146+
147+
def withInputStreamValue[A](path: FilePath)(f: InputStream => ResultT[IO, A]): ResultT[IO, A] =
148+
hdfs { Hdfs.readWith(new Path((base </> path).path), f) }
149+
150+
val unsafe: StoreUnsafe[ResultTIO] = new StoreUnsafe[ResultTIO] {
151+
def withInputStream(path: FilePath)(f: InputStream => ResultT[IO, Unit]): ResultT[IO, Unit] =
152+
withInputStreamValue[Unit](path)(f)
153+
154+
def withOutputStream(path: FilePath)(f: OutputStream => ResultT[IO, Unit]): ResultT[IO, Unit] =
155+
hdfs { Hdfs.writeWith(new Path((base </> path).path), f) }
156+
}
157+
158+
def hdfs[A](thunk: => Hdfs[A]): ResultT[IO, A] =
159+
thunk.run(conf)
160+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package com.ambiata.ivory.alien.hdfs
2+
3+
import scala.io.Codec
4+
import scalaz.{Store => _, _}, Scalaz._, \&/._, effect.IO
5+
import scodec.bits.ByteVector
6+
import org.specs2._, org.specs2.matcher._
7+
import org.scalacheck.Arbitrary, Arbitrary._
8+
import org.apache.hadoop.conf.Configuration
9+
import org.apache.hadoop.fs.Path
10+
import com.ambiata.saws.core._
11+
import com.ambiata.mundane.control._
12+
import com.ambiata.mundane.io._
13+
import com.ambiata.mundane.testing._, ResultTIOMatcher._
14+
import java.io.{File, FileOutputStream, ByteArrayInputStream}
15+
import java.util.UUID
16+
17+
18+
// FIX Workout how this test can be pulled out and shared with posix/s3/hdfs.
19+
class HdfsStoreSpec extends Specification with ScalaCheck { def is = args.execute(threadsNb = 10) ^ s2"""
20+
Hdfs Store Usage
21+
================
22+
23+
list path $list
24+
filter listed paths $filter
25+
find path in root (thirdish) $find
26+
find path in root (first) $findfirst
27+
find path in root (last) $findlast
28+
29+
exists $exists
30+
not exists $notExists
31+
32+
delete $delete
33+
deleteAll $deleteAll
34+
35+
move $move
36+
move and read $moveRead
37+
copy $copy
38+
copy and read $copyRead
39+
mirror $mirror
40+
41+
moveTo $moveTo
42+
copyTo $copyTo
43+
mirrorTo $mirrorTo
44+
45+
checksum $checksum
46+
47+
read / write bytes $bytes
48+
49+
read / write strings $strings
50+
51+
read / write utf8 strings $utf8Strings
52+
53+
read / write lines $lines
54+
55+
read / write utf8 lines $utf8Lines
56+
57+
"""
58+
59+
implicit val params =
60+
Parameters(workers = 20, minTestsOk = 40, maxSize = 10)
61+
62+
val conf = new Configuration
63+
64+
implicit def HdfsStoreArbitary: Arbitrary[HdfsStore] =
65+
Arbitrary(arbitrary[Int].map(math.abs).map(n =>
66+
HdfsStore(conf, FilePath.root </> "tmp" </> s"HdfsStoreSpec.${UUID.randomUUID}.${n}")))
67+
68+
def list =
69+
prop((store: HdfsStore, paths: Paths) => clean(store, paths) { filepaths =>
70+
store.list(FilePath.root) must beOkValue(filepaths) })
71+
72+
def filter =
73+
prop((store: HdfsStore, paths: Paths) => clean(store, paths) { filepaths =>
74+
val first = filepaths.head
75+
val last = filepaths.last
76+
val expected = if (first == last) List(first) else List(first, last)
77+
store.filter(FilePath.root, x => x == first || x == last) must beOkLike(paths => paths must contain(allOf(expected:_*))) })
78+
79+
def find =
80+
prop((store: HdfsStore, paths: Paths) => paths.entries.length >= 3 ==> { clean(store, paths) { filepaths =>
81+
val third = filepaths.drop(2).head
82+
store.find(FilePath.root, _ == third) must beOkValue(Some(third)) } })
83+
84+
def findfirst =
85+
prop((store: HdfsStore, paths: Paths) => clean(store, paths) { filepaths =>
86+
store.find(FilePath.root, x => x == filepaths.head) must beOkValue(Some(filepaths.head)) })
87+
88+
def findlast =
89+
prop((store: HdfsStore, paths: Paths) => clean(store, paths) { filepaths =>
90+
store.find(FilePath.root, x => x == filepaths.last) must beOkValue(Some(filepaths.last)) })
91+
92+
def exists =
93+
prop((store: HdfsStore, paths: Paths) => clean(store, paths) { filepaths =>
94+
filepaths.traverseU(store.exists) must beOkLike(_.forall(identity)) })
95+
96+
def notExists =
97+
prop((store: HdfsStore, paths: Paths) => store.exists(FilePath.root </> "i really don't exist") must beOkValue(false))
98+
99+
def delete =
100+
prop((store: HdfsStore, paths: Paths) => clean(store, paths) { filepaths =>
101+
val first = filepaths.head
102+
(store.delete(first) >> filepaths.traverseU(store.exists)) must beOkLike(x => !x.head && x.tail.forall(identity)) })
103+
104+
def deleteAll =
105+
prop((store: HdfsStore, paths: Paths) => clean(store, paths) { filepaths =>
106+
(store.deleteAll(FilePath.root) >> filepaths.traverseU(store.exists)) must beOkLike(x => !x.tail.exists(identity)) })
107+
108+
def move =
109+
prop((store: HdfsStore, m: Entry, n: Entry) => clean(store, Paths(m :: Nil)) { _ =>
110+
(store.move(m.full.toFilePath, n.full.toFilePath) >>
111+
store.exists(m.full.toFilePath).zip(store.exists(n.full.toFilePath))) must beOkValue(false -> true) })
112+
113+
def moveRead =
114+
prop((store: HdfsStore, m: Entry, n: Entry) => clean(store, Paths(m :: Nil)) { _ =>
115+
(store.move(m.full.toFilePath, n.full.toFilePath) >>
116+
store.utf8.read(n.full.toFilePath)) must beOkValue(m.value.toString) })
117+
118+
def copy =
119+
prop((store: HdfsStore, m: Entry, n: Entry) => clean(store, Paths(m :: Nil)) { _ =>
120+
(store.copy(m.full.toFilePath, n.full.toFilePath) >>
121+
store.exists(m.full.toFilePath).zip(store.exists(n.full.toFilePath))) must beOkValue(true -> true) })
122+
123+
def copyRead =
124+
prop((store: HdfsStore, m: Entry, n: Entry) => clean(store, Paths(m :: Nil)) { _ =>
125+
(store.copy(m.full.toFilePath, n.full.toFilePath) >>
126+
store.utf8.read(m.full.toFilePath).zip(store.utf8.read(n.full.toFilePath))) must beOkLike({ case (in, out) => in must_== out }) })
127+
128+
def mirror =
129+
prop((store: HdfsStore, paths: Paths) => clean(store, paths) { filepaths =>
130+
store.mirror(FilePath.root, FilePath.root </> "mirror") >> store.list(FilePath.root </> "mirror") must beOkValue(filepaths.map("mirror" </> _)) })
131+
132+
def moveTo =
133+
prop((store: HdfsStore, alternate: HdfsStore, m: Entry, n: Entry) => clean(store, alternate, Paths(m :: Nil)) { _ =>
134+
(store.moveTo(alternate, m.full.toFilePath, n.full.toFilePath) >>
135+
store.exists(m.full.toFilePath).zip(alternate.exists(n.full.toFilePath))) must beOkValue(false -> true) })
136+
137+
def copyTo =
138+
prop((store: HdfsStore, alternate: HdfsStore, m: Entry, n: Entry) => clean(store, alternate, Paths(m :: Nil)) { _ =>
139+
(store.copyTo(alternate, m.full.toFilePath, n.full.toFilePath) >>
140+
store.exists(m.full.toFilePath).zip(alternate.exists(n.full.toFilePath))) must beOkValue(true -> true) })
141+
142+
def mirrorTo =
143+
prop((store: HdfsStore, alternate: HdfsStore, paths: Paths) => clean(store, alternate, paths) { filepaths =>
144+
store.mirrorTo(alternate, FilePath.root, FilePath.root </> "mirror") >> alternate.list(FilePath.root </> "mirror") must beOkValue(filepaths.map("mirror" </> _)) })
145+
146+
def checksum =
147+
prop((store: HdfsStore, m: Entry) => clean(store, Paths(m :: Nil)) { _ =>
148+
store.checksum(m.full.toFilePath, MD5) must beOkValue(Checksum.string(m.value.toString, MD5)) })
149+
150+
def bytes =
151+
prop((store: HdfsStore, m: Entry, bytes: Array[Byte]) => clean(store, Paths(m :: Nil)) { _ =>
152+
(store.bytes.write(m.full.toFilePath, ByteVector(bytes)) >> store.bytes.read(m.full.toFilePath)) must beOkValue(ByteVector(bytes)) })
153+
154+
def strings =
155+
prop((store: HdfsStore, m: Entry, s: String) => clean(store, Paths(m :: Nil)) { _ =>
156+
(store.strings.write(m.full.toFilePath, s, Codec.UTF8) >> store.strings.read(m.full.toFilePath, Codec.UTF8)) must beOkValue(s) })
157+
158+
def utf8Strings =
159+
prop((store: HdfsStore, m: Entry, s: String) => clean(store, Paths(m :: Nil)) { _ =>
160+
(store.utf8.write(m.full.toFilePath, s) >> store.utf8.read(m.full.toFilePath)) must beOkValue(s) })
161+
162+
def lines =
163+
prop((store: HdfsStore, m: Entry, s: List[Int]) => clean(store, Paths(m :: Nil)) { _ =>
164+
(store.lines.write(m.full.toFilePath, s.map(_.toString), Codec.UTF8) >> store.lines.read(m.full.toFilePath, Codec.UTF8)) must beOkValue(s.map(_.toString)) })
165+
166+
def utf8Lines =
167+
prop((store: HdfsStore, m: Entry, s: List[Int]) => clean(store, Paths(m :: Nil)) { _ =>
168+
(store.linesUtf8.write(m.full.toFilePath, s.map(_.toString)) >> store.linesUtf8.read(m.full.toFilePath)) must beOkValue(s.map(_.toString)) })
169+
170+
def files(paths: Paths): List[FilePath] =
171+
paths.entries.map(e => e.full.toFilePath).sortBy(_.path)
172+
173+
def create(store: HdfsStore, paths: Paths): ResultT[IO, Unit] =
174+
paths.entries.traverseU(e =>
175+
Hdfs.writeWith[Unit](new Path((store.base </> e.full).path), out => ResultT.safe[IO, Unit] { out.write( e.value.toString.getBytes("UTF-8")) }).run(conf)).void
176+
177+
def clean[A](store: HdfsStore, paths: Paths)(run: List[FilePath] => A): A = {
178+
create(store, paths).run.unsafePerformIO
179+
try run(files(paths))
180+
finally store.deleteAll(FilePath.root).run.unsafePerformIO
181+
}
182+
183+
def clean[A](store: HdfsStore, alternate: HdfsStore, paths: Paths)(run: List[FilePath] => A): A = {
184+
create(store, paths).run.unsafePerformIO
185+
try run(files(paths))
186+
finally (store.deleteAll(FilePath.root) >> alternate.deleteAll(FilePath.root)).run.unsafePerformIO
187+
}
188+
}

0 commit comments

Comments
 (0)