Skip to content

Commit

Permalink
Merge pull request #529 from samply/rocksdb-tests
Browse files Browse the repository at this point in the history
Add RocksDB Tests
  • Loading branch information
alexanderkiel authored Nov 18, 2021
2 parents 99b2f09 + cf52bd1 commit 9702223
Show file tree
Hide file tree
Showing 15 changed files with 1,398 additions and 172 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ jobs:
- page-store-cassandra
- rest-api
- rest-util
- rocksdb
- scheduler
- search-param-registry
- thread-pool-executor-collector
Expand Down Expand Up @@ -129,6 +130,7 @@ jobs:
- page-store-cassandra
- rest-api
- rest-util
- rocksdb
- scheduler
- search-param-registry
- thread-pool-executor-collector
Expand Down
4 changes: 2 additions & 2 deletions dev/blaze/dev/rocksdb.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@


(comment
(rocksdb/compact-range
(rocksdb/compact-range!
(get system :blaze.db.kv/rocksdb)
:resource-as-of-index
true
1)

(rocksdb/compact-range
(rocksdb/compact-range!
(get system :blaze.db.kv/rocksdb)
:search-param-value-index
true
Expand Down
6 changes: 5 additions & 1 deletion modules/rocksdb/.clj-kondo/config.edn
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{:linters
{:lint-as
{blaze.db.kv.rocksdb-test/with-system-data clojure.core/with-open
blaze.test-util/with-system clojure.core/with-open}

:linters
{:unsorted-required-namespaces
{:level :error}
:single-key-in
Expand Down
6 changes: 3 additions & 3 deletions modules/rocksdb/Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
lint:
clj-kondo --lint src deps.edn
clj-kondo --lint src test deps.edn

test:
true
clojure -M:test --profile :ci

test-coverage:
true
clojure -M:test-coverage

clean:
rm -rf .clj-kondo/.cache .cpcache target
Expand Down
28 changes: 27 additions & 1 deletion modules/rocksdb/deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,30 @@
{:local/root "../module-base"}

org.rocksdb/rocksdbjni
{:mvn/version "6.25.3"}}}
{:mvn/version "6.25.3"}}

:aliases
{:test
{:extra-paths ["test"]

:extra-deps
{blaze/test-util
{:local/root "../test-util"}

lambdaisland/kaocha
{:mvn/version "1.60.945"}}

:main-opts ["-m" "kaocha.runner"]}

:test-coverage
{:extra-paths ["test"]

:extra-deps
{blaze/test-util
{:local/root "../test-util"}

cloverage/cloverage
{:mvn/version "1.2.2"}}

:main-opts ["-m" "cloverage.coverage" "--codecov" "-p" "src" "-s" "test"
"-e" ".+spec"]}}}
177 changes: 31 additions & 146 deletions modules/rocksdb/src/blaze/db/kv/rocksdb.clj
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
(ns blaze.db.kv.rocksdb
(:require
[blaze.anomaly :as ba :refer [throw-anom]]
[blaze.db.kv :as kv]
[blaze.db.kv.rocksdb.impl :as impl]
[blaze.db.kv.rocksdb.metrics :as metrics]
[blaze.db.kv.rocksdb.spec]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[taoensso.timbre :as log])
(:import
[clojure.lang Indexed]
[java.lang AutoCloseable]
[java.io Closeable]
[java.nio ByteBuffer]
[java.util ArrayList EnumSet]
[org.rocksdb
RocksDB RocksIterator WriteOptions WriteBatch Options ColumnFamilyHandle
DBOptions ColumnFamilyDescriptor CompressionType ColumnFamilyOptions
BlockBasedTableConfig Statistics LRUCache BloomFilter CompactRangeOptions
Snapshot ReadOptions BuiltinComparator StatsLevel HistogramType]))
DBOptions Statistics LRUCache CompactRangeOptions Snapshot ReadOptions
StatsLevel HistogramType]))


(set! *warn-on-reflection* true)
Expand Down Expand Up @@ -60,36 +58,27 @@
(-value [_ buf]
(.value i buf))

Closeable
AutoCloseable
(close [_]
(.close i)))


(defn- column-family-not-found-msg [column-family]
(format "column family `%s` not found" (name column-family)))


(defn- get-cfh ^ColumnFamilyHandle [cfhs column-family]
(or (cfhs column-family)
(throw-anom (ba/not-found (column-family-not-found-msg column-family)))))


(deftype RocksKvSnapshot
[^RocksDB db ^Snapshot snapshot ^ReadOptions read-opts cfhs]
kv/KvSnapshot
(-new-iterator [_]
(->RocksKvIterator (.newIterator db read-opts)))

(-new-iterator [_ column-family]
(->RocksKvIterator (.newIterator db (get-cfh cfhs column-family) read-opts)))
(->RocksKvIterator (.newIterator db (impl/get-cfh cfhs column-family) read-opts)))

(-snapshot-get [_ k]
(.get db read-opts ^bytes k))

(-snapshot-get [_ column-family k]
(.get db (get-cfh cfhs column-family) read-opts ^bytes k))
(.get db (impl/get-cfh cfhs column-family) read-opts ^bytes k))

Closeable
AutoCloseable
(close [_]
(.close read-opts)
(.releaseSnapshot db snapshot)))
Expand All @@ -116,7 +105,7 @@
(.get db k))

(-get [_ column-family k]
(.get db ^ColumnFamilyHandle (get-cfh cfhs column-family) ^bytes k))
(.get db (impl/get-cfh cfhs column-family) ^bytes k))

(-multi-get [_ keys]
(loop [[k & ks] keys
Expand All @@ -130,69 +119,47 @@

(-put [_ entries]
(with-open [wb (WriteBatch.)]
(reduce
(fn [_ ^Indexed entry]
(let [column-family (.nth entry 0)]
(if (keyword? column-family)
(.put wb (get-cfh cfhs column-family) ^bytes (.nth entry 1)
^bytes (.nth entry 2))
(.put wb ^bytes column-family ^bytes (.nth entry 1)))))
nil
entries)
(impl/put-wb! cfhs wb entries)
(.write db write-opts wb)))

(-put [_ key value]
(.put db key value))

(-delete [_ ks]
(-delete [_ keys]
(with-open [wb (WriteBatch.)]
(doseq [k ks]
(if (vector? k)
(let [[column-family k] k]
(.delete wb (get-cfh cfhs column-family) k))
(.delete wb k)))
(impl/delete-wb! wb keys)
(.write db write-opts wb)))

(-write [_ entries]
(with-open [wb (WriteBatch.)]
(doseq [[op column-family k v] entries]
(if (keyword? column-family)
(case op
:put (.put wb (get-cfh cfhs column-family) ^bytes k ^bytes v)
:merge (.merge wb (get-cfh cfhs column-family) ^bytes k ^bytes v)
:delete (.delete wb (get-cfh cfhs column-family) ^bytes k))
(case op
:put (.put wb ^bytes column-family ^bytes k)
:merge (.merge wb ^bytes column-family ^bytes k)
:delete (.delete wb ^bytes column-family))))
(impl/write-wb! cfhs wb entries)
(.write db write-opts wb)))

Rocks
(-get-property [_ name]
(.getProperty db name))

(-get-property [_ column-family name]
(.getProperty db (get-cfh cfhs column-family) name))
(.getProperty db (impl/get-cfh cfhs column-family) name))

Closeable
AutoCloseable
(close [_]
(.close db)
(.close opts)
(.close write-opts)))


(defn compact-range
(defn compact-range!
"Range compaction of database.
Note: After the database has been compacted, all data will have been pushed
down to the last level containing any data."
([store]
(.compactRange ^RocksDB (.db ^RocksKvStore store)))
([^RocksKvStore store column-family change-level target-level]
(assert (get (.cfhs store) column-family))
([store column-family change-level target-level]
(.compactRange
^RocksDB (.db store)
^ColumnFamilyHandle (get (.cfhs store) column-family)
^RocksDB (.db ^RocksKvStore store)
(impl/get-cfh (.cfhs ^RocksKvStore store) column-family)
nil
nil
(doto (CompactRangeOptions.)
Expand All @@ -207,93 +174,6 @@
column-family-handles))


(defn- column-family-descriptor
{:arglists '([[key]] [block-cache [key opts]])}
([[key]]
(ColumnFamilyDescriptor.
(.getBytes (name key))))
([block-cache
[key {:keys [write-buffer-size-in-mb
max-write-buffer-number
level0-file-num-compaction-trigger
min-write-buffer-number-to-merge
max-bytes-for-level-base-in-mb
target-file-size-base-in-mb
block-size
bloom-filter?
optimize-filters-for-hits?
reverse-comparator?]
:or {write-buffer-size-in-mb 64
max-write-buffer-number 2
level0-file-num-compaction-trigger 4
min-write-buffer-number-to-merge 1
max-bytes-for-level-base-in-mb 256
target-file-size-base-in-mb 64
block-size (bit-shift-left 4 10)
bloom-filter? false
optimize-filters-for-hits? false
reverse-comparator? false}}]]
(ColumnFamilyDescriptor.
(.getBytes (name key))
(cond->
(doto (ColumnFamilyOptions.)
(.setLevelCompactionDynamicLevelBytes true)
(.setCompressionType CompressionType/LZ4_COMPRESSION)
(.setBottommostCompressionType CompressionType/ZSTD_COMPRESSION)
(.setWriteBufferSize (bit-shift-left ^long write-buffer-size-in-mb 20))
(.setMaxWriteBufferNumber ^long max-write-buffer-number)
(.setMaxBytesForLevelBase (bit-shift-left ^long max-bytes-for-level-base-in-mb 20))
(.setLevel0FileNumCompactionTrigger ^long level0-file-num-compaction-trigger)
(.setMinWriteBufferNumberToMerge ^long min-write-buffer-number-to-merge)
(.setTargetFileSizeBase (bit-shift-left ^long target-file-size-base-in-mb 20))
(.setTableFormatConfig
(cond->
(doto (BlockBasedTableConfig.)
(.setVerifyCompression false)
(.setCacheIndexAndFilterBlocks true)
(.setPinL0FilterAndIndexBlocksInCache true)
(.setBlockSize block-size)
(.setBlockCache block-cache))
bloom-filter?
(.setFilterPolicy (BloomFilter. 10 false)))))
optimize-filters-for-hits?
(.setOptimizeFiltersForHits true)
reverse-comparator?
(.setComparator BuiltinComparator/REVERSE_BYTEWISE_COMPARATOR)))))


(defn init-rocksdb-kv-store
[dir
block-cache
stats
{:keys [sync?
disable-wal?
max-background-jobs
compaction-readahead-size]
:or {max-background-jobs 2
compaction-readahead-size 0}}
column-families]
(let [opts (doto (DBOptions.)
(.setStatistics ^Statistics stats)
(.setMaxBackgroundJobs ^long max-background-jobs)
(.setCompactionReadaheadSize ^long compaction-readahead-size)
(.setEnablePipelinedWrite true)
(.setCreateIfMissing true)
(.setCreateMissingColumnFamilies true))
cfds (map (partial column-family-descriptor block-cache) column-families)
cfhs (ArrayList.)
db (try
(RocksDB/open opts dir cfds cfhs)
(finally (.close opts)))
write-opts (WriteOptions.)]

(when sync?
(.setSync write-opts true))
(when disable-wal?
(.setDisableWAL write-opts true))
(->RocksKvStore db opts write-opts (index-column-family-handles cfhs))))


(defmethod ig/init-key ::block-cache
[_ {:keys [size-in-mb] :or {size-in-mb 128}}]
(log/info (format "Init RocksDB block cache of %d MB" size-in-mb))
Expand All @@ -307,12 +187,8 @@
(.close ^AutoCloseable cache))


(s/def ::dir
string?)


(defmethod ig/pre-init-spec ::kv/rocksdb [_]
(s/keys :req-un [::dir]))
(s/keys :req-un [::dir ::block-cache ::stats]))


(defn- init-log-msg [dir opts]
Expand All @@ -323,13 +199,22 @@
(defmethod ig/init-key ::kv/rocksdb
[_ {:keys [dir block-cache stats opts column-families]}]
(log/info (init-log-msg dir opts))
(init-rocksdb-kv-store dir block-cache stats opts (merge {:default nil} column-families)))
(let [^DBOptions db-options (impl/db-options stats opts)
cfds (map
(partial impl/column-family-descriptor block-cache)
(merge {:default nil} column-families))
cfhs (ArrayList.)
db (try
(RocksDB/open db-options dir cfds cfhs)
(finally (.close db-options)))]
(->RocksKvStore db db-options (impl/write-options opts)
(index-column-family-handles cfhs))))


(defmethod ig/halt-key! ::kv/rocksdb
[_ store]
(log/info "Close RocksDB key-value store")
(.close ^Closeable store))
(.close ^AutoCloseable store))


(defmethod ig/init-key ::stats
Expand Down
Loading

0 comments on commit 9702223

Please sign in to comment.