Summary > Report 56454b

Bug Summary

File:home/avsej/code/libcouchbase/src/mc/mcreq.c
Warning:line 365, column 28
Potential leak of memory pointed to by 'edst'
Report Bug

Annotated Source Code

1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 * Copyright 2014 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#include "mcreq.h"
19#include "compress.h"
20#include "sllist-inl.h"
21#include "internal.h"
22
23#define PKT_HDRSIZE(pkt)(24 + (pkt)->extlen) (MCREQ_PKT_BASESIZE24 + (pkt)->extlen)
24
25lcb_error_t
26mcreq_reserve_header(
27 mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize)
28{
29 int rv;
30 packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24;
31 packet->kh_span.size = hdrsize;
32 rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
33 if (rv != 0) {
34 return LCB_CLIENT_ENOMEM;
35 }
36 return LCB_SUCCESS;
37}
38
39lcb_error_t
40mcreq_reserve_key(
41 mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize,
42 const lcb_KEYBUF *kreq)
43{
44 const struct lcb_CONTIGBUF *contig = &kreq->contig;
45 int rv;
46
47 /** Set the key offset which is the start of the key from the buffer */
48 packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24;
49 packet->kh_span.size = kreq->contig.nbytes;
50
51 if (kreq->type == LCB_KV_COPY) {
52 /**
53 * If the key is to be copied then just allocate the span size
54 * for the key+24+extras
55 */
56 packet->kh_span.size += hdrsize;
57 rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
58 if (rv != 0) {
59 return LCB_CLIENT_ENOMEM;
60 }
61
62 /**
63 * Copy the key into the packet starting at the extras end
64 */
65 memcpy(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ hdrsize,
66 contig->bytes,
67 contig->nbytes);
68
69 } else if (kreq->type == LCB_KV_CONTIG) {
70 /**
71 * Don't do any copying.
72 * Assume the key buffer has enough space for the packet as well.
73 */
74 CREATE_STANDALONE_SPAN(&packet->kh_span, contig->bytes, contig->nbytes)(&packet->kh_span)->parent = (nb_MBLOCK *) (void *)
contig->bytes; (&packet->kh_span)->offset = (nb_SIZE
)-1; (&packet->kh_span)->size = contig->nbytes;
;
75 packet->flags |= MCREQ_F_KEY_NOCOPY;
76
77 } else {
78 /** IOVs not supported for keys */
79 return LCB_EINVAL;
80 }
81
82 return LCB_SUCCESS;
83}
84
85lcb_error_t
86mcreq_reserve_value2(mc_PIPELINE *pl, mc_PACKET *pkt, lcb_size_t n)
87{
88 int rv;
89 pkt->u_value.single.size = n;
90 if (!n) {
91 return LCB_SUCCESS;
92 }
93
94 pkt->flags |= MCREQ_F_HASVALUE;
95 rv = netbuf_mblock_reserve(&pl->nbmgr, &pkt->u_value.single);
96 if (rv) {
97 return LCB_CLIENT_ENOMEM;
98 }
99 return LCB_SUCCESS;
100}
101
102lcb_error_t
103mcreq_reserve_value(
104 mc_PIPELINE *pipeline, mc_PACKET *packet, const lcb_VALBUF *vreq)
105{
106 const lcb_CONTIGBUF *contig = &vreq->u_buf.contig;
107 nb_SPAN *vspan = &packet->u_value.single;
108 int rv;
109
110 if (vreq->vtype == LCB_KV_COPY) {
111 /** Copy the value into a single SPAN */
112 if (! (vspan->size = vreq->u_buf.contig.nbytes)) {
113 return LCB_SUCCESS;
114 }
115 rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan);
116
117 if (rv != 0) {
118 return LCB_CLIENT_ENOMEM;
119 }
120
121 memcpy(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent
) : ((vspan)->parent->root + (vspan)->offset))
, contig->bytes, contig->nbytes);
122
123 } else if (vreq->vtype == LCB_KV_CONTIG) {
124 /** It's still contiguous so make it a 'standalone' span */
125 CREATE_STANDALONE_SPAN(vspan, contig->bytes, contig->nbytes)(vspan)->parent = (nb_MBLOCK *) (void *)contig->bytes; (
vspan)->offset = (nb_SIZE)-1; (vspan)->size = contig->
nbytes;
;
126 packet->flags |= MCREQ_F_VALUE_NOCOPY;
127
128 } else if (vreq->vtype == LCB_KV_IOV) {
129 /** Multiple spans, no copy */
130 unsigned int ii;
131 const lcb_FRAGBUF *msrc = &vreq->u_buf.multi;
132 lcb_FRAGBUF *mdst = &packet->u_value.multi;
133
134 packet->flags |= MCREQ_F_VALUE_IOV | MCREQ_F_VALUE_NOCOPY;
135 mdst->niov = msrc->niov;
136 mdst->iov = malloc(mdst->niov * sizeof(*mdst->iov));
137 mdst->total_length = 0;
138
139 for (ii = 0; ii < mdst->niov; ii++) {
140 mdst->iov[ii] = msrc->iov[ii];
141 mdst->total_length += mdst->iov[ii].iov_len;
142 }
143 } else if (vreq->vtype == LCB_KV_IOVCOPY) {
144 /** Multiple input buffers, normal copying output buffer */
145 unsigned int ii, cur_offset;
146 const lcb_FRAGBUF *msrc = &vreq->u_buf.multi;
147
148 if (msrc->total_length) {
149 vspan->size = msrc->total_length;
150 } else {
151 vspan->size = 0;
152 for (ii = 0; ii < msrc->niov; ii++) {
153 vspan->size += msrc->iov[ii].iov_len;
154 }
155 }
156
157 rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan);
158 if (rv != 0) {
159 return LCB_CLIENT_ENOMEM;
160 }
161
162 for (ii = 0, cur_offset = 0; ii < msrc->niov; ii++) {
163 char *buf = SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent
) : ((vspan)->parent->root + (vspan)->offset))
+ cur_offset;
164 memcpy(buf, msrc->iov[ii].iov_base, msrc->iov[ii].iov_len);
165 cur_offset += msrc->iov[ii].iov_len;
166 }
167 }
168
169 packet->flags |= MCREQ_F_HASVALUE;
170 return LCB_SUCCESS;
171}
172
173static int
174pkt_tmo_compar(sllist_node *a, sllist_node *b)
175{
176 mc_PACKET *pa, *pb;
177 hrtime_t tmo_a, tmo_b;
178
179 pa = SLLIST_ITEM(a, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(a) - __builtin_offsetof(mc_PACKET
, slnode)))
;
180 pb = SLLIST_ITEM(b, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(b) - __builtin_offsetof(mc_PACKET
, slnode)))
;
181
182 tmo_a = MCREQ_PKT_RDATA(pa)(((pa)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pa)->
u_rdata.exdata) : (&(pa)->u_rdata.reqdata))
->start;
183 tmo_b = MCREQ_PKT_RDATA(pb)(((pb)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pb)->
u_rdata.exdata) : (&(pb)->u_rdata.reqdata))
->start;
184
185 if (tmo_a == tmo_b) {
186 return 0;
187 } else if (tmo_a < tmo_b) {
188 return -1;
189 } else {
190 return 1;
191 }
192}
193
194void
195mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
196{
197 sllist_root *reqs = &pipeline->requests;
198 mcreq_enqueue_packet(pipeline, packet);
199 sllist_remove(reqs, &packet->slnode);
200 sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar);
201}
202
203void
204mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
205{
206 nb_SPAN *vspan = &packet->u_value.single;
207 sllist_append(&pipeline->requests, &packet->slnode);
208 netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span);
209
210 if (!(packet->flags & MCREQ_F_HASVALUE)) {
211 goto GT_ENQUEUE_PDU;
212 }
213
214 if (packet->flags & MCREQ_F_VALUE_IOV) {
215 unsigned int ii;
216 lcb_FRAGBUF *multi = &packet->u_value.multi;
217 for (ii = 0; ii < multi->niov; ii++) {
218 netbuf_enqueue(&pipeline->nbmgr, (nb_IOV *)multi->iov + ii);
219 }
220
221 } else if (vspan->size) {
222 netbuf_enqueue_span(&pipeline->nbmgr, vspan);
223 }
224
225 GT_ENQUEUE_PDU:
226 netbuf_pdu_enqueue(&pipeline->nbmgr, packet, offsetof(mc_PACKET, sl_flushq)__builtin_offsetof(mc_PACKET, sl_flushq));
227}
228
229void
230mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
231{
232 if (! (packet->flags & MCREQ_F_KEY_NOCOPY)) {
233 if (packet->flags & MCREQ_F_DETACHED) {
234 free(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
);
235 } else {
236 netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span);
237 }
238 }
239
240 if (! (packet->flags & MCREQ_F_HASVALUE)) {
241 return;
242 }
243
244 if (packet->flags & MCREQ_F_VALUE_NOCOPY) {
245 if (packet->flags & MCREQ_F_VALUE_IOV) {
246 free(packet->u_value.multi.iov);
247 }
248
249 return;
250 }
251
252 if (packet->flags & MCREQ_F_DETACHED) {
253 free(SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1)
? ((char *)(&packet->u_value.single)->parent) : ((
&packet->u_value.single)->parent->root + (&packet
->u_value.single)->offset))
);
254 } else {
255 netbuf_mblock_release(&pipeline->nbmgr, &packet->u_value.single);
256 }
257
258}
259
260mc_PACKET *
261mcreq_allocate_packet(mc_PIPELINE *pipeline)
262{
263 nb_SPAN span;
264 int rv;
265 mc_PACKET *ret;
266 span.size = sizeof(*ret);
267
268 rv = netbuf_mblock_reserve(&pipeline->reqpool, &span);
269 if (rv != 0) {
270 return NULL((void*)0);
271 }
272
273 ret = (void *) SPAN_MBUFFER_NC(&span)((&span)->parent->root + (&span)->offset);
274 ret->alloc_parent = span.parent;
275 ret->flags = 0;
276 ret->retries = 0;
277 ret->opaque = pipeline->parent->seq++;
278 return ret;
279}
280
281void
282mcreq_release_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
283{
284 nb_SPAN span;
285 if (packet->flags & MCREQ_F_DETACHED) {
286 sllist_iterator iter;
287 mc_EXPACKET *epkt = (mc_EXPACKET *)packet;
288
289 SLLIST_ITERFOR(&epkt->data, &iter)for (slist_iter_init(&epkt->data, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&epkt->data
, &iter))
{
290 mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_EPKTDATUM, slnode)))
;
291 sllist_iter_remove(&epkt->data, &iter);
292 d->dtorfn(d);
293 }
294 free(epkt);
295 return;
296 }
297
298 span.size = sizeof(*packet);
299 span.parent = packet->alloc_parent;
300 span.offset = (char *)packet - packet->alloc_parent->root;
301
302 netbuf_mblock_release(&pipeline->reqpool, &span);
303}
304
305#define MCREQ_DETACH_WIPESRC1 1
306
307mc_PACKET *
308mcreq_renew_packet(const mc_PACKET *src)
309{
310 char *kdata, *vdata;
311 unsigned nvdata;
312 mc_PACKET *dst;
313 mc_EXPACKET *edst = calloc(1, sizeof(*edst));
1
Memory is allocated
314
315 dst = &edst->base;
316 *dst = *src;
317
318 kdata = malloc(src->kh_span.size);
319 memcpy(kdata, SPAN_BUFFER(&src->kh_span)(((&src->kh_span)->offset == (nb_SIZE)-1) ? ((char *
)(&src->kh_span)->parent) : ((&src->kh_span)
->parent->root + (&src->kh_span)->offset))
, src->kh_span.size);
320 CREATE_STANDALONE_SPAN(&dst->kh_span, kdata, src->kh_span.size)(&dst->kh_span)->parent = (nb_MBLOCK *) (void *)kdata
; (&dst->kh_span)->offset = (nb_SIZE)-1; (&dst->
kh_span)->size = src->kh_span.size;
;
321
322 dst->flags &= ~(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY|MCREQ_F_VALUE_IOV);
323 dst->flags |= MCREQ_F_DETACHED;
324 dst->alloc_parent = NULL((void*)0);
325 dst->sl_flushq.next = NULL((void*)0);
326 dst->slnode.next = NULL((void*)0);
327 dst->retries = src->retries;
328
329 if (src->flags & MCREQ_F_HASVALUE) {
2
Assuming the condition is true
3
Taking true branch
330 /** Get the length */
331 if (src->flags & MCREQ_F_VALUE_IOV) {
4
Assuming the condition is false
5
Taking false branch
332 unsigned ii;
333 unsigned offset = 0;
334
335 nvdata = src->u_value.multi.total_length;
336 vdata = malloc(nvdata);
337 for (ii = 0; ii < src->u_value.multi.niov; ii++) {
338 const lcb_IOV *iov = src->u_value.multi.iov + ii;
339
340 memcpy(vdata + offset, iov->iov_base, iov->iov_len);
341 offset += iov->iov_len;
342 }
343 } else {
344 protocol_binary_request_header hdr;
345 const nb_SPAN *origspan = &src->u_value.single;
346 mcreq_read_hdr(dst, &hdr)memcpy( (&hdr)->bytes, (((&(dst)->kh_span)->
offset == (nb_SIZE)-1) ? ((char *)(&(dst)->kh_span)->
parent) : ((&(dst)->kh_span)->parent->root + (&
(dst)->kh_span)->offset)), sizeof((&hdr)->bytes)
)
;
347
348 if (hdr.request.datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
6
Taking true branch
349 /* For compressed payloads we need to uncompress it first
350 * because it may be forwarded to a server without compression.
351 * TODO: might be more clever to check a setting flag somewhere
352 * and see if we should do this. */
353
354 lcb_SIZE n_inflated;
355 const void *inflated;
356 int rv;
357
358 vdata = NULL((void*)0);
359 rv = mcreq_inflate_value(SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan)
->parent) : ((origspan)->parent->root + (origspan)->
offset))
, origspan->size,
360 &inflated, &n_inflated, (void**)&vdata);
361
362 assert(vdata == inflated)((void) sizeof ((vdata == inflated) ? 1 : 0), __extension__ (
{ if (vdata == inflated) ; else __assert_fail ("vdata == inflated"
, "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 362, __extension__
__PRETTY_FUNCTION__); }))
;
363
364 if (rv != 0) {
7
Assuming 'rv' is not equal to 0
8
Taking true branch
365 return NULL((void*)0);
9
Within the expansion of the macro 'NULL':
a
Potential leak of memory pointed to by 'edst'
366 }
367 nvdata = n_inflated;
368 hdr.request.datatype &= ~PROTOCOL_BINARY_DATATYPE_COMPRESSED;
369 hdr.request.bodylen = htonl(
370 ntohs(hdr.request.keylen) +
371 hdr.request.extlen +
372 n_inflated);
373 mcreq_write_hdr(dst, &hdr)memcpy( (((&(dst)->kh_span)->offset == (nb_SIZE)-1)
? ((char *)(&(dst)->kh_span)->parent) : ((&(dst
)->kh_span)->parent->root + (&(dst)->kh_span)
->offset)), (&hdr)->bytes, sizeof((&hdr)->bytes
) )
;
374
375 } else {
376 nvdata = origspan->size;
377 vdata = malloc(nvdata);
378 memcpy(vdata, SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan)
->parent) : ((origspan)->parent->root + (origspan)->
offset))
, nvdata);
379 }
380 }
381
382 /* Declare the value as a standalone malloc'd span */
383 CREATE_STANDALONE_SPAN(&dst->u_value.single, vdata, nvdata)(&dst->u_value.single)->parent = (nb_MBLOCK *) (void
*)vdata; (&dst->u_value.single)->offset = (nb_SIZE
)-1; (&dst->u_value.single)->size = nvdata;
;
384 }
385
386 if (src->flags & MCREQ_F_DETACHED) {
387 mc_EXPACKET *esrc = (mc_EXPACKET *)src;
388 sllist_iterator iter;
389 SLLIST_ITERFOR(&esrc->data, &iter)for (slist_iter_init(&esrc->data, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&esrc->data
, &iter))
{
390 sllist_node *cur = iter.cur;
391 sllist_iter_remove(&esrc->data, &iter);
392 sllist_append(&edst->data, cur);
393 }
394 }
395 return dst;
396}
397
398int
399mcreq_epkt_insert(mc_EXPACKET *ep, mc_EPKTDATUM *datum)
400{
401 if (!(ep->base.flags & MCREQ_F_DETACHED)) {
402 return -1;
403 }
404 assert(!sllist_contains(&ep->data, &datum->slnode))((void) sizeof ((!sllist_contains(&ep->data, &datum
->slnode)) ? 1 : 0), __extension__ ({ if (!sllist_contains
(&ep->data, &datum->slnode)) ; else __assert_fail
("!sllist_contains(&ep->data, &datum->slnode)"
, "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 404, __extension__
__PRETTY_FUNCTION__); }))
;
405 sllist_append(&ep->data, &datum->slnode);
406 return 0;
407}
408
409mc_EPKTDATUM *
410mcreq_epkt_find(mc_EXPACKET *ep, const char *key)
411{
412 sllist_iterator iter;
413 SLLIST_ITERFOR(&ep->data, &iter)for (slist_iter_init(&ep->data, &iter); !((&iter
)->cur == ((void*)0)); slist_iter_incr(&ep->data, &
iter))
{
414 mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_EPKTDATUM, slnode)))
;
415 if (!strcmp(key, d->key)) {
416 return d;
417 }
418 }
419 return NULL((void*)0);
420}
421
422void
423mcreq_map_key(mc_CMDQUEUE *queue,
424 const lcb_KEYBUF *key, const lcb_KEYBUF *hashkey,
425 unsigned nhdr, int *vbid, int *srvix)
426{
427 const void *hk;
428 size_t nhk = 0;
429 if (hashkey) {
430 if (hashkey->type == LCB_KV_COPY && hashkey->contig.bytes != NULL((void*)0)) {
431 hk = hashkey->contig.bytes;
432 nhk = hashkey->contig.nbytes;
433 } else if (hashkey->type == LCB_KV_VBID) {
434 *vbid = hashkey->contig.nbytes;
435 *srvix = lcbvb_vbmaster(queue->config, *vbid);
436 return;
437 }
438 }
439 if (!nhk) {
440 if (key->type == LCB_KV_COPY) {
441 hk = key->contig.bytes;
442 nhk = key->contig.nbytes;
443 } else {
444 const char *buf = key->contig.bytes;
445 buf += nhdr;
446 hk = buf;
447 nhk = key->contig.nbytes - nhdr;
448 }
449 }
450 lcbvb_map_key(queue->config, hk, nhk, vbid, srvix);
451}
452
453lcb_error_t
454mcreq_basic_packet(
455 mc_CMDQUEUE *queue, const lcb_CMDBASE *cmd,
456 protocol_binary_request_header *req, lcb_uint8_t extlen,
457 mc_PACKET **packet, mc_PIPELINE **pipeline, int options)
458{
459 int vb, srvix;
460
461 if (!queue->config) {
462 return LCB_CLIENT_ETMPFAILLCB_CLIENT_ENOCONF;
463 }
464 if (!cmd) {
465 return LCB_EINVAL;
466 }
467
468 mcreq_map_key(queue, &cmd->key, &cmd->_hashkey,
469 sizeof(*req) + extlen, &vb, &srvix);
470 if (srvix > -1 && srvix < (int)queue->npipelines) {
471 *pipeline = queue->pipelines[srvix];
472
473 } else {
474 if ((options & MCREQ_BASICPACKET_F_FALLBACKOK0x01) && queue->fallback) {
475 *pipeline = queue->fallback;
476 } else {
477 return LCB_NO_MATCHING_SERVER;
478 }
479 }
480
481 *packet = mcreq_allocate_packet(*pipeline);
482 if (*packet == NULL((void*)0)) {
483 return LCB_CLIENT_ENOMEM;
484 }
485
486 mcreq_reserve_key(*pipeline, *packet, sizeof(*req) + extlen, &cmd->key);
487
488 req->request.keylen = htons((*packet)->kh_span.size - PKT_HDRSIZE(*packet)(24 + (*packet)->extlen));
489 req->request.vbucket = htons(vb);
490 req->request.extlen = extlen;
491 return LCB_SUCCESS;
492}
493
494void
495mcreq_get_key(const mc_PACKET *packet, const void **key, lcb_size_t *nkey)
496{
497 *key = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ PKT_HDRSIZE(packet)(24 + (packet)->extlen);
498 *nkey = packet->kh_span.size - PKT_HDRSIZE(packet)(24 + (packet)->extlen);
499}
500
501lcb_uint32_t
502mcreq_get_bodysize(const mc_PACKET *packet)
503{
504 lcb_uint32_t ret;
505 char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ 8;
506 if ((uintptr_t)retptr % sizeof(ret) == 0) {
507 return ntohl(*(lcb_uint32_t*) (void *)retptr);
508 } else {
509 memcpy(&ret, retptr, sizeof(ret));
510 return ntohl(ret);
511 }
512}
513
514uint16_t
515mcreq_get_vbucket(const mc_PACKET *packet)
516{
517 uint16_t ret;
518 char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ 6;
519 if ((uintptr_t)retptr % sizeof(ret) == 0) {
520 return ntohs(*(uint16_t*)(void*)retptr);
521 } else {
522 memcpy(&ret, retptr, sizeof ret);
523 return ntohs(ret);
524 }
525}
526
527uint32_t
528mcreq_get_size(const mc_PACKET *packet)
529{
530 uint32_t sz = packet->kh_span.size;
531 if (packet->flags & MCREQ_F_HASVALUE) {
532 if (packet->flags & MCREQ_F_VALUE_IOV) {
533 sz += packet->u_value.multi.total_length;
534 } else {
535 sz += packet->u_value.single.size;
536 }
537 }
538 return sz;
539}
540
541void
542mcreq_pipeline_cleanup(mc_PIPELINE *pipeline)
543{
544 netbuf_cleanup(&pipeline->nbmgr);
545 netbuf_cleanup(&pipeline->reqpool);
546}
547
548int
549mcreq_pipeline_init(mc_PIPELINE *pipeline)
550{
551 nb_SETTINGS settings;
552
553 /* Initialize all members to 0 */
554 memset(&pipeline->requests, 0, sizeof pipeline->requests);
555 pipeline->parent = NULL((void*)0);
556 pipeline->flush_start = NULL((void*)0);
557 pipeline->index = 0;
558 memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued);
559 pipeline->buf_done_callback = NULL((void*)0);
560
561 netbuf_default_settings(&settings);
562
563 /** Initialize datapool */
564 netbuf_init(&pipeline->nbmgr, &settings);
565
566 /** Initialize request pool */
567 settings.data_basealloc = sizeof(mc_PACKET) * 32;
568 netbuf_init(&pipeline->reqpool, &settings);;
569 return 0;
570}
571
572void
573mcreq_queue_add_pipelines(
574 mc_CMDQUEUE *queue, mc_PIPELINE * const *pipelines, unsigned npipelines,
575 lcbvb_CONFIG* config)
576{
577 unsigned ii;
578
579 lcb_assert(queue->pipelines == NULL)((void) sizeof ((queue->pipelines == ((void*)0)) ? 1 : 0),
__extension__ ({ if (queue->pipelines == ((void*)0)) ; else
__assert_fail ("queue->pipelines == ((void*)0)", "/home/avsej/code/libcouchbase/src/mc/mcreq.c"
, 579, __extension__ __PRETTY_FUNCTION__); }))
;
580 queue->npipelines = npipelines;
581 queue->_npipelines_ex = queue->npipelines;
582 queue->pipelines = malloc(sizeof(*pipelines) * (npipelines + 1));
583 queue->config = config;
584
585 memcpy(queue->pipelines, pipelines, sizeof(*pipelines) * npipelines);
586
587 free(queue->scheds);
588 queue->scheds = calloc(npipelines+1, 1);
589
590 for (ii = 0; ii < npipelines; ii++) {
591 pipelines[ii]->parent = queue;
592 pipelines[ii]->index = ii;
593 }
594
595 if (queue->fallback) {
596 queue->fallback->index = npipelines;
597 queue->pipelines[queue->npipelines] = queue->fallback;
598 queue->_npipelines_ex++;
599 }
600}
601
602mc_PIPELINE **
603mcreq_queue_take_pipelines(mc_CMDQUEUE *queue, unsigned *count)
604{
605 mc_PIPELINE **ret = queue->pipelines;
606 *count = queue->npipelines;
607 queue->pipelines = NULL((void*)0);
608 queue->npipelines = 0;
609 return ret;
610}
611
612int
613mcreq_queue_init(mc_CMDQUEUE *queue)
614{
615 queue->seq = 0;
616 queue->pipelines = NULL((void*)0);
617 queue->scheds = NULL((void*)0);
618 queue->fallback = NULL((void*)0);
619 queue->npipelines = 0;
620 return 0;
621}
622
623void
624mcreq_queue_cleanup(mc_CMDQUEUE *queue)
625{
626 if (queue->fallback) {
627 mcreq_pipeline_cleanup(queue->fallback);
628 free(queue->fallback);
629 queue->fallback = NULL((void*)0);
630 }
631 free(queue->scheds);
632 free(queue->pipelines);
633 queue->scheds = NULL((void*)0);
634}
635
636void
637mcreq_sched_enter(mc_CMDQUEUE *queue)
638{
639 queue->ctxenter = 1;
640}
641
642
643
644static void
645queuectx_leave(mc_CMDQUEUE *queue, int success, int flush)
646{
647 unsigned ii;
648
649 if (queue->ctxenter) {
650 queue->ctxenter = 0;
651 }
652
653 for (ii = 0; ii < queue->_npipelines_ex; ii++) {
654 mc_PIPELINE *pipeline;
655 sllist_node *ll_next, *ll;
656
657 if (!queue->scheds[ii]) {
658 continue;
659 }
660
661 pipeline = queue->pipelines[ii];
662 ll = SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next;
663
664 while (ll) {
665 mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET
, slnode)))
;
666 ll_next = ll->next;
667
668 if (success) {
669 mcreq_enqueue_packet(pipeline, pkt);
670 } else {
671 if (pkt->flags & MCREQ_F_REQEXT) {
672 mc_REQDATAEX *rd = pkt->u_rdata.exdata;
673 if (rd->procs->fail_dtor) {
674 rd->procs->fail_dtor(pkt);
675 }
676 }
677 mcreq_wipe_packet(pipeline, pkt);
678 mcreq_release_packet(pipeline, pkt);
679 }
680
681 ll = ll_next;
682 }
683 SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next = pipeline->ctxqueued.last = NULL((void*)0);
684 if (flush) {
685 pipeline->flush_start(pipeline);
686 }
687 queue->scheds[ii] = 0;
688 }
689}
690
691void
692mcreq_sched_leave(mc_CMDQUEUE *queue, int do_flush)
693{
694 queuectx_leave(queue, 1, do_flush);
695}
696
697void
698mcreq_sched_fail(mc_CMDQUEUE *queue)
699{
700 queuectx_leave(queue, 0, 0);
701}
702
703void
704mcreq_sched_add(mc_PIPELINE *pipeline, mc_PACKET *pkt)
705{
706 mc_CMDQUEUE *cq = pipeline->parent;
707 if (!cq->scheds[pipeline->index]) {
708 cq->scheds[pipeline->index] = 1;
709 }
710 sllist_append(&pipeline->ctxqueued, &pkt->slnode);
711}
712
713static mc_PACKET *
714pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque, int do_remove)
715{
716 sllist_iterator iter;
717 SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); !
((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline
->requests, &iter))
{
718 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
719 if (pkt->opaque == opaque) {
720 if (do_remove) {
721 sllist_iter_remove(&pipeline->requests, &iter);
722 }
723 return pkt;
724 }
725 }
726 return NULL((void*)0);
727}
728
729mc_PACKET *
730mcreq_pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque)
731{
732 return pipeline_find(pipeline, opaque, 0);
733}
734
735mc_PACKET *
736mcreq_pipeline_remove(mc_PIPELINE *pipeline, lcb_uint32_t opaque)
737{
738 return pipeline_find(pipeline, opaque, 1);
739}
740
741void
742mcreq_packet_done(mc_PIPELINE *pipeline, mc_PACKET *pkt)
743{
744 assert(pkt->flags & MCREQ_F_FLUSHED)((void) sizeof ((pkt->flags & MCREQ_F_FLUSHED) ? 1 : 0
), __extension__ ({ if (pkt->flags & MCREQ_F_FLUSHED) ;
else __assert_fail ("pkt->flags & MCREQ_F_FLUSHED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c"
, 744, __extension__ __PRETTY_FUNCTION__); }))
;
745 assert(pkt->flags & MCREQ_F_INVOKED)((void) sizeof ((pkt->flags & MCREQ_F_INVOKED) ? 1 : 0
), __extension__ ({ if (pkt->flags & MCREQ_F_INVOKED) ;
else __assert_fail ("pkt->flags & MCREQ_F_INVOKED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c"
, 745, __extension__ __PRETTY_FUNCTION__); }))
;
746 if (pkt->flags & MCREQ_UBUF_FLAGS(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY)) {
747 void *kbuf, *vbuf;
748 const void *cookie;
749
750 cookie = MCREQ_PKT_COOKIE(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt
)->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))->
cookie
;
751 if (pkt->flags & MCREQ_F_KEY_NOCOPY) {
752 kbuf = SPAN_BUFFER(&pkt->kh_span)(((&pkt->kh_span)->offset == (nb_SIZE)-1) ? ((char *
)(&pkt->kh_span)->parent) : ((&pkt->kh_span)
->parent->root + (&pkt->kh_span)->offset))
;
753 } else {
754 kbuf = NULL((void*)0);
755 }
756 if (pkt->flags & MCREQ_F_VALUE_NOCOPY) {
757 if (pkt->flags & MCREQ_F_VALUE_IOV) {
758 vbuf = pkt->u_value.multi.iov->iov_base;
759 } else {
760 vbuf = SPAN_SABUFFER_NC(&pkt->u_value.single)((char *)(&pkt->u_value.single)->parent);
761 }
762 } else {
763 vbuf = NULL((void*)0);
764 }
765
766 pipeline->buf_done_callback(pipeline, cookie, kbuf, vbuf);
767 }
768 mcreq_wipe_packet(pipeline, pkt);
769 mcreq_release_packet(pipeline, pkt);
770}
771
772void
773mcreq_reset_timeouts(mc_PIPELINE *pl, lcb_U64 nstime)
774{
775 sllist_node *nn;
776 SLLIST_ITERBASIC(&pl->requests, nn)for (nn = (&pl->requests)->first_prev.next; nn; nn =
nn->next)
{
777 mc_PACKET *pkt = SLLIST_ITEM(nn, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(nn) - __builtin_offsetof(mc_PACKET
, slnode)))
;
778 MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt
)->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))
->start = nstime;
779 }
780}
781
782unsigned
783mcreq_pipeline_timeout(
784 mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *cbarg,
785 hrtime_t oldest_valid, hrtime_t *oldest_start)
786{
787 sllist_iterator iter;
788 unsigned count = 0;
789
790 SLLIST_ITERFOR(&pl->requests, &iter)for (slist_iter_init(&pl->requests, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&pl->requests
, &iter))
{
791 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
792 mc_REQDATA *rd = MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt
)->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))
;
793
794 /**
795 * oldest_valid contains the LOWEST timestamp we can admit to being
796 * acceptable. If the current command is newer (i.e. has a higher
797 * timestamp) then we break the iteration and return.
798 */
799 if (oldest_valid && rd->start > oldest_valid) {
800 if (oldest_start) {
801 *oldest_start = rd->start;
802 }
803 return count;
804 }
805
806 sllist_iter_remove(&pl->requests, &iter);
807 failcb(pl, pkt, err, cbarg);
808 mcreq_packet_handled(pl, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags &
MCREQ_F_FLUSHED) { mcreq_packet_done(pl, pkt); } } while (0)
;
;
809 count++;
810 }
811 return count;
812}
813
814unsigned
815mcreq_pipeline_fail(
816 mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *arg)
817{
818 return mcreq_pipeline_timeout(pl, err, failcb, arg, 0, NULL((void*)0));
819}
820
821void
822mcreq_iterwipe(mc_CMDQUEUE *queue, mc_PIPELINE *src,
823 mcreq_iterwipe_fn callback, void *arg)
824{
825 sllist_iterator iter;
826
827 SLLIST_ITERFOR(&src->requests, &iter)for (slist_iter_init(&src->requests, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&src->requests
, &iter))
{
828 int rv;
829 mc_PACKET *orig = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
830 rv = callback(queue, src, orig, arg);
831 if (rv == MCREQ_REMOVE_PACKET2) {
832 sllist_iter_remove(&src->requests, &iter);
833 }
834 }
835}
836
837#include "mcreq-flush-inl.h"
838typedef struct {
839 mc_PIPELINE base;
840 mcreq_fallback_cb handler;
841} mc_FALLBACKPL;
842
843static void
844do_fallback_flush(mc_PIPELINE *pipeline)
845{
846 nb_IOV iov;
847 unsigned nb;
848 int nused;
849 sllist_iterator iter;
850 mc_FALLBACKPL *fpl = (mc_FALLBACKPL*)pipeline;
851
852 while ((nb = mcreq_flush_iov_fill(pipeline, &iov, 1, &nused))) {
853 mcreq_flush_done(pipeline, nb, nb);
854 }
855 /* Now handle all the packets, for real */
856 SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); !
((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline
->requests, &iter))
{
857 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
858 fpl->handler(pipeline->parent, pkt);
859 sllist_iter_remove(&pipeline->requests, &iter);
860 mcreq_packet_handled(pipeline, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags &
MCREQ_F_FLUSHED) { mcreq_packet_done(pipeline, pkt); } } while
(0);
;
861 }
862}
863
864void
865mcreq_set_fallback_handler(mc_CMDQUEUE *cq, mcreq_fallback_cb handler)
866{
867 assert(!cq->fallback)((void) sizeof ((!cq->fallback) ? 1 : 0), __extension__ ({
if (!cq->fallback) ; else __assert_fail ("!cq->fallback"
, "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 867, __extension__
__PRETTY_FUNCTION__); }))
;
868 cq->fallback = calloc(1, sizeof (mc_FALLBACKPL));
869 mcreq_pipeline_init(cq->fallback);
870 cq->fallback->parent = cq;
871 cq->fallback->index = cq->npipelines;
872 ((mc_FALLBACKPL*)cq->fallback)->handler = handler;
873 cq->fallback->flush_start = do_fallback_flush;
874}
875
876static void
877noop_dumpfn(const void *d, unsigned n, FILE *fp) { (void)d;(void)n;(void)fp; }
878
879#define MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT
) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED)
\
880 X(KEY_NOCOPY) \
881 X(VALUE_NOCOPY) \
882 X(VALUE_IOV) \
883 X(HASVALUE) \
884 X(REQEXT) \
885 X(UFWD) \
886 X(FLUSHED) \
887 X(INVOKED) \
888 X(DETACHED)
889
890void
891mcreq_dump_packet(const mc_PACKET *packet, FILE *fp, mcreq_payload_dump_fn dumpfn)
892{
893 const char *indent = " ";
894 const mc_REQDATA *rdata = MCREQ_PKT_RDATA(packet)(((packet)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(
packet)->u_rdata.exdata) : (&(packet)->u_rdata.reqdata
))
;
895
896 if (!dumpfn) {
897 dumpfn = noop_dumpfn;
898 }
899 if (!fp) {
900 fp = stderrstderr;
901 }
902
903 fprintf(fp, "Packet @%p\n", (void *)packet);
904 fprintf(fp, "%sOPAQUE: %u\n", indent, (unsigned int)packet->opaque);
905
906 fprintf(fp, "%sPKTFLAGS: 0x%x ", indent, packet->flags);
907 #define X(base) \
908 if (packet->flags & MCREQ_F_##base) { fprintf(fp, "%s, ", #base); }
909 MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT
) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED)
910 #undef X
911 fprintf(fp, "\n");
912
913 fprintf(fp, "%sKey+Header Size: %u\n", indent, (unsigned int)packet->kh_span.size);
914 fprintf(fp, "%sKey Offset: %u\n", indent, MCREQ_PKT_BASESIZE24 + packet->extlen);
915
916
917 if (packet->flags & MCREQ_F_HASVALUE) {
918 if (packet->flags & MCREQ_F_VALUE_IOV) {
919 fprintf(fp, "%sValue Length: %u\n", indent,
920 packet->u_value.multi.total_length);
921
922 fprintf(fp, "%sValue IOV: [start=%p, n=%d]\n", indent,
923 (void *)packet->u_value.multi.iov, packet->u_value.multi.niov);
924 } else {
925 if (packet->flags & MCREQ_F_VALUE_NOCOPY) {
926 fprintf(fp, "%sValue is user allocated\n", indent);
927 }
928 fprintf(fp, "%sValue: %p, %u bytes\n", indent,
929 (void *)SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1)
? ((char *)(&packet->u_value.single)->parent) : ((
&packet->u_value.single)->parent->root + (&packet
->u_value.single)->offset))
, packet->u_value.single.size);
930 }
931 }
932
933 fprintf(fp, "%sRDATA(%s): %p\n", indent,
934 (packet->flags & MCREQ_F_REQEXT) ? "ALLOC" : "EMBEDDED", (void *)rdata);
935
936 indent = " ";
937 fprintf(fp, "%sStart: %lu\n", indent, (unsigned long)rdata->start);
938 fprintf(fp, "%sCookie: %p\n", indent, rdata->cookie);
939
940 indent = " ";
941 fprintf(fp, "%sNEXT: %p\n", indent, (void *)packet->slnode.next);
942 if (dumpfn != noop_dumpfn) {
943 fprintf(fp, "PACKET CONTENTS:\n");
944 }
945
946 fwrite(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
, 1, packet->kh_span.size, fp);
947 if (packet->flags & MCREQ_F_HASVALUE) {
948 if (packet->flags & MCREQ_F_VALUE_IOV) {
949 const lcb_IOV *iovs = packet->u_value.multi.iov;
950 unsigned ii, ixmax = packet->u_value.multi.niov;
951 for (ii = 0; ii < ixmax; ii++) {
952 dumpfn(iovs[ii].iov_base, iovs[ii].iov_len, fp);
953 }
954 } else {
955 const nb_SPAN *vspan = &packet->u_value.single;
956 dumpfn(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent
) : ((vspan)->parent->root + (vspan)->offset))
, vspan->size, fp);
957 }
958 }
959}
960
961void
962mcreq_dump_chain(const mc_PIPELINE *pipeline, FILE *fp, mcreq_payload_dump_fn dumpfn)
963{
964 sllist_node *ll;
965 SLLIST_FOREACH(&pipeline->requests, ll)for (ll = (&pipeline->requests)->first_prev.next; ll
; ll = ll->next)
{
966 const mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET
, slnode)))
;
967 mcreq_dump_packet(pkt, fp, dumpfn);
968 }
969}