Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚚 Support 'begins_with' condition in Dynamo #6031

Merged
merged 3 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion tests/test_dynamo.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def test_updates(self):
self.assertEqual(ret, dict(id='key', sort='s', x='x', y='y'))


class TestQueryInMemory(unittest.TestCase, Helpers):
class TestQueryInMemoryWithIntSortkey(unittest.TestCase, Helpers):
"""Test that the query work on an in-memory table."""

def setUp(self):
Expand Down Expand Up @@ -592,6 +592,79 @@ def test_get_many_iterator(self):
self.assertEqual(ret, expected)


class TestQueryInMemoryWithStringSortKey(unittest.TestCase, Helpers):
"""Test that the query work on an in-memory table."""

def setUp(self):
self.table = dynamo.Table(
dynamo.MemoryStorage(),
'table',
partition_key='id',
sort_key='sort',
types={
'id': str,
'sort': str,
'str': OptionalOf(str),
'x': OptionalOf(int),
'y': OptionalOf(int),
'm': OptionalOf(int),
'n': OptionalOf(int),
},
indexes=[
dynamo.Index(
'x',
'y'),
dynamo.Index('m'),
dynamo.Index('n', keys_only=True),
])

def test_begins_with_query(self):
self.table.create({'id': 'key', 'sort': 'asdf'})
self.table.create({'id': 'key', 'sort': 'asd'})
self.table.create({'id': 'key', 'sort': 'as'})

ret = list(self.table.get_many({
'id': 'key',
'sort': dynamo.BeginsWith('asd'),
}))
self.assertEqual(ret, [
{'id': 'key', 'sort': 'asd'},
{'id': 'key', 'sort': 'asdf'},
])

def test_cannot_use_begin_with_on_nonkey_field(self):
with self.assertRaises(ValueError):
self.table.get_many({
'id': 'key',
'str': dynamo.BeginsWith('asdf'),
})

def test_can_use_begin_with_as_server_side_filter(self):
self.table.create({'id': 'key', 'sort': 'asdf', 'str': 'asdf'})
self.table.create({'id': 'key', 'sort': 'asd', 'str': 'asd'})
self.table.create({'id': 'key', 'sort': 'as', 'str': 'as'})

ret = list(self.table.get_many({
'id': 'key',
}, server_side_filter={
'str': dynamo.BeginsWith('asd'),
}))
self.assertEqual(ret, [
{'id': 'key', 'sort': 'asd', 'str': 'asd'},
{'id': 'key', 'sort': 'asdf', 'str': 'asdf'},
])

def test_server_side_filter_may_not_filter_nonkey_attrs(self):
self.table.create({'id': 'key', 'sort': 'asdf'})

with self.assertRaises(ValueError):
self.table.get_many({
'id': 'key',
}, server_side_filter={
'sort': dynamo.BeginsWith('asd'),
})


class TestSortKeysAgainstAws(unittest.TestCase):
"""Test that the operations send out appropriate Dynamo requests."""

Expand Down
109 changes: 87 additions & 22 deletions website/dynamo.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,37 +346,53 @@ def batch_get(self, keys):
return items

@querylog.timed_as("db_get_many")
def get_many(self, key, reverse=False, limit=None, pagination_token=None, filter=None):
def get_many(self, key, reverse=False, limit=None, pagination_token=None, filter=None, server_side_filter=None):
"""Gets a list of items by key from the database.

The key must be a dict with a single entry which references the
partition key or an index key.

`get_many` reads up to 1MB of data from the database, or a maximum of `limit`
records, whichever one is hit first. 'filter' is a dictionary of values
that will be applied server-side after reading. The response may contain
less than `limit` rows if 'filter' is used.
`get_many` reads up to 1MB of data from the database, or a maximum of
`limit` records, whichever one is hit first.

Filtering saves bytes sent over the wire, but still costs time and
money, and may lead in receiving nearly no records. It is still
important to pick a good key/index to read.

'filter' is a dictionary with a set of values that will be applied as a server-side
filter. The values should be either literal strings, ints or bools that are compared to the
values in the database literally, or instances of subclasses of `DynamoCondition`; currently
only `Between` exists as a Condition class.
'server_side_filter' can be used to filter down the max 1MB of data read
from the database, to avoid sending useless bytes over the internet.

The result object will have `next_page_token` and `prev_page_token` members
which can be used to paginate through the result set.

# On filtering

- 'server_side_filter' is a dictionary of values that will be applied
server-side after reading. The values should be either literal
strings, ints or bools that are compared to the values in the database
literally, or instances of subclasses of `DynamoCondition`.
- The response may contain less than `limit` rows if
'server_side_filter' is used. The response may contain 0 rows, yet still
have a next page, if all rows read from disk are filtered out.
- Filtering saves bytes sent over the wire, but still costs time and
money, and may lead in receiving nearly no records. It is still
important to pick a good key/index to read. Filters will not magically
make a table scan efficient!

'filter' is also accepted as a deprecated spelling of
'server_side_filter' (but 'server_side_filter' is preferred for
consistency with 'get_page').
"""
querylog.log_counter(f"db_get_many:{self.table_name}")

if filter is not None and server_side_filter is not None:
raise ValueError("Only one of 'filter' and 'server_side_filter' may be specified")
server_side_filter = server_side_filter or filter

inverse_page, pagination_token = decode_page_token(pagination_token)
if inverse_page:
reverse = not reverse

