Summary > Report e1de1f

Bug Summary

File:home/avsej/code/libcouchbase/src/mc/mcreq.c
Warning:line 48, column 20
Access to field 'extlen' results in a dereference of a null pointer (loaded from variable 'packet')
Report Bug

Annotated Source Code

1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 * Copyright 2014 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#include "mcreq.h"
19#include "compress.h"
20#include "sllist-inl.h"
21#include "internal.h"
22
23#define PKT_HDRSIZE(pkt)(24 + (pkt)->extlen) (MCREQ_PKT_BASESIZE24 + (pkt)->extlen)
24
25lcb_error_t
26mcreq_reserve_header(
27 mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize)
28{
29 int rv;
30 packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24;
31 packet->kh_span.size = hdrsize;
32 rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
33 if (rv != 0) {
34 return LCB_CLIENT_ENOMEM;
35 }
36 return LCB_SUCCESS;
37}
38
39lcb_error_t
40mcreq_reserve_key(
41 mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize,
42 const lcb_KEYBUF *kreq)
43{
44 const struct lcb_CONTIGBUF *contig = &kreq->contig;
45 int rv;
46
47 /** Set the key offset which is the start of the key from the buffer */
48 packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24;
9
Access to field 'extlen' results in a dereference of a null pointer (loaded from variable 'packet')
49 packet->kh_span.size = kreq->contig.nbytes;
50
51 if (kreq->type == LCB_KV_COPY) {
52 /**
53 * If the key is to be copied then just allocate the span size
54 * for the key+24+extras
55 */
56 packet->kh_span.size += hdrsize;
57 rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
58 if (rv != 0) {
59 return LCB_CLIENT_ENOMEM;
60 }
61
62 /**
63 * Copy the key into the packet starting at the extras end
64 */
65 memcpy(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ hdrsize,
66 contig->bytes,
67 contig->nbytes);
68
69 } else if (kreq->type == LCB_KV_CONTIG) {
70 /**
71 * Don't do any copying.
72 * Assume the key buffer has enough space for the packet as well.
73 */
74 CREATE_STANDALONE_SPAN(&packet->kh_span, contig->bytes, contig->nbytes)(&packet->kh_span)->parent = (nb_MBLOCK *) (void *)
contig->bytes; (&packet->kh_span)->offset = (nb_SIZE
)-1; (&packet->kh_span)->size = contig->nbytes;
;
75 packet->flags |= MCREQ_F_KEY_NOCOPY;
76
77 } else {
78 /** IOVs not supported for keys */
79 return LCB_EINVAL;
80 }
81
82 return LCB_SUCCESS;
83}
84
85lcb_error_t
86mcreq_reserve_value2(mc_PIPELINE *pl, mc_PACKET *pkt, lcb_size_t n)
87{
88 int rv;
89 pkt->u_value.single.size = n;
90 if (!n) {
91 return LCB_SUCCESS;
92 }
93
94 pkt->flags |= MCREQ_F_HASVALUE;
95 rv = netbuf_mblock_reserve(&pl->nbmgr, &pkt->u_value.single);
96 if (rv) {
97 return LCB_CLIENT_ENOMEM;
98 }
99 return LCB_SUCCESS;
100}
101
102lcb_error_t
103mcreq_reserve_value(
104 mc_PIPELINE *pipeline, mc_PACKET *packet, const lcb_VALBUF *vreq)
105{
106 const lcb_CONTIGBUF *contig = &vreq->u_buf.contig;
107 nb_SPAN *vspan = &packet->u_value.single;
108 int rv;
109
110 if (vreq->vtype == LCB_KV_COPY) {
111 /** Copy the value into a single SPAN */
112 if (! (vspan->size = vreq->u_buf.contig.nbytes)) {
113 return LCB_SUCCESS;
114 }
115 rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan);
116
117 if (rv != 0) {
118 return LCB_CLIENT_ENOMEM;
119 }
120
121 memcpy(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent
) : ((vspan)->parent->root + (vspan)->offset))
, contig->bytes, contig->nbytes);
122
123 } else if (vreq->vtype == LCB_KV_CONTIG) {
124 /** It's still contiguous so make it a 'standalone' span */
125 CREATE_STANDALONE_SPAN(vspan, contig->bytes, contig->nbytes)(vspan)->parent = (nb_MBLOCK *) (void *)contig->bytes; (
vspan)->offset = (nb_SIZE)-1; (vspan)->size = contig->
nbytes;
;
126 packet->flags |= MCREQ_F_VALUE_NOCOPY;
127
128 } else if (vreq->vtype == LCB_KV_IOV) {
129 /** Multiple spans, no copy */
130 unsigned int ii;
131 const lcb_FRAGBUF *msrc = &vreq->u_buf.multi;
132 lcb_FRAGBUF *mdst = &packet->u_value.multi;
133
134 packet->flags |= MCREQ_F_VALUE_IOV | MCREQ_F_VALUE_NOCOPY;
135 mdst->niov = msrc->niov;
136 mdst->iov = malloc(mdst->niov * sizeof(*mdst->iov));
137 mdst->total_length = 0;
138
139 for (ii = 0; ii < mdst->niov; ii++) {
140 mdst->iov[ii] = msrc->iov[ii];
141 mdst->total_length += mdst->iov[ii].iov_len;
142 }
143 } else if (vreq->vtype == LCB_KV_IOVCOPY) {
144 /** Multiple input buffers, normal copying output buffer */
145 unsigned int ii, cur_offset;
146 const lcb_FRAGBUF *msrc = &vreq->u_buf.multi;
147
148 if (msrc->total_length) {
149 vspan->size = msrc->total_length;
150 } else {
151 vspan->size = 0;
152 for (ii = 0; ii < msrc->niov; ii++) {
153 vspan->size += msrc->iov[ii].iov_len;
154 }
155 }
156
157 rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan);
158 if (rv != 0) {
159 return LCB_CLIENT_ENOMEM;
160 }
161
162 for (ii = 0, cur_offset = 0; ii < msrc->niov; ii++) {
163 char *buf = SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent
) : ((vspan)->parent->root + (vspan)->offset))
+ cur_offset;
164 memcpy(buf, msrc->iov[ii].iov_base, msrc->iov[ii].iov_len);
165 cur_offset += msrc->iov[ii].iov_len;
166 }
167 }
168
169 packet->flags |= MCREQ_F_HASVALUE;
170 return LCB_SUCCESS;
171}
172
173static int
174pkt_tmo_compar(sllist_node *a, sllist_node *b)
175{
176 mc_PACKET *pa, *pb;
177 hrtime_t tmo_a, tmo_b;
178
179 pa = SLLIST_ITEM(a, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(a) - __builtin_offsetof(mc_PACKET
, slnode)))
;
180 pb = SLLIST_ITEM(b, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(b) - __builtin_offsetof(mc_PACKET
, slnode)))
;
181
182 tmo_a = MCREQ_PKT_RDATA(pa)(((pa)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pa)->
u_rdata.exdata) : (&(pa)->u_rdata.reqdata))
->start;
183 tmo_b = MCREQ_PKT_RDATA(pb)(((pb)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pb)->
u_rdata.exdata) : (&(pb)->u_rdata.reqdata))
->start;
184
185 if (tmo_a == tmo_b) {
186 return 0;
187 } else if (tmo_a < tmo_b) {
188 return -1;
189 } else {
190 return 1;
191 }
192}
193
194void
195mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
196{
197 sllist_root *reqs = &pipeline->requests;
198 mcreq_enqueue_packet(pipeline, packet);
199 sllist_remove(reqs, &packet->slnode);
200 sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar);
201}
202
203void
204mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
205{
206 nb_SPAN *vspan = &packet->u_value.single;
207 sllist_append(&pipeline->requests, &packet->slnode);
208 netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span);
209
210 if (!(packet->flags & MCREQ_F_HASVALUE)) {
211 goto GT_ENQUEUE_PDU;
212 }
213
214 if (packet->flags & MCREQ_F_VALUE_IOV) {
215 unsigned int ii;
216 lcb_FRAGBUF *multi = &packet->u_value.multi;
217 for (ii = 0; ii < multi->niov; ii++) {
218 netbuf_enqueue(&pipeline->nbmgr, (nb_IOV *)multi->iov + ii);
219 }
220
221 } else if (vspan->size) {
222 netbuf_enqueue_span(&pipeline->nbmgr, vspan);
223 }
224
225 GT_ENQUEUE_PDU:
226 netbuf_pdu_enqueue(&pipeline->nbmgr, packet, offsetof(mc_PACKET, sl_flushq)__builtin_offsetof(mc_PACKET, sl_flushq));
227}
228
229void
230mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
231{
232 if (! (packet->flags & MCREQ_F_KEY_NOCOPY)) {
233 if (packet->flags & MCREQ_F_DETACHED) {
234 free(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
);
235 } else {
236 netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span);
237 }
238 }
239
240 if (! (packet->flags & MCREQ_F_HASVALUE)) {
241 return;
242 }
243
244 if (packet->flags & MCREQ_F_VALUE_NOCOPY) {
245 if (packet->flags & MCREQ_F_VALUE_IOV) {
246 free(packet->u_value.multi.iov);
247 }
248
249 return;
250 }
251
252 if (packet->flags & MCREQ_F_DETACHED) {
253 free(SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1)
? ((char *)(&packet->u_value.single)->parent) : ((
&packet->u_value.single)->parent->root + (&packet
->u_value.single)->offset))
);
254 } else {
255 netbuf_mblock_release(&pipeline->nbmgr, &packet->u_value.single);
256 }
257
258}
259
260mc_PACKET *
261mcreq_allocate_packet(mc_PIPELINE *pipeline)
262{
263 nb_SPAN span;
264 int rv;
265 mc_PACKET *ret;
266 span.size = sizeof(*ret);
267
268 rv = netbuf_mblock_reserve(&pipeline->reqpool, &span);
269 if (rv != 0) {
270 return NULL((void*)0);
271 }
272
273 ret = (void *) SPAN_MBUFFER_NC(&span)((&span)->parent->root + (&span)->offset);
274 ret->alloc_parent = span.parent;
275 ret->flags = 0;
276 ret->retries = 0;
277 ret->opaque = pipeline->parent->seq++;
278 return ret;
279}
280
281void
282mcreq_release_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
283{
284 nb_SPAN span;
285 if (packet->flags & MCREQ_F_DETACHED) {
286 sllist_iterator iter;
287 mc_EXPACKET *epkt = (mc_EXPACKET *)packet;
288
289 SLLIST_ITERFOR(&epkt->data, &iter)for (slist_iter_init(&epkt->data, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&epkt->data
, &iter))
{
290 mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_EPKTDATUM, slnode)))
;
291 sllist_iter_remove(&epkt->data, &iter);
292 d->dtorfn(d);
293 }
294 free(epkt);
295 return;
296 }
297
298 span.size = sizeof(*packet);
299 span.parent = packet->alloc_parent;
300 span.offset = (char *)packet - packet->alloc_parent->root;
301
302 netbuf_mblock_release(&pipeline->reqpool, &span);
303}
304
305#define MCREQ_DETACH_WIPESRC1 1
306
307mc_PACKET *
308mcreq_renew_packet(const mc_PACKET *src)
309{
310 char *kdata, *vdata;
311 unsigned nvdata;
312 mc_PACKET *dst;
313 mc_EXPACKET *edst = calloc(1, sizeof(*edst));
314
315 dst = &edst->base;
316 *dst = *src;
317
318 kdata = malloc(src->kh_span.size);
319 memcpy(kdata, SPAN_BUFFER(&src->kh_span)(((&src->kh_span)->offset == (nb_SIZE)-1) ? ((char *
)(&src->kh_span)->parent) : ((&src->kh_span)
->parent->root + (&src->kh_span)->offset))
, src->kh_span.size);
320 CREATE_STANDALONE_SPAN(&dst->kh_span, kdata, src->kh_span.size)(&dst->kh_span)->parent = (nb_MBLOCK *) (void *)kdata
; (&dst->kh_span)->offset = (nb_SIZE)-1; (&dst->
kh_span)->size = src->kh_span.size;
;
321
322 dst->flags &= ~(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY|MCREQ_F_VALUE_IOV);
323 dst->flags |= MCREQ_F_DETACHED;
324 dst->alloc_parent = NULL((void*)0);
325 dst->sl_flushq.next = NULL((void*)0);
326 dst->slnode.next = NULL((void*)0);
327 dst->retries = src->retries;
328
329 if (src->flags & MCREQ_F_HASVALUE) {
330 /** Get the length */
331 if (src->flags & MCREQ_F_VALUE_IOV) {
332 unsigned ii;
333 unsigned offset = 0;
334
335 nvdata = src->u_value.multi.total_length;
336 vdata = malloc(nvdata);
337 for (ii = 0; ii < src->u_value.multi.niov; ii++) {
338 const lcb_IOV *iov = src->u_value.multi.iov + ii;
339
340 memcpy(vdata + offset, iov->iov_base, iov->iov_len);
341 offset += iov->iov_len;
342 }
343 } else {
344 protocol_binary_request_header hdr;
345 const nb_SPAN *origspan = &src->u_value.single;
346 mcreq_read_hdr(dst, &hdr)memcpy( (&hdr)->bytes, (((&(dst)->kh_span)->
offset == (nb_SIZE)-1) ? ((char *)(&(dst)->kh_span)->
parent) : ((&(dst)->kh_span)->parent->root + (&
(dst)->kh_span)->offset)), sizeof((&hdr)->bytes)
)
;
347
348 if (hdr.request.datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
349 /* For compressed payloads we need to uncompress it first
350 * because it may be forwarded to a server without compression.
351 * TODO: might be more clever to check a setting flag somewhere
352 * and see if we should do this. */
353
354 lcb_SIZE n_inflated;
355 const void *inflated;
356 int rv;
357
358 vdata = NULL((void*)0);
359 rv = mcreq_inflate_value(SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan)
->parent) : ((origspan)->parent->root + (origspan)->
offset))
, origspan->size,
360 &inflated, &n_inflated, (void**)&vdata);
361
362 assert(vdata == inflated)((void) sizeof ((vdata == inflated) ? 1 : 0), __extension__ (
{ if (vdata == inflated) ; else __assert_fail ("vdata == inflated"
, "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 362, __extension__
__PRETTY_FUNCTION__); }))
;
363
364 if (rv != 0) {
365 return NULL((void*)0);
366 }
367 nvdata = n_inflated;
368 hdr.request.datatype &= ~PROTOCOL_BINARY_DATATYPE_COMPRESSED;
369 hdr.request.bodylen = htonl(
370 ntohs(hdr.request.keylen) +
371 hdr.request.extlen +
372 n_inflated);
373 mcreq_write_hdr(dst, &hdr)memcpy( (((&(dst)->kh_span)->offset == (nb_SIZE)-1)
? ((char *)(&(dst)->kh_span)->parent) : ((&(dst
)->kh_span)->parent->root + (&(dst)->kh_span)
->offset)), (&hdr)->bytes, sizeof((&hdr)->bytes
) )
;
374
375 } else {
376 nvdata = origspan->size;
377 vdata = malloc(nvdata);
378 memcpy(vdata, SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan)
->parent) : ((origspan)->parent->root + (origspan)->
offset))
, nvdata);
379 }
380 }
381
382 /* Declare the value as a standalone malloc'd span */
383 CREATE_STANDALONE_SPAN(&dst->u_value.single, vdata, nvdata)(&dst->u_value.single)->parent = (nb_MBLOCK *) (void
*)vdata; (&dst->u_value.single)->offset = (nb_SIZE
)-1; (&dst->u_value.single)->size = nvdata;
;
384 }
385
386 if (src->flags & MCREQ_F_DETACHED) {
387 mc_EXPACKET *esrc = (mc_EXPACKET *)src;
388 sllist_iterator iter;
389 SLLIST_ITERFOR(&esrc->data, &iter)for (slist_iter_init(&esrc->data, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&esrc->data
, &iter))
{
390 sllist_node *cur = iter.cur;
391 sllist_iter_remove(&esrc->data, &iter);
392 sllist_append(&edst->data, cur);
393 }
394 }
395 return dst;
396}
397
398int
399mcreq_epkt_insert(mc_EXPACKET *ep, mc_EPKTDATUM *datum)
400{
401 if (!(ep->base.flags & MCREQ_F_DETACHED)) {
402 return -1;
403 }
404 assert(!sllist_contains(&ep->data, &datum->slnode))((void) sizeof ((!sllist_contains(&ep->data, &datum
->slnode)) ? 1 : 0), __extension__ ({ if (!sllist_contains
(&ep->data, &datum->slnode)) ; else __assert_fail
("!sllist_contains(&ep->data, &datum->slnode)"
, "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 404, __extension__
__PRETTY_FUNCTION__); }))
;
405 sllist_append(&ep->data, &datum->slnode);
406 return 0;
407}
408
409mc_EPKTDATUM *
410mcreq_epkt_find(mc_EXPACKET *ep, const char *key)
411{
412 sllist_iterator iter;
413 SLLIST_ITERFOR(&ep->data, &iter)for (slist_iter_init(&ep->data, &iter); !((&iter
)->cur == ((void*)0)); slist_iter_incr(&ep->data, &
iter))
{
414 mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_EPKTDATUM, slnode)))
;
415 if (!strcmp(key, d->key)) {
416 return d;
417 }
418 }
419 return NULL((void*)0);
420}
421
422void
423mcreq_map_key(mc_CMDQUEUE *queue,
424 const lcb_KEYBUF *key, const lcb_KEYBUF *hashkey,
425 unsigned nhdr, int *vbid, int *srvix)
426{
427 const void *hk;
428 size_t nhk = 0;
429 if (hashkey) {
430 if (hashkey->type == LCB_KV_COPY && hashkey->contig.bytes != NULL((void*)0)) {
431 hk = hashkey->contig.bytes;
432 nhk = hashkey->contig.nbytes;
433 } else if (hashkey->type == LCB_KV_VBID) {
434 *vbid = hashkey->contig.nbytes;
435 *srvix = lcbvb_vbmaster(queue->config, *vbid);
436 return;
437 }
438 }
439 if (!nhk) {
440 if (key->type == LCB_KV_COPY) {
441 hk = key->contig.bytes;
442 nhk = key->contig.nbytes;
443 } else {
444 const char *buf = key->contig.bytes;
445 buf += nhdr;
446 hk = buf;
447 nhk = key->contig.nbytes - nhdr;
448 }
449 }
450 lcbvb_map_key(queue->config, hk, nhk, vbid, srvix);
451}
452
453lcb_error_t
454mcreq_basic_packet(
455 mc_CMDQUEUE *queue, const lcb_CMDBASE *cmd,
456 protocol_binary_request_header *req, lcb_uint8_t extlen,
457 mc_PACKET **packet, mc_PIPELINE **pipeline, int options)
458{
459 int vb, srvix;
460
461 if (!queue->config) {
1
Assuming the condition is false
2
Taking false branch
462 return LCB_CLIENT_ETMPFAILLCB_CLIENT_ENOCONF;
463 }
464
465 mcreq_map_key(queue, &(cmd->key), &(cmd->_hashkey),
466 sizeof(*req) + extlen, &vb, &srvix);
467 if (srvix > -1 && srvix < (int)queue->npipelines) {
3
Assuming the condition is false
468 *pipeline = queue->pipelines[srvix];
469
470 } else {
471 if ((options & MCREQ_BASICPACKET_F_FALLBACKOK0x01) && queue->fallback) {
4
Assuming the condition is true
5
Assuming the condition is true
6
Taking true branch
472 *pipeline = queue->fallback;
473 } else {
474 return LCB_NO_MATCHING_SERVER;
475 }
476 }
477
478 *packet = mcreq_allocate_packet(*pipeline);
479
480 mcreq_reserve_key(*pipeline, *packet, sizeof(*req) + extlen, &cmd->key);
7
Passing null pointer value via 2nd parameter 'packet'
8
Calling 'mcreq_reserve_key'
481
482 req->request.keylen = htons((*packet)->kh_span.size - PKT_HDRSIZE(*packet)(24 + (*packet)->extlen));
483 req->request.vbucket = htons(vb);
484 req->request.extlen = extlen;
485 return LCB_SUCCESS;
486}
487
488void
489mcreq_get_key(const mc_PACKET *packet, const void **key, lcb_size_t *nkey)
490{
491 *key = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ PKT_HDRSIZE(packet)(24 + (packet)->extlen);
492 *nkey = packet->kh_span.size - PKT_HDRSIZE(packet)(24 + (packet)->extlen);
493}
494
495lcb_uint32_t
496mcreq_get_bodysize(const mc_PACKET *packet)
497{
498 lcb_uint32_t ret;
499 char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ 8;
500 if ((uintptr_t)retptr % sizeof(ret) == 0) {
501 return ntohl(*(lcb_uint32_t*) (void *)retptr);
502 } else {
503 memcpy(&ret, retptr, sizeof(ret));
504 return ntohl(ret);
505 }
506}
507
508uint16_t
509mcreq_get_vbucket(const mc_PACKET *packet)
510{
511 uint16_t ret;
512 char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ 6;
513 if ((uintptr_t)retptr % sizeof(ret) == 0) {
514 return ntohs(*(uint16_t*)(void*)retptr);
515 } else {
516 memcpy(&ret, retptr, sizeof ret);
517 return ntohs(ret);
518 }
519}
520
521uint32_t
522mcreq_get_size(const mc_PACKET *packet)
523{
524 uint32_t sz = packet->kh_span.size;
525 if (packet->flags & MCREQ_F_HASVALUE) {
526 if (packet->flags & MCREQ_F_VALUE_IOV) {
527 sz += packet->u_value.multi.total_length;
528 } else {
529 sz += packet->u_value.single.size;
530 }
531 }
532 return sz;
533}
534
535void
536mcreq_pipeline_cleanup(mc_PIPELINE *pipeline)
537{
538 netbuf_cleanup(&pipeline->nbmgr);
539 netbuf_cleanup(&pipeline->reqpool);
540}
541
542int
543mcreq_pipeline_init(mc_PIPELINE *pipeline)
544{
545 nb_SETTINGS settings;
546
547 /* Initialize all members to 0 */
548 memset(&pipeline->requests, 0, sizeof pipeline->requests);
549 pipeline->parent = NULL((void*)0);
550 pipeline->flush_start = NULL((void*)0);
551 pipeline->index = 0;
552 memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued);
553 pipeline->buf_done_callback = NULL((void*)0);
554
555 netbuf_default_settings(&settings);
556
557 /** Initialize datapool */
558 netbuf_init(&pipeline->nbmgr, &settings);
559
560 /** Initialize request pool */
561 settings.data_basealloc = sizeof(mc_PACKET) * 32;
562 netbuf_init(&pipeline->reqpool, &settings);;
563 return 0;
564}
565
566void
567mcreq_queue_add_pipelines(
568 mc_CMDQUEUE *queue, mc_PIPELINE * const *pipelines, unsigned npipelines,
569 lcbvb_CONFIG* config)
570{
571 unsigned ii;
572
573 lcb_assert(queue->pipelines == NULL)((void) sizeof ((queue->pipelines == ((void*)0)) ? 1 : 0),
__extension__ ({ if (queue->pipelines == ((void*)0)) ; else
__assert_fail ("queue->pipelines == ((void*)0)", "/home/avsej/code/libcouchbase/src/mc/mcreq.c"
, 573, __extension__ __PRETTY_FUNCTION__); }))
;
574 queue->npipelines = npipelines;
575 queue->_npipelines_ex = queue->npipelines;
576 queue->pipelines = malloc(sizeof(*pipelines) * (npipelines + 1));
577 queue->config = config;
578
579 memcpy(queue->pipelines, pipelines, sizeof(*pipelines) * npipelines);
580
581 free(queue->scheds);
582 queue->scheds = calloc(npipelines+1, 1);
583
584 for (ii = 0; ii < npipelines; ii++) {
585 pipelines[ii]->parent = queue;
586 pipelines[ii]->index = ii;
587 }
588
589 if (queue->fallback) {
590 queue->fallback->index = npipelines;
591 queue->pipelines[queue->npipelines] = queue->fallback;
592 queue->_npipelines_ex++;
593 }
594}
595
596mc_PIPELINE **
597mcreq_queue_take_pipelines(mc_CMDQUEUE *queue, unsigned *count)
598{
599 mc_PIPELINE **ret = queue->pipelines;
600 *count = queue->npipelines;
601 queue->pipelines = NULL((void*)0);
602 queue->npipelines = 0;
603 return ret;
604}
605
606int
607mcreq_queue_init(mc_CMDQUEUE *queue)
608{
609 queue->seq = 0;
610 queue->pipelines = NULL((void*)0);
611 queue->scheds = NULL((void*)0);
612 queue->fallback = NULL((void*)0);
613 queue->npipelines = 0;
614 return 0;
615}
616
617void
618mcreq_queue_cleanup(mc_CMDQUEUE *queue)
619{
620 if (queue->fallback) {
621 mcreq_pipeline_cleanup(queue->fallback);
622 free(queue->fallback);
623 queue->fallback = NULL((void*)0);
624 }
625 free(queue->scheds);
626 free(queue->pipelines);
627 queue->scheds = NULL((void*)0);
628}
629
630void
631mcreq_sched_enter(mc_CMDQUEUE *queue)
632{
633 queue->ctxenter = 1;
634}
635
636
637
638static void
639queuectx_leave(mc_CMDQUEUE *queue, int success, int flush)
640{
641 unsigned ii;
642
643 if (queue->ctxenter) {
644 queue->ctxenter = 0;
645 }
646
647 for (ii = 0; ii < queue->_npipelines_ex; ii++) {
648 mc_PIPELINE *pipeline;
649 sllist_node *ll_next, *ll;
650
651 if (!queue->scheds[ii]) {
652 continue;
653 }
654
655 pipeline = queue->pipelines[ii];
656 ll = SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next;
657
658 while (ll) {
659 mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET
, slnode)))
;
660 ll_next = ll->next;
661
662 if (success) {
663 mcreq_enqueue_packet(pipeline, pkt);
664 } else {
665 if (pkt->flags & MCREQ_F_REQEXT) {
666 mc_REQDATAEX *rd = pkt->u_rdata.exdata;
667 if (rd->procs->fail_dtor) {
668 rd->procs->fail_dtor(pkt);
669 }
670 }
671 mcreq_wipe_packet(pipeline, pkt);
672 mcreq_release_packet(pipeline, pkt);
673 }
674
675 ll = ll_next;
676 }
677 SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next = pipeline->ctxqueued.last = NULL((void*)0);
678 if (flush) {
679 pipeline->flush_start(pipeline);
680 }
681 queue->scheds[ii] = 0;
682 }
683}
684
685void
686mcreq_sched_leave(mc_CMDQUEUE *queue, int do_flush)
687{
688 queuectx_leave(queue, 1, do_flush);
689}
690
691void
692mcreq_sched_fail(mc_CMDQUEUE *queue)
693{
694 queuectx_leave(queue, 0, 0);
695}
696
697void
698mcreq_sched_add(mc_PIPELINE *pipeline, mc_PACKET *pkt)
699{
700 mc_CMDQUEUE *cq = pipeline->parent;
701 if (!cq->scheds[pipeline->index]) {
702 cq->scheds[pipeline->index] = 1;
703 }
704 sllist_append(&pipeline->ctxqueued, &pkt->slnode);
705}
706
707static mc_PACKET *
708pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque, int do_remove)
709{
710 sllist_iterator iter;
711 SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); !
((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline
->requests, &iter))
{
712 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
713 if (pkt->opaque == opaque) {
714 if (do_remove) {
715 sllist_iter_remove(&pipeline->requests, &iter);
716 }
717 return pkt;
718 }
719 }
720 return NULL((void*)0);
721}
722
723mc_PACKET *
724mcreq_pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque)
725{
726 return pipeline_find(pipeline, opaque, 0);
727}
728
729mc_PACKET *
730mcreq_pipeline_remove(mc_PIPELINE *pipeline, lcb_uint32_t opaque)
731{
732 return pipeline_find(pipeline, opaque, 1);
733}
734
735void
736mcreq_packet_done(mc_PIPELINE *pipeline, mc_PACKET *pkt)
737{
738 assert(pkt->flags & MCREQ_F_FLUSHED)((void) sizeof ((pkt->flags & MCREQ_F_FLUSHED) ? 1 : 0
), __extension__ ({ if (pkt->flags & MCREQ_F_FLUSHED) ;
else __assert_fail ("pkt->flags & MCREQ_F_FLUSHED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c"
, 738, __extension__ __PRETTY_FUNCTION__); }))
;
739 assert(pkt->flags & MCREQ_F_INVOKED)((void) sizeof ((pkt->flags & MCREQ_F_INVOKED) ? 1 : 0
), __extension__ ({ if (pkt->flags & MCREQ_F_INVOKED) ;
else __assert_fail ("pkt->flags & MCREQ_F_INVOKED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c"
, 739, __extension__ __PRETTY_FUNCTION__); }))
;
740 if (pkt->flags & MCREQ_UBUF_FLAGS(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY)) {
741 void *kbuf, *vbuf;
742 const void *cookie;
743
744 cookie = MCREQ_PKT_COOKIE(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt
)->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))->
cookie
;
745 if (pkt->flags & MCREQ_F_KEY_NOCOPY) {
746 kbuf = SPAN_BUFFER(&pkt->kh_span)(((&pkt->kh_span)->offset == (nb_SIZE)-1) ? ((char *
)(&pkt->kh_span)->parent) : ((&pkt->kh_span)
->parent->root + (&pkt->kh_span)->offset))
;
747 } else {
748 kbuf = NULL((void*)0);
749 }
750 if (pkt->flags & MCREQ_F_VALUE_NOCOPY) {
751 if (pkt->flags & MCREQ_F_VALUE_IOV) {
752 vbuf = pkt->u_value.multi.iov->iov_base;
753 } else {
754 vbuf = SPAN_SABUFFER_NC(&pkt->u_value.single)((char *)(&pkt->u_value.single)->parent);
755 }
756 } else {
757 vbuf = NULL((void*)0);
758 }
759
760 pipeline->buf_done_callback(pipeline, cookie, kbuf, vbuf);
761 }
762 mcreq_wipe_packet(pipeline, pkt);
763 mcreq_release_packet(pipeline, pkt);
764}
765
766void
767mcreq_reset_timeouts(mc_PIPELINE *pl, lcb_U64 nstime)
768{
769 sllist_node *nn;
770 SLLIST_ITERBASIC(&pl->requests, nn)for (nn = (&pl->requests)->first_prev.next; nn; nn =
nn->next)
{
771 mc_PACKET *pkt = SLLIST_ITEM(nn, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(nn) - __builtin_offsetof(mc_PACKET
, slnode)))
;
772 MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt
)->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))
->start = nstime;
773 }
774}
775
776unsigned
777mcreq_pipeline_timeout(
778 mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *cbarg,
779 hrtime_t oldest_valid, hrtime_t *oldest_start)
780{
781 sllist_iterator iter;
782 unsigned count = 0;
783
784 SLLIST_ITERFOR(&pl->requests, &iter)for (slist_iter_init(&pl->requests, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&pl->requests
, &iter))
{
785 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
786 mc_REQDATA *rd = MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt
)->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))
;
787
788 /**
789 * oldest_valid contains the LOWEST timestamp we can admit to being
790 * acceptable. If the current command is newer (i.e. has a higher
791 * timestamp) then we break the iteration and return.
792 */
793 if (oldest_valid && rd->start > oldest_valid) {
794 if (oldest_start) {
795 *oldest_start = rd->start;
796 }
797 return count;
798 }
799
800 sllist_iter_remove(&pl->requests, &iter);
801 failcb(pl, pkt, err, cbarg);
802 mcreq_packet_handled(pl, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags &
MCREQ_F_FLUSHED) { mcreq_packet_done(pl, pkt); } } while (0)
;
;
803 count++;
804 }
805 return count;
806}
807
808unsigned
809mcreq_pipeline_fail(
810 mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *arg)
811{
812 return mcreq_pipeline_timeout(pl, err, failcb, arg, 0, NULL((void*)0));
813}
814
815void
816mcreq_iterwipe(mc_CMDQUEUE *queue, mc_PIPELINE *src,
817 mcreq_iterwipe_fn callback, void *arg)
818{
819 sllist_iterator iter;
820
821 SLLIST_ITERFOR(&src->requests, &iter)for (slist_iter_init(&src->requests, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&src->requests
, &iter))
{
822 int rv;
823 mc_PACKET *orig = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
824 rv = callback(queue, src, orig, arg);
825 if (rv == MCREQ_REMOVE_PACKET2) {
826 sllist_iter_remove(&src->requests, &iter);
827 }
828 }
829}
830
831#include "mcreq-flush-inl.h"
832typedef struct {
833 mc_PIPELINE base;
834 mcreq_fallback_cb handler;
835} mc_FALLBACKPL;
836
837static void
838do_fallback_flush(mc_PIPELINE *pipeline)
839{
840 nb_IOV iov;
841 unsigned nb;
842 int nused;
843 sllist_iterator iter;
844 mc_FALLBACKPL *fpl = (mc_FALLBACKPL*)pipeline;
845
846 while ((nb = mcreq_flush_iov_fill(pipeline, &iov, 1, &nused))) {
847 mcreq_flush_done(pipeline, nb, nb);
848 }
849 /* Now handle all the packets, for real */
850 SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); !
((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline
->requests, &iter))
{
851 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
852 fpl->handler(pipeline->parent, pkt);
853 sllist_iter_remove(&pipeline->requests, &iter);
854 mcreq_packet_handled(pipeline, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags &
MCREQ_F_FLUSHED) { mcreq_packet_done(pipeline, pkt); } } while
(0);
;
855 }
856}
857
858void
859mcreq_set_fallback_handler(mc_CMDQUEUE *cq, mcreq_fallback_cb handler)
860{
861 assert(!cq->fallback)((void) sizeof ((!cq->fallback) ? 1 : 0), __extension__ ({
if (!cq->fallback) ; else __assert_fail ("!cq->fallback"
, "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 861, __extension__
__PRETTY_FUNCTION__); }))
;
862 cq->fallback = calloc(1, sizeof (mc_FALLBACKPL));
863 mcreq_pipeline_init(cq->fallback);
864 cq->fallback->parent = cq;
865 cq->fallback->index = cq->npipelines;
866 ((mc_FALLBACKPL*)cq->fallback)->handler = handler;
867 cq->fallback->flush_start = do_fallback_flush;
868}
869
870static void
871noop_dumpfn(const void *d, unsigned n, FILE *fp) { (void)d;(void)n;(void)fp; }
872
873#define MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT
) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED)
\
874 X(KEY_NOCOPY) \
875 X(VALUE_NOCOPY) \
876 X(VALUE_IOV) \
877 X(HASVALUE) \
878 X(REQEXT) \
879 X(UFWD) \
880 X(FLUSHED) \
881 X(INVOKED) \
882 X(DETACHED)
883
884void
885mcreq_dump_packet(const mc_PACKET *packet, FILE *fp, mcreq_payload_dump_fn dumpfn)
886{
887 const char *indent = " ";
888 const mc_REQDATA *rdata = MCREQ_PKT_RDATA(packet)(((packet)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(
packet)->u_rdata.exdata) : (&(packet)->u_rdata.reqdata
))
;
889
890 if (!dumpfn) {
891 dumpfn = noop_dumpfn;
892 }
893 if (!fp) {
894 fp = stderrstderr;
895 }
896
897 fprintf(fp, "Packet @%p\n", (void *)packet);
898 fprintf(fp, "%sOPAQUE: %u\n", indent, (unsigned int)packet->opaque);
899
900 fprintf(fp, "%sPKTFLAGS: 0x%x ", indent, packet->flags);
901 #define X(base) \
902 if (packet->flags & MCREQ_F_##base) { fprintf(fp, "%s, ", #base); }
903 MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT
) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED)
904 #undef X
905 fprintf(fp, "\n");
906
907 fprintf(fp, "%sKey+Header Size: %u\n", indent, (unsigned int)packet->kh_span.size);
908 fprintf(fp, "%sKey Offset: %u\n", indent, MCREQ_PKT_BASESIZE24 + packet->extlen);
909
910
911 if (packet->flags & MCREQ_F_HASVALUE) {
912 if (packet->flags & MCREQ_F_VALUE_IOV) {
913 fprintf(fp, "%sValue Length: %u\n", indent,
914 packet->u_value.multi.total_length);
915
916 fprintf(fp, "%sValue IOV: [start=%p, n=%d]\n", indent,
917 (void *)packet->u_value.multi.iov, packet->u_value.multi.niov);
918 } else {
919 if (packet->flags & MCREQ_F_VALUE_NOCOPY) {
920 fprintf(fp, "%sValue is user allocated\n", indent);
921 }
922 fprintf(fp, "%sValue: %p, %u bytes\n", indent,
923 (void *)SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1)
? ((char *)(&packet->u_value.single)->parent) : ((
&packet->u_value.single)->parent->root + (&packet
->u_value.single)->offset))
, packet->u_value.single.size);
924 }
925 }
926
927 fprintf(fp, "%sRDATA(%s): %p\n", indent,
928 (packet->flags & MCREQ_F_REQEXT) ? "ALLOC" : "EMBEDDED", (void *)rdata);
929
930 indent = " ";
931 fprintf(fp, "%sStart: %lu\n", indent, (unsigned long)rdata->start);
932 fprintf(fp, "%sCookie: %p\n", indent, rdata->cookie);
933
934 indent = " ";
935 fprintf(fp, "%sNEXT: %p\n", indent, (void *)packet->slnode.next);
936 if (dumpfn != noop_dumpfn) {
937 fprintf(fp, "PACKET CONTENTS:\n");
938 }
939
940 fwrite(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
, 1, packet->kh_span.size, fp);
941 if (packet->flags & MCREQ_F_HASVALUE) {
942 if (packet->flags & MCREQ_F_VALUE_IOV) {
943 const lcb_IOV *iovs = packet->u_value.multi.iov;
944 unsigned ii, ixmax = packet->u_value.multi.niov;
945 for (ii = 0; ii < ixmax; ii++) {
946 dumpfn(iovs[ii].iov_base, iovs[ii].iov_len, fp);
947 }
948 } else {
949 const nb_SPAN *vspan = &packet->u_value.single;
950 dumpfn(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent
) : ((vspan)->parent->root + (vspan)->offset))
, vspan->size, fp);
951 }
952 }
953}
954
955void
956mcreq_dump_chain(const mc_PIPELINE *pipeline, FILE *fp, mcreq_payload_dump_fn dumpfn)
957{
958 sllist_node *ll;
959 SLLIST_FOREACH(&pipeline->requests, ll)for (ll = (&pipeline->requests)->first_prev.next; ll
; ll = ll->next)
{
960 const mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET
, slnode)))
;
961 mcreq_dump_packet(pkt, fp, dumpfn);
962 }
963}