Skip to content
Snippets Groups Projects
Commit 973d67a0 authored by Erik Johnston's avatar Erik Johnston Committed by GitHub
Browse files

Merge pull request #1019 from matrix-org/erikj/appservice_clean

Clean up _ServiceQueuer
parents e8850245 7321f454
No related branches found
No related tags found
No related merge requests found
......@@ -48,9 +48,12 @@ UP & quit +---------- YES SUCCESS
This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
from twisted.internet import defer
from synapse.appservice import ApplicationServiceState
from twisted.internet import defer
from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import Measure
import logging
logger = logging.getLogger(__name__)
......@@ -73,7 +76,7 @@ class ApplicationServiceScheduler(object):
self.txn_ctrl = _TransactionController(
self.clock, self.store, self.as_api, create_recoverer
)
self.queuer = _ServiceQueuer(self.txn_ctrl)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
@defer.inlineCallbacks
def start(self):
......@@ -94,38 +97,36 @@ class _ServiceQueuer(object):
this schedules any other events in the queue to run.
"""
def __init__(self, txn_ctrl):
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
self.pending_requests = {} # dict of {service_id: Deferred}
self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
def enqueue(self, service, event):
# if this service isn't being sent something
if not self.pending_requests.get(service.id):
self._send_request(service, [event])
else:
# add to queue for this service
if service.id not in self.queued_events:
self.queued_events[service.id] = []
self.queued_events[service.id].append(event)
def _send_request(self, service, events):
# send request and add callbacks
d = self.txn_ctrl.send(service, events)
d.addBoth(self._on_request_finish)
d.addErrback(self._on_request_fail)
self.pending_requests[service.id] = d
def _on_request_finish(self, service):
self.pending_requests[service.id] = None
# if there are queued events, then send them.
if (service.id in self.queued_events
and len(self.queued_events[service.id]) > 0):
self._send_request(service, self.queued_events[service.id])
self.queued_events[service.id] = []
def _on_request_fail(self, err):
logger.error("AS request failed: %s", err)
self.queued_events.setdefault(service.id, []).append(event)
preserve_fn(self._send_request)(service)
@defer.inlineCallbacks
def _send_request(self, service):
if service.id in self.requests_in_flight:
return
with Measure(self.clock, "_ServiceQueuer._send_request"):
self.requests_in_flight.add(service.id)
try:
while True:
events = self.queued_events.pop(service.id, [])
if not events:
return
try:
yield self.txn_ctrl.send(service, events)
except:
logger.exception("AS request failed")
finally:
self.requests_in_flight.discard(service.id)
class _TransactionController(object):
......@@ -155,8 +156,6 @@ class _TransactionController(object):
except Exception as e:
logger.exception(e)
self._start_recoverer(service)
# request has finished
defer.returnValue(service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
......
......@@ -193,7 +193,7 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
def setUp(self):
self.txn_ctrl = Mock()
self.queuer = _ServiceQueuer(self.txn_ctrl)
self.queuer = _ServiceQueuer(self.txn_ctrl, MockClock())
def test_send_single_event_no_queue(self):
# Expect the event to be sent immediately.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment