Summary > Report 379388

Bug Summary

File:src/mc/mcreq.c
Warning:line 875, column 20
Result of 'calloc' is converted to a pointer of type 'mc_PIPELINE', which is incompatible with sizeof operand type 'mc_FALLBACKPL'
Report Bug

Annotated Source Code

1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 * Copyright 2014 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#include "mcreq.h"
19#include "compress.h"
20#include "sllist-inl.h"
21#include "internal.h"
22
23#define PKT_HDRSIZE(pkt)(24 + (pkt)->extlen) (MCREQ_PKT_BASESIZE24 + (pkt)->extlen)
24
25lcb_error_t
26mcreq_reserve_header(
27 mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize)
28{
29 int rv;
30 packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24;
31 packet->kh_span.size = hdrsize;
32 rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
33 if (rv != 0) {
34 return LCB_CLIENT_ENOMEM;
35 }
36 return LCB_SUCCESS;
37}
38
39lcb_error_t
40mcreq_reserve_key(
41 mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize,
42 const lcb_KEYBUF *kreq)
43{
44 const struct lcb_CONTIGBUF *contig = &kreq->contig;
45 int rv;
46
47 /** Set the key offset which is the start of the key from the buffer */
48 packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24;
49 packet->kh_span.size = kreq->contig.nbytes;
50
51 if (kreq->type == LCB_KV_COPY) {
52 /**
53 * If the key is to be copied then just allocate the span size
54 * for the key+24+extras
55 */
56 packet->kh_span.size += hdrsize;
57 rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
58 if (rv != 0) {
59 return LCB_CLIENT_ENOMEM;
60 }
61
62 /**
63 * Copy the key into the packet starting at the extras end
64 */
65 memcpy(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ hdrsize,
66 contig->bytes,
67 contig->nbytes);
68
69 } else if (kreq->type == LCB_KV_CONTIG) {
70 /**
71 * Don't do any copying.
72 * Assume the key buffer has enough space for the packet as well.
73 */
74 CREATE_STANDALONE_SPAN(&packet->kh_span, contig->bytes, contig->nbytes)(&packet->kh_span)->parent = (nb_MBLOCK *) (void *)
contig->bytes; (&packet->kh_span)->offset = (nb_SIZE
)-1; (&packet->kh_span)->size = contig->nbytes;
;
75 packet->flags |= MCREQ_F_KEY_NOCOPY;
76
77 } else {
78 /** IOVs not supported for keys */
79 return LCB_EINVAL;
80 }
81
82 return LCB_SUCCESS;
83}
84
85lcb_error_t
86mcreq_reserve_value2(mc_PIPELINE *pl, mc_PACKET *pkt, lcb_size_t n)
87{
88 int rv;
89 pkt->u_value.single.size = n;
90 if (!n) {
91 return LCB_SUCCESS;
92 }
93
94 pkt->flags |= MCREQ_F_HASVALUE;
95 rv = netbuf_mblock_reserve(&pl->nbmgr, &pkt->u_value.single);
96 if (rv) {
97 return LCB_CLIENT_ENOMEM;
98 }
99 return LCB_SUCCESS;
100}
101
102lcb_error_t
103mcreq_reserve_value(
104 mc_PIPELINE *pipeline, mc_PACKET *packet, const lcb_VALBUF *vreq)
105{
106 const lcb_CONTIGBUF *contig = &vreq->u_buf.contig;
107 nb_SPAN *vspan = &packet->u_value.single;
108 int rv;
109
110 if (vreq->vtype == LCB_KV_COPY) {
111 /** Copy the value into a single SPAN */
112 if (! (vspan->size = vreq->u_buf.contig.nbytes)) {
113 return LCB_SUCCESS;
114 }
115 rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan);
116
117 if (rv != 0) {
118 return LCB_CLIENT_ENOMEM;
119 }
120
121 memcpy(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent
) : ((vspan)->parent->root + (vspan)->offset))
, contig->bytes, contig->nbytes);
122
123 } else if (vreq->vtype == LCB_KV_CONTIG) {
124 /** It's still contiguous so make it a 'standalone' span */
125 CREATE_STANDALONE_SPAN(vspan, contig->bytes, contig->nbytes)(vspan)->parent = (nb_MBLOCK *) (void *)contig->bytes; (
vspan)->offset = (nb_SIZE)-1; (vspan)->size = contig->
nbytes;
;
126 packet->flags |= MCREQ_F_VALUE_NOCOPY;
127
128 } else if (vreq->vtype == LCB_KV_IOV) {
129 /** Multiple spans, no copy */
130 unsigned int ii;
131 const lcb_FRAGBUF *msrc = &vreq->u_buf.multi;
132 lcb_FRAGBUF *mdst = &packet->u_value.multi;
133
134 packet->flags |= MCREQ_F_VALUE_IOV | MCREQ_F_VALUE_NOCOPY;
135 mdst->niov = msrc->niov;
136 mdst->iov = malloc(mdst->niov * sizeof(*mdst->iov));
137 mdst->total_length = 0;
138
139 for (ii = 0; ii < mdst->niov; ii++) {
140 mdst->iov[ii] = msrc->iov[ii];
141 mdst->total_length += mdst->iov[ii].iov_len;
142 }
143 } else if (vreq->vtype == LCB_KV_IOVCOPY) {
144 /** Multiple input buffers, normal copying output buffer */
145 unsigned int ii, cur_offset;
146 const lcb_FRAGBUF *msrc = &vreq->u_buf.multi;
147
148 if (msrc->total_length) {
149 vspan->size = msrc->total_length;
150 } else {
151 vspan->size = 0;
152 for (ii = 0; ii < msrc->niov; ii++) {
153 vspan->size += msrc->iov[ii].iov_len;
154 }
155 }
156
157 rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan);
158 if (rv != 0) {
159 return LCB_CLIENT_ENOMEM;
160 }
161
162 for (ii = 0, cur_offset = 0; ii < msrc->niov; ii++) {
163 char *buf = SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent
) : ((vspan)->parent->root + (vspan)->offset))
+ cur_offset;
164 memcpy(buf, msrc->iov[ii].iov_base, msrc->iov[ii].iov_len);
165 cur_offset += msrc->iov[ii].iov_len;
166 }
167 }
168
169 packet->flags |= MCREQ_F_HASVALUE;
170 return LCB_SUCCESS;
171}
172
173static int
174pkt_tmo_compar(sllist_node *a, sllist_node *b)
175{
176 mc_PACKET *pa, *pb;
177 hrtime_t tmo_a, tmo_b;
178
179 pa = SLLIST_ITEM(a, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(a) - __builtin_offsetof(mc_PACKET
, slnode)))
;
180 pb = SLLIST_ITEM(b, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(b) - __builtin_offsetof(mc_PACKET
, slnode)))
;
181
182 tmo_a = MCREQ_PKT_RDATA(pa)(((pa)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pa)->
u_rdata.exdata) : (&(pa)->u_rdata.reqdata))
->start;
183 tmo_b = MCREQ_PKT_RDATA(pb)(((pb)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pb)->
u_rdata.exdata) : (&(pb)->u_rdata.reqdata))
->start;
184
185 if (tmo_a == tmo_b) {
186 return 0;
187 } else if (tmo_a < tmo_b) {
188 return -1;
189 } else {
190 return 1;
191 }
192}
193
194void
195mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
196{
197 sllist_root *reqs = &pipeline->requests;
198 mcreq_enqueue_packet(pipeline, packet);
199 sllist_remove(reqs, &packet->slnode);
200 sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar);
201}
202
203void
204mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
205{
206 nb_SPAN *vspan = &packet->u_value.single;
207 sllist_append(&pipeline->requests, &packet->slnode);
208 netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span);
209 MC_INCR_METRIC(pipeline, bytes_queued, packet->kh_span.size)do { if ((pipeline)->metrics) { (pipeline)->metrics->
bytes_queued += packet->kh_span.size; } } while (0)
;
210
211 if (!(packet->flags & MCREQ_F_HASVALUE)) {
212 goto GT_ENQUEUE_PDU;
213 }
214
215 if (packet->flags & MCREQ_F_VALUE_IOV) {
216 unsigned int ii;
217 lcb_FRAGBUF *multi = &packet->u_value.multi;
218 for (ii = 0; ii < multi->niov; ii++) {
219 netbuf_enqueue(&pipeline->nbmgr, (nb_IOV *)multi->iov + ii);
220 MC_INCR_METRIC(pipeline, bytes_queued, multi->iov[ii].iov_len)do { if ((pipeline)->metrics) { (pipeline)->metrics->
bytes_queued += multi->iov[ii].iov_len; } } while (0)
;
221 }
222
223 } else if (vspan->size) {
224 MC_INCR_METRIC(pipeline, bytes_queued, vspan->size)do { if ((pipeline)->metrics) { (pipeline)->metrics->
bytes_queued += vspan->size; } } while (0)
;
225 netbuf_enqueue_span(&pipeline->nbmgr, vspan);
226 }
227
228 GT_ENQUEUE_PDU:
229 netbuf_pdu_enqueue(&pipeline->nbmgr, packet, offsetof(mc_PACKET, sl_flushq)__builtin_offsetof(mc_PACKET, sl_flushq));
230 MC_INCR_METRIC(pipeline, packets_queued, 1)do { if ((pipeline)->metrics) { (pipeline)->metrics->
packets_queued += 1; } } while (0)
;
231}
232
233void
234mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
235{
236 if (! (packet->flags & MCREQ_F_KEY_NOCOPY)) {
237 if (packet->flags & MCREQ_F_DETACHED) {
238 free(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
);
239 } else {
240 netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span);
241 }
242 }
243
244 if (! (packet->flags & MCREQ_F_HASVALUE)) {
245 return;
246 }
247
248 if (packet->flags & MCREQ_F_VALUE_NOCOPY) {
249 if (packet->flags & MCREQ_F_VALUE_IOV) {
250 free(packet->u_value.multi.iov);
251 }
252
253 return;
254 }
255
256 if (packet->flags & MCREQ_F_DETACHED) {
257 free(SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1)
? ((char *)(&packet->u_value.single)->parent) : ((
&packet->u_value.single)->parent->root + (&packet
->u_value.single)->offset))
);
258 } else {
259 netbuf_mblock_release(&pipeline->nbmgr, &packet->u_value.single);
260 }
261
262}
263
264mc_PACKET *
265mcreq_allocate_packet(mc_PIPELINE *pipeline)
266{
267 nb_SPAN span;
268 int rv;
269 mc_PACKET *ret;
270 span.size = sizeof(*ret);
271
272 rv = netbuf_mblock_reserve(&pipeline->reqpool, &span);
273 if (rv != 0) {
274 return NULL((void*)0);
275 }
276
277 ret = (void *) SPAN_MBUFFER_NC(&span)((&span)->parent->root + (&span)->offset);
278 ret->alloc_parent = span.parent;
279 ret->flags = 0;
280 ret->retries = 0;
281 ret->opaque = pipeline->parent->seq++;
282 return ret;
283}
284
285void
286mcreq_release_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
287{
288 nb_SPAN span;
289 if (packet->flags & MCREQ_F_DETACHED) {
290 sllist_iterator iter;
291 mc_EXPACKET *epkt = (mc_EXPACKET *)packet;
292
293 SLLIST_ITERFOR(&epkt->data, &iter)for (slist_iter_init(&epkt->data, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&epkt->data
, &iter))
{
294 mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_EPKTDATUM, slnode)))
;
295 sllist_iter_remove(&epkt->data, &iter);
296 d->dtorfn(d);
297 }
298 free(epkt);
299 return;
300 }
301
302 span.size = sizeof(*packet);
303 span.parent = packet->alloc_parent;
304 span.offset = (char *)packet - packet->alloc_parent->root;
305
306 netbuf_mblock_release(&pipeline->reqpool, &span);
307}
308
309#define MCREQ_DETACH_WIPESRC1 1
310
311mc_PACKET *
312mcreq_renew_packet(const mc_PACKET *src)
313{
314 char *kdata, *vdata;
315 unsigned nvdata;
316 mc_PACKET *dst;
317 mc_EXPACKET *edst = calloc(1, sizeof(*edst));
318
319 dst = &edst->base;
320 *dst = *src;
321
322 kdata = malloc(src->kh_span.size);
323 memcpy(kdata, SPAN_BUFFER(&src->kh_span)(((&src->kh_span)->offset == (nb_SIZE)-1) ? ((char *
)(&src->kh_span)->parent) : ((&src->kh_span)
->parent->root + (&src->kh_span)->offset))
, src->kh_span.size);
324 CREATE_STANDALONE_SPAN(&dst->kh_span, kdata, src->kh_span.size)(&dst->kh_span)->parent = (nb_MBLOCK *) (void *)kdata
; (&dst->kh_span)->offset = (nb_SIZE)-1; (&dst->
kh_span)->size = src->kh_span.size;
;
325
326 dst->flags &= ~(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY|MCREQ_F_VALUE_IOV);
327 dst->flags |= MCREQ_F_DETACHED;
328 dst->alloc_parent = NULL((void*)0);
329 dst->sl_flushq.next = NULL((void*)0);
330 dst->slnode.next = NULL((void*)0);
331 dst->retries = src->retries;
332
333 if (src->flags & MCREQ_F_HASVALUE) {
334 /** Get the length */
335 if (src->flags & MCREQ_F_VALUE_IOV) {
336 unsigned ii;
337 unsigned offset = 0;
338
339 nvdata = src->u_value.multi.total_length;
340 vdata = malloc(nvdata);
341 for (ii = 0; ii < src->u_value.multi.niov; ii++) {
342 const lcb_IOV *iov = src->u_value.multi.iov + ii;
343
344 memcpy(vdata + offset, iov->iov_base, iov->iov_len);
345 offset += iov->iov_len;
346 }
347 } else {
348 protocol_binary_request_header hdr;
349 const nb_SPAN *origspan = &src->u_value.single;
350 mcreq_read_hdr(dst, &hdr)memcpy( (&hdr)->bytes, (((&(dst)->kh_span)->
offset == (nb_SIZE)-1) ? ((char *)(&(dst)->kh_span)->
parent) : ((&(dst)->kh_span)->parent->root + (&
(dst)->kh_span)->offset)), sizeof((&hdr)->bytes)
)
;
351
352 if (hdr.request.datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
353 /* For compressed payloads we need to uncompress it first
354 * because it may be forwarded to a server without compression.
355 * TODO: might be more clever to check a setting flag somewhere
356 * and see if we should do this. */
357
358 lcb_SIZE n_inflated;
359 const void *inflated;
360 int rv;
361
362 vdata = NULL((void*)0);
363 rv = mcreq_inflate_value(SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan)
->parent) : ((origspan)->parent->root + (origspan)->
offset))
, origspan->size,
364 &inflated, &n_inflated, (void**)&vdata);
365
366 assert(vdata == inflated)((void) sizeof ((vdata == inflated) ? 1 : 0), __extension__ (
{ if (vdata == inflated) ; else __assert_fail ("vdata == inflated"
, "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 366, __extension__
__PRETTY_FUNCTION__); }))
;
367
368 if (rv != 0) {
369 /* TODO: log error details when snappy will be enabled */
370 free(edst);
371 return NULL((void*)0);
372 }
373 nvdata = n_inflated;
374 hdr.request.datatype &= ~PROTOCOL_BINARY_DATATYPE_COMPRESSED;
375 hdr.request.bodylen = htonl(
376 ntohs(hdr.request.keylen) +
377 hdr.request.extlen +
378 n_inflated);
379 mcreq_write_hdr(dst, &hdr)memcpy( (((&(dst)->kh_span)->offset == (nb_SIZE)-1)
? ((char *)(&(dst)->kh_span)->parent) : ((&(dst
)->kh_span)->parent->root + (&(dst)->kh_span)
->offset)), (&hdr)->bytes, sizeof((&hdr)->bytes
) )
;
380
381 } else {
382 nvdata = origspan->size;
383 vdata = malloc(nvdata);
384 memcpy(vdata, SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan)
->parent) : ((origspan)->parent->root + (origspan)->
offset))
, nvdata);
385 }
386 }
387
388 /* Declare the value as a standalone malloc'd span */
389 CREATE_STANDALONE_SPAN(&dst->u_value.single, vdata, nvdata)(&dst->u_value.single)->parent = (nb_MBLOCK *) (void
*)vdata; (&dst->u_value.single)->offset = (nb_SIZE
)-1; (&dst->u_value.single)->size = nvdata;
;
390 }
391
392 if (src->flags & MCREQ_F_DETACHED) {
393 mc_EXPACKET *esrc = (mc_EXPACKET *)src;
394 sllist_iterator iter;
395 SLLIST_ITERFOR(&esrc->data, &iter)for (slist_iter_init(&esrc->data, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&esrc->data
, &iter))
{
396 sllist_node *cur = iter.cur;
397 sllist_iter_remove(&esrc->data, &iter);
398 sllist_append(&edst->data, cur);
399 }
400 }
401 return dst;
402}
403
404int
405mcreq_epkt_insert(mc_EXPACKET *ep, mc_EPKTDATUM *datum)
406{
407 if (!(ep->base.flags & MCREQ_F_DETACHED)) {
408 return -1;
409 }
410 assert(!sllist_contains(&ep->data, &datum->slnode))((void) sizeof ((!sllist_contains(&ep->data, &datum
->slnode)) ? 1 : 0), __extension__ ({ if (!sllist_contains
(&ep->data, &datum->slnode)) ; else __assert_fail
("!sllist_contains(&ep->data, &datum->slnode)"
, "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 410, __extension__
__PRETTY_FUNCTION__); }))
;
411 sllist_append(&ep->data, &datum->slnode);
412 return 0;
413}
414
415mc_EPKTDATUM *
416mcreq_epkt_find(mc_EXPACKET *ep, const char *key)
417{
418 sllist_iterator iter;
419 SLLIST_ITERFOR(&ep->data, &iter)for (slist_iter_init(&ep->data, &iter); !((&iter
)->cur == ((void*)0)); slist_iter_incr(&ep->data, &
iter))
{
420 mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_EPKTDATUM, slnode)))
;
421 if (!strcmp(key, d->key)) {
422 return d;
423 }
424 }
425 return NULL((void*)0);
426}
427
428void
429mcreq_map_key(mc_CMDQUEUE *queue,
430 const lcb_KEYBUF *key, const lcb_KEYBUF *hashkey,
431 unsigned nhdr, int *vbid, int *srvix)
432{
433 const void *hk;
434 size_t nhk = 0;
435 if (hashkey) {
436 if (hashkey->type == LCB_KV_COPY && hashkey->contig.bytes != NULL((void*)0)) {
437 hk = hashkey->contig.bytes;
438 nhk = hashkey->contig.nbytes;
439 } else if (hashkey->type == LCB_KV_VBID) {
440 *vbid = hashkey->contig.nbytes;
441 *srvix = lcbvb_vbmaster(queue->config, *vbid);
442 return;
443 }
444 }
445 if (!nhk) {
446 if (key->type == LCB_KV_COPY) {
447 hk = key->contig.bytes;
448 nhk = key->contig.nbytes;
449 } else {
450 const char *buf = key->contig.bytes;
451 buf += nhdr;
452 hk = buf;
453 nhk = key->contig.nbytes - nhdr;
454 }
455 }
456 lcbvb_map_key(queue->config, hk, nhk, vbid, srvix);
457}
458
459lcb_error_t
460mcreq_basic_packet(
461 mc_CMDQUEUE *queue, const lcb_CMDBASE *cmd,
462 protocol_binary_request_header *req, lcb_uint8_t extlen,
463 mc_PACKET **packet, mc_PIPELINE **pipeline, int options)
464{
465 int vb, srvix;
466
467 if (!queue->config) {
468 return LCB_CLIENT_ETMPFAILLCB_CLIENT_ENOCONF;
469 }
470 if (!cmd) {
471 return LCB_EINVAL;
472 }
473
474 mcreq_map_key(queue, &cmd->key, &cmd->_hashkey,
475 sizeof(*req) + extlen, &vb, &srvix);
476 if (srvix > -1 && srvix < (int)queue->npipelines) {
477 *pipeline = queue->pipelines[srvix];
478
479 } else {
480 if ((options & MCREQ_BASICPACKET_F_FALLBACKOK0x01) && queue->fallback) {
481 *pipeline = queue->fallback;
482 } else {
483 return LCB_NO_MATCHING_SERVER;
484 }
485 }
486
487 *packet = mcreq_allocate_packet(*pipeline);
488 if (*packet == NULL((void*)0)) {
489 return LCB_CLIENT_ENOMEM;
490 }
491
492 mcreq_reserve_key(*pipeline, *packet, sizeof(*req) + extlen, &cmd->key);
493
494 req->request.keylen = htons((*packet)->kh_span.size - PKT_HDRSIZE(*packet)(24 + (*packet)->extlen));
495 req->request.vbucket = htons(vb);
496 req->request.extlen = extlen;
497 return LCB_SUCCESS;
498}
499
500void
501mcreq_get_key(const mc_PACKET *packet, const void **key, lcb_size_t *nkey)
502{
503 *key = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ PKT_HDRSIZE(packet)(24 + (packet)->extlen);
504 *nkey = packet->kh_span.size - PKT_HDRSIZE(packet)(24 + (packet)->extlen);
505}
506
507lcb_uint32_t
508mcreq_get_bodysize(const mc_PACKET *packet)
509{
510 lcb_uint32_t ret;
511 char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ 8;
512 if ((uintptr_t)retptr % sizeof(ret) == 0) {
513 return ntohl(*(lcb_uint32_t*) (void *)retptr);
514 } else {
515 memcpy(&ret, retptr, sizeof(ret));
516 return ntohl(ret);
517 }
518}
519
520uint16_t
521mcreq_get_vbucket(const mc_PACKET *packet)
522{
523 uint16_t ret;
524 char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
+ 6;
525 if ((uintptr_t)retptr % sizeof(ret) == 0) {
526 return ntohs(*(uint16_t*)(void*)retptr);
527 } else {
528 memcpy(&ret, retptr, sizeof ret);
529 return ntohs(ret);
530 }
531}
532
533uint32_t
534mcreq_get_size(const mc_PACKET *packet)
535{
536 uint32_t sz = packet->kh_span.size;
537 if (packet->flags & MCREQ_F_HASVALUE) {
538 if (packet->flags & MCREQ_F_VALUE_IOV) {
539 sz += packet->u_value.multi.total_length;
540 } else {
541 sz += packet->u_value.single.size;
542 }
543 }
544 return sz;
545}
546
547void
548mcreq_pipeline_cleanup(mc_PIPELINE *pipeline)
549{
550 netbuf_cleanup(&pipeline->nbmgr);
551 netbuf_cleanup(&pipeline->reqpool);
552}
553
554int
555mcreq_pipeline_init(mc_PIPELINE *pipeline)
556{
557 nb_SETTINGS settings;
558
559 /* Initialize all members to 0 */
560 memset(&pipeline->requests, 0, sizeof pipeline->requests);
561 pipeline->parent = NULL((void*)0);
562 pipeline->flush_start = NULL((void*)0);
563 pipeline->index = 0;
564 memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued);
565 pipeline->buf_done_callback = NULL((void*)0);
566
567 netbuf_default_settings(&settings);
568
569 /** Initialize datapool */
570 netbuf_init(&pipeline->nbmgr, &settings);
571
572 /** Initialize request pool */
573 settings.data_basealloc = sizeof(mc_PACKET) * 32;
574 netbuf_init(&pipeline->reqpool, &settings);;
575 pipeline->metrics = NULL((void*)0);
576 return 0;
577}
578
579void
580mcreq_queue_add_pipelines(
581 mc_CMDQUEUE *queue, mc_PIPELINE * const *pipelines, unsigned npipelines,
582 lcbvb_CONFIG* config)
583{
584 unsigned ii;
585
586 lcb_assert(queue->pipelines == NULL)((void) sizeof ((queue->pipelines == ((void*)0)) ? 1 : 0),
__extension__ ({ if (queue->pipelines == ((void*)0)) ; else
__assert_fail ("queue->pipelines == ((void*)0)", "/home/avsej/code/libcouchbase/src/mc/mcreq.c"
, 586, __extension__ __PRETTY_FUNCTION__); }))
;
587 queue->npipelines = npipelines;
588 queue->_npipelines_ex = queue->npipelines;
589 queue->pipelines = malloc(sizeof(*pipelines) * (npipelines + 1));
590 queue->config = config;
591
592 memcpy(queue->pipelines, pipelines, sizeof(*pipelines) * npipelines);
593
594 free(queue->scheds);
595 queue->scheds = calloc(npipelines+1, 1);
596
597 for (ii = 0; ii < npipelines; ii++) {
598 pipelines[ii]->parent = queue;
599 pipelines[ii]->index = ii;
600 }
601
602 if (queue->fallback) {
603 queue->fallback->index = npipelines;
604 queue->pipelines[queue->npipelines] = queue->fallback;
605 queue->_npipelines_ex++;
606 }
607}
608
609mc_PIPELINE **
610mcreq_queue_take_pipelines(mc_CMDQUEUE *queue, unsigned *count)
611{
612 mc_PIPELINE **ret = queue->pipelines;
613 *count = queue->npipelines;
614 queue->pipelines = NULL((void*)0);
615 queue->npipelines = 0;
616 return ret;
617}
618
619int
620mcreq_queue_init(mc_CMDQUEUE *queue)
621{
622 queue->seq = 0;
623 queue->pipelines = NULL((void*)0);
624 queue->scheds = NULL((void*)0);
625 queue->fallback = NULL((void*)0);
626 queue->npipelines = 0;
627 return 0;
628}
629
630void
631mcreq_queue_cleanup(mc_CMDQUEUE *queue)
632{
633 if (queue->fallback) {
634 mcreq_pipeline_cleanup(queue->fallback);
635 free(queue->fallback);
636 queue->fallback = NULL((void*)0);
637 }
638 free(queue->scheds);
639 free(queue->pipelines);
640 queue->scheds = NULL((void*)0);
641}
642
643void
644mcreq_sched_enter(mc_CMDQUEUE *queue)
645{
646 queue->ctxenter = 1;
647}
648
649
650
651static void
652queuectx_leave(mc_CMDQUEUE *queue, int success, int flush)
653{
654 unsigned ii;
655
656 if (queue->ctxenter) {
657 queue->ctxenter = 0;
658 }
659
660 for (ii = 0; ii < queue->_npipelines_ex; ii++) {
661 mc_PIPELINE *pipeline;
662 sllist_node *ll_next, *ll;
663
664 if (!queue->scheds[ii]) {
665 continue;
666 }
667
668 pipeline = queue->pipelines[ii];
669 ll = SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next;
670
671 while (ll) {
672 mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET
, slnode)))
;
673 ll_next = ll->next;
674
675 if (success) {
676 mcreq_enqueue_packet(pipeline, pkt);
677 } else {
678 if (pkt->flags & MCREQ_F_REQEXT) {
679 mc_REQDATAEX *rd = pkt->u_rdata.exdata;
680 if (rd->procs->fail_dtor) {
681 rd->procs->fail_dtor(pkt);
682 }
683 }
684 mcreq_wipe_packet(pipeline, pkt);
685 mcreq_release_packet(pipeline, pkt);
686 }
687
688 ll = ll_next;
689 }
690 SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next = pipeline->ctxqueued.last = NULL((void*)0);
691 if (flush) {
692 pipeline->flush_start(pipeline);
693 }
694 queue->scheds[ii] = 0;
695 }
696}
697
698void
699mcreq_sched_leave(mc_CMDQUEUE *queue, int do_flush)
700{
701 queuectx_leave(queue, 1, do_flush);
702}
703
704void
705mcreq_sched_fail(mc_CMDQUEUE *queue)
706{
707 queuectx_leave(queue, 0, 0);
708}
709
710void
711mcreq_sched_add(mc_PIPELINE *pipeline, mc_PACKET *pkt)
712{
713 mc_CMDQUEUE *cq = pipeline->parent;
714 if (!cq->scheds[pipeline->index]) {
715 cq->scheds[pipeline->index] = 1;
716 }
717 sllist_append(&pipeline->ctxqueued, &pkt->slnode);
718}
719
720static mc_PACKET *
721pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque, int do_remove)
722{
723 sllist_iterator iter;
724 SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); !
((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline
->requests, &iter))
{
725 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
726 if (pkt->opaque == opaque) {
727 if (do_remove) {
728 sllist_iter_remove(&pipeline->requests, &iter);
729 }
730 return pkt;
731 }
732 }
733 return NULL((void*)0);
734}
735
736mc_PACKET *
737mcreq_pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque)
738{
739 return pipeline_find(pipeline, opaque, 0);
740}
741
742mc_PACKET *
743mcreq_pipeline_remove(mc_PIPELINE *pipeline, lcb_uint32_t opaque)
744{
745 return pipeline_find(pipeline, opaque, 1);
746}
747
748void
749mcreq_packet_done(mc_PIPELINE *pipeline, mc_PACKET *pkt)
750{
751 assert(pkt->flags & MCREQ_F_FLUSHED)((void) sizeof ((pkt->flags & MCREQ_F_FLUSHED) ? 1 : 0
), __extension__ ({ if (pkt->flags & MCREQ_F_FLUSHED) ;
else __assert_fail ("pkt->flags & MCREQ_F_FLUSHED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c"
, 751, __extension__ __PRETTY_FUNCTION__); }))
;
752 assert(pkt->flags & MCREQ_F_INVOKED)((void) sizeof ((pkt->flags & MCREQ_F_INVOKED) ? 1 : 0
), __extension__ ({ if (pkt->flags & MCREQ_F_INVOKED) ;
else __assert_fail ("pkt->flags & MCREQ_F_INVOKED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c"
, 752, __extension__ __PRETTY_FUNCTION__); }))
;
753 if (pkt->flags & MCREQ_UBUF_FLAGS(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY)) {
754 void *kbuf, *vbuf;
755 const void *cookie;
756
757 cookie = MCREQ_PKT_COOKIE(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt
)->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))->
cookie
;
758 if (pkt->flags & MCREQ_F_KEY_NOCOPY) {
759 kbuf = SPAN_BUFFER(&pkt->kh_span)(((&pkt->kh_span)->offset == (nb_SIZE)-1) ? ((char *
)(&pkt->kh_span)->parent) : ((&pkt->kh_span)
->parent->root + (&pkt->kh_span)->offset))
;
760 } else {
761 kbuf = NULL((void*)0);
762 }
763 if (pkt->flags & MCREQ_F_VALUE_NOCOPY) {
764 if (pkt->flags & MCREQ_F_VALUE_IOV) {
765 vbuf = pkt->u_value.multi.iov->iov_base;
766 } else {
767 vbuf = SPAN_SABUFFER_NC(&pkt->u_value.single)((char *)(&pkt->u_value.single)->parent);
768 }
769 } else {
770 vbuf = NULL((void*)0);
771 }
772
773 pipeline->buf_done_callback(pipeline, cookie, kbuf, vbuf);
774 }
775 mcreq_wipe_packet(pipeline, pkt);
776 mcreq_release_packet(pipeline, pkt);
777}
778
779void
780mcreq_reset_timeouts(mc_PIPELINE *pl, lcb_U64 nstime)
781{
782 sllist_node *nn;
783 SLLIST_ITERBASIC(&pl->requests, nn)for (nn = (&pl->requests)->first_prev.next; nn; nn =
nn->next)
{
784 mc_PACKET *pkt = SLLIST_ITEM(nn, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(nn) - __builtin_offsetof(mc_PACKET
, slnode)))
;
785 MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt
)->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))
->start = nstime;
786 }
787}
788
789unsigned
790mcreq_pipeline_timeout(
791 mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *cbarg,
792 hrtime_t oldest_valid, hrtime_t *oldest_start)
793{
794 sllist_iterator iter;
795 unsigned count = 0;
796
797 SLLIST_ITERFOR(&pl->requests, &iter)for (slist_iter_init(&pl->requests, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&pl->requests
, &iter))
{
798 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
799 mc_REQDATA *rd = MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt
)->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))
;
800
801 /**
802 * oldest_valid contains the LOWEST timestamp we can admit to being
803 * acceptable. If the current command is newer (i.e. has a higher
804 * timestamp) then we break the iteration and return.
805 */
806 if (oldest_valid && rd->start > oldest_valid) {
807 if (oldest_start) {
808 *oldest_start = rd->start;
809 }
810 return count;
811 }
812
813 sllist_iter_remove(&pl->requests, &iter);
814 failcb(pl, pkt, err, cbarg);
815 mcreq_packet_handled(pl, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags &
MCREQ_F_FLUSHED) { mcreq_packet_done(pl, pkt); } } while (0)
;
;
816 count++;
817 }
818 return count;
819}
820
821unsigned
822mcreq_pipeline_fail(
823 mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *arg)
824{
825 return mcreq_pipeline_timeout(pl, err, failcb, arg, 0, NULL((void*)0));
826}
827
828void
829mcreq_iterwipe(mc_CMDQUEUE *queue, mc_PIPELINE *src,
830 mcreq_iterwipe_fn callback, void *arg)
831{
832 sllist_iterator iter;
833
834 SLLIST_ITERFOR(&src->requests, &iter)for (slist_iter_init(&src->requests, &iter); !((&
iter)->cur == ((void*)0)); slist_iter_incr(&src->requests
, &iter))
{
835 int rv;
836 mc_PACKET *orig = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
837 rv = callback(queue, src, orig, arg);
838 if (rv == MCREQ_REMOVE_PACKET2) {
839 sllist_iter_remove(&src->requests, &iter);
840 }
841 }
842}
843
844#include "mcreq-flush-inl.h"
845typedef struct {
846 mc_PIPELINE base;
847 mcreq_fallback_cb handler;
848} mc_FALLBACKPL;
849
850static void
851do_fallback_flush(mc_PIPELINE *pipeline)
852{
853 nb_IOV iov;
854 unsigned nb;
855 int nused;
856 sllist_iterator iter;
857 mc_FALLBACKPL *fpl = (mc_FALLBACKPL*)pipeline;
858
859 while ((nb = mcreq_flush_iov_fill(pipeline, &iov, 1, &nused))) {
860 mcreq_flush_done(pipeline, nb, nb);
861 }
862 /* Now handle all the packets, for real */
863 SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); !
((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline
->requests, &iter))
{
864 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof
(mc_PACKET, slnode)))
;
865 fpl->handler(pipeline->parent, pkt);
866 sllist_iter_remove(&pipeline->requests, &iter);
867 mcreq_packet_handled(pipeline, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags &
MCREQ_F_FLUSHED) { mcreq_packet_done(pipeline, pkt); } } while
(0);
;
868 }
869}
870
871void
872mcreq_set_fallback_handler(mc_CMDQUEUE *cq, mcreq_fallback_cb handler)
873{
874 assert(!cq->fallback)((void) sizeof ((!cq->fallback) ? 1 : 0), __extension__ ({
if (!cq->fallback) ; else __assert_fail ("!cq->fallback"
, "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 874, __extension__
__PRETTY_FUNCTION__); }))
;
875 cq->fallback = calloc(1, sizeof (mc_FALLBACKPL));
Result of 'calloc' is converted to a pointer of type 'mc_PIPELINE', which is incompatible with sizeof operand type 'mc_FALLBACKPL'
876 mcreq_pipeline_init(cq->fallback);
877 cq->fallback->parent = cq;
878 cq->fallback->index = cq->npipelines;
879 ((mc_FALLBACKPL*)cq->fallback)->handler = handler;
880 cq->fallback->flush_start = do_fallback_flush;
881}
882
883static void
884noop_dumpfn(const void *d, unsigned n, FILE *fp) { (void)d;(void)n;(void)fp; }
885
886#define MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT
) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED)
\
887 X(KEY_NOCOPY) \
888 X(VALUE_NOCOPY) \
889 X(VALUE_IOV) \
890 X(HASVALUE) \
891 X(REQEXT) \
892 X(UFWD) \
893 X(FLUSHED) \
894 X(INVOKED) \
895 X(DETACHED)
896
897void
898mcreq_dump_packet(const mc_PACKET *packet, FILE *fp, mcreq_payload_dump_fn dumpfn)
899{
900 const char *indent = " ";
901 const mc_REQDATA *rdata = MCREQ_PKT_RDATA(packet)(((packet)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(
packet)->u_rdata.exdata) : (&(packet)->u_rdata.reqdata
))
;
902
903 if (!dumpfn) {
904 dumpfn = noop_dumpfn;
905 }
906 if (!fp) {
907 fp = stderrstderr;
908 }
909
910 fprintf(fp, "Packet @%p\n", (void *)packet);
911 fprintf(fp, "%sOPAQUE: %u\n", indent, (unsigned int)packet->opaque);
912
913 fprintf(fp, "%sPKTFLAGS: 0x%x ", indent, packet->flags);
914 #define X(base) \
915 if (packet->flags & MCREQ_F_##base) { fprintf(fp, "%s, ", #base); }
916 MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT
) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED)
917 #undef X
918 fprintf(fp, "\n");
919
920 fprintf(fp, "%sKey+Header Size: %u\n", indent, (unsigned int)packet->kh_span.size);
921 fprintf(fp, "%sKey Offset: %u\n", indent, MCREQ_PKT_BASESIZE24 + packet->extlen);
922
923
924 if (packet->flags & MCREQ_F_HASVALUE) {
925 if (packet->flags & MCREQ_F_VALUE_IOV) {
926 fprintf(fp, "%sValue Length: %u\n", indent,
927 packet->u_value.multi.total_length);
928
929 fprintf(fp, "%sValue IOV: [start=%p, n=%d]\n", indent,
930 (void *)packet->u_value.multi.iov, packet->u_value.multi.niov);
931 } else {
932 if (packet->flags & MCREQ_F_VALUE_NOCOPY) {
933 fprintf(fp, "%sValue is user allocated\n", indent);
934 }
935 fprintf(fp, "%sValue: %p, %u bytes\n", indent,
936 (void *)SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1)
? ((char *)(&packet->u_value.single)->parent) : ((
&packet->u_value.single)->parent->root + (&packet
->u_value.single)->offset))
, packet->u_value.single.size);
937 }
938 }
939
940 fprintf(fp, "%sRDATA(%s): %p\n", indent,
941 (packet->flags & MCREQ_F_REQEXT) ? "ALLOC" : "EMBEDDED", (void *)rdata);
942
943 indent = " ";
944 fprintf(fp, "%sStart: %lu\n", indent, (unsigned long)rdata->start);
945 fprintf(fp, "%sCookie: %p\n", indent, rdata->cookie);
946
947 indent = " ";
948 fprintf(fp, "%sNEXT: %p\n", indent, (void *)packet->slnode.next);
949 if (dumpfn != noop_dumpfn) {
950 fprintf(fp, "PACKET CONTENTS:\n");
951 }
952
953 fwrite(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char
*)(&packet->kh_span)->parent) : ((&packet->
kh_span)->parent->root + (&packet->kh_span)->
offset))
, 1, packet->kh_span.size, fp);
954 if (packet->flags & MCREQ_F_HASVALUE) {
955 if (packet->flags & MCREQ_F_VALUE_IOV) {
956 const lcb_IOV *iovs = packet->u_value.multi.iov;
957 unsigned ii, ixmax = packet->u_value.multi.niov;
958 for (ii = 0; ii < ixmax; ii++) {
959 dumpfn(iovs[ii].iov_base, iovs[ii].iov_len, fp);
960 }
961 } else {
962 const nb_SPAN *vspan = &packet->u_value.single;
963 dumpfn(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent
) : ((vspan)->parent->root + (vspan)->offset))
, vspan->size, fp);
964 }
965 }
966}
967
968void
969mcreq_dump_chain(const mc_PIPELINE *pipeline, FILE *fp, mcreq_payload_dump_fn dumpfn)
970{
971 sllist_node *ll;
972 SLLIST_FOREACH(&pipeline->requests, ll)for (ll = (&pipeline->requests)->first_prev.next; ll
; ll = ll->next)
{
973 const mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET
, slnode)))
;
974 mcreq_dump_packet(pkt, fp, dumpfn);
975 }
976}