lookup = self._determine_lookup(key, many=True)
if isinstance(lookup, TableLookup):
validate_filter_nonkey_columns(server_side_filter, self.key_schema)

pagination_key = PaginationKey.from_table(self.key_schema)
items, next_page_token = self.storage.query(
lookup.table_name,
Expand All @@ -386,9 +402,11 @@ def get_many(self, key, reverse=False, limit=None, pagination_token=None, filter
limit=limit + 1 if limit else None,
pagination_key=pagination_key,
pagination_token=pagination_token,
filter=filter
filter=server_side_filter
)
elif isinstance(lookup, IndexLookup):
validate_filter_nonkey_columns(server_side_filter, lookup.key_schema)

pagination_key = PaginationKey.from_index(lookup.key.keys(), lookup.sort_key, self.key_schema)
items, next_page_token = self.storage.query_index(
lookup.table_name,
Expand All @@ -401,7 +419,7 @@ def get_many(self, key, reverse=False, limit=None, pagination_token=None, filter
pagination_token=pagination_token,
keys_only=lookup.keys_only,
table_key_names=self.key_schema.key_names,
filter=filter,
filter=server_side_filter,
)
else:
assert False
Expand Down Expand Up @@ -436,18 +454,34 @@ def get_page(self, key, limit, reverse=False, pagination_token=None, server_side

'server_side_filter' is a dictionary with a set of values that will be applied as a server-side
filter. The values should be either literal strings, ints or bools that are compared to the
values in the database literally, or instances of subclasses of `DynamoCondition`; currently
only `Between` exists as a Condition class.
values in the database literally, or instances of subclasses of `DynamoCondition`.

'client_side_filter' should be either a dictionary of values, or a callable that will be called
for every row. If it is a dictionary, the values in the dictionary must match the values
in the records; if it is a callable, it will be invoked for every row and the callable
should return True or False to indicate whether that row should be included.
'client_side_filter' should be either a dictionary of values, or a
callable (function) that will be called for every row. If it is a
dictionary, the values in the dictionary must match the values in the
records; if it is a callable, it will be invoked for every row and the
callable should return True or False to indicate whether that row should
be included.

'fetch_factor' controls how many items are fetched per batch in order to try and fill
'limit' items. Combination with an estimate of how many rows would be
rejected due to filtering, this can be used to reduce the amount of individual queries
necessary in order to come up with a given set of items (reducing latency slightly).
Ignore this if you are unsure about the right value to use.

# On server side filtering

- 'server_side_filter' is a dictionary of values that will be applied
server-side after reading. The values should be either literal
strings, ints or bools that are compared to the values in the database
literally, or instances of subclasses of `DynamoCondition`.
- The response may contain less than `limit` rows if
'server_side_filter' is used. The response may contain 0 rows, yet still
have a next page, if all rows read from disk are filtered out.
- Filtering saves bytes sent over the wire, but still costs time and
money, and may lead in receiving nearly no records. It is still
important to pick a good key/index to read. Filters will not magically
make a table scan efficient!
"""
if limit <= 0:
raise ValueError('limit must be positive')
Expand Down Expand Up @@ -641,7 +675,7 @@ def _determine_lookup(self, key_data, many):
for index in self.indexes:
if index.key_schema.matches(key_data):
return IndexLookup(self.table_name, index.index_name, key_data,
index.key_schema.sort_key, keys_only=index.keys_only)
index.key_schema.sort_key, keys_only=index.keys_only, key_schema=index.key_schema)

schemas = [self.key_schema] + [i.key_schema for i in self.indexes]
str_schemas = ', '.join(s.to_string(opt=True) for s in schemas)
Expand Down Expand Up @@ -708,6 +742,13 @@ def validate_value_against_validator(value, validator: 'Validator'):
return validator.is_valid(value)


def validate_filter_nonkey_columns(server_side_filter, key_schema):
"""Check that there are no filters that match columns in the key schema."""
for column in server_side_filter or {}:
if column in key_schema.key_names:
raise ValueError(f'Do not use server_side_filter on "{column}", use a key lookup instead.')


DDB_SERIALIZER = TypeSerializer()
DDB_DESERIALIZER = TypeDeserializer()

Expand Down Expand Up @@ -1223,6 +1264,7 @@ class IndexLookup:
key: dict
sort_key: Optional[str]
keys_only: bool
key_schema: KeySchema


class DynamoUpdate:
Expand Down Expand Up @@ -1338,6 +1380,11 @@ class DynamoCondition:

These encode any type of comparison supported by Dynamo except equality.

Conditions can be applied to sort keys for efficient lookup, or as a
`server_side_filter` as a post-retrieval, pre-download filter. Queries will
never fetch more than 1MB from disk, so your server-side filter should
not filter out more than ~50% of the rows.

Conditions only apply to sort keys.
"""

Expand Down Expand Up @@ -1400,6 +1447,24 @@ def matches(self, value):
return self.minval <= value <= self.maxval


class BeginsWith(DynamoCondition):
"""Assert that a string begins with another string."""

def __init__(self, prefix):
self.prefix = prefix

def to_dynamo_expression(self, field_name):
return f"begins_with(#{field_name}, :{field_name}_prefix)"

def to_dynamo_values(self, field_name):
return {
f":{field_name}_prefix": DDB_SERIALIZER.serialize(self.prefix),
}

def matches(self, value):
return isinstance(value, str) and value.startswith(self.prefix)


def replace_decimals(obj):
"""
Replace Decimals with native Python values.
Expand Down
Loading