From 4c38152513c2eeb4841ceb083efe0155f85bda33 Mon Sep 17 00:00:00 2001 From: Dustin Sallings Date: Wed, 21 Dec 2011 19:00:28 -0800 Subject: [PATCH 2/3] Moved the tap handling to a new class. Change-Id: Ia17415301c524a185040af7ddfb63ab20dfac3ce --- management/mbbackup-incremental | 292 +++++++++++++++++++++------------------ 1 files changed, 155 insertions(+), 137 deletions(-) diff --git a/management/mbbackup-incremental b/management/mbbackup-incremental index c1a17f2..0810f22 100755 --- a/management/mbbackup-incremental +++ b/management/mbbackup-incremental @@ -132,10 +132,10 @@ def main(): mc = mc_bin_client.MemcachedClient(host_port[0], int(host_port[1])) ext, val = encodeTAPConnectOpts({ - memcacheConstants.TAP_FLAG_CHECKPOINT: '', - memcacheConstants.TAP_FLAG_SUPPORT_ACK: '', - memcacheConstants.TAP_FLAG_REGISTERED_CLIENT: 0x01, # "value > 0" means "closed checkpoints only" - memcacheConstants.TAP_FLAG_BACKFILL: 0xffffffff + memcacheConstants.TAP_FLAG_CHECKPOINT: '', + memcacheConstants.TAP_FLAG_SUPPORT_ACK: '', + memcacheConstants.TAP_FLAG_REGISTERED_CLIENT: 0x01, # "value > 0" means "closed checkpoints only" + memcacheConstants.TAP_FLAG_BACKFILL: 0xffffffff }) mc._sendCmd(memcacheConstants.CMD_TAP_CONNECT, name, val, 0, ext) @@ -149,7 +149,7 @@ def main(): file = util.expand_file_pattern(file) log(1, "file actual = " + file) if os.path.exists(file): - sys.exit("ERROR: file exists already: " + file) + sys.exit("ERROR: file exists already: " + file) db = sqlite3.connect(file) # TODO: Revisit isolation level db.text_factory = str @@ -198,24 +198,162 @@ def check_incremental_backup_file(file_name): if db: db.close() -def loop(mc, db_file, timeout, txn_size, source): - vbmap = {} # Key is vbucketId, value is [checkpointId, seq]. + +class TapHandler(object): cmdInfo = { memcacheConstants.CMD_TAP_MUTATION: ('mutation', 'm'), memcacheConstants.CMD_TAP_DELETE: ('delete', 'd'), memcacheConstants.CMD_TAP_FLUSH: ('flush', 'f'), - } + } + + finished = False + update_count = 0 + + def __init__(self, db_file, txn_size, source): + self.vbmap = {} + self.txn_size = txn_size + self.source = source + self.q = Queue.Queue() + self.wt = threading.Thread(target=db_worker, args=(self.q, db_file)) + self.wt.start() + + def shutdown(self): + self.q.put(['quit']) + self.q.join() + self.wt.join() + + def __call__(self, identifier, cmd, ext, key, vbucketId, val, cas): + log(2, "got " + str(cmd) + " k:" + key + " vlen:" + str(len(val)) + + " elen:" + str(len(ext)) + + " vbid:" + str(vbucketId)) + if len(val) > 0 and len(val) < 64: + log(2, " val: <<" + str(val) + ">>") + + needAck = False + + if (cmd == memcacheConstants.CMD_TAP_MUTATION or + cmd == memcacheConstants.CMD_TAP_DELETE or + cmd == memcacheConstants.CMD_TAP_FLUSH): + cmdName, cmdOp = self.cmdInfo[cmd] + if not vbucketId in self.vbmap: + log(2, "%s with unknown vbucketId: %s" % (cmdName, vbucketId)) + sys.exit("ERROR: received %s without checkpoint in vbucket: %s\n" + "Perhaps the server is an older version?" + % (cmdName, vbucketId)) + + c_s = self.vbmap[vbucketId] + checkpointId = c_s[0] + seq = c_s[1] = c_s[1] + 1 + + eng_length, flags, ttl, flg, exp, needAck = parseTapExt(ext) + + if (cmd == memcacheConstants.CMD_TAP_MUTATION): + s = "INSERT into cpoint_op" \ + "(vbucket_id, cpoint_id, seq, op, key, flg, exp, cas, val)" \ + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + self.q.put(['execute', s, (vbucketId, checkpointId, seq, cmdOp, + key, flg, exp, cas, sqlite3.Binary(val))]) + else: + s = "INSERT into cpoint_op" \ + "(vbucket_id, cpoint_id, seq, op, key, cas, val)" \ + " VALUES (?, ?, ?, ?, ?, ?, ?)" + self.q.put(['execute', s, (vbucketId, checkpointId, seq, cmdOp, + key, cas, sqlite3.Binary(val))]) + + self.update_count = self.update_count + 1 + needAck = flags & memcacheConstants.TAP_FLAG_ACK + + elif cmd == memcacheConstants.CMD_TAP_CHECKPOINT_START: + if len(ext) > 0: + eng_length, flags, ttl, flg, exp, needAck = parseTapExt(ext) + checkpoint_id = struct.unpack(">Q", val) + log(1, "Started checkpoint %d" % checkpoint_id) + checkpointStartExists = False + if vbucketId in self.vbmap: + if self.vbmap[vbucketId][0] == checkpoint_id[0]: + checkpointStartExists = True + else: + sys.exit("ERROR: CHECKPOINT_START with checkpoint Id %s arrived" \ + " before receiving CHECKPOINT_END with checkpoint Id $s" + % (checkpoint_id[0], self.vbmap[vbucketId][0])) + if checkpointStartExists == False: + self.vbmap[vbucketId] = [checkpoint_id[0], 0] + t = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + s = "INSERT into cpoint_state" \ + "(vbucket_id, cpoint_id, prev_cpoint_id, state, source, updated_at)" \ + " VALUES (?, ?, ?, \"open\", ?, ?)" + self.q.put(['execute', s, (vbucketId, checkpoint_id[0], -1, self.source, t)]) + self.q.put(['commit']) + + elif cmd == memcacheConstants.CMD_TAP_CHECKPOINT_END: + self.q.put(['commit']) + self.update_count = 0 + checkpoint_id = struct.unpack(">Q", val) + log(1, "Completed checkpoint %d" % checkpoint_id) + if not vbucketId in self.vbmap: + sys.exit("ERROR: unmatched checkpoint end: %s vb: %s" + % (checkpoint_id[0], vbucketId)) + + current_checkpoint_id, seq = self.vbmap[vbucketId] + if current_checkpoint_id != checkpoint_id[0]: + sys.exit("ERROR: unmatched checkpoint end id: %s vb: %s cp: %s" + % (checkpoint_id[0], vbucketId, current_checkpoint_id)) + + if len(ext) > 0: + eng_length, flags, ttl, flg, exp, needAck = parseTapExt(ext) + + del self.vbmap[vbucketId] + + t = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + s = "UPDATE cpoint_state" \ + " SET state=\"closed\", updated_at=?" \ + " WHERE vbucket_id=? AND cpoint_id=? AND state=\"open\" AND source=?" + + def do_end(db, c): + r = c.execute(s, (t, vbucketId, current_checkpoint_id, self.source)) + db.commit() + if r.rowcount != 1: + sys.exit("ERROR: unexpected rowcount during update: " + + ",".join([t, vbucketId, checkpointId, self.source])) + self.q.put(['lambda', do_end]) + + elif cmd == memcacheConstants.CMD_TAP_OPAQUE: + if len(ext) > 0: + eng_length, flags, ttl, flg, exp, needAck = parseTapExt(ext) + opaque_opcode = struct.unpack(">I" , val[0:eng_length]) + if opaque_opcode[0] == memcacheConstants.TAP_OPAQUE_OPEN_CHECKPOINT: + if self.update_count > 0: + self.q.put(['commit']) + log(1, "Incremental backup is currently at the open checkpoint. Exit...") + self.finished = True + + elif cmd == memcacheConstants.CMD_TAP_CONNECT: + if self.update_count > 0: + self.q.put(['commit']) + sys.exit("ERROR: TAP_CONNECT error: " + str(key)) + + elif cmd == memcacheConstants.CMD_NOOP: + pass + + else: + sys.exit("ERROR: unhandled cmd " + str(cmd)) + + if self.update_count == self.txn_size: + self.q.put(['commit']) + self.update_count = 0 + + if needAck: + return 0, 0, 0 - q = Queue.Queue() - wt = threading.Thread(target=db_worker, args=(q, db_file)) - wt.start() + +def loop(mc, db_file, timeout, txn_size, source): try: sinput = [mc.s] - update_count = 0 + th = TapHandler(db_file, txn_size, source) - while True: + while not th.finished: if timeout > 0: iready, oready, eready = select.select(sinput, [], [], timeout) if (not iready) and (not oready) and (not eready): @@ -223,138 +361,18 @@ def loop(mc, db_file, timeout, txn_size, source): sys.exit(0) cmd, opaque, cas, vbucketId, key, ext, val = readTap(mc) - log(2, "got " + str(cmd) + " k:" + key + " vlen:" + str(len(val)) - + " elen:" + str(len(ext)) - + " vbid:" + str(vbucketId)) - if len(val) > 0 and len(val) < 64: - log(2, " val: <<" + str(val) + ">>") - - needAck = False - - if (cmd == memcacheConstants.CMD_TAP_MUTATION or - cmd == memcacheConstants.CMD_TAP_DELETE or - cmd == memcacheConstants.CMD_TAP_FLUSH): - cmdName, cmdOp = cmdInfo[cmd] - if not vbucketId in vbmap: - log(2, "%s with unknown vbucketId: %s" % (cmdName, vbucketId)) - sys.exit("ERROR: received %s without checkpoint in vbucket: %s\n" \ - "Perhaps the server is an older version?" - % (cmdName, vbucketId)) - - c_s = vbmap[vbucketId] - checkpointId = c_s[0] - seq = c_s[1] = c_s[1] + 1 - - eng_length, flags, ttl, flg, exp, needAck = parseTapExt(ext) - - if (cmd == memcacheConstants.CMD_TAP_MUTATION): - s = "INSERT into cpoint_op" \ - "(vbucket_id, cpoint_id, seq, op, key, flg, exp, cas, val)" \ - " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" - q.put(['execute', s, (vbucketId, checkpointId, seq, cmdOp, - key, flg, exp, cas, sqlite3.Binary(val))]) - else: - s = "INSERT into cpoint_op" \ - "(vbucket_id, cpoint_id, seq, op, key, cas, val)" \ - " VALUES (?, ?, ?, ?, ?, ?, ?)" - q.put(['execute', s, (vbucketId, checkpointId, seq, cmdOp, - key, cas, sqlite3.Binary(val))]) - - update_count = update_count + 1 - needAck = flags & memcacheConstants.TAP_FLAG_ACK - - elif cmd == memcacheConstants.CMD_TAP_CHECKPOINT_START: - if len(ext) > 0: - eng_length, flags, ttl, flg, exp, needAck = parseTapExt(ext) - checkpoint_id = struct.unpack(">Q", val) - log(1, "Started checkpoint %d" % checkpoint_id) - checkpointStartExists = False - if vbucketId in vbmap: - if vbmap[vbucketId][0] == checkpoint_id[0]: - checkpointStartExists = True - else: - sys.exit("ERROR: CHECKPOINT_START with checkpoint Id %s arrived" \ - " before receiving CHECKPOINT_END with checkpoint Id $s" - % (checkpoint_id[0], vbmap[vbucketId][0])) - if checkpointStartExists == False: - vbmap[vbucketId] = [checkpoint_id[0], 0] - t = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - s = "INSERT into cpoint_state" \ - "(vbucket_id, cpoint_id, prev_cpoint_id, state, source, updated_at)" \ - " VALUES (?, ?, ?, \"open\", ?, ?)" - q.put(['execute', s, (vbucketId, checkpoint_id[0], -1, source, t)]) - q.put(['commit']) - - elif cmd == memcacheConstants.CMD_TAP_CHECKPOINT_END: - q.put(['commit']) - update_count = 0 - checkpoint_id = struct.unpack(">Q", val) - log(1, "Completed checkpoint %d" % checkpoint_id) - if not vbucketId in vbmap: - sys.exit("ERROR: unmatched checkpoint end: %s vb: %s" - % (checkpoint_id[0], vbucketId)) - - current_checkpoint_id, seq = vbmap[vbucketId] - if current_checkpoint_id != checkpoint_id[0]: - sys.exit("ERROR: unmatched checkpoint end id: %s vb: %s cp: %s" - % (checkpoint_id[0], vbucketId, current_checkpoint_id)) - - if len(ext) > 0: - eng_length, flags, ttl, flg, exp, needAck = parseTapExt(ext) - - del vbmap[vbucketId] - - t = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - s = "UPDATE cpoint_state" \ - " SET state=\"closed\", updated_at=?" \ - " WHERE vbucket_id=? AND cpoint_id=? AND state=\"open\" AND source=?" - - def do_end(db, c): - r = c.execute(s, (t, vbucketId, current_checkpoint_id, source)) - db.commit() - if r.rowcount != 1: - sys.exit("ERROR: unexpected rowcount during update: " - + ",".join([t, vbucketId, checkpointId, source])) - q.put(['lambda', do_end]) - - elif cmd == memcacheConstants.CMD_TAP_OPAQUE: - if len(ext) > 0: - eng_length, flags, ttl, flg, exp, needAck = parseTapExt(ext) - opaque_opcode = struct.unpack(">I" , val[0:eng_length]) - if opaque_opcode[0] == memcacheConstants.TAP_OPAQUE_OPEN_CHECKPOINT: - if update_count > 0: - q.put(['commit']) - log(1, "Incremental backup is currently at the open checkpoint. Exit...") - break - - elif cmd == memcacheConstants.CMD_TAP_CONNECT: - if update_count > 0: - q.put(['commit']) - sys.exit("ERROR: TAP_CONNECT error: " + str(key)) - - elif cmd == memcacheConstants.CMD_NOOP: - pass - - else: - sys.exit("ERROR: unhandled cmd " + str(cmd)) - - if update_count == txn_size: - q.put(['commit']) - update_count = 0 - - if needAck: + rv = th(None, cmd, ext, key, vbucketId, val, cas) + if rv: mc._sendMsg(cmd, '', '', opaque, vbucketId=0, fmt=memcacheConstants.RES_PKT_FMT, magic=memcacheConstants.RES_MAGIC_BYTE) + th.shutdown() + except exceptions.EOFError: pass - q.put(['quit']) - q.join() - wt.join() - def readTap(mc): ext = '' key = '' -- 1.7.4.4