Skip to content
Snippets Groups Projects
Commit 71dcb275 authored by Richard van der Hoff's avatar Richard van der Hoff
Browse files

move FederationStream out to its own file

parent aa1e0178
No related branches found
No related tags found
No related merge requests found
......@@ -30,7 +30,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func
from .protocol import ServerReplicationStreamProtocol
from .streams import STREAMS_MAP, FederationStream
from .streams import STREAMS_MAP
from .streams.federation import FederationStream
stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])
......
......@@ -25,7 +25,7 @@ Each stream is defined by the following information:
update_function: The function that returns a list of updates between two tokens
"""
from . import _base, events
from . import _base, events, federation
STREAMS_MAP = {
stream.NAME: stream
......@@ -41,7 +41,7 @@ STREAMS_MAP = {
_base.PublicRoomsStream,
_base.DeviceListsStream,
_base.ToDeviceStream,
_base.FederationStream,
federation.FederationStream,
_base.TagAccountDataStream,
_base.AccountDataStream,
_base.CurrentStateDeltaStream,
......
......@@ -80,10 +80,6 @@ DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str
))
FederationStreamRow = namedtuple("FederationStreamRow", (
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str
"room_id", # str
......@@ -374,22 +370,6 @@ class ToDeviceStream(Stream):
super(ToDeviceStream, self).__init__(hs)
class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
NAME = "federation"
ROW_TYPE = FederationStreamRow
def __init__(self, hs):
federation_sender = hs.get_federation_sender()
self.current_token = federation_sender.get_current_token
self.update_function = federation_sender.get_replication_rows
super(FederationStream, self).__init__(hs)
class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""
......
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
from ._base import Stream
FederationStreamRow = namedtuple("FederationStreamRow", (
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
))
class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
NAME = "federation"
ROW_TYPE = FederationStreamRow
def __init__(self, hs):
federation_sender = hs.get_federation_sender()
self.current_token = federation_sender.get_current_token
self.update_function = federation_sender.get_replication_rows
super(FederationStream, self).__init__(hs)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment