Skip to content
Snippets Groups Projects
Commit 940a1611 authored by Mark Haines's avatar Mark Haines
Browse files

Fix the background update

parent 90b50321
No related branches found
No related tags found
No related merge requests found
......@@ -93,16 +93,19 @@ class BackgroundUpdateStore(SQLBaseStore):
sleep = defer.Deferred()
self._background_update_timer = self._clock.call_later(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
)
try:
yield sleep
finally:
self._background_update_timer = None
result = yield self.do_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
try:
result = yield self.do_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
except:
logger.exception("Error doing update")
if result is None:
logger.info(
......@@ -169,7 +172,7 @@ class BackgroundUpdateStore(SQLBaseStore):
duration_ms = time_stop - time_start
logger.info(
"Updating %. Updated %r items in %rms."
"Updating %r. Updated %r items in %rms."
" (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)",
update_name, items_updated, duration_ms,
performance.total_items_per_ms(),
......
......@@ -47,11 +47,10 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
for statement in get_statements(POSTGRES_TABLE.splitlines()):
cur.execute(statement)
return
if isinstance(database_engine, Sqlite3Engine):
elif isinstance(database_engine, Sqlite3Engine):
cur.execute(SQLITE_TABLE)
return
else:
raise Exception("Unrecognized database engine")
cur.execute("SELECT MIN(stream_ordering) FROM events")
rows = cur.fetchall()
......
......@@ -46,18 +46,18 @@ class SearchStore(BackgroundUpdateStore):
def reindex_search_txn(txn):
sql = (
"SELECT stream_id, event_id FROM events"
"SELECT stream_ordering, event_id FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
) % (" OR ".join("type = '%s'" % TYPES),)
) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),)
txn.execute(sql, target_min_stream_id, max_stream_id, batch_size)
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
rows = txn.fetch_all()
rows = txn.fetchall()
if not rows:
return None
return 0
min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]
......@@ -102,7 +102,7 @@ class SearchStore(BackgroundUpdateStore):
for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE):
clump = event_search_rows[index:index + INSERT_CLUMP_SIZE]
txn.execute_many(sql, clump)
txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
......@@ -116,11 +116,11 @@ class SearchStore(BackgroundUpdateStore):
return len(event_search_rows)
result = yield self.runInteration(
result = yield self.runInteraction(
self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
)
if result is None:
if not result:
yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
defer.returnValue(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment