Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support of skip_ids in merge_from_multiple function of OnDiskInvertedLists #3327

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions contrib/ondisk.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


def merge_ondisk(
trained_index: faiss.Index, shard_fnames: List[str], ivfdata_fname: str
trained_index: faiss.Index, shard_fnames: List[str], ivfdata_fname: str, shift_ids=False
) -> None:
"""Add the contents of the indexes stored in shard_fnames into the index
trained_index. The on-disk data is stored in ivfdata_fname"""
Expand Down Expand Up @@ -51,7 +51,7 @@ def merge_ondisk(
ivf_vector.push_back(ivf)

LOG.info("merge %d inverted lists " % ivf_vector.size())
ntotal = invlists.merge_from(ivf_vector.data(), ivf_vector.size())
ntotal = invlists.merge_from_multiple(ivf_vector.data(), ivf_vector.size(), shift_ids)

# now replace the inverted lists in the output index
index.ntotal = index_ivf.ntotal = ntotal
Expand Down
23 changes: 19 additions & 4 deletions faiss/invlists/OnDiskInvertedLists.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,22 +565,27 @@ void OnDiskInvertedLists::free_slot(size_t offset, size_t capacity) {
/*****************************************
* Compact form
*****************************************/

size_t OnDiskInvertedLists::merge_from(
size_t OnDiskInvertedLists::merge_from_multiple(
const InvertedLists** ils,
int n_il,
bool shift_ids,
bool verbose) {
FAISS_THROW_IF_NOT_MSG(
totsize == 0, "works only on an empty InvertedLists");

std::vector<size_t> sizes(nlist);
std::vector<size_t> shift_id_offsets(n_il);
for (int i = 0; i < n_il; i++) {
const InvertedLists* il = ils[i];
FAISS_THROW_IF_NOT(il->nlist == nlist && il->code_size == code_size);

for (size_t j = 0; j < nlist; j++) {
sizes[j] += il->list_size(j);
}

size_t il_totsize = il->compute_ntotal();
shift_id_offsets[i] =
(shift_ids && i > 0) ? shift_id_offsets[i - 1] + il_totsize : 0;
}

size_t cums = 0;
Expand All @@ -605,11 +610,21 @@ size_t OnDiskInvertedLists::merge_from(
const InvertedLists* il = ils[i];
size_t n_entry = il->list_size(j);
l.size += n_entry;
ScopedIds scope_ids(il, j);
const idx_t* scope_ids_data = scope_ids.get();
std::vector<idx_t> new_ids;
if (shift_ids) {
new_ids.resize(n_entry);
for (size_t k = 0; k < n_entry; k++) {
new_ids[k] = scope_ids[k] + shift_id_offsets[i];
}
scope_ids_data = new_ids.data();
}
update_entries(
j,
l.size - n_entry,
n_entry,
ScopedIds(il, j).get(),
scope_ids_data,
ScopedCodes(il, j).get());
}
assert(l.size == l.capacity);
Expand Down Expand Up @@ -638,7 +653,7 @@ size_t OnDiskInvertedLists::merge_from(
size_t OnDiskInvertedLists::merge_from_1(
const InvertedLists* ils,
bool verbose) {
return merge_from(&ils, 1, verbose);
return merge_from_multiple(&ils, 1, verbose);
}

void OnDiskInvertedLists::crop_invlists(size_t l0, size_t l1) {
Expand Down
3 changes: 2 additions & 1 deletion faiss/invlists/OnDiskInvertedLists.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ struct OnDiskInvertedLists : InvertedLists {

// copy all inverted lists into *this, in compact form (without
// allocating slots)
size_t merge_from(
size_t merge_from_multiple(
const InvertedLists** ils,
int n_il,
bool shift_ids = false,
bool verbose = false);

/// same as merge_from for a single invlist
Expand Down
73 changes: 66 additions & 7 deletions tests/test_contrib.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import platform
import os
import random
import shutil
import tempfile

from faiss.contrib import datasets
Expand All @@ -17,15 +18,13 @@
from faiss.contrib import ivf_tools
from faiss.contrib import clustering
from faiss.contrib import big_batch_search
from faiss.contrib.ondisk import merge_ondisk

from common_faiss_tests import get_dataset_2
try:
from faiss.contrib.exhaustive_search import \
knn_ground_truth, knn, range_ground_truth, \
range_search_max_results, exponential_query_iterator
except:
pass # Submodule import broken in python 2.

from faiss.contrib.exhaustive_search import \
knn_ground_truth, knn, range_ground_truth, \
range_search_max_results, exponential_query_iterator
from contextlib import contextmanager

@unittest.skipIf(platform.python_version_tuple()[0] < '3',
'Submodule import broken in python 2.')
Expand Down Expand Up @@ -674,3 +673,63 @@ def test_code_set(self):
np.testing.assert_equal(
np.sort(np.unique(codes, axis=0), axis=None),
np.sort(codes[inserted], axis=None))


@unittest.skipIf(platform.system() == 'Windows',
'OnDiskInvertedLists is unsupported on Windows.')
class TestMerge(unittest.TestCase):
@contextmanager
def temp_directory(self):
temp_dir = tempfile.mkdtemp()
try:
yield temp_dir
finally:
shutil.rmtree(temp_dir)

def do_test_ondisk_merge(self, shift_ids=False):
with self.temp_directory() as tmpdir:
# only train and add index to disk without adding elements.
# this will create empty inverted lists.
ds = datasets.SyntheticDataset(32, 2000, 200, 20)
index = faiss.index_factory(ds.d, "IVF32,Flat")
index.train(ds.get_train())
faiss.write_index(index, tmpdir + "/trained.index")

# create 4 shards and add elements to them
ns = 4 # number of shards

for bno in range(ns):
index = faiss.read_index(tmpdir + "/trained.index")
i0, i1 = int(bno * ds.nb / ns), int((bno + 1) * ds.nb / ns)
if shift_ids:
index.add_with_ids(ds.xb[i0:i1], np.arange(0, ds.nb / ns))
else:
index.add_with_ids(ds.xb[i0:i1], np.arange(i0, i1))
faiss.write_index(index, tmpdir + "/block_%d.index" % bno)

# construct the output index and merge them on disk
index = faiss.read_index(tmpdir + "/trained.index")
block_fnames = [tmpdir + "/block_%d.index" % bno for bno in range(4)]

merge_ondisk(
index, block_fnames, tmpdir + "/merged_index.ivfdata", shift_ids
)
faiss.write_index(index, tmpdir + "/populated.index")

# perform a search from index on disk
index = faiss.read_index(tmpdir + "/populated.index")
index.nprobe = 5
D, I = index.search(ds.xq, 5)

# ground-truth
gtI = ds.get_groundtruth(5)

recall_at_1 = (I[:, :1] == gtI[:, :1]).sum() / float(ds.xq.shape[0])
self.assertGreaterEqual(recall_at_1, 0.5)

def test_ondisk_merge(self):
self.do_test_ondisk_merge()

def test_ondisk_merge_with_shift_ids(self):
# verified that recall is same for test_ondisk_merge and
self.do_test_ondisk_merge(True)
28 changes: 26 additions & 2 deletions tests/test_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ size_t nq = 100;
int nindex = 4;
int k = 10;
int nlist = 40;
int shard_size = nb / nindex;

struct CommonData {
std::vector<float> database;
Expand Down Expand Up @@ -100,7 +101,7 @@ int compare_merged(
auto il = new faiss::OnDiskInvertedLists(
index0->nlist, index0->code_size, filename.c_str());

il->merge_from(lists.data(), lists.size());
il->merge_from_multiple(lists.data(), lists.size(), shift_ids);

index0->replace_invlists(il, true);
index0->ntotal = ntotal;
Expand All @@ -110,11 +111,14 @@ int compare_merged(
nq, cd.queries.data(), k, newD.data(), newI.data());

size_t ndiff = 0;
bool adjust_ids = shift_ids && !standard_merge;
for (size_t i = 0; i < k * nq; i++) {
if (refI[i] != newI[i]) {
idx_t new_id = adjust_ids ? refI[i] % shard_size : refI[i];
if (refI[i] != new_id) {
ndiff++;
}
}

return ndiff;
}

Expand Down Expand Up @@ -220,3 +224,23 @@ TEST(MERGE, merge_flat_ondisk_2) {
int ndiff = compare_merged(&index_shards, false, false);
EXPECT_GE(0, ndiff);
}

// now use ondisk specific merge and use shift ids
TEST(MERGE, merge_flat_ondisk_3) {
faiss::IndexShards index_shards(d, false, false);
index_shards.own_indices = true;

std::vector<idx_t> ids;
for (int i = 0; i < nb; ++i) {
int id = i % shard_size;
ids.push_back(id);
}
for (int i = 0; i < nindex; i++) {
index_shards.add_shard(
new faiss::IndexIVFFlat(&cd.quantizer, d, nlist));
}
EXPECT_TRUE(index_shards.is_trained);
index_shards.add_with_ids(nb, cd.database.data(), ids.data());
int ndiff = compare_merged(&index_shards, true, false);
EXPECT_GE(0, ndiff);
}