diff --git a/contrib/ondisk.py b/contrib/ondisk.py index 26a95f44f5..81ec71941c 100644 --- a/contrib/ondisk.py +++ b/contrib/ondisk.py @@ -11,7 +11,7 @@ def merge_ondisk( - trained_index: faiss.Index, shard_fnames: List[str], ivfdata_fname: str + trained_index: faiss.Index, shard_fnames: List[str], ivfdata_fname: str, shift_ids=False ) -> None: """Add the contents of the indexes stored in shard_fnames into the index trained_index. The on-disk data is stored in ivfdata_fname""" @@ -51,7 +51,7 @@ def merge_ondisk( ivf_vector.push_back(ivf) LOG.info("merge %d inverted lists " % ivf_vector.size()) - ntotal = invlists.merge_from(ivf_vector.data(), ivf_vector.size()) + ntotal = invlists.merge_from_multiple(ivf_vector.data(), ivf_vector.size(), shift_ids) # now replace the inverted lists in the output index index.ntotal = index_ivf.ntotal = ntotal diff --git a/faiss/invlists/OnDiskInvertedLists.cpp b/faiss/invlists/OnDiskInvertedLists.cpp index 3017d164c6..dc17fe67f6 100644 --- a/faiss/invlists/OnDiskInvertedLists.cpp +++ b/faiss/invlists/OnDiskInvertedLists.cpp @@ -565,15 +565,16 @@ void OnDiskInvertedLists::free_slot(size_t offset, size_t capacity) { /***************************************** * Compact form *****************************************/ - -size_t OnDiskInvertedLists::merge_from( +size_t OnDiskInvertedLists::merge_from_multiple( const InvertedLists** ils, int n_il, + bool shift_ids, bool verbose) { FAISS_THROW_IF_NOT_MSG( totsize == 0, "works only on an empty InvertedLists"); std::vector sizes(nlist); + std::vector shift_id_offsets(n_il); for (int i = 0; i < n_il; i++) { const InvertedLists* il = ils[i]; FAISS_THROW_IF_NOT(il->nlist == nlist && il->code_size == code_size); @@ -581,6 +582,10 @@ size_t OnDiskInvertedLists::merge_from( for (size_t j = 0; j < nlist; j++) { sizes[j] += il->list_size(j); } + + size_t il_totsize = il->compute_ntotal(); + shift_id_offsets[i] = + (shift_ids && i > 0) ? shift_id_offsets[i - 1] + il_totsize : 0; } size_t cums = 0; @@ -605,11 +610,21 @@ size_t OnDiskInvertedLists::merge_from( const InvertedLists* il = ils[i]; size_t n_entry = il->list_size(j); l.size += n_entry; + ScopedIds scope_ids(il, j); + const idx_t* scope_ids_data = scope_ids.get(); + std::vector new_ids; + if (shift_ids) { + new_ids.resize(n_entry); + for (size_t k = 0; k < n_entry; k++) { + new_ids[k] = scope_ids[k] + shift_id_offsets[i]; + } + scope_ids_data = new_ids.data(); + } update_entries( j, l.size - n_entry, n_entry, - ScopedIds(il, j).get(), + scope_ids_data, ScopedCodes(il, j).get()); } assert(l.size == l.capacity); @@ -638,7 +653,7 @@ size_t OnDiskInvertedLists::merge_from( size_t OnDiskInvertedLists::merge_from_1( const InvertedLists* ils, bool verbose) { - return merge_from(&ils, 1, verbose); + return merge_from_multiple(&ils, 1, verbose); } void OnDiskInvertedLists::crop_invlists(size_t l0, size_t l1) { diff --git a/faiss/invlists/OnDiskInvertedLists.h b/faiss/invlists/OnDiskInvertedLists.h index 98cb653a7a..01c7f3481e 100644 --- a/faiss/invlists/OnDiskInvertedLists.h +++ b/faiss/invlists/OnDiskInvertedLists.h @@ -101,9 +101,10 @@ struct OnDiskInvertedLists : InvertedLists { // copy all inverted lists into *this, in compact form (without // allocating slots) - size_t merge_from( + size_t merge_from_multiple( const InvertedLists** ils, int n_il, + bool shift_ids = false, bool verbose = false); /// same as merge_from for a single invlist diff --git a/tests/test_contrib.py b/tests/test_contrib.py index 84b90a4e5f..0e7cbbfb03 100644 --- a/tests/test_contrib.py +++ b/tests/test_contrib.py @@ -9,6 +9,7 @@ import platform import os import random +import shutil import tempfile from faiss.contrib import datasets @@ -17,15 +18,13 @@ from faiss.contrib import ivf_tools from faiss.contrib import clustering from faiss.contrib import big_batch_search +from faiss.contrib.ondisk import merge_ondisk from common_faiss_tests import get_dataset_2 -try: - from faiss.contrib.exhaustive_search import \ - knn_ground_truth, knn, range_ground_truth, \ - range_search_max_results, exponential_query_iterator -except: - pass # Submodule import broken in python 2. - +from faiss.contrib.exhaustive_search import \ + knn_ground_truth, knn, range_ground_truth, \ + range_search_max_results, exponential_query_iterator +from contextlib import contextmanager @unittest.skipIf(platform.python_version_tuple()[0] < '3', 'Submodule import broken in python 2.') @@ -674,3 +673,63 @@ def test_code_set(self): np.testing.assert_equal( np.sort(np.unique(codes, axis=0), axis=None), np.sort(codes[inserted], axis=None)) + + +@unittest.skipIf(platform.system() == 'Windows', + 'OnDiskInvertedLists is unsupported on Windows.') +class TestMerge(unittest.TestCase): + @contextmanager + def temp_directory(self): + temp_dir = tempfile.mkdtemp() + try: + yield temp_dir + finally: + shutil.rmtree(temp_dir) + + def do_test_ondisk_merge(self, shift_ids=False): + with self.temp_directory() as tmpdir: + # only train and add index to disk without adding elements. + # this will create empty inverted lists. + ds = datasets.SyntheticDataset(32, 2000, 200, 20) + index = faiss.index_factory(ds.d, "IVF32,Flat") + index.train(ds.get_train()) + faiss.write_index(index, tmpdir + "/trained.index") + + # create 4 shards and add elements to them + ns = 4 # number of shards + + for bno in range(ns): + index = faiss.read_index(tmpdir + "/trained.index") + i0, i1 = int(bno * ds.nb / ns), int((bno + 1) * ds.nb / ns) + if shift_ids: + index.add_with_ids(ds.xb[i0:i1], np.arange(0, ds.nb / ns)) + else: + index.add_with_ids(ds.xb[i0:i1], np.arange(i0, i1)) + faiss.write_index(index, tmpdir + "/block_%d.index" % bno) + + # construct the output index and merge them on disk + index = faiss.read_index(tmpdir + "/trained.index") + block_fnames = [tmpdir + "/block_%d.index" % bno for bno in range(4)] + + merge_ondisk( + index, block_fnames, tmpdir + "/merged_index.ivfdata", shift_ids + ) + faiss.write_index(index, tmpdir + "/populated.index") + + # perform a search from index on disk + index = faiss.read_index(tmpdir + "/populated.index") + index.nprobe = 5 + D, I = index.search(ds.xq, 5) + + # ground-truth + gtI = ds.get_groundtruth(5) + + recall_at_1 = (I[:, :1] == gtI[:, :1]).sum() / float(ds.xq.shape[0]) + self.assertGreaterEqual(recall_at_1, 0.5) + + def test_ondisk_merge(self): + self.do_test_ondisk_merge() + + def test_ondisk_merge_with_shift_ids(self): + # verified that recall is same for test_ondisk_merge and + self.do_test_ondisk_merge(True) diff --git a/tests/test_merge.cpp b/tests/test_merge.cpp index 5a1d08cfba..edbe2a03a6 100644 --- a/tests/test_merge.cpp +++ b/tests/test_merge.cpp @@ -32,6 +32,7 @@ size_t nq = 100; int nindex = 4; int k = 10; int nlist = 40; +int shard_size = nb / nindex; struct CommonData { std::vector database; @@ -100,7 +101,7 @@ int compare_merged( auto il = new faiss::OnDiskInvertedLists( index0->nlist, index0->code_size, filename.c_str()); - il->merge_from(lists.data(), lists.size()); + il->merge_from_multiple(lists.data(), lists.size(), shift_ids); index0->replace_invlists(il, true); index0->ntotal = ntotal; @@ -110,11 +111,14 @@ int compare_merged( nq, cd.queries.data(), k, newD.data(), newI.data()); size_t ndiff = 0; + bool adjust_ids = shift_ids && !standard_merge; for (size_t i = 0; i < k * nq; i++) { - if (refI[i] != newI[i]) { + idx_t new_id = adjust_ids ? refI[i] % shard_size : refI[i]; + if (refI[i] != new_id) { ndiff++; } } + return ndiff; } @@ -220,3 +224,23 @@ TEST(MERGE, merge_flat_ondisk_2) { int ndiff = compare_merged(&index_shards, false, false); EXPECT_GE(0, ndiff); } + +// now use ondisk specific merge and use shift ids +TEST(MERGE, merge_flat_ondisk_3) { + faiss::IndexShards index_shards(d, false, false); + index_shards.own_indices = true; + + std::vector ids; + for (int i = 0; i < nb; ++i) { + int id = i % shard_size; + ids.push_back(id); + } + for (int i = 0; i < nindex; i++) { + index_shards.add_shard( + new faiss::IndexIVFFlat(&cd.quantizer, d, nlist)); + } + EXPECT_TRUE(index_shards.is_trained); + index_shards.add_with_ids(nb, cd.database.data(), ids.data()); + int ndiff = compare_merged(&index_shards, true, false); + EXPECT_GE(0, ndiff); +}