Skip to content
Snippets Groups Projects
Commit 2f18a264 authored by Erik Johnston's avatar Erik Johnston
Browse files

Make all fields private

parent dc519602
No related branches found
No related tags found
No related merge requests found
......@@ -35,24 +35,24 @@ class BackgroundFileConsumer(object):
_RESUME_ON_QUEUE_SIZE = 2
def __init__(self, file_obj):
self.file_obj = file_obj
self._file_obj = file_obj
# Producer we're registered with
self.producer = None
self._producer = None
# True if PushProducer, false if PullProducer
self.streaming = False
# For PushProducers, indicates whether we've paused the producer and
# need to call resumeProducing before we get more data.
self.paused_producer = False
self._paused_producer = False
# Queue of slices of bytes to be written. When producer calls
# unregister a final None is sent.
self.bytes_queue = Queue.Queue()
self._bytes_queue = Queue.Queue()
# Deferred that is resolved when finished writing
self.finished_deferred = None
self._finished_deferred = None
# If the _writer thread throws an exception it gets stored here.
self._write_exception = None
......@@ -69,21 +69,21 @@ class BackgroundFileConsumer(object):
streaming (bool): True if push based producer, False if pull
based.
"""
if self.producer:
if self._producer:
raise Exception("registerProducer called twice")
self.producer = producer
self._producer = producer
self.streaming = streaming
self.finished_deferred = threads.deferToThread(self._writer)
self._finished_deferred = threads.deferToThread(self._writer)
if not streaming:
self.producer.resumeProducing()
self._producer.resumeProducing()
def unregisterProducer(self):
"""Part of IProducer interface
"""
self.producer = None
if not self.finished_deferred.called:
self.bytes_queue.put_nowait(None)
self._producer = None
if not self._finished_deferred.called:
self._bytes_queue.put_nowait(None)
def write(self, bytes):
"""Part of IProducer interface
......@@ -91,65 +91,65 @@ class BackgroundFileConsumer(object):
if self._write_exception:
raise self._write_exception
if self.finished_deferred.called:
if self._finished_deferred.called:
raise Exception("consumer has closed")
self.bytes_queue.put_nowait(bytes)
self._bytes_queue.put_nowait(bytes)
# If this is a PushProducer and the queue is getting behind
# then we pause the producer.
if self.streaming and self.bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE:
self.paused_producer = True
self.producer.pauseProducing()
if self.streaming and self._bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE:
self._paused_producer = True
self._producer.pauseProducing()
def _writer(self):
"""This is run in a background thread to write to the file.
"""
try:
while self.producer or not self.bytes_queue.empty():
while self._producer or not self._bytes_queue.empty():
# If we've paused the producer check if we should resume the
# producer.
if self.producer and self.paused_producer:
if self.bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
if self._producer and self._paused_producer:
if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
reactor.callFromThread(self._resume_paused_producer)
if self._notify_empty_deferred and self.bytes_queue.empty():
if self._notify_empty_deferred and self._bytes_queue.empty():
reactor.callFromThread(self._notify_empty)
bytes = self.bytes_queue.get()
bytes = self._bytes_queue.get()
# If we get a None (or empty list) then that's a signal used
# to indicate we should check if we should stop.
if bytes:
self.file_obj.write(bytes)
self._file_obj.write(bytes)
# If its a pull producer then we need to explicitly ask for
# more stuff.
if not self.streaming and self.producer:
reactor.callFromThread(self.producer.resumeProducing)
if not self.streaming and self._producer:
reactor.callFromThread(self._producer.resumeProducing)
except Exception as e:
self._write_exception = e
raise
finally:
self.file_obj.close()
self._file_obj.close()
def wait(self):
"""Returns a deferred that resolves when finished writing to file
"""
return make_deferred_yieldable(self.finished_deferred)
return make_deferred_yieldable(self._finished_deferred)
def _resume_paused_producer(self):
"""Gets called if we should resume producing after being paused
"""
if self.paused_producer and self.producer:
self.paused_producer = False
self.producer.resumeProducing()
if self._paused_producer and self._producer:
self._paused_producer = False
self._producer.resumeProducing()
def _notify_empty(self):
"""Called when the _writer thread thinks the queue may be empty and
we should notify anything waiting on `wait_for_writes`
"""
if self._notify_empty_deferred and self.bytes_queue.empty():
if self._notify_empty_deferred and self._bytes_queue.empty():
d = self._notify_empty_deferred
self._notify_empty_deferred = None
d.callback(None)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment