Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-34785: [C++][Parquet] Parquet Bloom Filter Writer Implementation #37400

Open
wants to merge 69 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
f1c6dc0
Parquet: Implement skeleton for BloomFilter
mapleFU Aug 26, 2023
6ebd6da
tiny fixing
mapleFU Aug 26, 2023
70c9267
tiny update test
mapleFU Aug 26, 2023
48350d8
trying to fix ci
mapleFU Aug 26, 2023
d2a659e
fix lint
mapleFU Aug 26, 2023
41236d8
fix some style problem
mapleFU Aug 26, 2023
8afba81
add file roundtrip test
mapleFU Aug 26, 2023
96c6691
add file roundtrip test
mapleFU Aug 26, 2023
c131341
fix document and ci
mapleFU Aug 26, 2023
220b58e
Update: tiny style fix
mapleFU Aug 26, 2023
ad96c48
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Sep 2, 2023
b756241
Bloom Filter Resolve comments:
mapleFU Sep 2, 2023
f43505b
make space writing a batched writing
mapleFU Sep 2, 2023
3497f4a
update bloom_filter builder interface
mapleFU Sep 2, 2023
fecd0f0
update BloomFilterBuilder arguments
mapleFU Sep 2, 2023
29cc1c1
fix compile
mapleFU Sep 2, 2023
ffbb491
try to satisfy win compiler
mapleFU Sep 2, 2023
4d63428
change all to vector
mapleFU Sep 2, 2023
f689716
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Sep 11, 2023
8e9cb16
resolve comment
mapleFU Sep 11, 2023
7fd47be
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Oct 2, 2023
7c4ff4e
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Oct 10, 2023
feccee9
fix some comment
mapleFU Oct 10, 2023
90245e7
add cached version test
mapleFU Oct 10, 2023
d924e36
cleaning the code for column-props
mapleFU Oct 10, 2023
0340193
optimize get bf
mapleFU Oct 10, 2023
b78eed0
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Mar 16, 2024
23828e1
comment minor fix
mapleFU Mar 16, 2024
6fd57dc
fix comment and add bloom-filter-length
mapleFU Mar 16, 2024
86a8760
Fix a bf bug
mapleFU Mar 16, 2024
f8e724c
trying to use std::map for RowGroup filter
mapleFU Mar 17, 2024
447badf
trying to fix msvc compile
mapleFU Mar 17, 2024
0c1065c
fix comment
mapleFU Mar 17, 2024
5225e08
add test case for 2 row-groups
mapleFU Mar 17, 2024
a779982
add test case for dictionary
mapleFU Mar 17, 2024
4195406
minor update style for file_writer.cc
mapleFU Mar 17, 2024
ed267bd
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Mar 26, 2024
478889d
resolve comment
mapleFU Mar 26, 2024
2992072
fix comment for boolean col, and add test
mapleFU Mar 26, 2024
4852261
trying to add bloom boolean test
mapleFU Mar 26, 2024
add1afd
fix test
mapleFU Mar 26, 2024
f627e30
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Apr 8, 2024
bb8d4a5
fix some comments
mapleFU Apr 8, 2024
ad0f1af
Merge branch 'parquet/support-write-bloom-filter' of github.com:maple…
mapleFU Apr 8, 2024
e1de5bc
fix lint
mapleFU Apr 8, 2024
430742a
switch to anonymous namespace
mapleFU Apr 9, 2024
00f176e
fix comment for column_writer.cc
mapleFU Apr 26, 2024
17f4951
fix comment in other parts
mapleFU Apr 26, 2024
de27ce4
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Apr 26, 2024
259f15b
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Apr 26, 2024
057b542
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Jun 10, 2024
34a4c28
trying to fix the ci build
mapleFU Jun 10, 2024
70e3508
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Jul 3, 2024
c587568
resolve comments
mapleFU Jul 3, 2024
2223423
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Nov 11, 2024
22030db
change the bloom filter from vector to map
mapleFU Nov 11, 2024
e9c550a
fix lint
mapleFU Nov 11, 2024
23fb3fa
fix lint
mapleFU Nov 14, 2024
d892819
fix comment
mapleFU Nov 15, 2024
ef3291d
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Dec 20, 2024
7aee7dd
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Jan 13, 2025
c5b1fb1
Resolve comments
mapleFU Jan 13, 2025
0898466
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Feb 5, 2025
71f5906
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Feb 7, 2025
d57ceea
minor fix
mapleFU Feb 7, 2025
26c2d07
address some comments
mapleFU Feb 7, 2025
d422ffa
Merge branch 'main' into parquet/support-write-bloom-filter
mapleFU Mar 10, 2025
e6bc6e1
Minor fix
mapleFU Mar 10, 2025
dfaf0e8
try to fix lint
mapleFU Mar 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ set(PARQUET_SRCS
arrow/schema_internal.cc
arrow/writer.cc
bloom_filter.cc
bloom_filter_builder.cc
bloom_filter_reader.cc
column_reader.cc
column_scanner.cc
Expand Down Expand Up @@ -367,7 +368,7 @@ install(FILES "${CMAKE_CURRENT_BINARY_DIR}/parquet_version.h"
add_parquet_test(internals-test
SOURCES
bloom_filter_test.cc
bloom_filter_reader_test.cc
bloom_filter_reader_writer_test.cc
properties_test.cc
statistics_test.cc
encoding_test.cc
Expand Down
251 changes: 249 additions & 2 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
#include "parquet/arrow/schema.h"
#include "parquet/arrow/test_util.h"
#include "parquet/arrow/writer.h"
#include "parquet/bloom_filter.h"
#include "parquet/bloom_filter_reader.h"
#include "parquet/column_writer.h"
#include "parquet/file_writer.h"
#include "parquet/page_index.h"
Expand Down Expand Up @@ -5648,7 +5650,7 @@ auto encode_double = [](double value) {

} // namespace

class ParquetPageIndexRoundTripTest : public ::testing::Test {
class TestingWithPageIndex {
public:
void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties,
const std::shared_ptr<::arrow::Table>& table) {
Expand All @@ -5672,6 +5674,13 @@ class ParquetPageIndexRoundTripTest : public ::testing::Test {
ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish());
}

protected:
std::shared_ptr<Buffer> buffer_;
};

class ParquetPageIndexRoundTripTest : public ::testing::Test,
public TestingWithPageIndex {
public:
void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages,
const std::set<int>& expect_columns_without_index = {}) {
auto read_properties = default_arrow_reader_properties();
Expand Down Expand Up @@ -5740,7 +5749,6 @@ class ParquetPageIndexRoundTripTest : public ::testing::Test {
}

protected:
std::shared_ptr<Buffer> buffer_;
std::vector<ColumnIndexObject> column_indexes_;
};

Expand Down Expand Up @@ -5976,5 +5984,244 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
/*null_counts=*/{0}}));
}

