From 6408372234eef2d72a13ee838c07199751c56378 Mon Sep 17 00:00:00 2001
From: Andrew Morgan <>
Date: Thu, 21 Oct 2021 17:42:25 +0100
Subject: [PATCH] Improve docstrings for methods related to sending EDUs to
 application services (#11138)

 changelog.d/11138.misc         |  1 +
 synapse/handlers/ | 94 +++++++++++++++++++++++++++++-----
 synapse/handlers/     |  4 ++
 synapse/handlers/   | 34 ++++++++++--
 synapse/handlers/   |  8 ++-
 synapse/handlers/     | 12 +++--
 synapse/            | 18 ++++++-
 7 files changed, 148 insertions(+), 23 deletions(-)
 create mode 100644 changelog.d/11138.misc

diff --git a/changelog.d/11138.misc b/changelog.d/11138.misc
new file mode 100644
index 0000000000..79b7776975
--- /dev/null
+++ b/changelog.d/11138.misc
@@ -0,0 +1 @@
+Add docstrings and comments to the application service ephemeral event sending code.
\ No newline at end of file
diff --git a/synapse/handlers/ b/synapse/handlers/
index 163278708c..36c206dae6 100644
--- a/synapse/handlers/
+++ b/synapse/handlers/
@@ -185,19 +185,26 @@ class ApplicationServicesHandler:
         new_token: Optional[int],
         users: Optional[Collection[Union[str, UserID]]] = None,
     ) -> None:
-        """This is called by the notifier in the background
-        when a ephemeral event handled by the homeserver.
-        This will determine which appservices
-        are interested in the event, and submit them.
+        """
+        This is called by the notifier in the background when an ephemeral event is handled
+        by the homeserver.
-        Events will only be pushed to appservices
-        that have opted into ephemeral events
+        This will determine which appservices are interested in the event, and submit them.
             stream_key: The stream the event came from.
-            new_token: The latest stream token
-            users: The user(s) involved with the event.
+                `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
+                value for `stream_key` will cause this function to return early.
+                Ephemeral events will only be pushed to appservices that have opted into
+                them.
+                Appservices will only receive ephemeral events that fall within their
+                registered user and room namespaces.
+            new_token: The latest stream token.
+            users: The users that should be informed of the new event, if any.
         if not self.notify_appservices:
@@ -232,21 +239,32 @@ class ApplicationServicesHandler:
             for service in services:
                 # Only handle typing if we have the latest token
                 if stream_key == "typing_key" and new_token is not None:
+                    # Note that we don't persist the token (via set_type_stream_id_for_appservice)
+                    # for typing_key due to performance reasons and due to their highly
+                    # ephemeral nature.
+                    #
+                    # Instead we simply grab the latest typing updates in _handle_typing
+                    # and, if they apply to this application service, send it off.
                     events = await self._handle_typing(service, new_token)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
-                    # We don't persist the token for typing_key for performance reasons
                 elif stream_key == "receipt_key":
                     events = await self._handle_receipts(service)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    # Persist the latest handled stream token for this appservice
                         service, "read_receipt", new_token
                 elif stream_key == "presence_key":
                     events = await self._handle_presence(service, users)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    # Persist the latest handled stream token for this appservice
                         service, "presence", new_token
@@ -254,18 +272,54 @@ class ApplicationServicesHandler:
     async def _handle_typing(
         self, service: ApplicationService, new_token: int
     ) -> List[JsonDict]:
+        """
+        Return the typing events since the given stream token that the given application
+        service should receive.
+        First fetch all typing events between the given typing stream token (non-inclusive)
+        and the latest typing event stream token (inclusive). Then return only those typing
+        events that the given application service may be interested in.
+        Args:
+            service: The application service to check for which events it should receive.
+            new_token: A typing event stream token.
+        Returns:
+            A list of JSON dictionaries containing data derived from the typing events that
+            should be sent to the given application service.
+        """
         typing_source = self.event_sources.sources.typing
         # Get the typing events from just before current
         typing, _ = await typing_source.get_new_events_as(
             # For performance reasons, we don't persist the previous
-            # token in the DB and instead fetch the latest typing information
+            # token in the DB and instead fetch the latest typing event
             # for appservices.
+            # TODO: It'd likely be more efficient to simply fetch the
+            #  typing event with the given 'new_token' stream token and
+            #  check if the given service was interested, rather than
+            #  iterating over all typing events and only grabbing the
+            #  latest few.
             from_key=new_token - 1,
         return typing
     async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
+        """
+        Return the latest read receipts that the given application service should receive.
+        First fetch all read receipts between the last receipt stream token that this
+        application service should have previously received (non-inclusive) and the
+        latest read receipt stream token (inclusive). Then from that set, return only
+        those read receipts that the given application service may be interested in.
+        Args:
+            service: The application service to check for which events it should receive.
+        Returns:
+            A list of JSON dictionaries containing data derived from the read receipts that
+            should be sent to the given application service.
+        """
         from_key = await
             service, "read_receipt"
@@ -278,6 +332,22 @@ class ApplicationServicesHandler:
     async def _handle_presence(
         self, service: ApplicationService, users: Collection[Union[str, UserID]]
     ) -> List[JsonDict]:
+        """
+        Return the latest presence updates that the given application service should receive.
+        First, filter the given users list to those that the application service is
+        interested in. Then retrieve the latest presence updates since the
+        the last-known previously received presence stream token for the given
+        application service. Return those presence updates.
+        Args:
+            service: The application service that ephemeral events are being sent to.
+            users: The users that should receive the presence update.
+        Returns:
+            A list of json dictionaries containing data derived from the presence events
+            that should be sent to the given application service.
+        """
         events: List[JsonDict] = []
         presence_source = self.event_sources.sources.presence
         from_key = await
@@ -290,9 +360,9 @@ class ApplicationServicesHandler:
             interested = await service.is_interested_in_presence(user,
             if not interested:
             presence_events, _ = await presence_source.get_new_events(
-                service=service,
             time_now = self.clock.time_msec()
diff --git a/synapse/handlers/ b/synapse/handlers/
index 6eafbea25d..68b446eb66 100644
--- a/synapse/handlers/
+++ b/synapse/handlers/
@@ -454,6 +454,10 @@ class DeviceHandler(DeviceWorkerHandler):
     ) -> None:
         """Notify that a user's device(s) has changed. Pokes the notifier, and
         remote servers if the user is local.
+        Args:
+            user_id: The Matrix ID of the user who's device list has been updated.
+            device_ids: The device IDs that have changed.
         if not device_ids:
             # No changes to notify about, so this is a no-op.
diff --git a/synapse/handlers/ b/synapse/handlers/
index b5968e047b..fdab50da37 100644
--- a/synapse/handlers/
+++ b/synapse/handlers/
@@ -52,7 +52,6 @@ import synapse.metrics
 from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.api.presence import UserPresenceState
-from synapse.appservice import ApplicationService
 from import PresenceRouter
 from synapse.logging.context import run_in_background
 from synapse.logging.utils import log_function
@@ -1483,11 +1482,37 @@ def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) ->
 def format_user_presence_state(
     state: UserPresenceState, now: int, include_user_id: bool = True
 ) -> JsonDict:
-    """Convert UserPresenceState to a format that can be sent down to clients
+    """Convert UserPresenceState to a JSON format that can be sent down to clients
     and to other servers.
-    The "user_id" is optional so that this function can be used to format presence
-    updates for client /sync responses and for federation /send requests.
+    Args:
+        state: The user presence state to format.
+        now: The current timestamp since the epoch in ms.
+        include_user_id: Whether to include `user_id` in the returned dictionary.
+            As this function can be used both to format presence updates for client /sync
+            responses and for federation /send requests, only the latter needs the include
+            the `user_id` field.
+    Returns:
+        A JSON dictionary with the following keys:
+            * presence: The presence state as a str.
+            * user_id: Optional. Included if `include_user_id` is truthy. The canonical
+                Matrix ID of the user.
+            * last_active_ago: Optional. Included if `last_active_ts` is set on `state`.
+                The timestamp that the user was last active.
+            * status_msg: Optional. Included if `status_msg` is set on `state`. The user's
+                status.
+            * currently_active: Optional. Included only if `state.state` is "online".
+        Example:
+        {
+            "presence": "online",
+            "user_id": "",
+            "last_active_ago": 16783813918,
+            "status_msg": "Hello world!",
+            "currently_active": True
+        }
     content: JsonDict = {"presence": state.state}
     if include_user_id:
@@ -1526,7 +1551,6 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
         is_guest: bool = False,
         explicit_room_id: Optional[str] = None,
         include_offline: bool = True,
-        service: Optional[ApplicationService] = None,
     ) -> Tuple[List[UserPresenceState], int]:
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
diff --git a/synapse/handlers/ b/synapse/handlers/
index 374e961e3b..4911a11535 100644
--- a/synapse/handlers/
+++ b/synapse/handlers/
@@ -241,12 +241,18 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
     async def get_new_events_as(
         self, from_key: int, service: ApplicationService
     ) -> Tuple[List[JsonDict], int]:
-        """Returns a set of new receipt events that an appservice
+        """Returns a set of new read receipt events that an appservice
         may be interested in.
             from_key: the stream position at which events should be fetched from
             service: The appservice which may be interested
+        Returns:
+            A two-tuple containing the following:
+                * A list of json dictionaries derived from read receipts that the
+                  appservice may be interested in.
+                * The current read receipt stream token.
         from_key = int(from_key)
         to_key = self.get_current_key()
diff --git a/synapse/handlers/ b/synapse/handlers/
index d10e9b8ec4..c411d69924 100644
--- a/synapse/handlers/
+++ b/synapse/handlers/
@@ -465,17 +465,23 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
         may be interested in.
-            from_key: the stream position at which events should be fetched from
-            service: The appservice which may be interested
+            from_key: the stream position at which events should be fetched from.
+            service: The appservice which may be interested.
+        Returns:
+            A two-tuple containing the following:
+                * A list of json dictionaries derived from typing events that the
+                  appservice may be interested in.
+                * The latest known room serial.
         with Measure(self.clock, "typing.get_new_events_as"):
-            from_key = int(from_key)
             handler = self.get_typing_handler()
             events = []
             for room_id in handler._room_serials.keys():
                 if handler._room_serials[room_id] <= from_key:
                 if not await service.matches_user_in_member_list(
diff --git a/synapse/ b/synapse/
index 1a9f84ba45..1acd899fab 100644
--- a/synapse/
+++ b/synapse/
@@ -379,7 +379,14 @@ class Notifier:
         stream_key: str,
         new_token: Union[int, RoomStreamToken],
         users: Optional[Collection[Union[str, UserID]]] = None,
-    ):
+    ) -> None:
+        """Notify application services of ephemeral event activity.
+        Args:
+            stream_key: The stream the event came from.
+            new_token: The value of the new stream token.
+            users: The users that should be informed of the new event, if any.
+        """
             stream_token = None
             if isinstance(new_token, int):
@@ -402,10 +409,17 @@ class Notifier:
         new_token: Union[int, RoomStreamToken],
         users: Optional[Collection[Union[str, UserID]]] = None,
         rooms: Optional[Collection[str]] = None,
-    ):
+    ) -> None:
         """Used to inform listeners that something has happened event wise.
         Will wake up all listeners for the given users and rooms.
+        Args:
+            stream_key: The stream the event came from.
+            new_token: The value of the new stream token.
+            users: The users that should be informed of the new event.
+            rooms: A collection of room IDs for which each joined member will be
+                informed of the new event.
         users = users or []
         rooms = rooms or []