static unsigned purge_single_server(lcb_server_t *server, lcb_error_t error, hrtime_t min_nonstale, hrtime_t *tmo_next) { protocol_binary_request_header req; struct lcb_command_data_st ct; lcb_size_t nr; char *packet; lcb_size_t packetsize; char *keyptr; int npurged = 0; ringbuffer_t *stream = &server->cmd_log; ringbuffer_t *cookies; ringbuffer_t *mirror = NULL; /* mirror buffer should be purged with main stream */ lcb_connection_t conn = &server->connection; lcb_size_t send_size = 0; lcb_size_t stream_size = ringbuffer_get_nbytes(stream); hrtime_t now = gethrtime(); if (server->connection_ready) { cookies = &server->output_cookies; } else { cookies = &server->pending_cookies; mirror = &server->pending; } lcb_uint32_t send_seq = 0; lcb_uint32_t stream_seq = 0; if (conn->output && conn->output->nbytes >= sizeof(protocol_binary_request_header)) { protocol_binary_request_header send_req; ringbuffer_peek(conn->output, &send_req, sizeof(send_req)); send_seq = send_req.request.opaque; } if (conn->output) { /* This will usually be false for v1 */ send_size = ringbuffer_get_nbytes(conn->output); } do { int allocated = 0; lcb_uint32_t headersize; lcb_uint16_t nkey; nr = ringbuffer_peek(cookies, &ct, sizeof(ct)); if (nr != sizeof(ct)) { break; } nr = ringbuffer_peek(stream, req.bytes, sizeof(req)); if (nr != sizeof(req)) { break; } packetsize = (lcb_uint32_t)sizeof(req) + ntohl(req.request.bodylen); stream_seq = req.request.opaque; if (stream->nbytes < packetsize) { break; } if (min_nonstale && ct.start >= min_nonstale) { if (npurged) { lcb_log(LOGARGS(server, INFO), "Still have %d ms remaining for command", (ct.start - min_nonstale) / 1000000); } if (tmo_next) { *tmo_next = (ct.start - min_nonstale) + 1; } break; } lcb_log(LOGARGS(server, INFO), "Command with cookie=%p failed with err=0x%x server %s:%s", ct.cookie, error, server->curhost.host, server->curhost.port); npurged++; ringbuffer_consumed(cookies, sizeof(ct)); lcb_assert(nr == sizeof(req)); packet = stream->read_head; if (server->instance->histogram) { lcb_record_metrics(server->instance, now - ct.start, req.request.opcode); } // if the request packet being failed is still at the head of the // output buffer, consume it. if (send_seq == stream_seq && send_size >= packetsize) { ringbuffer_consumed(conn->output, packetsize); send_size -= packetsize; if (conn->output && conn->output->nbytes >= sizeof(protocol_binary_request_header)) { protocol_binary_request_header send_req; ringbuffer_peek(conn->output, &send_req, sizeof(send_req)); send_seq = send_req.request.opaque; } } stream_size -= packetsize; headersize = (lcb_uint32_t)sizeof(req) + req.request.extlen + htons(req.request.keylen); if (!ringbuffer_is_continous(stream, RINGBUFFER_READ, headersize)) { packet = malloc(headersize); if (packet == NULL) { lcb_error_handler(server->instance, LCB_CLIENT_ENOMEM, NULL); abort(); } nr = ringbuffer_peek(stream, packet, headersize); if (nr != headersize) { lcb_error_handler(server->instance, LCB_EINTERNAL, NULL); free(packet); abort(); } allocated = 1; } keyptr = packet + sizeof(req) + req.request.extlen; nkey = ntohs(req.request.keylen); failout_single_request(server, &req, &ct, error, keyptr, nkey, packet); if (allocated) { free(packet); } ringbuffer_consumed(stream, packetsize); if (mirror) { ringbuffer_consumed(mirror, packetsize); } } while (1); /* CONSTCOND */ lcb_maybe_breakout(server->instance); return npurged; }