From 320e77d413c2ed51dce3e152e9621d2d0cfe5622 Mon Sep 17 00:00:00 2001 From: Dustin Sallings Date: Thu, 22 Dec 2011 00:38:30 -0800 Subject: [PATCH 3/3] Use the python tap lib for doing tap in mbbackup Change-Id: Ife44576c67315380e7334164c873c2ae8271a729 --- management/mbbackup-incremental | 113 +++++++++++---------------------------- management/memcacheConstants.py | 2 + 2 files changed, 34 insertions(+), 81 deletions(-) diff --git a/management/mbbackup-incremental b/management/mbbackup-incremental index 0810f22..69fea15 100755 --- a/management/mbbackup-incremental +++ b/management/mbbackup-incremental @@ -4,6 +4,7 @@ import os import sys import glob import getopt +import asyncore import exceptions import datetime import select @@ -14,6 +15,7 @@ import traceback import threading import Queue +import tap import mc_bin_client import memcacheConstants import util @@ -121,42 +123,38 @@ def main(): else: log(1, "registered_client_name = " + name) - mc = None - db = None - try: if only_check: check_incremental_backup_file(name) sys.exit(0) - mc = mc_bin_client.MemcachedClient(host_port[0], int(host_port[1])) - - ext, val = encodeTAPConnectOpts({ - memcacheConstants.TAP_FLAG_CHECKPOINT: '', - memcacheConstants.TAP_FLAG_SUPPORT_ACK: '', - memcacheConstants.TAP_FLAG_REGISTERED_CLIENT: 0x01, # "value > 0" means "closed checkpoints only" - memcacheConstants.TAP_FLAG_BACKFILL: 0xffffffff - }) - - mc._sendCmd(memcacheConstants.CMD_TAP_CONNECT, name, val, 0, ext) - - if only_name: - cmd, opaque, cas, vbucketId, key, ext, val = readTap(mc) - if cmd == memcacheConstants.CMD_TAP_OPAQUE: - sys.exit(0); - sys.exit("ERROR: could not register name: " + name) - file = util.expand_file_pattern(file) log(1, "file actual = " + file) if os.path.exists(file): sys.exit("ERROR: file exists already: " + file) - db = sqlite3.connect(file) # TODO: Revisit isolation level - db.text_factory = str - createSchema(db) - db.close() + hostspec = "%s@%s:%s" % (name, host_port[0], host_port[1]) + source = ':'.join(host_port) + '-' + name + + opts = { + memcacheConstants.TAP_FLAG_CHECKPOINT: '', + memcacheConstants.TAP_FLAG_SUPPORT_ACK: '', + # "value > 0" means "closed checkpoints only" + memcacheConstants.TAP_FLAG_REGISTERED_CLIENT: 1, + memcacheConstants.TAP_FLAG_BACKFILL: 0xffffffff + } + + if only_name: + def one_shot(identifier, cmd, extra, key, vb, val, cas): + if cmd == memcacheConstants.CMD_TAP_OPAQUE: + sys.exit(0) + sys.exit("Error registering " + name) + th = one_shot + else: + th = TapHandler(file, txn_size, source) - loop(mc, file, timeout, txn_size, ':'.join(host_port) + '-' + name) + import tap_example + tap_example.mainLoop([hostspec], th, opts) except NameError as ne: sys.exit("ERROR: " + str(ne)) @@ -165,11 +163,6 @@ def main(): traceback.print_exc(file=sys.stdout) print e sys.exit("ERROR: " + str(e)) - finally: - if mc: - mc.close() - if db: - db.close() def check_incremental_backup_file(file_name): db = None @@ -214,6 +207,12 @@ class TapHandler(object): self.vbmap = {} self.txn_size = txn_size self.source = source + + db = sqlite3.connect(db_file) + db.text_factory = str + createSchema(db) + db.close() + self.q = Queue.Queue() self.wt = threading.Thread(target=db_worker, args=(self.q, db_file)) self.wt.start() @@ -327,6 +326,8 @@ class TapHandler(object): self.q.put(['commit']) log(1, "Incremental backup is currently at the open checkpoint. Exit...") self.finished = True + self.q.put(['quit']) + sys.exit() elif cmd == memcacheConstants.CMD_TAP_CONNECT: if self.update_count > 0: @@ -344,57 +345,7 @@ class TapHandler(object): self.update_count = 0 if needAck: - return 0, 0, 0 - - -def loop(mc, db_file, timeout, txn_size, source): - - try: - sinput = [mc.s] - th = TapHandler(db_file, txn_size, source) - - while not th.finished: - if timeout > 0: - iready, oready, eready = select.select(sinput, [], [], timeout) - if (not iready) and (not oready) and (not eready): - log(1, "EXIT: timeout after " + str(timeout) + " seconds of inactivity") - sys.exit(0) - - cmd, opaque, cas, vbucketId, key, ext, val = readTap(mc) - rv = th(None, cmd, ext, key, vbucketId, val, cas) - if rv: - mc._sendMsg(cmd, '', '', opaque, - vbucketId=0, - fmt=memcacheConstants.RES_PKT_FMT, - magic=memcacheConstants.RES_MAGIC_BYTE) - - th.shutdown() - - except exceptions.EOFError: - pass - -def readTap(mc): - ext = '' - key = '' - val = '' - cmd, vbucketId, opaque, cas, keylen, extlen, data = mc._recvMsg() - if data: - ext = data[0:extlen] - key = data[extlen:extlen+keylen] - val = data[extlen+keylen:] - return cmd, opaque, cas, vbucketId, key, ext, val - -def encodeTAPConnectOpts(opts): - header = 0 - val = [] - for op in sorted(opts.keys()): - header |= op - if op in memcacheConstants.TAP_FLAG_TYPES: - val.append(struct.pack(memcacheConstants.TAP_FLAG_TYPES[op], - opts[op])) - else: - val.append(opts[op]) - return struct.pack(">I", header), ''.join(val) + return 0, 0, '' def parseTapExt(ext): if len(ext) == 8: diff --git a/management/memcacheConstants.py b/management/memcacheConstants.py index 7b58af2..59dfa1b 100644 --- a/management/memcacheConstants.py +++ b/management/memcacheConstants.py @@ -182,6 +182,8 @@ EXTRA_HDR_FMTS={ CMD_TAP_OPAQUE: TAP_GENERAL_PKT_FMT, CMD_TAP_VBUCKET_SET: TAP_GENERAL_PKT_FMT, CMD_SET_VBUCKET_STATE: VB_SET_PKT_FMT, + CMD_TAP_CHECKPOINT_START: '>IIII', + CMD_TAP_CHECKPOINT_END: '>IIII' } EXTRA_HDR_SIZES=dict( -- 1.7.4.4