Unverified Commit 9d25a0ae authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Split presence out of master (#9820)

parent d924827d
Add experimental support for handling presence on a worker.
......@@ -634,8 +634,11 @@ class Porter(object):
"device_inbox_sequence", ("device_inbox", "device_federation_outbox")
)
await self._setup_sequence(
"account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
"account_data_sequence",
("room_account_data", "room_tags_revisions", "account_data"),
)
await self._setup_sequence("receipts_sequence", ("receipts_linearized",))
await self._setup_sequence("presence_stream_sequence", ("presence_stream",))
await self._setup_auth_chain_sequence()
# Step 3. Get tables.
......
......@@ -55,7 +55,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
......@@ -64,7 +63,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events, login, room
from synapse.rest.client.v1 import events, login, presence, room
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
......@@ -110,6 +109,7 @@ from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
......@@ -121,26 +121,6 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.generic_worker")
class PresenceStatusStubServlet(RestServlet):
"""If presence is disabled this servlet can be used to stub out setting
presence status.
"""
PATTERNS = client_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super().__init__()
self.auth = hs.get_auth()
async def on_GET(self, request, user_id):
await self.auth.get_user_by_req(request)
return 200, {"presence": "offline"}
async def on_PUT(self, request, user_id):
await self.auth.get_user_by_req(request)
return 200, {}
class KeyUploadServlet(RestServlet):
"""An implementation of the `KeyUploadServlet` that responds to read only
requests, but otherwise proxies through to the master instance.
......@@ -241,6 +221,7 @@ class GenericWorkerSlavedStore(
StatsStore,
UIAuthWorkerStore,
EndToEndRoomKeyStore,
PresenceStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
......@@ -259,7 +240,6 @@ class GenericWorkerSlavedStore(
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
SlavedPresenceStore,
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
......@@ -327,10 +307,7 @@ class GenericWorkerServer(HomeServer):
user_directory.register_servlets(self, resource)
# If presence is disabled, use the stub servlet that does
# not allow sending presence
if not self.config.use_presence:
PresenceStatusStubServlet(self).register(resource)
presence.register_servlets(self, resource)
groups.register_servlets(self, resource)
......
......@@ -64,6 +64,14 @@ class WriterLocations:
Attributes:
events: The instances that write to the event and backfill streams.
typing: The instance that writes to the typing stream.
to_device: The instances that write to the to_device stream. Currently
can only be a single instance.
account_data: The instances that write to the account data streams. Currently
can only be a single instance.
receipts: The instances that write to the receipts stream. Currently
can only be a single instance.
presence: The instances that write to the presence stream. Currently
can only be a single instance.
"""
events = attr.ib(
......@@ -85,6 +93,11 @@ class WriterLocations:
type=List[str],
converter=_instance_to_list_converter,
)
presence = attr.ib(
default=["master"],
type=List[str],
converter=_instance_to_list_converter,
)
class WorkerConfig(Config):
......@@ -188,7 +201,14 @@ class WorkerConfig(Config):
# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing", "to_device", "account_data", "receipts"):
for stream in (
"events",
"typing",
"to_device",
"account_data",
"receipts",
"presence",
):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
......@@ -215,6 +235,11 @@ class WorkerConfig(Config):
if len(self.writers.events) == 0:
raise ConfigError("Must specify at least one instance to handle `events`.")
if len(self.writers.presence) != 1:
raise ConfigError(
"Must only specify one instance to handle `presence` messages."
)
self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)
......
......@@ -122,7 +122,8 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class BasePresenceHandler(abc.ABC):
"""Parts of the PresenceHandler that are shared between workers and master"""
"""Parts of the PresenceHandler that are shared between workers and presence
writer"""
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
......@@ -309,8 +310,16 @@ class WorkerPresenceHandler(BasePresenceHandler):
super().__init__(hs)
self.hs = hs
self._presence_writer_instance = hs.config.worker.writers.presence[0]
self._presence_enabled = hs.config.use_presence
# Route presence EDUs to the right worker
hs.get_federation_registry().register_instances_for_edu(
"m.presence",
hs.config.worker.writers.presence,
)
# The number of ongoing syncs on this process, by user id.
# Empty if _presence_enabled is false.
self._user_to_num_current_syncs = {} # type: Dict[str, int]
......@@ -318,8 +327,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()
# user_id -> last_sync_ms. Lists the users that have stopped syncing
# but we haven't notified the master of that yet
# user_id -> last_sync_ms. Lists the users that have stopped syncing but
# we haven't notified the presence writer of that yet
self.users_going_offline = {}
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
......@@ -352,22 +361,23 @@ class WorkerPresenceHandler(BasePresenceHandler):
)
def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the master, unless they
had recently stopped syncing.
"""A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing.
Args:
user_id (str)
"""
going_offline = self.users_going_offline.pop(user_id, None)
if not going_offline:
# Safe to skip because we haven't yet told the master they were offline
# Safe to skip because we haven't yet told the presence writer they
# were offline
self.send_user_sync(user_id, True, self.clock.time_msec())
def mark_as_going_offline(self, user_id):
"""A user has stopped syncing. We wait before notifying the master as
its likely they'll come back soon. This allows us to avoid sending
a stopped syncing immediately followed by a started syncing notification
to the master
"""A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing
notification to the presence writer
Args:
user_id (str)
......@@ -375,8 +385,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.users_going_offline[user_id] = self.clock.time_msec()
def send_stop_syncing(self):
"""Check if there are any users who have stopped syncing a while ago
and haven't come back yet. If there are poke the master about them.
"""Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them.
"""
now = self.clock.time_msec()
for user_id, last_sync_ms in list(self.users_going_offline.items()):
......@@ -492,9 +502,12 @@ class WorkerPresenceHandler(BasePresenceHandler):
if not self.hs.config.use_presence:
return
# Proxy request to master
# Proxy request to instance that writes presence
await self._set_state_client(
user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
instance_name=self._presence_writer_instance,
user_id=user_id,
state=state,
ignore_status_msg=ignore_status_msg,
)
async def bump_presence_active_time(self, user):
......@@ -505,9 +518,11 @@ class WorkerPresenceHandler(BasePresenceHandler):
if not self.hs.config.use_presence:
return
# Proxy request to master
# Proxy request to instance that writes presence
user_id = user.to_string()
await self._bump_active_client(user_id=user_id)
await self._bump_active_client(
instance_name=self._presence_writer_instance, user_id=user_id
)
class PresenceHandler(BasePresenceHandler):
......@@ -1909,7 +1924,7 @@ class PresenceFederationQueue:
self._queue_presence_updates = True
# Whether this instance is a presence writer.
self._presence_writer = hs.config.worker.worker_app is None
self._presence_writer = self._instance_name in hs.config.worker.writers.presence
# The FederationSender instance, if this process sends federation traffic directly.
self._federation = None
......@@ -1957,7 +1972,7 @@ class PresenceFederationQueue:
Will forward to the local federation sender (if there is one) and queue
to send over replication (if there are other federation sender instances.).
Must only be called on the master process.
Must only be called on the presence writer process.
"""
# This should only be called on a presence writer.
......@@ -2003,10 +2018,11 @@ class PresenceFederationQueue:
We return rows in the form of `(destination, user_id)` to keep the size
of each row bounded (rather than returning the sets in a row).
On workers this will query the master process via HTTP replication.
On workers this will query the presence writer process via HTTP replication.
"""
if instance_name != self._instance_name:
# If not local we query over http replication from the master
# If not local we query over http replication from the presence
# writer
result = await self._repl_client(
instance_name=instance_name,
stream_name=PresenceFederationStream.NAME,
......
......@@ -158,7 +158,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
def make_client(cls, hs):
"""Create a client that makes requests.
Returns a callable that accepts the same parameters as `_serialize_payload`.
Returns a callable that accepts the same parameters as
`_serialize_payload`, and also accepts an optional `instance_name`
parameter to specify which instance to hit (the instance must be in
the `instance_map` config).
"""
clock = hs.get_clock()
client = hs.get_simple_http_client()
......
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.replication.tcp.streams import PresenceStream
from synapse.storage import DataStore
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.presence import PresenceStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
class SlavedPresenceStore(BaseSlavedStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self._presence_id_gen = SlavedIdTracker(db_conn, "presence_stream", "stream_id")
self._presence_on_startup = self._get_active_presence(db_conn) # type: ignore
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
)
_get_active_presence = DataStore._get_active_presence
take_presence_startup_info = DataStore.take_presence_startup_info
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == PresenceStream.NAME:
self._presence_id_gen.advance(instance_name, token)
for row in rows:
self.presence_stream_cache.entity_has_changed(row.user_id, token)
self._get_presence_for_user.invalidate((row.user_id,))
return super().process_replication_rows(stream_name, instance_name, token, rows)
......@@ -55,6 +55,8 @@ from synapse.replication.tcp.streams import (
CachesStream,
EventsStream,
FederationStream,
PresenceFederationStream,
PresenceStream,
ReceiptsStream,
Stream,
TagAccountDataStream,
......@@ -99,6 +101,10 @@ class ReplicationCommandHandler:
self._instance_id = hs.get_instance_id()
self._instance_name = hs.get_instance_name()
self._is_presence_writer = (
hs.get_instance_name() in hs.config.worker.writers.presence
)
self._streams = {
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
} # type: Dict[str, Stream]
......@@ -153,6 +159,14 @@ class ReplicationCommandHandler:
continue
if isinstance(stream, (PresenceStream, PresenceFederationStream)):
# Only add PresenceStream as a source on the instance in charge
# of presence.
if self._is_presence_writer:
self._streams_to_replicate.append(stream)
continue
# Only add any other streams if we're on master.
if hs.config.worker_app is not None:
continue
......@@ -350,7 +364,7 @@ class ReplicationCommandHandler:
) -> Optional[Awaitable[None]]:
user_sync_counter.inc()
if self._is_master:
if self._is_presence_writer:
return self._presence_handler.update_external_syncs_row(
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)
......@@ -360,7 +374,7 @@ class ReplicationCommandHandler:
def on_CLEAR_USER_SYNC(
self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand
) -> Optional[Awaitable[None]]:
if self._is_master:
if self._is_presence_writer:
return self._presence_handler.update_external_syncs_clear(cmd.instance_id)
else:
return None
......
......@@ -272,15 +272,22 @@ class PresenceStream(Stream):
NAME = "presence"
ROW_TYPE = PresenceStreamRow
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
store = hs.get_datastore()
if hs.config.worker_app is None:
# on the master, query the presence handler
if hs.get_instance_name() in hs.config.worker.writers.presence:
# on the presence writer, query the presence handler
presence_handler = hs.get_presence_handler()
update_function = presence_handler.get_all_presence_updates
from synapse.handlers.presence import PresenceHandler
assert isinstance(presence_handler, PresenceHandler)
update_function = (
presence_handler.get_all_presence_updates
) # type: UpdateFunction
else:
# Query master process
# Query presence writer process
update_function = make_http_update_function(hs, self.NAME)
super().__init__(
......
......@@ -35,10 +35,15 @@ class PresenceStatusRestServlet(RestServlet):
self.clock = hs.get_clock()
self.auth = hs.get_auth()
self._use_presence = hs.config.server.use_presence
async def on_GET(self, request, user_id):
requester = await self.auth.get_user_by_req(request)
user = UserID.from_string(user_id)
if not self._use_presence:
return 200, {"presence": "offline"}
if requester.user != user:
allowed = await self.presence_handler.is_visible(
observed_user=user, observer_user=requester.user
......@@ -80,7 +85,7 @@ class PresenceStatusRestServlet(RestServlet):
except Exception:
raise SynapseError(400, "Unable to parse state")
if self.hs.config.use_presence:
if self._use_presence:
await self.presence_handler.set_state(user, state)
return 200, {}
......
......@@ -418,10 +418,10 @@ class HomeServer(metaclass=abc.ABCMeta):
@cache_in_self
def get_presence_handler(self) -> BasePresenceHandler:
if self.config.worker_app:
return WorkerPresenceHandler(self)
else:
if self.get_instance_name() in self.config.worker.writers.presence:
return PresenceHandler(self)
else:
return WorkerPresenceHandler(self)
@cache_in_self
def get_typing_writer_handler(self) -> TypingWriterHandler:
......
......@@ -17,7 +17,6 @@
import logging
from typing import List, Optional, Tuple
from synapse.api.constants import PresenceState
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.stats import UserSortOrder
......@@ -51,7 +50,7 @@ from .media_repository import MediaRepositoryStore
from .metrics import ServerMetricsStore
from .monthly_active_users import MonthlyActiveUsersStore
from .openid import OpenIdStore
from .presence import PresenceStore, UserPresenceState
from .presence import PresenceStore
from .profile import ProfileStore
from .purge_events import PurgeEventsStore
from .push_rule import PushRuleStore
......@@ -126,9 +125,6 @@ class DataStore(
self._clock = hs.get_clock()
self.database_engine = database.engine
self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id"
)
self._public_room_id_gen = StreamIdGenerator(
db_conn, "public_room_list_stream", "stream_id"
)
......@@ -177,21 +173,6 @@ class DataStore(
super().__init__(database, db_conn, hs)
self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
db_conn,
"presence_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=self._presence_id_gen.get_current_token(),
)
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache",
min_presence_val,
prefilled_cache=presence_cache_prefill,
)
device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache", device_list_max
......@@ -238,32 +219,6 @@ class DataStore(
def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()
def take_presence_startup_info(self):
active_on_startup = self._presence_on_startup
self._presence_on_startup = None
return active_on_startup
def _get_active_presence(self, db_conn):
"""Fetch non-offline presence from the database so that we can register
the appropriate time outs.
"""
sql = (
"SELECT user_id, state, last_active_ts, last_federation_update_ts,"
" last_user_sync_ts, status_msg, currently_active FROM presence_stream"
" WHERE state != ?"
)
txn = db_conn.cursor()
txn.execute(sql, (PresenceState.OFFLINE,))
rows = self.db_pool.cursor_to_dict(txn)
txn.close()
for row in rows:
row["currently_active"] = bool(row["currently_active"])
return [UserPresenceState(**row) for row in rows]
async def get_users(self) -> List[JsonDict]:
"""Function to retrieve a list of users in users table.
......
......@@ -12,16 +12,69 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Tuple
from typing import TYPE_CHECKING, Dict, List, Tuple
from synapse.api.presence import UserPresenceState
from synapse.api.presence import PresenceState, UserPresenceState
from synapse.replication.tcp.streams import PresenceStream
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
from synapse.server import HomeServer
class PresenceStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: Connection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self._can_persist_presence = (
hs.get_instance_name() in hs.config.worker.writers.presence
)
if isinstance(database.engine, PostgresEngine):
self._presence_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="presence_stream",
instance_name=self._instance_name,
tables=[("presence_stream", "instance_name", "stream_id")],
sequence_name="presence_stream_sequence",
writers=hs.config.worker.writers.to_device,
)
else:
self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id"
)
self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
db_conn,
"presence_stream",
entity_column="user_id",
stream_column="stream_id",