File: | src/mc/mcreq.c |
Warning: | line 875, column 20 Result of 'calloc' is converted to a pointer of type 'mc_PIPELINE', which is incompatible with sizeof operand type 'mc_FALLBACKPL' |
1 | /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | /* |
3 | * Copyright 2014 Couchbase, Inc. |
4 | * |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | * you may not use this file except in compliance with the License. |
7 | * You may obtain a copy of the License at |
8 | * |
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | * |
11 | * Unless required by applicable law or agreed to in writing, software |
12 | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | * See the License for the specific language governing permissions and |
15 | * limitations under the License. |
16 | */ |
17 | |
18 | #include "mcreq.h" |
19 | #include "compress.h" |
20 | #include "sllist-inl.h" |
21 | #include "internal.h" |
22 | |
23 | #define PKT_HDRSIZE(pkt)(24 + (pkt)->extlen) (MCREQ_PKT_BASESIZE24 + (pkt)->extlen) |
24 | |
25 | lcb_error_t |
26 | mcreq_reserve_header( |
27 | mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize) |
28 | { |
29 | int rv; |
30 | packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24; |
31 | packet->kh_span.size = hdrsize; |
32 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span); |
33 | if (rv != 0) { |
34 | return LCB_CLIENT_ENOMEM; |
35 | } |
36 | return LCB_SUCCESS; |
37 | } |
38 | |
39 | lcb_error_t |
40 | mcreq_reserve_key( |
41 | mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize, |
42 | const lcb_KEYBUF *kreq) |
43 | { |
44 | const struct lcb_CONTIGBUF *contig = &kreq->contig; |
45 | int rv; |
46 | |
47 | /** Set the key offset which is the start of the key from the buffer */ |
48 | packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24; |
49 | packet->kh_span.size = kreq->contig.nbytes; |
50 | |
51 | if (kreq->type == LCB_KV_COPY) { |
52 | /** |
53 | * If the key is to be copied then just allocate the span size |
54 | * for the key+24+extras |
55 | */ |
56 | packet->kh_span.size += hdrsize; |
57 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span); |
58 | if (rv != 0) { |
59 | return LCB_CLIENT_ENOMEM; |
60 | } |
61 | |
62 | /** |
63 | * Copy the key into the packet starting at the extras end |
64 | */ |
65 | memcpy(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + hdrsize, |
66 | contig->bytes, |
67 | contig->nbytes); |
68 | |
69 | } else if (kreq->type == LCB_KV_CONTIG) { |
70 | /** |
71 | * Don't do any copying. |
72 | * Assume the key buffer has enough space for the packet as well. |
73 | */ |
74 | CREATE_STANDALONE_SPAN(&packet->kh_span, contig->bytes, contig->nbytes)(&packet->kh_span)->parent = (nb_MBLOCK *) (void *) contig->bytes; (&packet->kh_span)->offset = (nb_SIZE )-1; (&packet->kh_span)->size = contig->nbytes;; |
75 | packet->flags |= MCREQ_F_KEY_NOCOPY; |
76 | |
77 | } else { |
78 | /** IOVs not supported for keys */ |
79 | return LCB_EINVAL; |
80 | } |
81 | |
82 | return LCB_SUCCESS; |
83 | } |
84 | |
85 | lcb_error_t |
86 | mcreq_reserve_value2(mc_PIPELINE *pl, mc_PACKET *pkt, lcb_size_t n) |
87 | { |
88 | int rv; |
89 | pkt->u_value.single.size = n; |
90 | if (!n) { |
91 | return LCB_SUCCESS; |
92 | } |
93 | |
94 | pkt->flags |= MCREQ_F_HASVALUE; |
95 | rv = netbuf_mblock_reserve(&pl->nbmgr, &pkt->u_value.single); |
96 | if (rv) { |
97 | return LCB_CLIENT_ENOMEM; |
98 | } |
99 | return LCB_SUCCESS; |
100 | } |
101 | |
102 | lcb_error_t |
103 | mcreq_reserve_value( |
104 | mc_PIPELINE *pipeline, mc_PACKET *packet, const lcb_VALBUF *vreq) |
105 | { |
106 | const lcb_CONTIGBUF *contig = &vreq->u_buf.contig; |
107 | nb_SPAN *vspan = &packet->u_value.single; |
108 | int rv; |
109 | |
110 | if (vreq->vtype == LCB_KV_COPY) { |
111 | /** Copy the value into a single SPAN */ |
112 | if (! (vspan->size = vreq->u_buf.contig.nbytes)) { |
113 | return LCB_SUCCESS; |
114 | } |
115 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan); |
116 | |
117 | if (rv != 0) { |
118 | return LCB_CLIENT_ENOMEM; |
119 | } |
120 | |
121 | memcpy(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent ) : ((vspan)->parent->root + (vspan)->offset)), contig->bytes, contig->nbytes); |
122 | |
123 | } else if (vreq->vtype == LCB_KV_CONTIG) { |
124 | /** It's still contiguous so make it a 'standalone' span */ |
125 | CREATE_STANDALONE_SPAN(vspan, contig->bytes, contig->nbytes)(vspan)->parent = (nb_MBLOCK *) (void *)contig->bytes; ( vspan)->offset = (nb_SIZE)-1; (vspan)->size = contig-> nbytes;; |
126 | packet->flags |= MCREQ_F_VALUE_NOCOPY; |
127 | |
128 | } else if (vreq->vtype == LCB_KV_IOV) { |
129 | /** Multiple spans, no copy */ |
130 | unsigned int ii; |
131 | const lcb_FRAGBUF *msrc = &vreq->u_buf.multi; |
132 | lcb_FRAGBUF *mdst = &packet->u_value.multi; |
133 | |
134 | packet->flags |= MCREQ_F_VALUE_IOV | MCREQ_F_VALUE_NOCOPY; |
135 | mdst->niov = msrc->niov; |
136 | mdst->iov = malloc(mdst->niov * sizeof(*mdst->iov)); |
137 | mdst->total_length = 0; |
138 | |
139 | for (ii = 0; ii < mdst->niov; ii++) { |
140 | mdst->iov[ii] = msrc->iov[ii]; |
141 | mdst->total_length += mdst->iov[ii].iov_len; |
142 | } |
143 | } else if (vreq->vtype == LCB_KV_IOVCOPY) { |
144 | /** Multiple input buffers, normal copying output buffer */ |
145 | unsigned int ii, cur_offset; |
146 | const lcb_FRAGBUF *msrc = &vreq->u_buf.multi; |
147 | |
148 | if (msrc->total_length) { |
149 | vspan->size = msrc->total_length; |
150 | } else { |
151 | vspan->size = 0; |
152 | for (ii = 0; ii < msrc->niov; ii++) { |
153 | vspan->size += msrc->iov[ii].iov_len; |
154 | } |
155 | } |
156 | |
157 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan); |
158 | if (rv != 0) { |
159 | return LCB_CLIENT_ENOMEM; |
160 | } |
161 | |
162 | for (ii = 0, cur_offset = 0; ii < msrc->niov; ii++) { |
163 | char *buf = SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent ) : ((vspan)->parent->root + (vspan)->offset)) + cur_offset; |
164 | memcpy(buf, msrc->iov[ii].iov_base, msrc->iov[ii].iov_len); |
165 | cur_offset += msrc->iov[ii].iov_len; |
166 | } |
167 | } |
168 | |
169 | packet->flags |= MCREQ_F_HASVALUE; |
170 | return LCB_SUCCESS; |
171 | } |
172 | |
173 | static int |
174 | pkt_tmo_compar(sllist_node *a, sllist_node *b) |
175 | { |
176 | mc_PACKET *pa, *pb; |
177 | hrtime_t tmo_a, tmo_b; |
178 | |
179 | pa = SLLIST_ITEM(a, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(a) - __builtin_offsetof(mc_PACKET , slnode))); |
180 | pb = SLLIST_ITEM(b, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(b) - __builtin_offsetof(mc_PACKET , slnode))); |
181 | |
182 | tmo_a = MCREQ_PKT_RDATA(pa)(((pa)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pa)-> u_rdata.exdata) : (&(pa)->u_rdata.reqdata))->start; |
183 | tmo_b = MCREQ_PKT_RDATA(pb)(((pb)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pb)-> u_rdata.exdata) : (&(pb)->u_rdata.reqdata))->start; |
184 | |
185 | if (tmo_a == tmo_b) { |
186 | return 0; |
187 | } else if (tmo_a < tmo_b) { |
188 | return -1; |
189 | } else { |
190 | return 1; |
191 | } |
192 | } |
193 | |
194 | void |
195 | mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) |
196 | { |
197 | sllist_root *reqs = &pipeline->requests; |
198 | mcreq_enqueue_packet(pipeline, packet); |
199 | sllist_remove(reqs, &packet->slnode); |
200 | sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar); |
201 | } |
202 | |
203 | void |
204 | mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) |
205 | { |
206 | nb_SPAN *vspan = &packet->u_value.single; |
207 | sllist_append(&pipeline->requests, &packet->slnode); |
208 | netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span); |
209 | MC_INCR_METRIC(pipeline, bytes_queued, packet->kh_span.size)do { if ((pipeline)->metrics) { (pipeline)->metrics-> bytes_queued += packet->kh_span.size; } } while (0); |
210 | |
211 | if (!(packet->flags & MCREQ_F_HASVALUE)) { |
212 | goto GT_ENQUEUE_PDU; |
213 | } |
214 | |
215 | if (packet->flags & MCREQ_F_VALUE_IOV) { |
216 | unsigned int ii; |
217 | lcb_FRAGBUF *multi = &packet->u_value.multi; |
218 | for (ii = 0; ii < multi->niov; ii++) { |
219 | netbuf_enqueue(&pipeline->nbmgr, (nb_IOV *)multi->iov + ii); |
220 | MC_INCR_METRIC(pipeline, bytes_queued, multi->iov[ii].iov_len)do { if ((pipeline)->metrics) { (pipeline)->metrics-> bytes_queued += multi->iov[ii].iov_len; } } while (0); |
221 | } |
222 | |
223 | } else if (vspan->size) { |
224 | MC_INCR_METRIC(pipeline, bytes_queued, vspan->size)do { if ((pipeline)->metrics) { (pipeline)->metrics-> bytes_queued += vspan->size; } } while (0); |
225 | netbuf_enqueue_span(&pipeline->nbmgr, vspan); |
226 | } |
227 | |
228 | GT_ENQUEUE_PDU: |
229 | netbuf_pdu_enqueue(&pipeline->nbmgr, packet, offsetof(mc_PACKET, sl_flushq)__builtin_offsetof(mc_PACKET, sl_flushq)); |
230 | MC_INCR_METRIC(pipeline, packets_queued, 1)do { if ((pipeline)->metrics) { (pipeline)->metrics-> packets_queued += 1; } } while (0); |
231 | } |
232 | |
233 | void |
234 | mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) |
235 | { |
236 | if (! (packet->flags & MCREQ_F_KEY_NOCOPY)) { |
237 | if (packet->flags & MCREQ_F_DETACHED) { |
238 | free(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset))); |
239 | } else { |
240 | netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span); |
241 | } |
242 | } |
243 | |
244 | if (! (packet->flags & MCREQ_F_HASVALUE)) { |
245 | return; |
246 | } |
247 | |
248 | if (packet->flags & MCREQ_F_VALUE_NOCOPY) { |
249 | if (packet->flags & MCREQ_F_VALUE_IOV) { |
250 | free(packet->u_value.multi.iov); |
251 | } |
252 | |
253 | return; |
254 | } |
255 | |
256 | if (packet->flags & MCREQ_F_DETACHED) { |
257 | free(SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1) ? ((char *)(&packet->u_value.single)->parent) : (( &packet->u_value.single)->parent->root + (&packet ->u_value.single)->offset))); |
258 | } else { |
259 | netbuf_mblock_release(&pipeline->nbmgr, &packet->u_value.single); |
260 | } |
261 | |
262 | } |
263 | |
264 | mc_PACKET * |
265 | mcreq_allocate_packet(mc_PIPELINE *pipeline) |
266 | { |
267 | nb_SPAN span; |
268 | int rv; |
269 | mc_PACKET *ret; |
270 | span.size = sizeof(*ret); |
271 | |
272 | rv = netbuf_mblock_reserve(&pipeline->reqpool, &span); |
273 | if (rv != 0) { |
274 | return NULL((void*)0); |
275 | } |
276 | |
277 | ret = (void *) SPAN_MBUFFER_NC(&span)((&span)->parent->root + (&span)->offset); |
278 | ret->alloc_parent = span.parent; |
279 | ret->flags = 0; |
280 | ret->retries = 0; |
281 | ret->opaque = pipeline->parent->seq++; |
282 | return ret; |
283 | } |
284 | |
285 | void |
286 | mcreq_release_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) |
287 | { |
288 | nb_SPAN span; |
289 | if (packet->flags & MCREQ_F_DETACHED) { |
290 | sllist_iterator iter; |
291 | mc_EXPACKET *epkt = (mc_EXPACKET *)packet; |
292 | |
293 | SLLIST_ITERFOR(&epkt->data, &iter)for (slist_iter_init(&epkt->data, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&epkt->data , &iter)) { |
294 | mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_EPKTDATUM, slnode))); |
295 | sllist_iter_remove(&epkt->data, &iter); |
296 | d->dtorfn(d); |
297 | } |
298 | free(epkt); |
299 | return; |
300 | } |
301 | |
302 | span.size = sizeof(*packet); |
303 | span.parent = packet->alloc_parent; |
304 | span.offset = (char *)packet - packet->alloc_parent->root; |
305 | |
306 | netbuf_mblock_release(&pipeline->reqpool, &span); |
307 | } |
308 | |
309 | #define MCREQ_DETACH_WIPESRC1 1 |
310 | |
311 | mc_PACKET * |
312 | mcreq_renew_packet(const mc_PACKET *src) |
313 | { |
314 | char *kdata, *vdata; |
315 | unsigned nvdata; |
316 | mc_PACKET *dst; |
317 | mc_EXPACKET *edst = calloc(1, sizeof(*edst)); |
318 | |
319 | dst = &edst->base; |
320 | *dst = *src; |
321 | |
322 | kdata = malloc(src->kh_span.size); |
323 | memcpy(kdata, SPAN_BUFFER(&src->kh_span)(((&src->kh_span)->offset == (nb_SIZE)-1) ? ((char * )(&src->kh_span)->parent) : ((&src->kh_span) ->parent->root + (&src->kh_span)->offset)), src->kh_span.size); |
324 | CREATE_STANDALONE_SPAN(&dst->kh_span, kdata, src->kh_span.size)(&dst->kh_span)->parent = (nb_MBLOCK *) (void *)kdata ; (&dst->kh_span)->offset = (nb_SIZE)-1; (&dst-> kh_span)->size = src->kh_span.size;; |
325 | |
326 | dst->flags &= ~(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY|MCREQ_F_VALUE_IOV); |
327 | dst->flags |= MCREQ_F_DETACHED; |
328 | dst->alloc_parent = NULL((void*)0); |
329 | dst->sl_flushq.next = NULL((void*)0); |
330 | dst->slnode.next = NULL((void*)0); |
331 | dst->retries = src->retries; |
332 | |
333 | if (src->flags & MCREQ_F_HASVALUE) { |
334 | /** Get the length */ |
335 | if (src->flags & MCREQ_F_VALUE_IOV) { |
336 | unsigned ii; |
337 | unsigned offset = 0; |
338 | |
339 | nvdata = src->u_value.multi.total_length; |
340 | vdata = malloc(nvdata); |
341 | for (ii = 0; ii < src->u_value.multi.niov; ii++) { |
342 | const lcb_IOV *iov = src->u_value.multi.iov + ii; |
343 | |
344 | memcpy(vdata + offset, iov->iov_base, iov->iov_len); |
345 | offset += iov->iov_len; |
346 | } |
347 | } else { |
348 | protocol_binary_request_header hdr; |
349 | const nb_SPAN *origspan = &src->u_value.single; |
350 | mcreq_read_hdr(dst, &hdr)memcpy( (&hdr)->bytes, (((&(dst)->kh_span)-> offset == (nb_SIZE)-1) ? ((char *)(&(dst)->kh_span)-> parent) : ((&(dst)->kh_span)->parent->root + (& (dst)->kh_span)->offset)), sizeof((&hdr)->bytes) ); |
351 | |
352 | if (hdr.request.datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) { |
353 | /* For compressed payloads we need to uncompress it first |
354 | * because it may be forwarded to a server without compression. |
355 | * TODO: might be more clever to check a setting flag somewhere |
356 | * and see if we should do this. */ |
357 | |
358 | lcb_SIZE n_inflated; |
359 | const void *inflated; |
360 | int rv; |
361 | |
362 | vdata = NULL((void*)0); |
363 | rv = mcreq_inflate_value(SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan) ->parent) : ((origspan)->parent->root + (origspan)-> offset)), origspan->size, |
364 | &inflated, &n_inflated, (void**)&vdata); |
365 | |
366 | assert(vdata == inflated)((void) sizeof ((vdata == inflated) ? 1 : 0), __extension__ ( { if (vdata == inflated) ; else __assert_fail ("vdata == inflated" , "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 366, __extension__ __PRETTY_FUNCTION__); })); |
367 | |
368 | if (rv != 0) { |
369 | /* TODO: log error details when snappy will be enabled */ |
370 | free(edst); |
371 | return NULL((void*)0); |
372 | } |
373 | nvdata = n_inflated; |
374 | hdr.request.datatype &= ~PROTOCOL_BINARY_DATATYPE_COMPRESSED; |
375 | hdr.request.bodylen = htonl( |
376 | ntohs(hdr.request.keylen) + |
377 | hdr.request.extlen + |
378 | n_inflated); |
379 | mcreq_write_hdr(dst, &hdr)memcpy( (((&(dst)->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&(dst)->kh_span)->parent) : ((&(dst )->kh_span)->parent->root + (&(dst)->kh_span) ->offset)), (&hdr)->bytes, sizeof((&hdr)->bytes ) ); |
380 | |
381 | } else { |
382 | nvdata = origspan->size; |
383 | vdata = malloc(nvdata); |
384 | memcpy(vdata, SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan) ->parent) : ((origspan)->parent->root + (origspan)-> offset)), nvdata); |
385 | } |
386 | } |
387 | |
388 | /* Declare the value as a standalone malloc'd span */ |
389 | CREATE_STANDALONE_SPAN(&dst->u_value.single, vdata, nvdata)(&dst->u_value.single)->parent = (nb_MBLOCK *) (void *)vdata; (&dst->u_value.single)->offset = (nb_SIZE )-1; (&dst->u_value.single)->size = nvdata;; |
390 | } |
391 | |
392 | if (src->flags & MCREQ_F_DETACHED) { |
393 | mc_EXPACKET *esrc = (mc_EXPACKET *)src; |
394 | sllist_iterator iter; |
395 | SLLIST_ITERFOR(&esrc->data, &iter)for (slist_iter_init(&esrc->data, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&esrc->data , &iter)) { |
396 | sllist_node *cur = iter.cur; |
397 | sllist_iter_remove(&esrc->data, &iter); |
398 | sllist_append(&edst->data, cur); |
399 | } |
400 | } |
401 | return dst; |
402 | } |
403 | |
404 | int |
405 | mcreq_epkt_insert(mc_EXPACKET *ep, mc_EPKTDATUM *datum) |
406 | { |
407 | if (!(ep->base.flags & MCREQ_F_DETACHED)) { |
408 | return -1; |
409 | } |
410 | assert(!sllist_contains(&ep->data, &datum->slnode))((void) sizeof ((!sllist_contains(&ep->data, &datum ->slnode)) ? 1 : 0), __extension__ ({ if (!sllist_contains (&ep->data, &datum->slnode)) ; else __assert_fail ("!sllist_contains(&ep->data, &datum->slnode)" , "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 410, __extension__ __PRETTY_FUNCTION__); })); |
411 | sllist_append(&ep->data, &datum->slnode); |
412 | return 0; |
413 | } |
414 | |
415 | mc_EPKTDATUM * |
416 | mcreq_epkt_find(mc_EXPACKET *ep, const char *key) |
417 | { |
418 | sllist_iterator iter; |
419 | SLLIST_ITERFOR(&ep->data, &iter)for (slist_iter_init(&ep->data, &iter); !((&iter )->cur == ((void*)0)); slist_iter_incr(&ep->data, & iter)) { |
420 | mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_EPKTDATUM, slnode))); |
421 | if (!strcmp(key, d->key)) { |
422 | return d; |
423 | } |
424 | } |
425 | return NULL((void*)0); |
426 | } |
427 | |
428 | void |
429 | mcreq_map_key(mc_CMDQUEUE *queue, |
430 | const lcb_KEYBUF *key, const lcb_KEYBUF *hashkey, |
431 | unsigned nhdr, int *vbid, int *srvix) |
432 | { |
433 | const void *hk; |
434 | size_t nhk = 0; |
435 | if (hashkey) { |
436 | if (hashkey->type == LCB_KV_COPY && hashkey->contig.bytes != NULL((void*)0)) { |
437 | hk = hashkey->contig.bytes; |
438 | nhk = hashkey->contig.nbytes; |
439 | } else if (hashkey->type == LCB_KV_VBID) { |
440 | *vbid = hashkey->contig.nbytes; |
441 | *srvix = lcbvb_vbmaster(queue->config, *vbid); |
442 | return; |
443 | } |
444 | } |
445 | if (!nhk) { |
446 | if (key->type == LCB_KV_COPY) { |
447 | hk = key->contig.bytes; |
448 | nhk = key->contig.nbytes; |
449 | } else { |
450 | const char *buf = key->contig.bytes; |
451 | buf += nhdr; |
452 | hk = buf; |
453 | nhk = key->contig.nbytes - nhdr; |
454 | } |
455 | } |
456 | lcbvb_map_key(queue->config, hk, nhk, vbid, srvix); |
457 | } |
458 | |
459 | lcb_error_t |
460 | mcreq_basic_packet( |
461 | mc_CMDQUEUE *queue, const lcb_CMDBASE *cmd, |
462 | protocol_binary_request_header *req, lcb_uint8_t extlen, |
463 | mc_PACKET **packet, mc_PIPELINE **pipeline, int options) |
464 | { |
465 | int vb, srvix; |
466 | |
467 | if (!queue->config) { |
468 | return LCB_CLIENT_ETMPFAILLCB_CLIENT_ENOCONF; |
469 | } |
470 | if (!cmd) { |
471 | return LCB_EINVAL; |
472 | } |
473 | |
474 | mcreq_map_key(queue, &cmd->key, &cmd->_hashkey, |
475 | sizeof(*req) + extlen, &vb, &srvix); |
476 | if (srvix > -1 && srvix < (int)queue->npipelines) { |
477 | *pipeline = queue->pipelines[srvix]; |
478 | |
479 | } else { |
480 | if ((options & MCREQ_BASICPACKET_F_FALLBACKOK0x01) && queue->fallback) { |
481 | *pipeline = queue->fallback; |
482 | } else { |
483 | return LCB_NO_MATCHING_SERVER; |
484 | } |
485 | } |
486 | |
487 | *packet = mcreq_allocate_packet(*pipeline); |
488 | if (*packet == NULL((void*)0)) { |
489 | return LCB_CLIENT_ENOMEM; |
490 | } |
491 | |
492 | mcreq_reserve_key(*pipeline, *packet, sizeof(*req) + extlen, &cmd->key); |
493 | |
494 | req->request.keylen = htons((*packet)->kh_span.size - PKT_HDRSIZE(*packet)(24 + (*packet)->extlen)); |
495 | req->request.vbucket = htons(vb); |
496 | req->request.extlen = extlen; |
497 | return LCB_SUCCESS; |
498 | } |
499 | |
500 | void |
501 | mcreq_get_key(const mc_PACKET *packet, const void **key, lcb_size_t *nkey) |
502 | { |
503 | *key = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + PKT_HDRSIZE(packet)(24 + (packet)->extlen); |
504 | *nkey = packet->kh_span.size - PKT_HDRSIZE(packet)(24 + (packet)->extlen); |
505 | } |
506 | |
507 | lcb_uint32_t |
508 | mcreq_get_bodysize(const mc_PACKET *packet) |
509 | { |
510 | lcb_uint32_t ret; |
511 | char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + 8; |
512 | if ((uintptr_t)retptr % sizeof(ret) == 0) { |
513 | return ntohl(*(lcb_uint32_t*) (void *)retptr); |
514 | } else { |
515 | memcpy(&ret, retptr, sizeof(ret)); |
516 | return ntohl(ret); |
517 | } |
518 | } |
519 | |
520 | uint16_t |
521 | mcreq_get_vbucket(const mc_PACKET *packet) |
522 | { |
523 | uint16_t ret; |
524 | char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + 6; |
525 | if ((uintptr_t)retptr % sizeof(ret) == 0) { |
526 | return ntohs(*(uint16_t*)(void*)retptr); |
527 | } else { |
528 | memcpy(&ret, retptr, sizeof ret); |
529 | return ntohs(ret); |
530 | } |
531 | } |
532 | |
533 | uint32_t |
534 | mcreq_get_size(const mc_PACKET *packet) |
535 | { |
536 | uint32_t sz = packet->kh_span.size; |
537 | if (packet->flags & MCREQ_F_HASVALUE) { |
538 | if (packet->flags & MCREQ_F_VALUE_IOV) { |
539 | sz += packet->u_value.multi.total_length; |
540 | } else { |
541 | sz += packet->u_value.single.size; |
542 | } |
543 | } |
544 | return sz; |
545 | } |
546 | |
547 | void |
548 | mcreq_pipeline_cleanup(mc_PIPELINE *pipeline) |
549 | { |
550 | netbuf_cleanup(&pipeline->nbmgr); |
551 | netbuf_cleanup(&pipeline->reqpool); |
552 | } |
553 | |
554 | int |
555 | mcreq_pipeline_init(mc_PIPELINE *pipeline) |
556 | { |
557 | nb_SETTINGS settings; |
558 | |
559 | /* Initialize all members to 0 */ |
560 | memset(&pipeline->requests, 0, sizeof pipeline->requests); |
561 | pipeline->parent = NULL((void*)0); |
562 | pipeline->flush_start = NULL((void*)0); |
563 | pipeline->index = 0; |
564 | memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued); |
565 | pipeline->buf_done_callback = NULL((void*)0); |
566 | |
567 | netbuf_default_settings(&settings); |
568 | |
569 | /** Initialize datapool */ |
570 | netbuf_init(&pipeline->nbmgr, &settings); |
571 | |
572 | /** Initialize request pool */ |
573 | settings.data_basealloc = sizeof(mc_PACKET) * 32; |
574 | netbuf_init(&pipeline->reqpool, &settings);; |
575 | pipeline->metrics = NULL((void*)0); |
576 | return 0; |
577 | } |
578 | |
579 | void |
580 | mcreq_queue_add_pipelines( |
581 | mc_CMDQUEUE *queue, mc_PIPELINE * const *pipelines, unsigned npipelines, |
582 | lcbvb_CONFIG* config) |
583 | { |
584 | unsigned ii; |
585 | |
586 | lcb_assert(queue->pipelines == NULL)((void) sizeof ((queue->pipelines == ((void*)0)) ? 1 : 0), __extension__ ({ if (queue->pipelines == ((void*)0)) ; else __assert_fail ("queue->pipelines == ((void*)0)", "/home/avsej/code/libcouchbase/src/mc/mcreq.c" , 586, __extension__ __PRETTY_FUNCTION__); })); |
587 | queue->npipelines = npipelines; |
588 | queue->_npipelines_ex = queue->npipelines; |
589 | queue->pipelines = malloc(sizeof(*pipelines) * (npipelines + 1)); |
590 | queue->config = config; |
591 | |
592 | memcpy(queue->pipelines, pipelines, sizeof(*pipelines) * npipelines); |
593 | |
594 | free(queue->scheds); |
595 | queue->scheds = calloc(npipelines+1, 1); |
596 | |
597 | for (ii = 0; ii < npipelines; ii++) { |
598 | pipelines[ii]->parent = queue; |
599 | pipelines[ii]->index = ii; |
600 | } |
601 | |
602 | if (queue->fallback) { |
603 | queue->fallback->index = npipelines; |
604 | queue->pipelines[queue->npipelines] = queue->fallback; |
605 | queue->_npipelines_ex++; |
606 | } |
607 | } |
608 | |
609 | mc_PIPELINE ** |
610 | mcreq_queue_take_pipelines(mc_CMDQUEUE *queue, unsigned *count) |
611 | { |
612 | mc_PIPELINE **ret = queue->pipelines; |
613 | *count = queue->npipelines; |
614 | queue->pipelines = NULL((void*)0); |
615 | queue->npipelines = 0; |
616 | return ret; |
617 | } |
618 | |
619 | int |
620 | mcreq_queue_init(mc_CMDQUEUE *queue) |
621 | { |
622 | queue->seq = 0; |
623 | queue->pipelines = NULL((void*)0); |
624 | queue->scheds = NULL((void*)0); |
625 | queue->fallback = NULL((void*)0); |
626 | queue->npipelines = 0; |
627 | return 0; |
628 | } |
629 | |
630 | void |
631 | mcreq_queue_cleanup(mc_CMDQUEUE *queue) |
632 | { |
633 | if (queue->fallback) { |
634 | mcreq_pipeline_cleanup(queue->fallback); |
635 | free(queue->fallback); |
636 | queue->fallback = NULL((void*)0); |
637 | } |
638 | free(queue->scheds); |
639 | free(queue->pipelines); |
640 | queue->scheds = NULL((void*)0); |
641 | } |
642 | |
643 | void |
644 | mcreq_sched_enter(mc_CMDQUEUE *queue) |
645 | { |
646 | queue->ctxenter = 1; |
647 | } |
648 | |
649 | |
650 | |
651 | static void |
652 | queuectx_leave(mc_CMDQUEUE *queue, int success, int flush) |
653 | { |
654 | unsigned ii; |
655 | |
656 | if (queue->ctxenter) { |
657 | queue->ctxenter = 0; |
658 | } |
659 | |
660 | for (ii = 0; ii < queue->_npipelines_ex; ii++) { |
661 | mc_PIPELINE *pipeline; |
662 | sllist_node *ll_next, *ll; |
663 | |
664 | if (!queue->scheds[ii]) { |
665 | continue; |
666 | } |
667 | |
668 | pipeline = queue->pipelines[ii]; |
669 | ll = SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next; |
670 | |
671 | while (ll) { |
672 | mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET , slnode))); |
673 | ll_next = ll->next; |
674 | |
675 | if (success) { |
676 | mcreq_enqueue_packet(pipeline, pkt); |
677 | } else { |
678 | if (pkt->flags & MCREQ_F_REQEXT) { |
679 | mc_REQDATAEX *rd = pkt->u_rdata.exdata; |
680 | if (rd->procs->fail_dtor) { |
681 | rd->procs->fail_dtor(pkt); |
682 | } |
683 | } |
684 | mcreq_wipe_packet(pipeline, pkt); |
685 | mcreq_release_packet(pipeline, pkt); |
686 | } |
687 | |
688 | ll = ll_next; |
689 | } |
690 | SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next = pipeline->ctxqueued.last = NULL((void*)0); |
691 | if (flush) { |
692 | pipeline->flush_start(pipeline); |
693 | } |
694 | queue->scheds[ii] = 0; |
695 | } |
696 | } |
697 | |
698 | void |
699 | mcreq_sched_leave(mc_CMDQUEUE *queue, int do_flush) |
700 | { |
701 | queuectx_leave(queue, 1, do_flush); |
702 | } |
703 | |
704 | void |
705 | mcreq_sched_fail(mc_CMDQUEUE *queue) |
706 | { |
707 | queuectx_leave(queue, 0, 0); |
708 | } |
709 | |
710 | void |
711 | mcreq_sched_add(mc_PIPELINE *pipeline, mc_PACKET *pkt) |
712 | { |
713 | mc_CMDQUEUE *cq = pipeline->parent; |
714 | if (!cq->scheds[pipeline->index]) { |
715 | cq->scheds[pipeline->index] = 1; |
716 | } |
717 | sllist_append(&pipeline->ctxqueued, &pkt->slnode); |
718 | } |
719 | |
720 | static mc_PACKET * |
721 | pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque, int do_remove) |
722 | { |
723 | sllist_iterator iter; |
724 | SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); ! ((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline ->requests, &iter)) { |
725 | mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); |
726 | if (pkt->opaque == opaque) { |
727 | if (do_remove) { |
728 | sllist_iter_remove(&pipeline->requests, &iter); |
729 | } |
730 | return pkt; |
731 | } |
732 | } |
733 | return NULL((void*)0); |
734 | } |
735 | |
736 | mc_PACKET * |
737 | mcreq_pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque) |
738 | { |
739 | return pipeline_find(pipeline, opaque, 0); |
740 | } |
741 | |
742 | mc_PACKET * |
743 | mcreq_pipeline_remove(mc_PIPELINE *pipeline, lcb_uint32_t opaque) |
744 | { |
745 | return pipeline_find(pipeline, opaque, 1); |
746 | } |
747 | |
748 | void |
749 | mcreq_packet_done(mc_PIPELINE *pipeline, mc_PACKET *pkt) |
750 | { |
751 | assert(pkt->flags & MCREQ_F_FLUSHED)((void) sizeof ((pkt->flags & MCREQ_F_FLUSHED) ? 1 : 0 ), __extension__ ({ if (pkt->flags & MCREQ_F_FLUSHED) ; else __assert_fail ("pkt->flags & MCREQ_F_FLUSHED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c" , 751, __extension__ __PRETTY_FUNCTION__); })); |
752 | assert(pkt->flags & MCREQ_F_INVOKED)((void) sizeof ((pkt->flags & MCREQ_F_INVOKED) ? 1 : 0 ), __extension__ ({ if (pkt->flags & MCREQ_F_INVOKED) ; else __assert_fail ("pkt->flags & MCREQ_F_INVOKED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c" , 752, __extension__ __PRETTY_FUNCTION__); })); |
753 | if (pkt->flags & MCREQ_UBUF_FLAGS(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY)) { |
754 | void *kbuf, *vbuf; |
755 | const void *cookie; |
756 | |
757 | cookie = MCREQ_PKT_COOKIE(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt )->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))-> cookie; |
758 | if (pkt->flags & MCREQ_F_KEY_NOCOPY) { |
759 | kbuf = SPAN_BUFFER(&pkt->kh_span)(((&pkt->kh_span)->offset == (nb_SIZE)-1) ? ((char * )(&pkt->kh_span)->parent) : ((&pkt->kh_span) ->parent->root + (&pkt->kh_span)->offset)); |
760 | } else { |
761 | kbuf = NULL((void*)0); |
762 | } |
763 | if (pkt->flags & MCREQ_F_VALUE_NOCOPY) { |
764 | if (pkt->flags & MCREQ_F_VALUE_IOV) { |
765 | vbuf = pkt->u_value.multi.iov->iov_base; |
766 | } else { |
767 | vbuf = SPAN_SABUFFER_NC(&pkt->u_value.single)((char *)(&pkt->u_value.single)->parent); |
768 | } |
769 | } else { |
770 | vbuf = NULL((void*)0); |
771 | } |
772 | |
773 | pipeline->buf_done_callback(pipeline, cookie, kbuf, vbuf); |
774 | } |
775 | mcreq_wipe_packet(pipeline, pkt); |
776 | mcreq_release_packet(pipeline, pkt); |
777 | } |
778 | |
779 | void |
780 | mcreq_reset_timeouts(mc_PIPELINE *pl, lcb_U64 nstime) |
781 | { |
782 | sllist_node *nn; |
783 | SLLIST_ITERBASIC(&pl->requests, nn)for (nn = (&pl->requests)->first_prev.next; nn; nn = nn->next) { |
784 | mc_PACKET *pkt = SLLIST_ITEM(nn, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(nn) - __builtin_offsetof(mc_PACKET , slnode))); |
785 | MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt )->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))->start = nstime; |
786 | } |
787 | } |
788 | |
789 | unsigned |
790 | mcreq_pipeline_timeout( |
791 | mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *cbarg, |
792 | hrtime_t oldest_valid, hrtime_t *oldest_start) |
793 | { |
794 | sllist_iterator iter; |
795 | unsigned count = 0; |
796 | |
797 | SLLIST_ITERFOR(&pl->requests, &iter)for (slist_iter_init(&pl->requests, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&pl->requests , &iter)) { |
798 | mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); |
799 | mc_REQDATA *rd = MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt )->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata)); |
800 | |
801 | /** |
802 | * oldest_valid contains the LOWEST timestamp we can admit to being |
803 | * acceptable. If the current command is newer (i.e. has a higher |
804 | * timestamp) then we break the iteration and return. |
805 | */ |
806 | if (oldest_valid && rd->start > oldest_valid) { |
807 | if (oldest_start) { |
808 | *oldest_start = rd->start; |
809 | } |
810 | return count; |
811 | } |
812 | |
813 | sllist_iter_remove(&pl->requests, &iter); |
814 | failcb(pl, pkt, err, cbarg); |
815 | mcreq_packet_handled(pl, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags & MCREQ_F_FLUSHED) { mcreq_packet_done(pl, pkt); } } while (0) ;; |
816 | count++; |
817 | } |
818 | return count; |
819 | } |
820 | |
821 | unsigned |
822 | mcreq_pipeline_fail( |
823 | mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *arg) |
824 | { |
825 | return mcreq_pipeline_timeout(pl, err, failcb, arg, 0, NULL((void*)0)); |
826 | } |
827 | |
828 | void |
829 | mcreq_iterwipe(mc_CMDQUEUE *queue, mc_PIPELINE *src, |
830 | mcreq_iterwipe_fn callback, void *arg) |
831 | { |
832 | sllist_iterator iter; |
833 | |
834 | SLLIST_ITERFOR(&src->requests, &iter)for (slist_iter_init(&src->requests, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&src->requests , &iter)) { |
835 | int rv; |
836 | mc_PACKET *orig = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); |
837 | rv = callback(queue, src, orig, arg); |
838 | if (rv == MCREQ_REMOVE_PACKET2) { |
839 | sllist_iter_remove(&src->requests, &iter); |
840 | } |
841 | } |
842 | } |
843 | |
844 | #include "mcreq-flush-inl.h" |
845 | typedef struct { |
846 | mc_PIPELINE base; |
847 | mcreq_fallback_cb handler; |
848 | } mc_FALLBACKPL; |
849 | |
850 | static void |
851 | do_fallback_flush(mc_PIPELINE *pipeline) |
852 | { |
853 | nb_IOV iov; |
854 | unsigned nb; |
855 | int nused; |
856 | sllist_iterator iter; |
857 | mc_FALLBACKPL *fpl = (mc_FALLBACKPL*)pipeline; |
858 | |
859 | while ((nb = mcreq_flush_iov_fill(pipeline, &iov, 1, &nused))) { |
860 | mcreq_flush_done(pipeline, nb, nb); |
861 | } |
862 | /* Now handle all the packets, for real */ |
863 | SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); ! ((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline ->requests, &iter)) { |
864 | mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); |
865 | fpl->handler(pipeline->parent, pkt); |
866 | sllist_iter_remove(&pipeline->requests, &iter); |
867 | mcreq_packet_handled(pipeline, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags & MCREQ_F_FLUSHED) { mcreq_packet_done(pipeline, pkt); } } while (0);; |
868 | } |
869 | } |
870 | |
871 | void |
872 | mcreq_set_fallback_handler(mc_CMDQUEUE *cq, mcreq_fallback_cb handler) |
873 | { |
874 | assert(!cq->fallback)((void) sizeof ((!cq->fallback) ? 1 : 0), __extension__ ({ if (!cq->fallback) ; else __assert_fail ("!cq->fallback" , "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 874, __extension__ __PRETTY_FUNCTION__); })); |
875 | cq->fallback = calloc(1, sizeof (mc_FALLBACKPL)); |
Result
of 'calloc' is converted to a pointer of type 'mc_PIPELINE', which is
incompatible with sizeof operand type 'mc_FALLBACKPL' | |
876 | mcreq_pipeline_init(cq->fallback); |
877 | cq->fallback->parent = cq; |
878 | cq->fallback->index = cq->npipelines; |
879 | ((mc_FALLBACKPL*)cq->fallback)->handler = handler; |
880 | cq->fallback->flush_start = do_fallback_flush; |
881 | } |
882 | |
883 | static void |
884 | noop_dumpfn(const void *d, unsigned n, FILE *fp) { (void)d;(void)n;(void)fp; } |
885 | |
886 | #define MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT ) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED) \ |
887 | X(KEY_NOCOPY) \ |
888 | X(VALUE_NOCOPY) \ |
889 | X(VALUE_IOV) \ |
890 | X(HASVALUE) \ |
891 | X(REQEXT) \ |
892 | X(UFWD) \ |
893 | X(FLUSHED) \ |
894 | X(INVOKED) \ |
895 | X(DETACHED) |
896 | |
897 | void |
898 | mcreq_dump_packet(const mc_PACKET *packet, FILE *fp, mcreq_payload_dump_fn dumpfn) |
899 | { |
900 | const char *indent = " "; |
901 | const mc_REQDATA *rdata = MCREQ_PKT_RDATA(packet)(((packet)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)( packet)->u_rdata.exdata) : (&(packet)->u_rdata.reqdata )); |
902 | |
903 | if (!dumpfn) { |
904 | dumpfn = noop_dumpfn; |
905 | } |
906 | if (!fp) { |
907 | fp = stderrstderr; |
908 | } |
909 | |
910 | fprintf(fp, "Packet @%p\n", (void *)packet); |
911 | fprintf(fp, "%sOPAQUE: %u\n", indent, (unsigned int)packet->opaque); |
912 | |
913 | fprintf(fp, "%sPKTFLAGS: 0x%x ", indent, packet->flags); |
914 | #define X(base) \ |
915 | if (packet->flags & MCREQ_F_##base) { fprintf(fp, "%s, ", #base); } |
916 | MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT ) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED) |
917 | #undef X |
918 | fprintf(fp, "\n"); |
919 | |
920 | fprintf(fp, "%sKey+Header Size: %u\n", indent, (unsigned int)packet->kh_span.size); |
921 | fprintf(fp, "%sKey Offset: %u\n", indent, MCREQ_PKT_BASESIZE24 + packet->extlen); |
922 | |
923 | |
924 | if (packet->flags & MCREQ_F_HASVALUE) { |
925 | if (packet->flags & MCREQ_F_VALUE_IOV) { |
926 | fprintf(fp, "%sValue Length: %u\n", indent, |
927 | packet->u_value.multi.total_length); |
928 | |
929 | fprintf(fp, "%sValue IOV: [start=%p, n=%d]\n", indent, |
930 | (void *)packet->u_value.multi.iov, packet->u_value.multi.niov); |
931 | } else { |
932 | if (packet->flags & MCREQ_F_VALUE_NOCOPY) { |
933 | fprintf(fp, "%sValue is user allocated\n", indent); |
934 | } |
935 | fprintf(fp, "%sValue: %p, %u bytes\n", indent, |
936 | (void *)SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1) ? ((char *)(&packet->u_value.single)->parent) : (( &packet->u_value.single)->parent->root + (&packet ->u_value.single)->offset)), packet->u_value.single.size); |
937 | } |
938 | } |
939 | |
940 | fprintf(fp, "%sRDATA(%s): %p\n", indent, |
941 | (packet->flags & MCREQ_F_REQEXT) ? "ALLOC" : "EMBEDDED", (void *)rdata); |
942 | |
943 | indent = " "; |
944 | fprintf(fp, "%sStart: %lu\n", indent, (unsigned long)rdata->start); |
945 | fprintf(fp, "%sCookie: %p\n", indent, rdata->cookie); |
946 | |
947 | indent = " "; |
948 | fprintf(fp, "%sNEXT: %p\n", indent, (void *)packet->slnode.next); |
949 | if (dumpfn != noop_dumpfn) { |
950 | fprintf(fp, "PACKET CONTENTS:\n"); |
951 | } |
952 | |
953 | fwrite(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)), 1, packet->kh_span.size, fp); |
954 | if (packet->flags & MCREQ_F_HASVALUE) { |
955 | if (packet->flags & MCREQ_F_VALUE_IOV) { |
956 | const lcb_IOV *iovs = packet->u_value.multi.iov; |
957 | unsigned ii, ixmax = packet->u_value.multi.niov; |
958 | for (ii = 0; ii < ixmax; ii++) { |
959 | dumpfn(iovs[ii].iov_base, iovs[ii].iov_len, fp); |
960 | } |
961 | } else { |
962 | const nb_SPAN *vspan = &packet->u_value.single; |
963 | dumpfn(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent ) : ((vspan)->parent->root + (vspan)->offset)), vspan->size, fp); |
964 | } |
965 | } |
966 | } |
967 | |
968 | void |
969 | mcreq_dump_chain(const mc_PIPELINE *pipeline, FILE *fp, mcreq_payload_dump_fn dumpfn) |
970 | { |
971 | sllist_node *ll; |
972 | SLLIST_FOREACH(&pipeline->requests, ll)for (ll = (&pipeline->requests)->first_prev.next; ll ; ll = ll->next) { |
973 | const mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET , slnode))); |
974 | mcreq_dump_packet(pkt, fp, dumpfn); |
975 | } |
976 | } |