class ParquetBloomFilterRoundTripTest : public ::testing::Test,
public TestingWithPageIndex {
public:
void ReadBloomFilters(int expect_num_row_groups,
const std::set<int>& expect_columns_without_filter = {}) {
auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));

auto metadata = reader->metadata();
ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());

auto& bloom_filter_reader = reader->GetBloomFilterReader();

for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
auto row_group_reader = bloom_filter_reader.RowGroup(rg);
ASSERT_NE(row_group_reader, nullptr);

for (int col = 0; col < metadata->num_columns(); ++col) {
bool expect_no_bloom_filter = expect_columns_without_filter.find(col) !=
expect_columns_without_filter.cend();

auto bloom_filter = row_group_reader->GetColumnBloomFilter(col);
if (expect_no_bloom_filter) {
ASSERT_EQ(nullptr, bloom_filter);
} else {
ASSERT_NE(nullptr, bloom_filter);
bloom_filters_.push_back(std::move(bloom_filter));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about changing bloom_filters_ to be an output parameter to function ReadBloomFilters instead of a class member variable?

}
}
}
}

template <typename ArrowType>
void VerifyBloomFilterContains(const BloomFilter* bloom_filter,
const ::arrow::ChunkedArray& chunked_array) {
for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) {
if (value == std::nullopt) {
continue;
}
EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(value.value())));
}
}

