Skip to content
Snippets Groups Projects
Commit 90bcb869 authored by Erik Johnston's avatar Erik Johnston
Browse files

Support running porting script multiple times

parent 1bede478
No related branches found
No related tags found
No related merge requests found
......@@ -51,16 +51,52 @@ UNICODE_COLUMNS = {
}
APPEND_ONLY_TABLES = [
"event_content_hashes",
"event_reference_hashes",
"event_signatures",
"event_edge_hashes",
"events",
"event_json",
"state_events",
"room_memberships",
"feedback",
"topics",
"room_names",
"rooms",
"local_media_repository",
"local_media_repository_thumbnails",
"remote_media_cache",
"remote_media_cache_thumbnails",
"redactions",
"event_edges",
"event_auth",
"received_transactions",
"sent_transactions",
"transaction_id_to_pdu",
"users",
"state_groups",
"state_groups_state",
"event_to_state_groups",
"rejections",
]
class Store(object):
def __init__(self, db_pool, engine):
self.db_pool = db_pool
self.engine = engine
self.database_engine = engine
_simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
_simple_insert = SQLBaseStore.__dict__["_simple_insert"]
_simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
_simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
_simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
_simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"]
_simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
_simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
_execute_and_decode = SQLBaseStore.__dict__["_execute_and_decode"]
......@@ -73,11 +109,11 @@ class Store(object):
try:
txn = conn.cursor()
return func(
LoggingTransaction(txn, desc, self.engine),
LoggingTransaction(txn, desc, self.database_engine),
*args, **kwargs
)
except self.engine.module.DatabaseError as e:
if self.engine.is_deadlock(e):
except self.database_engine.module.DatabaseError as e:
if self.database_engine.is_deadlock(e):
logger.warn("[TXN DEADLOCK] {%s} %d/%d", desc, i, N)
if i < N:
i += 1
......@@ -117,16 +153,50 @@ def chunks(n):
@defer.inlineCallbacks
def handle_table(table, sqlite_store, mysql_store):
N = 1000
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
next_chunk = yield mysql_store._simple_select_one_onecol(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcol="rowid",
allow_none=True,
)
if next_chunk is None:
yield mysql_store._simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "rowid": 0}
)
next_chunk = 0
else:
def delete_all(txn):
txn.execute(
"DELETE FROM port_from_sqlite3 WHERE table_name = %s",
(table,)
)
txn.execute("DELETE FROM %s" % (table,))
mysql_store._simple_insert_txn(
txn,
table="port_from_sqlite3",
values={"table_name": table, "rowid": 0}
)
yield mysql_store.runInteraction(
"delete_non_append_only", delete_all
)
next_chunk = 0
N = 5000
select = "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,)
uni_col_names = UNICODE_COLUMNS.get(table, [])
def conv_uni(c):
return sqlite_store.engine.load_unicode(c)
return sqlite_store.database_engine.load_unicode(c)
next_chunk = 0
while True:
def r(txn):
txn.execute(select, (next_chunk, N,))
......@@ -145,7 +215,7 @@ def handle_table(table, sqlite_store, mysql_store):
for i, row in enumerate(rows):
rows[i] = tuple(
mysql_store.engine.encode_parameter(
mysql_store.database_engine.encode_parameter(
conv_uni(col) if j in uni_cols else col
)
for j, col in enumerate(row)
......@@ -153,6 +223,12 @@ def handle_table(table, sqlite_store, mysql_store):
)
yield mysql_store.insert_many(table, headers[1:], rows)
yield mysql_store._simple_update_one(
table="port_from_sqlite3",
keyvalues={"table_name": table},
updatevalues={"rowid": next_chunk},
)
else:
return
......@@ -208,6 +284,22 @@ def main(sqlite_config, mysql_config):
logger.info("Found %d tables", len(tables))
def create_port_table(txn):
try:
txn.execute(
"CREATE TABLE port_from_sqlite3 ("
" `table_name` varchar(100) NOT NULL UNIQUE,"
" `rowid` bigint unsigned NOT NULL"
")"
)
except mysql_engine.module.DatabaseError as e:
if e.errno != mysql_engine.module.errorcode.ER_TABLE_EXISTS_ERROR:
raise
yield mysql_store.runInteraction(
"create_port_table", create_port_table
)
# Process tables.
yield defer.gatherResults(
[
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment