From 4b9a6bbfa7921db974ae5624672e82240147c53a Mon Sep 17 00:00:00 2001 From: Dustin Sallings Date: Wed, 21 Dec 2011 18:16:06 -0800 Subject: [PATCH 1/3] Asynchronously store incremental backup data. Change-Id: Id4fc816475886cad1c7ccab213b72a9428d1a380 --- management/mbbackup-incremental | 71 ++++++++++++++++++++++++++++---------- 1 files changed, 52 insertions(+), 19 deletions(-) diff --git a/management/mbbackup-incremental b/management/mbbackup-incremental index 180f2c1..c1a17f2 100755 --- a/management/mbbackup-incremental +++ b/management/mbbackup-incremental @@ -11,6 +11,8 @@ import string import struct import time import traceback +import threading +import Queue import mc_bin_client import memcacheConstants @@ -36,6 +38,24 @@ Usage: %s [-h %s[:%s]] [-o %s] [-T num_secs] [-r] [-c] [-t transaction_size] [-v DEFAULT_HOST_PORT[0], DEFAULT_HOST_PORT[1], DEFAULT_FILE) sys.exit(err) +def db_worker(q, file_name): + db = sqlite3.connect(file_name) + db.text_factory = str + c = db.cursor() + + while True: + task = q.get() + if task[0] == 'quit': + q.task_done() + break + elif task[0] == 'execute': + c.execute(task[1], task[2]) + elif task[0] == 'lambda': + task[1](db, c, *task[2:]) + elif task[0] == 'commit': + db.commit() + q.task_done() + def parse_args(args): host_port = DEFAULT_HOST_PORT file = DEFAULT_FILE @@ -134,8 +154,9 @@ def main(): db = sqlite3.connect(file) # TODO: Revisit isolation level db.text_factory = str createSchema(db) + db.close() - loop(mc, db, timeout, txn_size, ':'.join(host_port) + '-' + name) + loop(mc, file, timeout, txn_size, ':'.join(host_port) + '-' + name) except NameError as ne: sys.exit("ERROR: " + str(ne)) @@ -177,7 +198,7 @@ def check_incremental_backup_file(file_name): if db: db.close() -def loop(mc, db, timeout, txn_size, source): +def loop(mc, db_file, timeout, txn_size, source): vbmap = {} # Key is vbucketId, value is [checkpointId, seq]. cmdInfo = { @@ -186,9 +207,12 @@ def loop(mc, db, timeout, txn_size, source): memcacheConstants.CMD_TAP_FLUSH: ('flush', 'f'), } + q = Queue.Queue() + wt = threading.Thread(target=db_worker, args=(q, db_file)) + wt.start() + try: sinput = [mc.s] - c = db.cursor() update_count = 0 while True: @@ -227,14 +251,14 @@ def loop(mc, db, timeout, txn_size, source): s = "INSERT into cpoint_op" \ "(vbucket_id, cpoint_id, seq, op, key, flg, exp, cas, val)" \ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" - c.execute(s, (vbucketId, checkpointId, seq, cmdOp, - key, flg, exp, cas, sqlite3.Binary(val))) + q.put(['execute', s, (vbucketId, checkpointId, seq, cmdOp, + key, flg, exp, cas, sqlite3.Binary(val))]) else: s = "INSERT into cpoint_op" \ "(vbucket_id, cpoint_id, seq, op, key, cas, val)" \ " VALUES (?, ?, ?, ?, ?, ?, ?)" - c.execute(s, (vbucketId, checkpointId, seq, cmdOp, - key, cas, sqlite3.Binary(val))) + q.put(['execute', s, (vbucketId, checkpointId, seq, cmdOp, + key, cas, sqlite3.Binary(val))]) update_count = update_count + 1 needAck = flags & memcacheConstants.TAP_FLAG_ACK @@ -243,6 +267,7 @@ def loop(mc, db, timeout, txn_size, source): if len(ext) > 0: eng_length, flags, ttl, flg, exp, needAck = parseTapExt(ext) checkpoint_id = struct.unpack(">Q", val) + log(1, "Started checkpoint %d" % checkpoint_id) checkpointStartExists = False if vbucketId in vbmap: if vbmap[vbucketId][0] == checkpoint_id[0]: @@ -257,13 +282,14 @@ def loop(mc, db, timeout, txn_size, source): s = "INSERT into cpoint_state" \ "(vbucket_id, cpoint_id, prev_cpoint_id, state, source, updated_at)" \ " VALUES (?, ?, ?, \"open\", ?, ?)" - c.execute(s, (vbucketId, checkpoint_id[0], -1, source, t)) - db.commit() + q.put(['execute', s, (vbucketId, checkpoint_id[0], -1, source, t)]) + q.put(['commit']) elif cmd == memcacheConstants.CMD_TAP_CHECKPOINT_END: - db.commit() + q.put(['commit']) update_count = 0 checkpoint_id = struct.unpack(">Q", val) + log(1, "Completed checkpoint %d" % checkpoint_id) if not vbucketId in vbmap: sys.exit("ERROR: unmatched checkpoint end: %s vb: %s" % (checkpoint_id[0], vbucketId)) @@ -282,11 +308,14 @@ def loop(mc, db, timeout, txn_size, source): s = "UPDATE cpoint_state" \ " SET state=\"closed\", updated_at=?" \ " WHERE vbucket_id=? AND cpoint_id=? AND state=\"open\" AND source=?" - r = c.execute(s, (t, vbucketId, current_checkpoint_id, source)) - db.commit() - if r.rowcount != 1: - sys.exit("ERROR: unexpected rowcount during update: " - + ",".join([t, vbucketId, checkpointId, source])) + + def do_end(db, c): + r = c.execute(s, (t, vbucketId, current_checkpoint_id, source)) + db.commit() + if r.rowcount != 1: + sys.exit("ERROR: unexpected rowcount during update: " + + ",".join([t, vbucketId, checkpointId, source])) + q.put(['lambda', do_end]) elif cmd == memcacheConstants.CMD_TAP_OPAQUE: if len(ext) > 0: @@ -294,13 +323,13 @@ def loop(mc, db, timeout, txn_size, source): opaque_opcode = struct.unpack(">I" , val[0:eng_length]) if opaque_opcode[0] == memcacheConstants.TAP_OPAQUE_OPEN_CHECKPOINT: if update_count > 0: - db.commit() + q.put(['commit']) log(1, "Incremental backup is currently at the open checkpoint. Exit...") - exit(0) + break elif cmd == memcacheConstants.CMD_TAP_CONNECT: if update_count > 0: - db.commit() + q.put(['commit']) sys.exit("ERROR: TAP_CONNECT error: " + str(key)) elif cmd == memcacheConstants.CMD_NOOP: @@ -310,7 +339,7 @@ def loop(mc, db, timeout, txn_size, source): sys.exit("ERROR: unhandled cmd " + str(cmd)) if update_count == txn_size: - db.commit() + q.put(['commit']) update_count = 0 if needAck: @@ -322,6 +351,10 @@ def loop(mc, db, timeout, txn_size, source): except exceptions.EOFError: pass + q.put(['quit']) + q.join() + wt.join() + def readTap(mc): ext = '' key = '' -- 1.7.4.4