File: | home/avsej/code/libcouchbase/src/mc/mcreq.c |
Warning: | line 48, column 20 Access to field 'extlen' results in a dereference of a null pointer (loaded from variable 'packet') |
1 | /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ | |||
2 | /* | |||
3 | * Copyright 2014 Couchbase, Inc. | |||
4 | * | |||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |||
6 | * you may not use this file except in compliance with the License. | |||
7 | * You may obtain a copy of the License at | |||
8 | * | |||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |||
10 | * | |||
11 | * Unless required by applicable law or agreed to in writing, software | |||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
14 | * See the License for the specific language governing permissions and | |||
15 | * limitations under the License. | |||
16 | */ | |||
17 | ||||
18 | #include "mcreq.h" | |||
19 | #include "compress.h" | |||
20 | #include "sllist-inl.h" | |||
21 | #include "internal.h" | |||
22 | ||||
23 | #define PKT_HDRSIZE(pkt)(24 + (pkt)->extlen) (MCREQ_PKT_BASESIZE24 + (pkt)->extlen) | |||
24 | ||||
25 | lcb_error_t | |||
26 | mcreq_reserve_header( | |||
27 | mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize) | |||
28 | { | |||
29 | int rv; | |||
30 | packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24; | |||
31 | packet->kh_span.size = hdrsize; | |||
32 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span); | |||
33 | if (rv != 0) { | |||
34 | return LCB_CLIENT_ENOMEM; | |||
35 | } | |||
36 | return LCB_SUCCESS; | |||
37 | } | |||
38 | ||||
39 | lcb_error_t | |||
40 | mcreq_reserve_key( | |||
41 | mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize, | |||
42 | const lcb_KEYBUF *kreq) | |||
43 | { | |||
44 | const struct lcb_CONTIGBUF *contig = &kreq->contig; | |||
45 | int rv; | |||
46 | ||||
47 | /** Set the key offset which is the start of the key from the buffer */ | |||
48 | packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24; | |||
| ||||
49 | packet->kh_span.size = kreq->contig.nbytes; | |||
50 | ||||
51 | if (kreq->type == LCB_KV_COPY) { | |||
52 | /** | |||
53 | * If the key is to be copied then just allocate the span size | |||
54 | * for the key+24+extras | |||
55 | */ | |||
56 | packet->kh_span.size += hdrsize; | |||
57 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span); | |||
58 | if (rv != 0) { | |||
59 | return LCB_CLIENT_ENOMEM; | |||
60 | } | |||
61 | ||||
62 | /** | |||
63 | * Copy the key into the packet starting at the extras end | |||
64 | */ | |||
65 | memcpy(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + hdrsize, | |||
66 | contig->bytes, | |||
67 | contig->nbytes); | |||
68 | ||||
69 | } else if (kreq->type == LCB_KV_CONTIG) { | |||
70 | /** | |||
71 | * Don't do any copying. | |||
72 | * Assume the key buffer has enough space for the packet as well. | |||
73 | */ | |||
74 | CREATE_STANDALONE_SPAN(&packet->kh_span, contig->bytes, contig->nbytes)(&packet->kh_span)->parent = (nb_MBLOCK *) (void *) contig->bytes; (&packet->kh_span)->offset = (nb_SIZE )-1; (&packet->kh_span)->size = contig->nbytes;; | |||
75 | packet->flags |= MCREQ_F_KEY_NOCOPY; | |||
76 | ||||
77 | } else { | |||
78 | /** IOVs not supported for keys */ | |||
79 | return LCB_EINVAL; | |||
80 | } | |||
81 | ||||
82 | return LCB_SUCCESS; | |||
83 | } | |||
84 | ||||
85 | lcb_error_t | |||
86 | mcreq_reserve_value2(mc_PIPELINE *pl, mc_PACKET *pkt, lcb_size_t n) | |||
87 | { | |||
88 | int rv; | |||
89 | pkt->u_value.single.size = n; | |||
90 | if (!n) { | |||
91 | return LCB_SUCCESS; | |||
92 | } | |||
93 | ||||
94 | pkt->flags |= MCREQ_F_HASVALUE; | |||
95 | rv = netbuf_mblock_reserve(&pl->nbmgr, &pkt->u_value.single); | |||
96 | if (rv) { | |||
97 | return LCB_CLIENT_ENOMEM; | |||
98 | } | |||
99 | return LCB_SUCCESS; | |||
100 | } | |||
101 | ||||
102 | lcb_error_t | |||
103 | mcreq_reserve_value( | |||
104 | mc_PIPELINE *pipeline, mc_PACKET *packet, const lcb_VALBUF *vreq) | |||
105 | { | |||
106 | const lcb_CONTIGBUF *contig = &vreq->u_buf.contig; | |||
107 | nb_SPAN *vspan = &packet->u_value.single; | |||
108 | int rv; | |||
109 | ||||
110 | if (vreq->vtype == LCB_KV_COPY) { | |||
111 | /** Copy the value into a single SPAN */ | |||
112 | if (! (vspan->size = vreq->u_buf.contig.nbytes)) { | |||
113 | return LCB_SUCCESS; | |||
114 | } | |||
115 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan); | |||
116 | ||||
117 | if (rv != 0) { | |||
118 | return LCB_CLIENT_ENOMEM; | |||
119 | } | |||
120 | ||||
121 | memcpy(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent ) : ((vspan)->parent->root + (vspan)->offset)), contig->bytes, contig->nbytes); | |||
122 | ||||
123 | } else if (vreq->vtype == LCB_KV_CONTIG) { | |||
124 | /** It's still contiguous so make it a 'standalone' span */ | |||
125 | CREATE_STANDALONE_SPAN(vspan, contig->bytes, contig->nbytes)(vspan)->parent = (nb_MBLOCK *) (void *)contig->bytes; ( vspan)->offset = (nb_SIZE)-1; (vspan)->size = contig-> nbytes;; | |||
126 | packet->flags |= MCREQ_F_VALUE_NOCOPY; | |||
127 | ||||
128 | } else if (vreq->vtype == LCB_KV_IOV) { | |||
129 | /** Multiple spans, no copy */ | |||
130 | unsigned int ii; | |||
131 | const lcb_FRAGBUF *msrc = &vreq->u_buf.multi; | |||
132 | lcb_FRAGBUF *mdst = &packet->u_value.multi; | |||
133 | ||||
134 | packet->flags |= MCREQ_F_VALUE_IOV | MCREQ_F_VALUE_NOCOPY; | |||
135 | mdst->niov = msrc->niov; | |||
136 | mdst->iov = malloc(mdst->niov * sizeof(*mdst->iov)); | |||
137 | mdst->total_length = 0; | |||
138 | ||||
139 | for (ii = 0; ii < mdst->niov; ii++) { | |||
140 | mdst->iov[ii] = msrc->iov[ii]; | |||
141 | mdst->total_length += mdst->iov[ii].iov_len; | |||
142 | } | |||
143 | } else if (vreq->vtype == LCB_KV_IOVCOPY) { | |||
144 | /** Multiple input buffers, normal copying output buffer */ | |||
145 | unsigned int ii, cur_offset; | |||
146 | const lcb_FRAGBUF *msrc = &vreq->u_buf.multi; | |||
147 | ||||
148 | if (msrc->total_length) { | |||
149 | vspan->size = msrc->total_length; | |||
150 | } else { | |||
151 | vspan->size = 0; | |||
152 | for (ii = 0; ii < msrc->niov; ii++) { | |||
153 | vspan->size += msrc->iov[ii].iov_len; | |||
154 | } | |||
155 | } | |||
156 | ||||
157 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan); | |||
158 | if (rv != 0) { | |||
159 | return LCB_CLIENT_ENOMEM; | |||
160 | } | |||
161 | ||||
162 | for (ii = 0, cur_offset = 0; ii < msrc->niov; ii++) { | |||
163 | char *buf = SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent ) : ((vspan)->parent->root + (vspan)->offset)) + cur_offset; | |||
164 | memcpy(buf, msrc->iov[ii].iov_base, msrc->iov[ii].iov_len); | |||
165 | cur_offset += msrc->iov[ii].iov_len; | |||
166 | } | |||
167 | } | |||
168 | ||||
169 | packet->flags |= MCREQ_F_HASVALUE; | |||
170 | return LCB_SUCCESS; | |||
171 | } | |||
172 | ||||
173 | static int | |||
174 | pkt_tmo_compar(sllist_node *a, sllist_node *b) | |||
175 | { | |||
176 | mc_PACKET *pa, *pb; | |||
177 | hrtime_t tmo_a, tmo_b; | |||
178 | ||||
179 | pa = SLLIST_ITEM(a, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(a) - __builtin_offsetof(mc_PACKET , slnode))); | |||
180 | pb = SLLIST_ITEM(b, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(b) - __builtin_offsetof(mc_PACKET , slnode))); | |||
181 | ||||
182 | tmo_a = MCREQ_PKT_RDATA(pa)(((pa)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pa)-> u_rdata.exdata) : (&(pa)->u_rdata.reqdata))->start; | |||
183 | tmo_b = MCREQ_PKT_RDATA(pb)(((pb)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pb)-> u_rdata.exdata) : (&(pb)->u_rdata.reqdata))->start; | |||
184 | ||||
185 | if (tmo_a == tmo_b) { | |||
186 | return 0; | |||
187 | } else if (tmo_a < tmo_b) { | |||
188 | return -1; | |||
189 | } else { | |||
190 | return 1; | |||
191 | } | |||
192 | } | |||
193 | ||||
194 | void | |||
195 | mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) | |||
196 | { | |||
197 | sllist_root *reqs = &pipeline->requests; | |||
198 | mcreq_enqueue_packet(pipeline, packet); | |||
199 | sllist_remove(reqs, &packet->slnode); | |||
200 | sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar); | |||
201 | } | |||
202 | ||||
203 | void | |||
204 | mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) | |||
205 | { | |||
206 | nb_SPAN *vspan = &packet->u_value.single; | |||
207 | sllist_append(&pipeline->requests, &packet->slnode); | |||
208 | netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span); | |||
209 | ||||
210 | if (!(packet->flags & MCREQ_F_HASVALUE)) { | |||
211 | goto GT_ENQUEUE_PDU; | |||
212 | } | |||
213 | ||||
214 | if (packet->flags & MCREQ_F_VALUE_IOV) { | |||
215 | unsigned int ii; | |||
216 | lcb_FRAGBUF *multi = &packet->u_value.multi; | |||
217 | for (ii = 0; ii < multi->niov; ii++) { | |||
218 | netbuf_enqueue(&pipeline->nbmgr, (nb_IOV *)multi->iov + ii); | |||
219 | } | |||
220 | ||||
221 | } else if (vspan->size) { | |||
222 | netbuf_enqueue_span(&pipeline->nbmgr, vspan); | |||
223 | } | |||
224 | ||||
225 | GT_ENQUEUE_PDU: | |||
226 | netbuf_pdu_enqueue(&pipeline->nbmgr, packet, offsetof(mc_PACKET, sl_flushq)__builtin_offsetof(mc_PACKET, sl_flushq)); | |||
227 | } | |||
228 | ||||
229 | void | |||
230 | mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) | |||
231 | { | |||
232 | if (! (packet->flags & MCREQ_F_KEY_NOCOPY)) { | |||
233 | if (packet->flags & MCREQ_F_DETACHED) { | |||
234 | free(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset))); | |||
235 | } else { | |||
236 | netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span); | |||
237 | } | |||
238 | } | |||
239 | ||||
240 | if (! (packet->flags & MCREQ_F_HASVALUE)) { | |||
241 | return; | |||
242 | } | |||
243 | ||||
244 | if (packet->flags & MCREQ_F_VALUE_NOCOPY) { | |||
245 | if (packet->flags & MCREQ_F_VALUE_IOV) { | |||
246 | free(packet->u_value.multi.iov); | |||
247 | } | |||
248 | ||||
249 | return; | |||
250 | } | |||
251 | ||||
252 | if (packet->flags & MCREQ_F_DETACHED) { | |||
253 | free(SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1) ? ((char *)(&packet->u_value.single)->parent) : (( &packet->u_value.single)->parent->root + (&packet ->u_value.single)->offset))); | |||
254 | } else { | |||
255 | netbuf_mblock_release(&pipeline->nbmgr, &packet->u_value.single); | |||
256 | } | |||
257 | ||||
258 | } | |||
259 | ||||
260 | mc_PACKET * | |||
261 | mcreq_allocate_packet(mc_PIPELINE *pipeline) | |||
262 | { | |||
263 | nb_SPAN span; | |||
264 | int rv; | |||
265 | mc_PACKET *ret; | |||
266 | span.size = sizeof(*ret); | |||
267 | ||||
268 | rv = netbuf_mblock_reserve(&pipeline->reqpool, &span); | |||
269 | if (rv != 0) { | |||
270 | return NULL((void*)0); | |||
271 | } | |||
272 | ||||
273 | ret = (void *) SPAN_MBUFFER_NC(&span)((&span)->parent->root + (&span)->offset); | |||
274 | ret->alloc_parent = span.parent; | |||
275 | ret->flags = 0; | |||
276 | ret->retries = 0; | |||
277 | ret->opaque = pipeline->parent->seq++; | |||
278 | return ret; | |||
279 | } | |||
280 | ||||
281 | void | |||
282 | mcreq_release_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) | |||
283 | { | |||
284 | nb_SPAN span; | |||
285 | if (packet->flags & MCREQ_F_DETACHED) { | |||
286 | sllist_iterator iter; | |||
287 | mc_EXPACKET *epkt = (mc_EXPACKET *)packet; | |||
288 | ||||
289 | SLLIST_ITERFOR(&epkt->data, &iter)for (slist_iter_init(&epkt->data, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&epkt->data , &iter)) { | |||
290 | mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_EPKTDATUM, slnode))); | |||
291 | sllist_iter_remove(&epkt->data, &iter); | |||
292 | d->dtorfn(d); | |||
293 | } | |||
294 | free(epkt); | |||
295 | return; | |||
296 | } | |||
297 | ||||
298 | span.size = sizeof(*packet); | |||
299 | span.parent = packet->alloc_parent; | |||
300 | span.offset = (char *)packet - packet->alloc_parent->root; | |||
301 | ||||
302 | netbuf_mblock_release(&pipeline->reqpool, &span); | |||
303 | } | |||
304 | ||||
305 | #define MCREQ_DETACH_WIPESRC1 1 | |||
306 | ||||
307 | mc_PACKET * | |||
308 | mcreq_renew_packet(const mc_PACKET *src) | |||
309 | { | |||
310 | char *kdata, *vdata; | |||
311 | unsigned nvdata; | |||
312 | mc_PACKET *dst; | |||
313 | mc_EXPACKET *edst = calloc(1, sizeof(*edst)); | |||
314 | ||||
315 | dst = &edst->base; | |||
316 | *dst = *src; | |||
317 | ||||
318 | kdata = malloc(src->kh_span.size); | |||
319 | memcpy(kdata, SPAN_BUFFER(&src->kh_span)(((&src->kh_span)->offset == (nb_SIZE)-1) ? ((char * )(&src->kh_span)->parent) : ((&src->kh_span) ->parent->root + (&src->kh_span)->offset)), src->kh_span.size); | |||
320 | CREATE_STANDALONE_SPAN(&dst->kh_span, kdata, src->kh_span.size)(&dst->kh_span)->parent = (nb_MBLOCK *) (void *)kdata ; (&dst->kh_span)->offset = (nb_SIZE)-1; (&dst-> kh_span)->size = src->kh_span.size;; | |||
321 | ||||
322 | dst->flags &= ~(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY|MCREQ_F_VALUE_IOV); | |||
323 | dst->flags |= MCREQ_F_DETACHED; | |||
324 | dst->alloc_parent = NULL((void*)0); | |||
325 | dst->sl_flushq.next = NULL((void*)0); | |||
326 | dst->slnode.next = NULL((void*)0); | |||
327 | dst->retries = src->retries; | |||
328 | ||||
329 | if (src->flags & MCREQ_F_HASVALUE) { | |||
330 | /** Get the length */ | |||
331 | if (src->flags & MCREQ_F_VALUE_IOV) { | |||
332 | unsigned ii; | |||
333 | unsigned offset = 0; | |||
334 | ||||
335 | nvdata = src->u_value.multi.total_length; | |||
336 | vdata = malloc(nvdata); | |||
337 | for (ii = 0; ii < src->u_value.multi.niov; ii++) { | |||
338 | const lcb_IOV *iov = src->u_value.multi.iov + ii; | |||
339 | ||||
340 | memcpy(vdata + offset, iov->iov_base, iov->iov_len); | |||
341 | offset += iov->iov_len; | |||
342 | } | |||
343 | } else { | |||
344 | protocol_binary_request_header hdr; | |||
345 | const nb_SPAN *origspan = &src->u_value.single; | |||
346 | mcreq_read_hdr(dst, &hdr)memcpy( (&hdr)->bytes, (((&(dst)->kh_span)-> offset == (nb_SIZE)-1) ? ((char *)(&(dst)->kh_span)-> parent) : ((&(dst)->kh_span)->parent->root + (& (dst)->kh_span)->offset)), sizeof((&hdr)->bytes) ); | |||
347 | ||||
348 | if (hdr.request.datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) { | |||
349 | /* For compressed payloads we need to uncompress it first | |||
350 | * because it may be forwarded to a server without compression. | |||
351 | * TODO: might be more clever to check a setting flag somewhere | |||
352 | * and see if we should do this. */ | |||
353 | ||||
354 | lcb_SIZE n_inflated; | |||
355 | const void *inflated; | |||
356 | int rv; | |||
357 | ||||
358 | vdata = NULL((void*)0); | |||
359 | rv = mcreq_inflate_value(SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan) ->parent) : ((origspan)->parent->root + (origspan)-> offset)), origspan->size, | |||
360 | &inflated, &n_inflated, (void**)&vdata); | |||
361 | ||||
362 | assert(vdata == inflated)((void) sizeof ((vdata == inflated) ? 1 : 0), __extension__ ( { if (vdata == inflated) ; else __assert_fail ("vdata == inflated" , "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 362, __extension__ __PRETTY_FUNCTION__); })); | |||
363 | ||||
364 | if (rv != 0) { | |||
365 | return NULL((void*)0); | |||
366 | } | |||
367 | nvdata = n_inflated; | |||
368 | hdr.request.datatype &= ~PROTOCOL_BINARY_DATATYPE_COMPRESSED; | |||
369 | hdr.request.bodylen = htonl( | |||
370 | ntohs(hdr.request.keylen) + | |||
371 | hdr.request.extlen + | |||
372 | n_inflated); | |||
373 | mcreq_write_hdr(dst, &hdr)memcpy( (((&(dst)->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&(dst)->kh_span)->parent) : ((&(dst )->kh_span)->parent->root + (&(dst)->kh_span) ->offset)), (&hdr)->bytes, sizeof((&hdr)->bytes ) ); | |||
374 | ||||
375 | } else { | |||
376 | nvdata = origspan->size; | |||
377 | vdata = malloc(nvdata); | |||
378 | memcpy(vdata, SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan) ->parent) : ((origspan)->parent->root + (origspan)-> offset)), nvdata); | |||
379 | } | |||
380 | } | |||
381 | ||||
382 | /* Declare the value as a standalone malloc'd span */ | |||
383 | CREATE_STANDALONE_SPAN(&dst->u_value.single, vdata, nvdata)(&dst->u_value.single)->parent = (nb_MBLOCK *) (void *)vdata; (&dst->u_value.single)->offset = (nb_SIZE )-1; (&dst->u_value.single)->size = nvdata;; | |||
384 | } | |||
385 | ||||
386 | if (src->flags & MCREQ_F_DETACHED) { | |||
387 | mc_EXPACKET *esrc = (mc_EXPACKET *)src; | |||
388 | sllist_iterator iter; | |||
389 | SLLIST_ITERFOR(&esrc->data, &iter)for (slist_iter_init(&esrc->data, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&esrc->data , &iter)) { | |||
390 | sllist_node *cur = iter.cur; | |||
391 | sllist_iter_remove(&esrc->data, &iter); | |||
392 | sllist_append(&edst->data, cur); | |||
393 | } | |||
394 | } | |||
395 | return dst; | |||
396 | } | |||
397 | ||||
398 | int | |||
399 | mcreq_epkt_insert(mc_EXPACKET *ep, mc_EPKTDATUM *datum) | |||
400 | { | |||
401 | if (!(ep->base.flags & MCREQ_F_DETACHED)) { | |||
402 | return -1; | |||
403 | } | |||
404 | assert(!sllist_contains(&ep->data, &datum->slnode))((void) sizeof ((!sllist_contains(&ep->data, &datum ->slnode)) ? 1 : 0), __extension__ ({ if (!sllist_contains (&ep->data, &datum->slnode)) ; else __assert_fail ("!sllist_contains(&ep->data, &datum->slnode)" , "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 404, __extension__ __PRETTY_FUNCTION__); })); | |||
405 | sllist_append(&ep->data, &datum->slnode); | |||
406 | return 0; | |||
407 | } | |||
408 | ||||
409 | mc_EPKTDATUM * | |||
410 | mcreq_epkt_find(mc_EXPACKET *ep, const char *key) | |||
411 | { | |||
412 | sllist_iterator iter; | |||
413 | SLLIST_ITERFOR(&ep->data, &iter)for (slist_iter_init(&ep->data, &iter); !((&iter )->cur == ((void*)0)); slist_iter_incr(&ep->data, & iter)) { | |||
414 | mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_EPKTDATUM, slnode))); | |||
415 | if (!strcmp(key, d->key)) { | |||
416 | return d; | |||
417 | } | |||
418 | } | |||
419 | return NULL((void*)0); | |||
420 | } | |||
421 | ||||
422 | void | |||
423 | mcreq_map_key(mc_CMDQUEUE *queue, | |||
424 | const lcb_KEYBUF *key, const lcb_KEYBUF *hashkey, | |||
425 | unsigned nhdr, int *vbid, int *srvix) | |||
426 | { | |||
427 | const void *hk; | |||
428 | size_t nhk = 0; | |||
429 | if (hashkey) { | |||
430 | if (hashkey->type == LCB_KV_COPY && hashkey->contig.bytes != NULL((void*)0)) { | |||
431 | hk = hashkey->contig.bytes; | |||
432 | nhk = hashkey->contig.nbytes; | |||
433 | } else if (hashkey->type == LCB_KV_VBID) { | |||
434 | *vbid = hashkey->contig.nbytes; | |||
435 | *srvix = lcbvb_vbmaster(queue->config, *vbid); | |||
436 | return; | |||
437 | } | |||
438 | } | |||
439 | if (!nhk) { | |||
440 | if (key->type == LCB_KV_COPY) { | |||
441 | hk = key->contig.bytes; | |||
442 | nhk = key->contig.nbytes; | |||
443 | } else { | |||
444 | const char *buf = key->contig.bytes; | |||
445 | buf += nhdr; | |||
446 | hk = buf; | |||
447 | nhk = key->contig.nbytes - nhdr; | |||
448 | } | |||
449 | } | |||
450 | lcbvb_map_key(queue->config, hk, nhk, vbid, srvix); | |||
451 | } | |||
452 | ||||
453 | lcb_error_t | |||
454 | mcreq_basic_packet( | |||
455 | mc_CMDQUEUE *queue, const lcb_CMDBASE *cmd, | |||
456 | protocol_binary_request_header *req, lcb_uint8_t extlen, | |||
457 | mc_PACKET **packet, mc_PIPELINE **pipeline, int options) | |||
458 | { | |||
459 | int vb, srvix; | |||
460 | ||||
461 | if (!queue->config) { | |||
| ||||
462 | return LCB_CLIENT_ETMPFAILLCB_CLIENT_ENOCONF; | |||
463 | } | |||
464 | ||||
465 | mcreq_map_key(queue, &(cmd->key), &(cmd->_hashkey), | |||
466 | sizeof(*req) + extlen, &vb, &srvix); | |||
467 | if (srvix > -1 && srvix < (int)queue->npipelines) { | |||
468 | *pipeline = queue->pipelines[srvix]; | |||
469 | ||||
470 | } else { | |||
471 | if ((options & MCREQ_BASICPACKET_F_FALLBACKOK0x01) && queue->fallback) { | |||
472 | *pipeline = queue->fallback; | |||
473 | } else { | |||
474 | return LCB_NO_MATCHING_SERVER; | |||
475 | } | |||
476 | } | |||
477 | ||||
478 | *packet = mcreq_allocate_packet(*pipeline); | |||
479 | ||||
480 | mcreq_reserve_key(*pipeline, *packet, sizeof(*req) + extlen, &cmd->key); | |||
481 | ||||
482 | req->request.keylen = htons((*packet)->kh_span.size - PKT_HDRSIZE(*packet)(24 + (*packet)->extlen)); | |||
483 | req->request.vbucket = htons(vb); | |||
484 | req->request.extlen = extlen; | |||
485 | return LCB_SUCCESS; | |||
486 | } | |||
487 | ||||
488 | void | |||
489 | mcreq_get_key(const mc_PACKET *packet, const void **key, lcb_size_t *nkey) | |||
490 | { | |||
491 | *key = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + PKT_HDRSIZE(packet)(24 + (packet)->extlen); | |||
492 | *nkey = packet->kh_span.size - PKT_HDRSIZE(packet)(24 + (packet)->extlen); | |||
493 | } | |||
494 | ||||
495 | lcb_uint32_t | |||
496 | mcreq_get_bodysize(const mc_PACKET *packet) | |||
497 | { | |||
498 | lcb_uint32_t ret; | |||
499 | char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + 8; | |||
500 | if ((uintptr_t)retptr % sizeof(ret) == 0) { | |||
501 | return ntohl(*(lcb_uint32_t*) (void *)retptr); | |||
502 | } else { | |||
503 | memcpy(&ret, retptr, sizeof(ret)); | |||
504 | return ntohl(ret); | |||
505 | } | |||
506 | } | |||
507 | ||||
508 | uint16_t | |||
509 | mcreq_get_vbucket(const mc_PACKET *packet) | |||
510 | { | |||
511 | uint16_t ret; | |||
512 | char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + 6; | |||
513 | if ((uintptr_t)retptr % sizeof(ret) == 0) { | |||
514 | return ntohs(*(uint16_t*)(void*)retptr); | |||
515 | } else { | |||
516 | memcpy(&ret, retptr, sizeof ret); | |||
517 | return ntohs(ret); | |||
518 | } | |||
519 | } | |||
520 | ||||
521 | uint32_t | |||
522 | mcreq_get_size(const mc_PACKET *packet) | |||
523 | { | |||
524 | uint32_t sz = packet->kh_span.size; | |||
525 | if (packet->flags & MCREQ_F_HASVALUE) { | |||
526 | if (packet->flags & MCREQ_F_VALUE_IOV) { | |||
527 | sz += packet->u_value.multi.total_length; | |||
528 | } else { | |||
529 | sz += packet->u_value.single.size; | |||
530 | } | |||
531 | } | |||
532 | return sz; | |||
533 | } | |||
534 | ||||
535 | void | |||
536 | mcreq_pipeline_cleanup(mc_PIPELINE *pipeline) | |||
537 | { | |||
538 | netbuf_cleanup(&pipeline->nbmgr); | |||
539 | netbuf_cleanup(&pipeline->reqpool); | |||
540 | } | |||
541 | ||||
542 | int | |||
543 | mcreq_pipeline_init(mc_PIPELINE *pipeline) | |||
544 | { | |||
545 | nb_SETTINGS settings; | |||
546 | ||||
547 | /* Initialize all members to 0 */ | |||
548 | memset(&pipeline->requests, 0, sizeof pipeline->requests); | |||
549 | pipeline->parent = NULL((void*)0); | |||
550 | pipeline->flush_start = NULL((void*)0); | |||
551 | pipeline->index = 0; | |||
552 | memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued); | |||
553 | pipeline->buf_done_callback = NULL((void*)0); | |||
554 | ||||
555 | netbuf_default_settings(&settings); | |||
556 | ||||
557 | /** Initialize datapool */ | |||
558 | netbuf_init(&pipeline->nbmgr, &settings); | |||
559 | ||||
560 | /** Initialize request pool */ | |||
561 | settings.data_basealloc = sizeof(mc_PACKET) * 32; | |||
562 | netbuf_init(&pipeline->reqpool, &settings);; | |||
563 | return 0; | |||
564 | } | |||
565 | ||||
566 | void | |||
567 | mcreq_queue_add_pipelines( | |||
568 | mc_CMDQUEUE *queue, mc_PIPELINE * const *pipelines, unsigned npipelines, | |||
569 | lcbvb_CONFIG* config) | |||
570 | { | |||
571 | unsigned ii; | |||
572 | ||||
573 | lcb_assert(queue->pipelines == NULL)((void) sizeof ((queue->pipelines == ((void*)0)) ? 1 : 0), __extension__ ({ if (queue->pipelines == ((void*)0)) ; else __assert_fail ("queue->pipelines == ((void*)0)", "/home/avsej/code/libcouchbase/src/mc/mcreq.c" , 573, __extension__ __PRETTY_FUNCTION__); })); | |||
574 | queue->npipelines = npipelines; | |||
575 | queue->_npipelines_ex = queue->npipelines; | |||
576 | queue->pipelines = malloc(sizeof(*pipelines) * (npipelines + 1)); | |||
577 | queue->config = config; | |||
578 | ||||
579 | memcpy(queue->pipelines, pipelines, sizeof(*pipelines) * npipelines); | |||
580 | ||||
581 | free(queue->scheds); | |||
582 | queue->scheds = calloc(npipelines+1, 1); | |||
583 | ||||
584 | for (ii = 0; ii < npipelines; ii++) { | |||
585 | pipelines[ii]->parent = queue; | |||
586 | pipelines[ii]->index = ii; | |||
587 | } | |||
588 | ||||
589 | if (queue->fallback) { | |||
590 | queue->fallback->index = npipelines; | |||
591 | queue->pipelines[queue->npipelines] = queue->fallback; | |||
592 | queue->_npipelines_ex++; | |||
593 | } | |||
594 | } | |||
595 | ||||
596 | mc_PIPELINE ** | |||
597 | mcreq_queue_take_pipelines(mc_CMDQUEUE *queue, unsigned *count) | |||
598 | { | |||
599 | mc_PIPELINE **ret = queue->pipelines; | |||
600 | *count = queue->npipelines; | |||
601 | queue->pipelines = NULL((void*)0); | |||
602 | queue->npipelines = 0; | |||
603 | return ret; | |||
604 | } | |||
605 | ||||
606 | int | |||
607 | mcreq_queue_init(mc_CMDQUEUE *queue) | |||
608 | { | |||
609 | queue->seq = 0; | |||
610 | queue->pipelines = NULL((void*)0); | |||
611 | queue->scheds = NULL((void*)0); | |||
612 | queue->fallback = NULL((void*)0); | |||
613 | queue->npipelines = 0; | |||
614 | return 0; | |||
615 | } | |||
616 | ||||
617 | void | |||
618 | mcreq_queue_cleanup(mc_CMDQUEUE *queue) | |||
619 | { | |||
620 | if (queue->fallback) { | |||
621 | mcreq_pipeline_cleanup(queue->fallback); | |||
622 | free(queue->fallback); | |||
623 | queue->fallback = NULL((void*)0); | |||
624 | } | |||
625 | free(queue->scheds); | |||
626 | free(queue->pipelines); | |||
627 | queue->scheds = NULL((void*)0); | |||
628 | } | |||
629 | ||||
630 | void | |||
631 | mcreq_sched_enter(mc_CMDQUEUE *queue) | |||
632 | { | |||
633 | queue->ctxenter = 1; | |||
634 | } | |||
635 | ||||
636 | ||||
637 | ||||
638 | static void | |||
639 | queuectx_leave(mc_CMDQUEUE *queue, int success, int flush) | |||
640 | { | |||
641 | unsigned ii; | |||
642 | ||||
643 | if (queue->ctxenter) { | |||
644 | queue->ctxenter = 0; | |||
645 | } | |||
646 | ||||
647 | for (ii = 0; ii < queue->_npipelines_ex; ii++) { | |||
648 | mc_PIPELINE *pipeline; | |||
649 | sllist_node *ll_next, *ll; | |||
650 | ||||
651 | if (!queue->scheds[ii]) { | |||
652 | continue; | |||
653 | } | |||
654 | ||||
655 | pipeline = queue->pipelines[ii]; | |||
656 | ll = SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next; | |||
657 | ||||
658 | while (ll) { | |||
659 | mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET , slnode))); | |||
660 | ll_next = ll->next; | |||
661 | ||||
662 | if (success) { | |||
663 | mcreq_enqueue_packet(pipeline, pkt); | |||
664 | } else { | |||
665 | if (pkt->flags & MCREQ_F_REQEXT) { | |||
666 | mc_REQDATAEX *rd = pkt->u_rdata.exdata; | |||
667 | if (rd->procs->fail_dtor) { | |||
668 | rd->procs->fail_dtor(pkt); | |||
669 | } | |||
670 | } | |||
671 | mcreq_wipe_packet(pipeline, pkt); | |||
672 | mcreq_release_packet(pipeline, pkt); | |||
673 | } | |||
674 | ||||
675 | ll = ll_next; | |||
676 | } | |||
677 | SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next = pipeline->ctxqueued.last = NULL((void*)0); | |||
678 | if (flush) { | |||
679 | pipeline->flush_start(pipeline); | |||
680 | } | |||
681 | queue->scheds[ii] = 0; | |||
682 | } | |||
683 | } | |||
684 | ||||
685 | void | |||
686 | mcreq_sched_leave(mc_CMDQUEUE *queue, int do_flush) | |||
687 | { | |||
688 | queuectx_leave(queue, 1, do_flush); | |||
689 | } | |||
690 | ||||
691 | void | |||
692 | mcreq_sched_fail(mc_CMDQUEUE *queue) | |||
693 | { | |||
694 | queuectx_leave(queue, 0, 0); | |||
695 | } | |||
696 | ||||
697 | void | |||
698 | mcreq_sched_add(mc_PIPELINE *pipeline, mc_PACKET *pkt) | |||
699 | { | |||
700 | mc_CMDQUEUE *cq = pipeline->parent; | |||
701 | if (!cq->scheds[pipeline->index]) { | |||
702 | cq->scheds[pipeline->index] = 1; | |||
703 | } | |||
704 | sllist_append(&pipeline->ctxqueued, &pkt->slnode); | |||
705 | } | |||
706 | ||||
707 | static mc_PACKET * | |||
708 | pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque, int do_remove) | |||
709 | { | |||
710 | sllist_iterator iter; | |||
711 | SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); ! ((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline ->requests, &iter)) { | |||
712 | mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); | |||
713 | if (pkt->opaque == opaque) { | |||
714 | if (do_remove) { | |||
715 | sllist_iter_remove(&pipeline->requests, &iter); | |||
716 | } | |||
717 | return pkt; | |||
718 | } | |||
719 | } | |||
720 | return NULL((void*)0); | |||
721 | } | |||
722 | ||||
723 | mc_PACKET * | |||
724 | mcreq_pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque) | |||
725 | { | |||
726 | return pipeline_find(pipeline, opaque, 0); | |||
727 | } | |||
728 | ||||
729 | mc_PACKET * | |||
730 | mcreq_pipeline_remove(mc_PIPELINE *pipeline, lcb_uint32_t opaque) | |||
731 | { | |||
732 | return pipeline_find(pipeline, opaque, 1); | |||
733 | } | |||
734 | ||||
735 | void | |||
736 | mcreq_packet_done(mc_PIPELINE *pipeline, mc_PACKET *pkt) | |||
737 | { | |||
738 | assert(pkt->flags & MCREQ_F_FLUSHED)((void) sizeof ((pkt->flags & MCREQ_F_FLUSHED) ? 1 : 0 ), __extension__ ({ if (pkt->flags & MCREQ_F_FLUSHED) ; else __assert_fail ("pkt->flags & MCREQ_F_FLUSHED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c" , 738, __extension__ __PRETTY_FUNCTION__); })); | |||
739 | assert(pkt->flags & MCREQ_F_INVOKED)((void) sizeof ((pkt->flags & MCREQ_F_INVOKED) ? 1 : 0 ), __extension__ ({ if (pkt->flags & MCREQ_F_INVOKED) ; else __assert_fail ("pkt->flags & MCREQ_F_INVOKED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c" , 739, __extension__ __PRETTY_FUNCTION__); })); | |||
740 | if (pkt->flags & MCREQ_UBUF_FLAGS(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY)) { | |||
741 | void *kbuf, *vbuf; | |||
742 | const void *cookie; | |||
743 | ||||
744 | cookie = MCREQ_PKT_COOKIE(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt )->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))-> cookie; | |||
745 | if (pkt->flags & MCREQ_F_KEY_NOCOPY) { | |||
746 | kbuf = SPAN_BUFFER(&pkt->kh_span)(((&pkt->kh_span)->offset == (nb_SIZE)-1) ? ((char * )(&pkt->kh_span)->parent) : ((&pkt->kh_span) ->parent->root + (&pkt->kh_span)->offset)); | |||
747 | } else { | |||
748 | kbuf = NULL((void*)0); | |||
749 | } | |||
750 | if (pkt->flags & MCREQ_F_VALUE_NOCOPY) { | |||
751 | if (pkt->flags & MCREQ_F_VALUE_IOV) { | |||
752 | vbuf = pkt->u_value.multi.iov->iov_base; | |||
753 | } else { | |||
754 | vbuf = SPAN_SABUFFER_NC(&pkt->u_value.single)((char *)(&pkt->u_value.single)->parent); | |||
755 | } | |||
756 | } else { | |||
757 | vbuf = NULL((void*)0); | |||
758 | } | |||
759 | ||||
760 | pipeline->buf_done_callback(pipeline, cookie, kbuf, vbuf); | |||
761 | } | |||
762 | mcreq_wipe_packet(pipeline, pkt); | |||
763 | mcreq_release_packet(pipeline, pkt); | |||
764 | } | |||
765 | ||||
766 | void | |||
767 | mcreq_reset_timeouts(mc_PIPELINE *pl, lcb_U64 nstime) | |||
768 | { | |||
769 | sllist_node *nn; | |||
770 | SLLIST_ITERBASIC(&pl->requests, nn)for (nn = (&pl->requests)->first_prev.next; nn; nn = nn->next) { | |||
771 | mc_PACKET *pkt = SLLIST_ITEM(nn, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(nn) - __builtin_offsetof(mc_PACKET , slnode))); | |||
772 | MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt )->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))->start = nstime; | |||
773 | } | |||
774 | } | |||
775 | ||||
776 | unsigned | |||
777 | mcreq_pipeline_timeout( | |||
778 | mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *cbarg, | |||
779 | hrtime_t oldest_valid, hrtime_t *oldest_start) | |||
780 | { | |||
781 | sllist_iterator iter; | |||
782 | unsigned count = 0; | |||
783 | ||||
784 | SLLIST_ITERFOR(&pl->requests, &iter)for (slist_iter_init(&pl->requests, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&pl->requests , &iter)) { | |||
785 | mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); | |||
786 | mc_REQDATA *rd = MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt )->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata)); | |||
787 | ||||
788 | /** | |||
789 | * oldest_valid contains the LOWEST timestamp we can admit to being | |||
790 | * acceptable. If the current command is newer (i.e. has a higher | |||
791 | * timestamp) then we break the iteration and return. | |||
792 | */ | |||
793 | if (oldest_valid && rd->start > oldest_valid) { | |||
794 | if (oldest_start) { | |||
795 | *oldest_start = rd->start; | |||
796 | } | |||
797 | return count; | |||
798 | } | |||
799 | ||||
800 | sllist_iter_remove(&pl->requests, &iter); | |||
801 | failcb(pl, pkt, err, cbarg); | |||
802 | mcreq_packet_handled(pl, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags & MCREQ_F_FLUSHED) { mcreq_packet_done(pl, pkt); } } while (0) ;; | |||
803 | count++; | |||
804 | } | |||
805 | return count; | |||
806 | } | |||
807 | ||||
808 | unsigned | |||
809 | mcreq_pipeline_fail( | |||
810 | mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *arg) | |||
811 | { | |||
812 | return mcreq_pipeline_timeout(pl, err, failcb, arg, 0, NULL((void*)0)); | |||
813 | } | |||
814 | ||||
815 | void | |||
816 | mcreq_iterwipe(mc_CMDQUEUE *queue, mc_PIPELINE *src, | |||
817 | mcreq_iterwipe_fn callback, void *arg) | |||
818 | { | |||
819 | sllist_iterator iter; | |||
820 | ||||
821 | SLLIST_ITERFOR(&src->requests, &iter)for (slist_iter_init(&src->requests, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&src->requests , &iter)) { | |||
822 | int rv; | |||
823 | mc_PACKET *orig = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); | |||
824 | rv = callback(queue, src, orig, arg); | |||
825 | if (rv == MCREQ_REMOVE_PACKET2) { | |||
826 | sllist_iter_remove(&src->requests, &iter); | |||
827 | } | |||
828 | } | |||
829 | } | |||
830 | ||||
831 | #include "mcreq-flush-inl.h" | |||
832 | typedef struct { | |||
833 | mc_PIPELINE base; | |||
834 | mcreq_fallback_cb handler; | |||
835 | } mc_FALLBACKPL; | |||
836 | ||||
837 | static void | |||
838 | do_fallback_flush(mc_PIPELINE *pipeline) | |||
839 | { | |||
840 | nb_IOV iov; | |||
841 | unsigned nb; | |||
842 | int nused; | |||
843 | sllist_iterator iter; | |||
844 | mc_FALLBACKPL *fpl = (mc_FALLBACKPL*)pipeline; | |||
845 | ||||
846 | while ((nb = mcreq_flush_iov_fill(pipeline, &iov, 1, &nused))) { | |||
847 | mcreq_flush_done(pipeline, nb, nb); | |||
848 | } | |||
849 | /* Now handle all the packets, for real */ | |||
850 | SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); ! ((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline ->requests, &iter)) { | |||
851 | mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); | |||
852 | fpl->handler(pipeline->parent, pkt); | |||
853 | sllist_iter_remove(&pipeline->requests, &iter); | |||
854 | mcreq_packet_handled(pipeline, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags & MCREQ_F_FLUSHED) { mcreq_packet_done(pipeline, pkt); } } while (0);; | |||
855 | } | |||
856 | } | |||
857 | ||||
858 | void | |||
859 | mcreq_set_fallback_handler(mc_CMDQUEUE *cq, mcreq_fallback_cb handler) | |||
860 | { | |||
861 | assert(!cq->fallback)((void) sizeof ((!cq->fallback) ? 1 : 0), __extension__ ({ if (!cq->fallback) ; else __assert_fail ("!cq->fallback" , "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 861, __extension__ __PRETTY_FUNCTION__); })); | |||
862 | cq->fallback = calloc(1, sizeof (mc_FALLBACKPL)); | |||
863 | mcreq_pipeline_init(cq->fallback); | |||
864 | cq->fallback->parent = cq; | |||
865 | cq->fallback->index = cq->npipelines; | |||
866 | ((mc_FALLBACKPL*)cq->fallback)->handler = handler; | |||
867 | cq->fallback->flush_start = do_fallback_flush; | |||
868 | } | |||
869 | ||||
870 | static void | |||
871 | noop_dumpfn(const void *d, unsigned n, FILE *fp) { (void)d;(void)n;(void)fp; } | |||
872 | ||||
873 | #define MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT ) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED) \ | |||
874 | X(KEY_NOCOPY) \ | |||
875 | X(VALUE_NOCOPY) \ | |||
876 | X(VALUE_IOV) \ | |||
877 | X(HASVALUE) \ | |||
878 | X(REQEXT) \ | |||
879 | X(UFWD) \ | |||
880 | X(FLUSHED) \ | |||
881 | X(INVOKED) \ | |||
882 | X(DETACHED) | |||
883 | ||||
884 | void | |||
885 | mcreq_dump_packet(const mc_PACKET *packet, FILE *fp, mcreq_payload_dump_fn dumpfn) | |||
886 | { | |||
887 | const char *indent = " "; | |||
888 | const mc_REQDATA *rdata = MCREQ_PKT_RDATA(packet)(((packet)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)( packet)->u_rdata.exdata) : (&(packet)->u_rdata.reqdata )); | |||
889 | ||||
890 | if (!dumpfn) { | |||
891 | dumpfn = noop_dumpfn; | |||
892 | } | |||
893 | if (!fp) { | |||
894 | fp = stderrstderr; | |||
895 | } | |||
896 | ||||
897 | fprintf(fp, "Packet @%p\n", (void *)packet); | |||
898 | fprintf(fp, "%sOPAQUE: %u\n", indent, (unsigned int)packet->opaque); | |||
899 | ||||
900 | fprintf(fp, "%sPKTFLAGS: 0x%x ", indent, packet->flags); | |||
901 | #define X(base) \ | |||
902 | if (packet->flags & MCREQ_F_##base) { fprintf(fp, "%s, ", #base); } | |||
903 | MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT ) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED) | |||
904 | #undef X | |||
905 | fprintf(fp, "\n"); | |||
906 | ||||
907 | fprintf(fp, "%sKey+Header Size: %u\n", indent, (unsigned int)packet->kh_span.size); | |||
908 | fprintf(fp, "%sKey Offset: %u\n", indent, MCREQ_PKT_BASESIZE24 + packet->extlen); | |||
909 | ||||
910 | ||||
911 | if (packet->flags & MCREQ_F_HASVALUE) { | |||
912 | if (packet->flags & MCREQ_F_VALUE_IOV) { | |||
913 | fprintf(fp, "%sValue Length: %u\n", indent, | |||
914 | packet->u_value.multi.total_length); | |||
915 | ||||
916 | fprintf(fp, "%sValue IOV: [start=%p, n=%d]\n", indent, | |||
917 | (void *)packet->u_value.multi.iov, packet->u_value.multi.niov); | |||
918 | } else { | |||
919 | if (packet->flags & MCREQ_F_VALUE_NOCOPY) { | |||
920 | fprintf(fp, "%sValue is user allocated\n", indent); | |||
921 | } | |||
922 | fprintf(fp, "%sValue: %p, %u bytes\n", indent, | |||
923 | (void *)SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1) ? ((char *)(&packet->u_value.single)->parent) : (( &packet->u_value.single)->parent->root + (&packet ->u_value.single)->offset)), packet->u_value.single.size); | |||
924 | } | |||
925 | } | |||
926 | ||||
927 | fprintf(fp, "%sRDATA(%s): %p\n", indent, | |||
928 | (packet->flags & MCREQ_F_REQEXT) ? "ALLOC" : "EMBEDDED", (void *)rdata); | |||
929 | ||||
930 | indent = " "; | |||
931 | fprintf(fp, "%sStart: %lu\n", indent, (unsigned long)rdata->start); | |||
932 | fprintf(fp, "%sCookie: %p\n", indent, rdata->cookie); | |||
933 | ||||
934 | indent = " "; | |||
935 | fprintf(fp, "%sNEXT: %p\n", indent, (void *)packet->slnode.next); | |||
936 | if (dumpfn != noop_dumpfn) { | |||
937 | fprintf(fp, "PACKET CONTENTS:\n"); | |||
938 | } | |||
939 | ||||
940 | fwrite(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)), 1, packet->kh_span.size, fp); | |||
941 | if (packet->flags & MCREQ_F_HASVALUE) { | |||
942 | if (packet->flags & MCREQ_F_VALUE_IOV) { | |||
943 | const lcb_IOV *iovs = packet->u_value.multi.iov; | |||
944 | unsigned ii, ixmax = packet->u_value.multi.niov; | |||
945 | for (ii = 0; ii < ixmax; ii++) { | |||
946 | dumpfn(iovs[ii].iov_base, iovs[ii].iov_len, fp); | |||
947 | } | |||
948 | } else { | |||
949 | const nb_SPAN *vspan = &packet->u_value.single; | |||
950 | dumpfn(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent ) : ((vspan)->parent->root + (vspan)->offset)), vspan->size, fp); | |||
951 | } | |||
952 | } | |||
953 | } | |||
954 | ||||
955 | void | |||
956 | mcreq_dump_chain(const mc_PIPELINE *pipeline, FILE *fp, mcreq_payload_dump_fn dumpfn) | |||
957 | { | |||
958 | sllist_node *ll; | |||
959 | SLLIST_FOREACH(&pipeline->requests, ll)for (ll = (&pipeline->requests)->first_prev.next; ll ; ll = ll->next) { | |||
960 | const mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET , slnode))); | |||
961 | mcreq_dump_packet(pkt, fp, dumpfn); | |||
962 | } | |||
963 | } |