Skip to content
Snippets Groups Projects
Unverified Commit c1ef579b authored by Richard van der Hoff's avatar Richard van der Hoff Committed by GitHub
Browse files

Add prometheus metrics to track federation delays (#8430)

Add a pair of federation metrics to track the delays in sending PDUs to/from 
particular servers.
parent 7941372e
No related branches found
No related tags found
No related merge requests found
Add prometheus metrics to track federation delays.
...@@ -629,6 +629,7 @@ acme: ...@@ -629,6 +629,7 @@ acme:
#tls_fingerprints: [{"sha256": "<base64_encoded_sha256_fingerprint>"}] #tls_fingerprints: [{"sha256": "<base64_encoded_sha256_fingerprint>"}]
## Federation ##
# Restrict federation to the following whitelist of domains. # Restrict federation to the following whitelist of domains.
# N.B. we recommend also firewalling your federation listener to limit # N.B. we recommend also firewalling your federation listener to limit
...@@ -662,6 +663,17 @@ federation_ip_range_blacklist: ...@@ -662,6 +663,17 @@ federation_ip_range_blacklist:
- 'fe80::/64' - 'fe80::/64'
- 'fc00::/7' - 'fc00::/7'
# Report prometheus metrics on the age of PDUs being sent to and received from
# the following domains. This can be used to give an idea of "delay" on inbound
# and outbound federation, though be aware that any delay can be due to problems
# at either end or with the intermediate network.
#
# By default, no domains are monitored in this way.
#
#federation_metrics_domains:
# - matrix.org
# - example.com
## Caching ## ## Caching ##
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Any, List from typing import Any, Iterable
import jsonschema import jsonschema
...@@ -20,7 +20,9 @@ from synapse.config._base import ConfigError ...@@ -20,7 +20,9 @@ from synapse.config._base import ConfigError
from synapse.types import JsonDict from synapse.types import JsonDict
def validate_config(json_schema: JsonDict, config: Any, config_path: List[str]) -> None: def validate_config(
json_schema: JsonDict, config: Any, config_path: Iterable[str]
) -> None:
"""Validates a config setting against a JsonSchema definition """Validates a config setting against a JsonSchema definition
This can be used to validate a section of the config file against a schema This can be used to validate a section of the config file against a schema
......
...@@ -17,7 +17,8 @@ from typing import Optional ...@@ -17,7 +17,8 @@ from typing import Optional
from netaddr import IPSet from netaddr import IPSet
from ._base import Config, ConfigError from synapse.config._base import Config, ConfigError
from synapse.config._util import validate_config
class FederationConfig(Config): class FederationConfig(Config):
...@@ -52,8 +53,18 @@ class FederationConfig(Config): ...@@ -52,8 +53,18 @@ class FederationConfig(Config):
"Invalid range(s) provided in federation_ip_range_blacklist: %s" % e "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e
) )
federation_metrics_domains = config.get("federation_metrics_domains") or []
validate_config(
_METRICS_FOR_DOMAINS_SCHEMA,
federation_metrics_domains,
("federation_metrics_domains",),
)
self.federation_metrics_domains = set(federation_metrics_domains)
def generate_config_section(self, config_dir_path, server_name, **kwargs): def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\ return """\
## Federation ##
# Restrict federation to the following whitelist of domains. # Restrict federation to the following whitelist of domains.
# N.B. we recommend also firewalling your federation listener to limit # N.B. we recommend also firewalling your federation listener to limit
# inbound federation traffic as early as possible, rather than relying # inbound federation traffic as early as possible, rather than relying
...@@ -85,4 +96,18 @@ class FederationConfig(Config): ...@@ -85,4 +96,18 @@ class FederationConfig(Config):
- '::1/128' - '::1/128'
- 'fe80::/64' - 'fe80::/64'
- 'fc00::/7' - 'fc00::/7'
# Report prometheus metrics on the age of PDUs being sent to and received from
# the following domains. This can be used to give an idea of "delay" on inbound
# and outbound federation, though be aware that any delay can be due to problems
# at either end or with the intermediate network.
#
# By default, no domains are monitored in this way.
#
#federation_metrics_domains:
# - matrix.org
# - example.com
""" """
_METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}}
...@@ -92,5 +92,4 @@ class HomeServerConfig(RootConfig): ...@@ -92,5 +92,4 @@ class HomeServerConfig(RootConfig):
TracerConfig, TracerConfig,
WorkerConfig, WorkerConfig,
RedisConfig, RedisConfig,
FederationConfig,
] ]
...@@ -471,7 +471,6 @@ class TlsConfig(Config): ...@@ -471,7 +471,6 @@ class TlsConfig(Config):
# or by checking matrix.org/federationtester/api/report?server_name=$host # or by checking matrix.org/federationtester/api/report?server_name=$host
# #
#tls_fingerprints: [{"sha256": "<base64_encoded_sha256_fingerprint>"}] #tls_fingerprints: [{"sha256": "<base64_encoded_sha256_fingerprint>"}]
""" """
# Lowercase the string representation of boolean values # Lowercase the string representation of boolean values
% { % {
......
...@@ -28,7 +28,7 @@ from typing import ( ...@@ -28,7 +28,7 @@ from typing import (
Union, Union,
) )
from prometheus_client import Counter, Histogram from prometheus_client import Counter, Gauge, Histogram
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.abstract import isIPAddress from twisted.internet.abstract import isIPAddress
...@@ -88,6 +88,13 @@ pdu_process_time = Histogram( ...@@ -88,6 +88,13 @@ pdu_process_time = Histogram(
) )
last_pdu_age_metric = Gauge(
"synapse_federation_last_received_pdu_age",
"The age (in seconds) of the last PDU successfully received from the given domain",
labelnames=("server_name",),
)
class FederationServer(FederationBase): class FederationServer(FederationBase):
def __init__(self, hs): def __init__(self, hs):
super().__init__(hs) super().__init__(hs)
...@@ -118,6 +125,10 @@ class FederationServer(FederationBase): ...@@ -118,6 +125,10 @@ class FederationServer(FederationBase):
hs, "state_ids_resp", timeout_ms=30000 hs, "state_ids_resp", timeout_ms=30000
) )
self._federation_metrics_domains = (
hs.get_config().federation.federation_metrics_domains
)
async def on_backfill_request( async def on_backfill_request(
self, origin: str, room_id: str, versions: List[str], limit: int self, origin: str, room_id: str, versions: List[str], limit: int
) -> Tuple[int, Dict[str, Any]]: ) -> Tuple[int, Dict[str, Any]]:
...@@ -262,7 +273,11 @@ class FederationServer(FederationBase): ...@@ -262,7 +273,11 @@ class FederationServer(FederationBase):
pdus_by_room = {} # type: Dict[str, List[EventBase]] pdus_by_room = {} # type: Dict[str, List[EventBase]]
newest_pdu_ts = 0
for p in transaction.pdus: # type: ignore for p in transaction.pdus: # type: ignore
# FIXME (richardv): I don't think this works:
# https://github.com/matrix-org/synapse/issues/8429
if "unsigned" in p: if "unsigned" in p:
unsigned = p["unsigned"] unsigned = p["unsigned"]
if "age" in unsigned: if "age" in unsigned:
...@@ -300,6 +315,9 @@ class FederationServer(FederationBase): ...@@ -300,6 +315,9 @@ class FederationServer(FederationBase):
event = event_from_pdu_json(p, room_version) event = event_from_pdu_json(p, room_version)
pdus_by_room.setdefault(room_id, []).append(event) pdus_by_room.setdefault(room_id, []).append(event)
if event.origin_server_ts > newest_pdu_ts:
newest_pdu_ts = event.origin_server_ts
pdu_results = {} pdu_results = {}
# we can process different rooms in parallel (which is useful if they # we can process different rooms in parallel (which is useful if they
...@@ -340,6 +358,10 @@ class FederationServer(FederationBase): ...@@ -340,6 +358,10 @@ class FederationServer(FederationBase):
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
) )
if newest_pdu_ts and origin in self._federation_metrics_domains:
newest_pdu_age = self._clock.time_msec() - newest_pdu_ts
last_pdu_age_metric.labels(server_name=origin).set(newest_pdu_age / 1000)
return pdu_results return pdu_results
async def _handle_edus_in_txn(self, origin: str, transaction: Transaction): async def _handle_edus_in_txn(self, origin: str, transaction: Transaction):
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
import logging import logging
from typing import TYPE_CHECKING, List from typing import TYPE_CHECKING, List
from prometheus_client import Gauge
from synapse.api.errors import HttpResponseException from synapse.api.errors import HttpResponseException
from synapse.events import EventBase from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions from synapse.federation.persistence import TransactionActions
...@@ -34,6 +36,12 @@ if TYPE_CHECKING: ...@@ -34,6 +36,12 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
last_pdu_age_metric = Gauge(
"synapse_federation_last_sent_pdu_age",
"The age (in seconds) of the last PDU successfully sent to the given domain",
labelnames=("server_name",),
)
class TransactionManager: class TransactionManager:
"""Helper class which handles building and sending transactions """Helper class which handles building and sending transactions
...@@ -48,6 +56,10 @@ class TransactionManager: ...@@ -48,6 +56,10 @@ class TransactionManager:
self._transaction_actions = TransactionActions(self._store) self._transaction_actions = TransactionActions(self._store)
self._transport_layer = hs.get_federation_transport_client() self._transport_layer = hs.get_federation_transport_client()
self._federation_metrics_domains = (
hs.get_config().federation.federation_metrics_domains
)
# HACK to get unique tx id # HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec()) self._next_txn_id = int(self.clock.time_msec())
...@@ -119,6 +131,9 @@ class TransactionManager: ...@@ -119,6 +131,9 @@ class TransactionManager:
# FIXME (erikj): This is a bit of a hack to make the Pdu age # FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work # keys work
# FIXME (richardv): I also believe it no longer works. We (now?) store
# "age_ts" in "unsigned" rather than at the top level. See
# https://github.com/matrix-org/synapse/issues/8429.
def json_data_cb(): def json_data_cb():
data = transaction.get_dict() data = transaction.get_dict()
now = int(self.clock.time_msec()) now = int(self.clock.time_msec())
...@@ -167,5 +182,12 @@ class TransactionManager: ...@@ -167,5 +182,12 @@ class TransactionManager:
) )
success = False success = False
if success and pdus and destination in self._federation_metrics_domains:
last_pdu = pdus[-1]
last_pdu_age = self.clock.time_msec() - last_pdu.origin_server_ts
last_pdu_age_metric.labels(server_name=destination).set(
last_pdu_age / 1000
)
set_tag(tags.ERROR, not success) set_tag(tags.ERROR, not success)
return success return success
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment