diff --git a/changelog.d/8112.misc b/changelog.d/8112.misc new file mode 100644 index 0000000000000000000000000000000000000000..80045dde1af13c6af431fd3db903a9fff96fa086 --- /dev/null +++ b/changelog.d/8112.misc @@ -0,0 +1 @@ +Return the previous stream token if a non-member event is a duplicate. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f242d3c6acb7b84ddb59564ed748138b7e65bfd8..532fc30681b0a690bbb44e5adb9060df06a6f5b3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -674,7 +674,7 @@ class EventCreationHandler(object): event.event_id, prev_event.event_id, ) - return await self.store.get_stream_token_for_event(prev_event.event_id) + return await self.store.get_stream_id_for_event(prev_event.event_id) return await self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 8ccfb8fc46f61f93fca1c412e8c3f053c892020b..4377bddb8caec1b9eaaa7266a3db1f7f058d80da 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -582,6 +582,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) return "t%d-%d" % (topo, token) + async def get_stream_id_for_event(self, event_id: str) -> int: + """The stream ID for an event + Args: + event_id: The id of the event to look up a stream token for. + Raises: + StoreError if the event wasn't in the database. + Returns: + A stream ID. + """ + return await self.db_pool.simple_select_one_onecol( + table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering" + ) + async def get_stream_token_for_event(self, event_id: str) -> str: """The stream token for an event Args: @@ -591,10 +604,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: A "s%d" stream token. """ - row = await self.db_pool.simple_select_one_onecol( - table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering" - ) - return "s%d" % (row,) + stream_id = await self.get_stream_id_for_event(event_id) + return "s%d" % (stream_id,) async def get_topological_token_for_event(self, event_id: str) -> str: """The stream token for an event