template <typename ArrowType>
void VerifyBloomFilterNotContains(const BloomFilter* bloom_filter,
const ::arrow::ChunkedArray& chunked_array) {
for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) {
if (value == std::nullopt) {
continue;
}
EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(value.value())));
}
}

protected:
std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
};

TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The three test cases below share a lot of common logic (with exactly same data). Should we refactor them to eliminate the duplicate?

auto schema = ::arrow::schema(
{::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
BloomFilterOptions options;
options.ndv = 10;
auto writer_properties = WriterProperties::Builder()
.enable_bloom_filter_options(options, "c0")
->enable_bloom_filter_options(options, "c1")
->max_row_group_length(4)
->build();
auto table = ::arrow::TableFromJSON(schema, {R"([
[1, "a"],
[2, "b"],
[3, "c"],
[null, "d"],
[5, null],
[6, "f"]
])"});
WriteFile(writer_properties, table);

ReadBloomFilters(/*expect_num_row_groups=*/2);
ASSERT_EQ(4, bloom_filters_.size());
std::vector<int64_t> row_group_row_count{4, 2};
int64_t current_row = 0;
int64_t bloom_filter_idx = 0; // current index in `bloom_filters_`
for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) {
{
// The bloom filter for same column in another row-group.
int64_t bloom_filter_idx_another_rg =
row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2;
ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
auto col = table->column(0)->Slice(current_row, row_group_row_count[row_group_id]);
VerifyBloomFilterContains<::arrow::Int64Type>(
bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterNotContains<::arrow::Int64Type>(
bloom_filters_[bloom_filter_idx_another_rg].get(), *col);
++bloom_filter_idx;
}
{
int64_t bloom_filter_idx_another_rg =
row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2;
ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
auto col = table->column(1)->Slice(current_row, row_group_row_count[row_group_id]);
VerifyBloomFilterContains<::arrow::StringType>(
bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterNotContains<::arrow::StringType>(
bloom_filters_[bloom_filter_idx_another_rg].get(), *col);
++bloom_filter_idx;
}
current_row += row_group_row_count[row_group_id];
}
}

TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) {
auto origin_schema = ::arrow::schema(
{::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
auto schema = ::arrow::schema(
{::arrow::field("c0", ::arrow::dictionary(::arrow::int64(), ::arrow::int64())),
::arrow::field("c1", ::arrow::dictionary(::arrow::int64(), ::arrow::utf8()))});
bloom_filters_.clear();
BloomFilterOptions options;
options.ndv = 10;
auto writer_properties = WriterProperties::Builder()
.enable_bloom_filter_options(options, "c0")
->enable_bloom_filter_options(options, "c1")
->max_row_group_length(4)
->build();
std::vector<std::string> contents = {R"([
[1, "a"],
[2, "b"],
[3, "c"],
[null, "d"],
[5, null],
[6, "f"]
])"};
auto dict_encoded_table = ::arrow::TableFromJSON(schema, contents);
// using non_dict_table to adapt some interface which doesn't support dictionary.
auto table = ::arrow::TableFromJSON(origin_schema, contents);
WriteFile(writer_properties, dict_encoded_table);

ReadBloomFilters(/*expect_num_row_groups=*/2);
ASSERT_EQ(4, bloom_filters_.size());
std::vector<int64_t> row_group_row_count{4, 2};
int64_t current_row = 0;
int64_t bloom_filter_idx = 0; // current index in `bloom_filters_`
for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) {
{
// The bloom filter for same column in another row-group.
int64_t bloom_filter_idx_another_rg =
row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2;
ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
auto col = table->column(0)->Slice(current_row, row_group_row_count[row_group_id]);
VerifyBloomFilterContains<::arrow::Int64Type>(
bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterNotContains<::arrow::Int64Type>(
bloom_filters_[bloom_filter_idx_another_rg].get(), *col);
++bloom_filter_idx;
}
{
int64_t bloom_filter_idx_another_rg =
row_group_id == 0 ? bloom_filter_idx + 2 : bloom_filter_idx - 2;
ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
auto col = table->column(1)->Slice(current_row, row_group_row_count[row_group_id]);
VerifyBloomFilterContains<::arrow::StringType>(
bloom_filters_[bloom_filter_idx].get(), *col);
VerifyBloomFilterNotContains<::arrow::StringType>(
bloom_filters_[bloom_filter_idx_another_rg].get(), *col);
++bloom_filter_idx;
}
current_row += row_group_row_count[row_group_id];
}
}

TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) {
auto schema = ::arrow::schema(
{::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
BloomFilterOptions options;
options.ndv = 10;
auto writer_properties = WriterProperties::Builder()
.enable_bloom_filter_options(options, "c0")
->disable_bloom_filter("c1")
->max_row_group_length(4)
->build();
auto table = ::arrow::TableFromJSON(schema, {R"([
[1, "a"],
[2, "b"],
[3, "c"],
[null, "d"],
[5, null],
[6, "f"]
])"});
WriteFile(writer_properties, table);

ReadBloomFilters(/*expect_num_row_groups=*/2, /*expect_columns_without_filter=*/{1});
ASSERT_EQ(2, bloom_filters_.size());
std::vector<int64_t> row_group_row_count{4, 2};
int64_t current_row = 0;
int64_t bloom_filter_idx = 0; // current index in `bloom_filters_`
for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) {
{
ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
auto col = table->column(0)->Slice(current_row, row_group_row_count[row_group_id]);
VerifyBloomFilterContains<::arrow::Int64Type>(
bloom_filters_[bloom_filter_idx].get(), *col);
++bloom_filter_idx;
}
current_row += row_group_row_count[row_group_id];
}
}

TEST_F(ParquetBloomFilterRoundTripTest, ThrowForBoolean) {
auto schema = ::arrow::schema({::arrow::field("boolean_col", ::arrow::boolean())});
BloomFilterOptions options;
options.ndv = 10;
auto writer_properties = WriterProperties::Builder()
.enable_bloom_filter_options(options, "boolean_col")
->max_row_group_length(4)
->build();
auto table = ::arrow::TableFromJSON(schema, {R"([
[true],
[null],
[false]
])"});
std::shared_ptr<SchemaDescriptor> parquet_schema;
auto arrow_writer_properties = default_arrow_writer_properties();
ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
*arrow_writer_properties, &parquet_schema));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());

// Write table to buffer.
auto sink = CreateOutputStream();
auto pool = ::arrow::default_memory_pool();
auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
std::unique_ptr<FileWriter> arrow_writer;
ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
&arrow_writer));
auto s = arrow_writer->WriteTable(*table);
EXPECT_TRUE(s.IsIOError());
EXPECT_THAT(s.message(),
::testing::HasSubstr("BloomFilterBuilder does not support boolean type"));
}

} // namespace arrow
} // namespace parquet
28 changes: 28 additions & 0 deletions cpp/src/parquet/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,34 @@ class PARQUET_EXPORT BloomFilter {
/// @return hash result.
virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;

// Variant of const reference argument to facilitate template

/// Compute hash for ByteArray value by using its plain encoding result.
///
/// @param value the value to hash.
uint64_t Hash(const ByteArray& value) const { return Hash(&value); }

/// Compute hash for fixed byte array value by using its plain encoding result.
///
/// @param value the value to hash.
/// @param type_len the value length.
uint64_t Hash(const FLBA& value, uint32_t type_len) const {
return Hash(&value, type_len);
}

/// Compute hash for Int96 value by using its plain encoding result.
///
/// @param value the value to hash.
uint64_t Hash(const Int96& value) const { return Hash(&value); }

/// Compute hash for std::string_view value by using its plain encoding result.
///
/// @param value the value to hash.
uint64_t Hash(std::string_view value) const {
ByteArray ba(value);
return Hash(&ba);
}

/// Batch compute hashes for 32 bits values by using its plain encoding result.
///
/// @param values values a pointer to the values to hash.
Expand Down
Loading
Loading