-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-14182: [C++][Compute] Hash Join performance improvement v2 #13493
Conversation
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW Opening JIRAs ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename pull request title in the following format?
or
See also: |
d79c6c1
to
01a41a8
Compare
19ae4bb
to
1659cdb
Compare
0534dc1
to
e422dbf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created some tests on SwissTableWithKeys
today and have a few initial questions.
|
||
Status Init(int64_t hardware_flags, MemoryPool* pool); | ||
|
||
void InitCallbacks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be called separately from Init
? Can Init
call InitCallbacks
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two ways SwissTableWithKeys is used:
a) There is per partition object used by one thread at a time, that grows from zero rows as new keys are inserted.
b) There is a shared, read-only copy created during the process of merging per partition objects.
In a) Init is called, it calls InitCallback and then initializes SwissTable object with default number of buckets.
In b) only InitCallback is called, and SwissTableMerge initializes SwissTable object by allocating a pre-computed number of buckets that is different than the default. SwissTableMerge does not interact with SwissTableWithKeys, only with SwissTable, so it cannot call SwissTableWithKeys::Init.
One thing could be to rename InitCallbacks as Init, remove the other Init, call map_.init() instead in the appropriate place in the code.
void MapReadOnly(Input* input, const uint32_t* hashes, uint8_t* match_bitvector, | ||
uint32_t* key_ids); | ||
Status MapWithInserts(Input* input, const uint32_t* hashes, uint32_t* key_ids); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not obvious at a first glance what the difference is between the Hash
call and the MapXyz
call. It's also not obvious what a key id is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SwissTableWithKeys maps keys (made of arbitrary number of columns of supported data types) into integer ids from 0 to N for N + 1 inserted keys. Key id is the integer assigned to a given key that has been inserted into the hash table.
Hash() computes hashes for input keys. Hash() needs to be called before calling MapReadOnly() or MapWithInserts() but the computed hashes can be reused outside of MapReadOnly and MapWithInserts or can be shared between both of these calls.
MapReadOnly() uses both input keys and hashes to lookup existing keys in the hash table. Keys that are not in a hash table will be marked in a bit-vector.
MapWithInserts() does everything MapReadOnly() does, but it will also allocate new key ids for new keys that are not present in the hash table yet and insert these new keys. Not thread-safe, unlike MapReadOnly().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked over this change pretty thoroughly today. I can't pretend I fully some of the AVX2 routines or the inner workings of a swiss table but I have a pretty solid grasp now of the public API and I'm pretty sure I understand the flow of the algorithm and could fix issues / maintain the code as needed. I have a few comments / questions but I think this is probably ready for merge (I don't see any significant changes needed). Please re-request review when you've addressed the comments.
Overall the code is already pretty much following Arrow style and fairly clean, so thank you for that.
Generally, I'm a little concerned about the proliferation of "context information" that needs to be passed around, in particular things like hardware flags and the memory pool. I think we need to start figuring out a central place to manage all of this pretty soon so every node doesn't have to create it's own copy of things like util::TempVectorStack. However, I think @save-buffer already has some work in progress here so that will help.
I know there is a hash-join design doc and I'd like to help get that integrated into the code / documentation somewhere as that will be essential for helping newcomers get started on these changes. Also, a number of the utilities you've created here (e.g. RowArray
, ExecBatchBuilder
) are likely to be useful to other nodes as well.
Some of these utilities could use independent unit tests, both to help with maintenance, as well as to demonstrate usage. I've started that work here: https://github.com/westonpace/arrow/tree/feature/ARROW-17022--add-tests-for-swiss-join and I plan on creating a PR soon (I want to add some tests for ExecBatchBuilder
).
Finally, thank you! I know this has taken a while to get merged in but there is some significant work behind this PR and I appreciate the addition. I've done some local benchmarking and the improvements to hash join seem pretty considerable though we may to address some bottlenecks elsewhere before we see the changes end-to-end.
79850cb
to
c366308
Compare
c366308
to
11bfbc4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor question
Benchmark runs are scheduled for baseline = 0024962 and contender = 96a3af4. 96a3af4 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Faster implementation of hash join.