Skip to content
Snippets Groups Projects
Unverified Commit 180d8ff0 authored by Nick Mills-Barrett's avatar Nick Mills-Barrett Committed by GitHub
Browse files

Retry some http replication failures (#12182)

This allows for the target process to be down for around a minute
which provides time for restarts during synapse upgrades/config updates.

Closes: #12178

Signed off by Nick Mills-Barrett nick@beeper.com
parent dc8d825e
No related branches found
No related tags found
No related merge requests found
Retry HTTP replication failures, this should prevent 502's when restarting stateful workers (main, event persisters, stream writers). Contributed by Nick @ Beeper.
......@@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple
from prometheus_client import Counter, Gauge
from twisted.internet.error import ConnectError, DNSLookupError
from twisted.web.server import Request
from synapse.api.errors import HttpResponseException, SynapseError
......@@ -87,6 +88,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
`_handle_request` must return a Deferred.
RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
is received.
RETRY_ON_CONNECT_ERROR (bool): Whether or not to retry the request when
a connection error is received.
RETRY_ON_CONNECT_ERROR_ATTEMPTS (int): Number of attempts to retry when
receiving connection errors, each will backoff exponentially longer.
"""
NAME: str = abc.abstractproperty() # type: ignore
......@@ -94,6 +99,8 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
RETRY_ON_CONNECT_ERROR = True
RETRY_ON_CONNECT_ERROR_ATTEMPTS = 5 # =63s (2^6-1)
def __init__(self, hs: "HomeServer"):
if self.CACHE:
......@@ -236,18 +243,20 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
"/".join(url_args),
)
headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
opentracing.inject_header_dict(headers, check_destination=False)
try:
# Keep track of attempts made so we can bail if we don't manage to
# connect to the target after N tries.
attempts = 0
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed
# on the master, and so whether we should clean up or not.
while True:
headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [
b"Bearer " + replication_secret
]
opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
......@@ -255,11 +264,27 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
if not cls.RETRY_ON_TIMEOUT:
raise
logger.warning("%s request timed out; retrying", cls.NAME)
logger.warning("%s request timed out; retrying", cls.NAME)
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except (ConnectError, DNSLookupError) as e:
if not cls.RETRY_ON_CONNECT_ERROR:
raise
if attempts > cls.RETRY_ON_CONNECT_ERROR_ATTEMPTS:
raise
delay = 2 ** attempts
logger.warning(
"%s request connection failed; retrying in %ds: %r",
cls.NAME,
delay,
e,
)
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
await clock.sleep(delay)
attempts += 1
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment