Skip to content

Commit f3fb859

Browse files
committed
Speed up /keys/changes
Follow on from #17537
1 parent 70b0e38 commit f3fb859

File tree

2 files changed

+75
-18
lines changed

2 files changed

+75
-18
lines changed

synapse/handlers/device.py

+13-16
Original file line numberDiff line numberDiff line change
@@ -269,29 +269,26 @@ async def get_user_ids_changed(
269269
# We now work out if any other users have since joined or left the rooms
270270
# the user is currently in. First we filter out rooms that we know
271271
# haven't changed recently.
272-
rooms_changed = self.store.get_rooms_that_changed(
273-
joined_room_ids, from_token.room_key
274-
)
275272

276273
# List of membership changes per room
277274
room_to_deltas: Dict[str, List[StateDelta]] = {}
278275
# The set of event IDs of membership events (so we can fetch their
279276
# associated membership).
280277
memberships_to_fetch: Set[str] = set()
281-
for room_id in rooms_changed:
282-
# TODO: Only pull out membership events?
283-
state_changes = await self.store.get_current_state_deltas_for_room(
284-
room_id, from_token=from_token.room_key, to_token=now_token.room_key
285-
)
286-
for delta in state_changes:
287-
if delta.event_type != EventTypes.Member:
288-
continue
289278

290-
room_to_deltas.setdefault(room_id, []).append(delta)
291-
if delta.event_id:
292-
memberships_to_fetch.add(delta.event_id)
293-
if delta.prev_event_id:
294-
memberships_to_fetch.add(delta.prev_event_id)
279+
# TODO: Only pull out membership events?
280+
state_changes = await self.store.get_current_state_deltas_for_rooms(
281+
joined_room_ids, from_token=from_token.room_key, to_token=now_token.room_key
282+
)
283+
for delta in state_changes:
284+
if delta.event_type != EventTypes.Member:
285+
continue
286+
287+
room_to_deltas.setdefault(delta.room_id, []).append(delta)
288+
if delta.event_id:
289+
memberships_to_fetch.add(delta.event_id)
290+
if delta.prev_event_id:
291+
memberships_to_fetch.add(delta.prev_event_id)
295292

296293
# Fetch all the memberships for the membership events
297294
event_id_to_memberships = await self.store.get_membership_from_event_ids(

synapse/storage/databases/main/state_deltas.py

+62-2
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@
2626

2727
from synapse.logging.opentracing import trace
2828
from synapse.storage._base import SQLBaseStore
29-
from synapse.storage.database import LoggingTransaction
29+
from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
3030
from synapse.storage.databases.main.stream import _filter_results_by_stream
31-
from synapse.types import RoomStreamToken
31+
from synapse.types import RoomStreamToken, StrCollection
3232
from synapse.util.caches.stream_change_cache import StreamChangeCache
33+
from synapse.util.iterutils import batch_iter
3334

3435
logger = logging.getLogger(__name__)
3536

@@ -200,3 +201,62 @@ def get_current_state_deltas_for_room_txn(
200201
return await self.db_pool.runInteraction(
201202
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
202203
)
204+
205+
@trace
206+
async def get_current_state_deltas_for_rooms(
207+
self,
208+
room_ids: StrCollection,
209+
from_token: RoomStreamToken,
210+
to_token: RoomStreamToken,
211+
) -> List[StateDelta]:
212+
"""Get the state deltas between two tokens for the set of rooms."""
213+
214+
room_ids = self._curr_state_delta_stream_cache.get_entities_changed(
215+
room_ids, from_token.stream
216+
)
217+
if not room_ids:
218+
return []
219+
220+
def get_current_state_deltas_for_rooms_txn(
221+
txn: LoggingTransaction,
222+
room_ids: StrCollection,
223+
) -> List[StateDelta]:
224+
clause, args = make_in_list_sql_clause(
225+
self.database_engine, "room_id", room_ids
226+
)
227+
228+
sql = f"""
229+
SELECT instance_name, stream_id, room_id, type, state_key, event_id, prev_event_id
230+
FROM current_state_delta_stream
231+
WHERE {clause} AND ? < stream_id AND stream_id <= ?
232+
ORDER BY stream_id ASC
233+
"""
234+
args.append(from_token.stream)
235+
args.append(to_token.get_max_stream_pos())
236+
237+
txn.execute(sql, args)
238+
239+
return [
240+
StateDelta(
241+
stream_id=row[1],
242+
room_id=row[2],
243+
event_type=row[3],
244+
state_key=row[4],
245+
event_id=row[5],
246+
prev_event_id=row[6],
247+
)
248+
for row in txn
249+
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
250+
]
251+
252+
results = []
253+
for batch in batch_iter(room_ids, 1000):
254+
deltas = await self.db_pool.runInteraction(
255+
"get_current_state_deltas_for_rooms",
256+
get_current_state_deltas_for_rooms_txn,
257+
batch,
258+
)
259+
260+
results.extend(deltas)
261+
262+
return results

0 commit comments

Comments
 (0)