Skip to content
Snippets Groups Projects
Commit 3d6df846 authored by Richard van der Hoff's avatar Richard van der Hoff
Browse files

Test and fix support for cancellation in Linearizer

parent 4f676236
No related branches found
No related tags found
No related merge requests found
......@@ -184,13 +184,13 @@ class Linearizer(object):
# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing, and
# the second element is a deque of deferreds for the things blocked from
# executing.
# the second element is an OrderedDict, where the keys are deferreds for the
# things blocked from executing.
self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
entry = self.key_to_defer.setdefault(key, [0, collections.deque()])
entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
......@@ -198,12 +198,28 @@ class Linearizer(object):
# this item so that it can continue executing.
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1].append(new_defer)
entry[1][new_defer] = 1
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
yield make_deferred_yieldable(new_defer)
try:
yield make_deferred_yieldable(new_defer)
except Exception as e:
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)
else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)
# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
raise
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
......@@ -238,7 +254,7 @@ class Linearizer(object):
entry[0] -= 1
if entry[1]:
next_def = entry[1].popleft()
(next_def, _) = entry[1].popitem(last=False)
# we need to run the next thing in the sentinel context.
with PreserveLoggingContext():
......
......@@ -17,6 +17,7 @@
from six.moves import range
from twisted.internet import defer, reactor
from twisted.internet.defer import CancelledError
from synapse.util import Clock, logcontext
from synapse.util.async import Linearizer
......@@ -112,3 +113,33 @@ class LinearizerTestCase(unittest.TestCase):
d6 = limiter.queue(key)
with (yield d6):
pass
@defer.inlineCallbacks
def test_cancellation(self):
linearizer = Linearizer()
key = object()
d1 = linearizer.queue(key)
cm1 = yield d1
d2 = linearizer.queue(key)
self.assertFalse(d2.called)
d3 = linearizer.queue(key)
self.assertFalse(d3.called)
d2.cancel()
with cm1:
pass
self.assertTrue(d2.called)
try:
yield d2
self.fail("Expected d2 to raise CancelledError")
except CancelledError:
pass
with (yield d3):
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment