File: | home/avsej/code/libcouchbase/src/mc/mcreq.c |
Warning: | line 365, column 28 Potential leak of memory pointed to by 'edst' |
1 | /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ | |||||
2 | /* | |||||
3 | * Copyright 2014 Couchbase, Inc. | |||||
4 | * | |||||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |||||
6 | * you may not use this file except in compliance with the License. | |||||
7 | * You may obtain a copy of the License at | |||||
8 | * | |||||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |||||
10 | * | |||||
11 | * Unless required by applicable law or agreed to in writing, software | |||||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |||||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
14 | * See the License for the specific language governing permissions and | |||||
15 | * limitations under the License. | |||||
16 | */ | |||||
17 | ||||||
18 | #include "mcreq.h" | |||||
19 | #include "compress.h" | |||||
20 | #include "sllist-inl.h" | |||||
21 | #include "internal.h" | |||||
22 | ||||||
23 | #define PKT_HDRSIZE(pkt)(24 + (pkt)->extlen) (MCREQ_PKT_BASESIZE24 + (pkt)->extlen) | |||||
24 | ||||||
25 | lcb_error_t | |||||
26 | mcreq_reserve_header( | |||||
27 | mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize) | |||||
28 | { | |||||
29 | int rv; | |||||
30 | packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24; | |||||
31 | packet->kh_span.size = hdrsize; | |||||
32 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span); | |||||
33 | if (rv != 0) { | |||||
34 | return LCB_CLIENT_ENOMEM; | |||||
35 | } | |||||
36 | return LCB_SUCCESS; | |||||
37 | } | |||||
38 | ||||||
39 | lcb_error_t | |||||
40 | mcreq_reserve_key( | |||||
41 | mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize, | |||||
42 | const lcb_KEYBUF *kreq) | |||||
43 | { | |||||
44 | const struct lcb_CONTIGBUF *contig = &kreq->contig; | |||||
45 | int rv; | |||||
46 | ||||||
47 | /** Set the key offset which is the start of the key from the buffer */ | |||||
48 | packet->extlen = hdrsize - MCREQ_PKT_BASESIZE24; | |||||
49 | packet->kh_span.size = kreq->contig.nbytes; | |||||
50 | ||||||
51 | if (kreq->type == LCB_KV_COPY) { | |||||
52 | /** | |||||
53 | * If the key is to be copied then just allocate the span size | |||||
54 | * for the key+24+extras | |||||
55 | */ | |||||
56 | packet->kh_span.size += hdrsize; | |||||
57 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span); | |||||
58 | if (rv != 0) { | |||||
59 | return LCB_CLIENT_ENOMEM; | |||||
60 | } | |||||
61 | ||||||
62 | /** | |||||
63 | * Copy the key into the packet starting at the extras end | |||||
64 | */ | |||||
65 | memcpy(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + hdrsize, | |||||
66 | contig->bytes, | |||||
67 | contig->nbytes); | |||||
68 | ||||||
69 | } else if (kreq->type == LCB_KV_CONTIG) { | |||||
70 | /** | |||||
71 | * Don't do any copying. | |||||
72 | * Assume the key buffer has enough space for the packet as well. | |||||
73 | */ | |||||
74 | CREATE_STANDALONE_SPAN(&packet->kh_span, contig->bytes, contig->nbytes)(&packet->kh_span)->parent = (nb_MBLOCK *) (void *) contig->bytes; (&packet->kh_span)->offset = (nb_SIZE )-1; (&packet->kh_span)->size = contig->nbytes;; | |||||
75 | packet->flags |= MCREQ_F_KEY_NOCOPY; | |||||
76 | ||||||
77 | } else { | |||||
78 | /** IOVs not supported for keys */ | |||||
79 | return LCB_EINVAL; | |||||
80 | } | |||||
81 | ||||||
82 | return LCB_SUCCESS; | |||||
83 | } | |||||
84 | ||||||
85 | lcb_error_t | |||||
86 | mcreq_reserve_value2(mc_PIPELINE *pl, mc_PACKET *pkt, lcb_size_t n) | |||||
87 | { | |||||
88 | int rv; | |||||
89 | pkt->u_value.single.size = n; | |||||
90 | if (!n) { | |||||
91 | return LCB_SUCCESS; | |||||
92 | } | |||||
93 | ||||||
94 | pkt->flags |= MCREQ_F_HASVALUE; | |||||
95 | rv = netbuf_mblock_reserve(&pl->nbmgr, &pkt->u_value.single); | |||||
96 | if (rv) { | |||||
97 | return LCB_CLIENT_ENOMEM; | |||||
98 | } | |||||
99 | return LCB_SUCCESS; | |||||
100 | } | |||||
101 | ||||||
102 | lcb_error_t | |||||
103 | mcreq_reserve_value( | |||||
104 | mc_PIPELINE *pipeline, mc_PACKET *packet, const lcb_VALBUF *vreq) | |||||
105 | { | |||||
106 | const lcb_CONTIGBUF *contig = &vreq->u_buf.contig; | |||||
107 | nb_SPAN *vspan = &packet->u_value.single; | |||||
108 | int rv; | |||||
109 | ||||||
110 | if (vreq->vtype == LCB_KV_COPY) { | |||||
111 | /** Copy the value into a single SPAN */ | |||||
112 | if (! (vspan->size = vreq->u_buf.contig.nbytes)) { | |||||
113 | return LCB_SUCCESS; | |||||
114 | } | |||||
115 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan); | |||||
116 | ||||||
117 | if (rv != 0) { | |||||
118 | return LCB_CLIENT_ENOMEM; | |||||
119 | } | |||||
120 | ||||||
121 | memcpy(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent ) : ((vspan)->parent->root + (vspan)->offset)), contig->bytes, contig->nbytes); | |||||
122 | ||||||
123 | } else if (vreq->vtype == LCB_KV_CONTIG) { | |||||
124 | /** It's still contiguous so make it a 'standalone' span */ | |||||
125 | CREATE_STANDALONE_SPAN(vspan, contig->bytes, contig->nbytes)(vspan)->parent = (nb_MBLOCK *) (void *)contig->bytes; ( vspan)->offset = (nb_SIZE)-1; (vspan)->size = contig-> nbytes;; | |||||
126 | packet->flags |= MCREQ_F_VALUE_NOCOPY; | |||||
127 | ||||||
128 | } else if (vreq->vtype == LCB_KV_IOV) { | |||||
129 | /** Multiple spans, no copy */ | |||||
130 | unsigned int ii; | |||||
131 | const lcb_FRAGBUF *msrc = &vreq->u_buf.multi; | |||||
132 | lcb_FRAGBUF *mdst = &packet->u_value.multi; | |||||
133 | ||||||
134 | packet->flags |= MCREQ_F_VALUE_IOV | MCREQ_F_VALUE_NOCOPY; | |||||
135 | mdst->niov = msrc->niov; | |||||
136 | mdst->iov = malloc(mdst->niov * sizeof(*mdst->iov)); | |||||
137 | mdst->total_length = 0; | |||||
138 | ||||||
139 | for (ii = 0; ii < mdst->niov; ii++) { | |||||
140 | mdst->iov[ii] = msrc->iov[ii]; | |||||
141 | mdst->total_length += mdst->iov[ii].iov_len; | |||||
142 | } | |||||
143 | } else if (vreq->vtype == LCB_KV_IOVCOPY) { | |||||
144 | /** Multiple input buffers, normal copying output buffer */ | |||||
145 | unsigned int ii, cur_offset; | |||||
146 | const lcb_FRAGBUF *msrc = &vreq->u_buf.multi; | |||||
147 | ||||||
148 | if (msrc->total_length) { | |||||
149 | vspan->size = msrc->total_length; | |||||
150 | } else { | |||||
151 | vspan->size = 0; | |||||
152 | for (ii = 0; ii < msrc->niov; ii++) { | |||||
153 | vspan->size += msrc->iov[ii].iov_len; | |||||
154 | } | |||||
155 | } | |||||
156 | ||||||
157 | rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan); | |||||
158 | if (rv != 0) { | |||||
159 | return LCB_CLIENT_ENOMEM; | |||||
160 | } | |||||
161 | ||||||
162 | for (ii = 0, cur_offset = 0; ii < msrc->niov; ii++) { | |||||
163 | char *buf = SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent ) : ((vspan)->parent->root + (vspan)->offset)) + cur_offset; | |||||
164 | memcpy(buf, msrc->iov[ii].iov_base, msrc->iov[ii].iov_len); | |||||
165 | cur_offset += msrc->iov[ii].iov_len; | |||||
166 | } | |||||
167 | } | |||||
168 | ||||||
169 | packet->flags |= MCREQ_F_HASVALUE; | |||||
170 | return LCB_SUCCESS; | |||||
171 | } | |||||
172 | ||||||
173 | static int | |||||
174 | pkt_tmo_compar(sllist_node *a, sllist_node *b) | |||||
175 | { | |||||
176 | mc_PACKET *pa, *pb; | |||||
177 | hrtime_t tmo_a, tmo_b; | |||||
178 | ||||||
179 | pa = SLLIST_ITEM(a, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(a) - __builtin_offsetof(mc_PACKET , slnode))); | |||||
180 | pb = SLLIST_ITEM(b, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(b) - __builtin_offsetof(mc_PACKET , slnode))); | |||||
181 | ||||||
182 | tmo_a = MCREQ_PKT_RDATA(pa)(((pa)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pa)-> u_rdata.exdata) : (&(pa)->u_rdata.reqdata))->start; | |||||
183 | tmo_b = MCREQ_PKT_RDATA(pb)(((pb)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pb)-> u_rdata.exdata) : (&(pb)->u_rdata.reqdata))->start; | |||||
184 | ||||||
185 | if (tmo_a == tmo_b) { | |||||
186 | return 0; | |||||
187 | } else if (tmo_a < tmo_b) { | |||||
188 | return -1; | |||||
189 | } else { | |||||
190 | return 1; | |||||
191 | } | |||||
192 | } | |||||
193 | ||||||
194 | void | |||||
195 | mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) | |||||
196 | { | |||||
197 | sllist_root *reqs = &pipeline->requests; | |||||
198 | mcreq_enqueue_packet(pipeline, packet); | |||||
199 | sllist_remove(reqs, &packet->slnode); | |||||
200 | sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar); | |||||
201 | } | |||||
202 | ||||||
203 | void | |||||
204 | mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) | |||||
205 | { | |||||
206 | nb_SPAN *vspan = &packet->u_value.single; | |||||
207 | sllist_append(&pipeline->requests, &packet->slnode); | |||||
208 | netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span); | |||||
209 | ||||||
210 | if (!(packet->flags & MCREQ_F_HASVALUE)) { | |||||
211 | goto GT_ENQUEUE_PDU; | |||||
212 | } | |||||
213 | ||||||
214 | if (packet->flags & MCREQ_F_VALUE_IOV) { | |||||
215 | unsigned int ii; | |||||
216 | lcb_FRAGBUF *multi = &packet->u_value.multi; | |||||
217 | for (ii = 0; ii < multi->niov; ii++) { | |||||
218 | netbuf_enqueue(&pipeline->nbmgr, (nb_IOV *)multi->iov + ii); | |||||
219 | } | |||||
220 | ||||||
221 | } else if (vspan->size) { | |||||
222 | netbuf_enqueue_span(&pipeline->nbmgr, vspan); | |||||
223 | } | |||||
224 | ||||||
225 | GT_ENQUEUE_PDU: | |||||
226 | netbuf_pdu_enqueue(&pipeline->nbmgr, packet, offsetof(mc_PACKET, sl_flushq)__builtin_offsetof(mc_PACKET, sl_flushq)); | |||||
227 | } | |||||
228 | ||||||
229 | void | |||||
230 | mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) | |||||
231 | { | |||||
232 | if (! (packet->flags & MCREQ_F_KEY_NOCOPY)) { | |||||
233 | if (packet->flags & MCREQ_F_DETACHED) { | |||||
234 | free(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset))); | |||||
235 | } else { | |||||
236 | netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span); | |||||
237 | } | |||||
238 | } | |||||
239 | ||||||
240 | if (! (packet->flags & MCREQ_F_HASVALUE)) { | |||||
241 | return; | |||||
242 | } | |||||
243 | ||||||
244 | if (packet->flags & MCREQ_F_VALUE_NOCOPY) { | |||||
245 | if (packet->flags & MCREQ_F_VALUE_IOV) { | |||||
246 | free(packet->u_value.multi.iov); | |||||
247 | } | |||||
248 | ||||||
249 | return; | |||||
250 | } | |||||
251 | ||||||
252 | if (packet->flags & MCREQ_F_DETACHED) { | |||||
253 | free(SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1) ? ((char *)(&packet->u_value.single)->parent) : (( &packet->u_value.single)->parent->root + (&packet ->u_value.single)->offset))); | |||||
254 | } else { | |||||
255 | netbuf_mblock_release(&pipeline->nbmgr, &packet->u_value.single); | |||||
256 | } | |||||
257 | ||||||
258 | } | |||||
259 | ||||||
260 | mc_PACKET * | |||||
261 | mcreq_allocate_packet(mc_PIPELINE *pipeline) | |||||
262 | { | |||||
263 | nb_SPAN span; | |||||
264 | int rv; | |||||
265 | mc_PACKET *ret; | |||||
266 | span.size = sizeof(*ret); | |||||
267 | ||||||
268 | rv = netbuf_mblock_reserve(&pipeline->reqpool, &span); | |||||
269 | if (rv != 0) { | |||||
270 | return NULL((void*)0); | |||||
271 | } | |||||
272 | ||||||
273 | ret = (void *) SPAN_MBUFFER_NC(&span)((&span)->parent->root + (&span)->offset); | |||||
274 | ret->alloc_parent = span.parent; | |||||
275 | ret->flags = 0; | |||||
276 | ret->retries = 0; | |||||
277 | ret->opaque = pipeline->parent->seq++; | |||||
278 | return ret; | |||||
279 | } | |||||
280 | ||||||
281 | void | |||||
282 | mcreq_release_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) | |||||
283 | { | |||||
284 | nb_SPAN span; | |||||
285 | if (packet->flags & MCREQ_F_DETACHED) { | |||||
286 | sllist_iterator iter; | |||||
287 | mc_EXPACKET *epkt = (mc_EXPACKET *)packet; | |||||
288 | ||||||
289 | SLLIST_ITERFOR(&epkt->data, &iter)for (slist_iter_init(&epkt->data, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&epkt->data , &iter)) { | |||||
290 | mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_EPKTDATUM, slnode))); | |||||
291 | sllist_iter_remove(&epkt->data, &iter); | |||||
292 | d->dtorfn(d); | |||||
293 | } | |||||
294 | free(epkt); | |||||
295 | return; | |||||
296 | } | |||||
297 | ||||||
298 | span.size = sizeof(*packet); | |||||
299 | span.parent = packet->alloc_parent; | |||||
300 | span.offset = (char *)packet - packet->alloc_parent->root; | |||||
301 | ||||||
302 | netbuf_mblock_release(&pipeline->reqpool, &span); | |||||
303 | } | |||||
304 | ||||||
305 | #define MCREQ_DETACH_WIPESRC1 1 | |||||
306 | ||||||
307 | mc_PACKET * | |||||
308 | mcreq_renew_packet(const mc_PACKET *src) | |||||
309 | { | |||||
310 | char *kdata, *vdata; | |||||
311 | unsigned nvdata; | |||||
312 | mc_PACKET *dst; | |||||
313 | mc_EXPACKET *edst = calloc(1, sizeof(*edst)); | |||||
| ||||||
314 | ||||||
315 | dst = &edst->base; | |||||
316 | *dst = *src; | |||||
317 | ||||||
318 | kdata = malloc(src->kh_span.size); | |||||
319 | memcpy(kdata, SPAN_BUFFER(&src->kh_span)(((&src->kh_span)->offset == (nb_SIZE)-1) ? ((char * )(&src->kh_span)->parent) : ((&src->kh_span) ->parent->root + (&src->kh_span)->offset)), src->kh_span.size); | |||||
320 | CREATE_STANDALONE_SPAN(&dst->kh_span, kdata, src->kh_span.size)(&dst->kh_span)->parent = (nb_MBLOCK *) (void *)kdata ; (&dst->kh_span)->offset = (nb_SIZE)-1; (&dst-> kh_span)->size = src->kh_span.size;; | |||||
321 | ||||||
322 | dst->flags &= ~(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY|MCREQ_F_VALUE_IOV); | |||||
323 | dst->flags |= MCREQ_F_DETACHED; | |||||
324 | dst->alloc_parent = NULL((void*)0); | |||||
325 | dst->sl_flushq.next = NULL((void*)0); | |||||
326 | dst->slnode.next = NULL((void*)0); | |||||
327 | dst->retries = src->retries; | |||||
328 | ||||||
329 | if (src->flags & MCREQ_F_HASVALUE) { | |||||
330 | /** Get the length */ | |||||
331 | if (src->flags & MCREQ_F_VALUE_IOV) { | |||||
332 | unsigned ii; | |||||
333 | unsigned offset = 0; | |||||
334 | ||||||
335 | nvdata = src->u_value.multi.total_length; | |||||
336 | vdata = malloc(nvdata); | |||||
337 | for (ii = 0; ii < src->u_value.multi.niov; ii++) { | |||||
338 | const lcb_IOV *iov = src->u_value.multi.iov + ii; | |||||
339 | ||||||
340 | memcpy(vdata + offset, iov->iov_base, iov->iov_len); | |||||
341 | offset += iov->iov_len; | |||||
342 | } | |||||
343 | } else { | |||||
344 | protocol_binary_request_header hdr; | |||||
345 | const nb_SPAN *origspan = &src->u_value.single; | |||||
346 | mcreq_read_hdr(dst, &hdr)memcpy( (&hdr)->bytes, (((&(dst)->kh_span)-> offset == (nb_SIZE)-1) ? ((char *)(&(dst)->kh_span)-> parent) : ((&(dst)->kh_span)->parent->root + (& (dst)->kh_span)->offset)), sizeof((&hdr)->bytes) ); | |||||
347 | ||||||
348 | if (hdr.request.datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) { | |||||
349 | /* For compressed payloads we need to uncompress it first | |||||
350 | * because it may be forwarded to a server without compression. | |||||
351 | * TODO: might be more clever to check a setting flag somewhere | |||||
352 | * and see if we should do this. */ | |||||
353 | ||||||
354 | lcb_SIZE n_inflated; | |||||
355 | const void *inflated; | |||||
356 | int rv; | |||||
357 | ||||||
358 | vdata = NULL((void*)0); | |||||
359 | rv = mcreq_inflate_value(SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan) ->parent) : ((origspan)->parent->root + (origspan)-> offset)), origspan->size, | |||||
360 | &inflated, &n_inflated, (void**)&vdata); | |||||
361 | ||||||
362 | assert(vdata == inflated)((void) sizeof ((vdata == inflated) ? 1 : 0), __extension__ ( { if (vdata == inflated) ; else __assert_fail ("vdata == inflated" , "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 362, __extension__ __PRETTY_FUNCTION__); })); | |||||
363 | ||||||
364 | if (rv != 0) { | |||||
365 | return NULL((void*)0); | |||||
| ||||||
366 | } | |||||
367 | nvdata = n_inflated; | |||||
368 | hdr.request.datatype &= ~PROTOCOL_BINARY_DATATYPE_COMPRESSED; | |||||
369 | hdr.request.bodylen = htonl( | |||||
370 | ntohs(hdr.request.keylen) + | |||||
371 | hdr.request.extlen + | |||||
372 | n_inflated); | |||||
373 | mcreq_write_hdr(dst, &hdr)memcpy( (((&(dst)->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&(dst)->kh_span)->parent) : ((&(dst )->kh_span)->parent->root + (&(dst)->kh_span) ->offset)), (&hdr)->bytes, sizeof((&hdr)->bytes ) ); | |||||
374 | ||||||
375 | } else { | |||||
376 | nvdata = origspan->size; | |||||
377 | vdata = malloc(nvdata); | |||||
378 | memcpy(vdata, SPAN_BUFFER(origspan)(((origspan)->offset == (nb_SIZE)-1) ? ((char *)(origspan) ->parent) : ((origspan)->parent->root + (origspan)-> offset)), nvdata); | |||||
379 | } | |||||
380 | } | |||||
381 | ||||||
382 | /* Declare the value as a standalone malloc'd span */ | |||||
383 | CREATE_STANDALONE_SPAN(&dst->u_value.single, vdata, nvdata)(&dst->u_value.single)->parent = (nb_MBLOCK *) (void *)vdata; (&dst->u_value.single)->offset = (nb_SIZE )-1; (&dst->u_value.single)->size = nvdata;; | |||||
384 | } | |||||
385 | ||||||
386 | if (src->flags & MCREQ_F_DETACHED) { | |||||
387 | mc_EXPACKET *esrc = (mc_EXPACKET *)src; | |||||
388 | sllist_iterator iter; | |||||
389 | SLLIST_ITERFOR(&esrc->data, &iter)for (slist_iter_init(&esrc->data, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&esrc->data , &iter)) { | |||||
390 | sllist_node *cur = iter.cur; | |||||
391 | sllist_iter_remove(&esrc->data, &iter); | |||||
392 | sllist_append(&edst->data, cur); | |||||
393 | } | |||||
394 | } | |||||
395 | return dst; | |||||
396 | } | |||||
397 | ||||||
398 | int | |||||
399 | mcreq_epkt_insert(mc_EXPACKET *ep, mc_EPKTDATUM *datum) | |||||
400 | { | |||||
401 | if (!(ep->base.flags & MCREQ_F_DETACHED)) { | |||||
402 | return -1; | |||||
403 | } | |||||
404 | assert(!sllist_contains(&ep->data, &datum->slnode))((void) sizeof ((!sllist_contains(&ep->data, &datum ->slnode)) ? 1 : 0), __extension__ ({ if (!sllist_contains (&ep->data, &datum->slnode)) ; else __assert_fail ("!sllist_contains(&ep->data, &datum->slnode)" , "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 404, __extension__ __PRETTY_FUNCTION__); })); | |||||
405 | sllist_append(&ep->data, &datum->slnode); | |||||
406 | return 0; | |||||
407 | } | |||||
408 | ||||||
409 | mc_EPKTDATUM * | |||||
410 | mcreq_epkt_find(mc_EXPACKET *ep, const char *key) | |||||
411 | { | |||||
412 | sllist_iterator iter; | |||||
413 | SLLIST_ITERFOR(&ep->data, &iter)for (slist_iter_init(&ep->data, &iter); !((&iter )->cur == ((void*)0)); slist_iter_incr(&ep->data, & iter)) { | |||||
414 | mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode)((mc_EPKTDATUM *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_EPKTDATUM, slnode))); | |||||
415 | if (!strcmp(key, d->key)) { | |||||
416 | return d; | |||||
417 | } | |||||
418 | } | |||||
419 | return NULL((void*)0); | |||||
420 | } | |||||
421 | ||||||
422 | void | |||||
423 | mcreq_map_key(mc_CMDQUEUE *queue, | |||||
424 | const lcb_KEYBUF *key, const lcb_KEYBUF *hashkey, | |||||
425 | unsigned nhdr, int *vbid, int *srvix) | |||||
426 | { | |||||
427 | const void *hk; | |||||
428 | size_t nhk = 0; | |||||
429 | if (hashkey) { | |||||
430 | if (hashkey->type == LCB_KV_COPY && hashkey->contig.bytes != NULL((void*)0)) { | |||||
431 | hk = hashkey->contig.bytes; | |||||
432 | nhk = hashkey->contig.nbytes; | |||||
433 | } else if (hashkey->type == LCB_KV_VBID) { | |||||
434 | *vbid = hashkey->contig.nbytes; | |||||
435 | *srvix = lcbvb_vbmaster(queue->config, *vbid); | |||||
436 | return; | |||||
437 | } | |||||
438 | } | |||||
439 | if (!nhk) { | |||||
440 | if (key->type == LCB_KV_COPY) { | |||||
441 | hk = key->contig.bytes; | |||||
442 | nhk = key->contig.nbytes; | |||||
443 | } else { | |||||
444 | const char *buf = key->contig.bytes; | |||||
445 | buf += nhdr; | |||||
446 | hk = buf; | |||||
447 | nhk = key->contig.nbytes - nhdr; | |||||
448 | } | |||||
449 | } | |||||
450 | lcbvb_map_key(queue->config, hk, nhk, vbid, srvix); | |||||
451 | } | |||||
452 | ||||||
453 | lcb_error_t | |||||
454 | mcreq_basic_packet( | |||||
455 | mc_CMDQUEUE *queue, const lcb_CMDBASE *cmd, | |||||
456 | protocol_binary_request_header *req, lcb_uint8_t extlen, | |||||
457 | mc_PACKET **packet, mc_PIPELINE **pipeline, int options) | |||||
458 | { | |||||
459 | int vb, srvix; | |||||
460 | ||||||
461 | if (!queue->config) { | |||||
462 | return LCB_CLIENT_ETMPFAILLCB_CLIENT_ENOCONF; | |||||
463 | } | |||||
464 | if (!cmd) { | |||||
465 | return LCB_EINVAL; | |||||
466 | } | |||||
467 | ||||||
468 | mcreq_map_key(queue, &cmd->key, &cmd->_hashkey, | |||||
469 | sizeof(*req) + extlen, &vb, &srvix); | |||||
470 | if (srvix > -1 && srvix < (int)queue->npipelines) { | |||||
471 | *pipeline = queue->pipelines[srvix]; | |||||
472 | ||||||
473 | } else { | |||||
474 | if ((options & MCREQ_BASICPACKET_F_FALLBACKOK0x01) && queue->fallback) { | |||||
475 | *pipeline = queue->fallback; | |||||
476 | } else { | |||||
477 | return LCB_NO_MATCHING_SERVER; | |||||
478 | } | |||||
479 | } | |||||
480 | ||||||
481 | *packet = mcreq_allocate_packet(*pipeline); | |||||
482 | if (*packet == NULL((void*)0)) { | |||||
483 | return LCB_CLIENT_ENOMEM; | |||||
484 | } | |||||
485 | ||||||
486 | mcreq_reserve_key(*pipeline, *packet, sizeof(*req) + extlen, &cmd->key); | |||||
487 | ||||||
488 | req->request.keylen = htons((*packet)->kh_span.size - PKT_HDRSIZE(*packet)(24 + (*packet)->extlen)); | |||||
489 | req->request.vbucket = htons(vb); | |||||
490 | req->request.extlen = extlen; | |||||
491 | return LCB_SUCCESS; | |||||
492 | } | |||||
493 | ||||||
494 | void | |||||
495 | mcreq_get_key(const mc_PACKET *packet, const void **key, lcb_size_t *nkey) | |||||
496 | { | |||||
497 | *key = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + PKT_HDRSIZE(packet)(24 + (packet)->extlen); | |||||
498 | *nkey = packet->kh_span.size - PKT_HDRSIZE(packet)(24 + (packet)->extlen); | |||||
499 | } | |||||
500 | ||||||
501 | lcb_uint32_t | |||||
502 | mcreq_get_bodysize(const mc_PACKET *packet) | |||||
503 | { | |||||
504 | lcb_uint32_t ret; | |||||
505 | char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + 8; | |||||
506 | if ((uintptr_t)retptr % sizeof(ret) == 0) { | |||||
507 | return ntohl(*(lcb_uint32_t*) (void *)retptr); | |||||
508 | } else { | |||||
509 | memcpy(&ret, retptr, sizeof(ret)); | |||||
510 | return ntohl(ret); | |||||
511 | } | |||||
512 | } | |||||
513 | ||||||
514 | uint16_t | |||||
515 | mcreq_get_vbucket(const mc_PACKET *packet) | |||||
516 | { | |||||
517 | uint16_t ret; | |||||
518 | char *retptr = SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)) + 6; | |||||
519 | if ((uintptr_t)retptr % sizeof(ret) == 0) { | |||||
520 | return ntohs(*(uint16_t*)(void*)retptr); | |||||
521 | } else { | |||||
522 | memcpy(&ret, retptr, sizeof ret); | |||||
523 | return ntohs(ret); | |||||
524 | } | |||||
525 | } | |||||
526 | ||||||
527 | uint32_t | |||||
528 | mcreq_get_size(const mc_PACKET *packet) | |||||
529 | { | |||||
530 | uint32_t sz = packet->kh_span.size; | |||||
531 | if (packet->flags & MCREQ_F_HASVALUE) { | |||||
532 | if (packet->flags & MCREQ_F_VALUE_IOV) { | |||||
533 | sz += packet->u_value.multi.total_length; | |||||
534 | } else { | |||||
535 | sz += packet->u_value.single.size; | |||||
536 | } | |||||
537 | } | |||||
538 | return sz; | |||||
539 | } | |||||
540 | ||||||
541 | void | |||||
542 | mcreq_pipeline_cleanup(mc_PIPELINE *pipeline) | |||||
543 | { | |||||
544 | netbuf_cleanup(&pipeline->nbmgr); | |||||
545 | netbuf_cleanup(&pipeline->reqpool); | |||||
546 | } | |||||
547 | ||||||
548 | int | |||||
549 | mcreq_pipeline_init(mc_PIPELINE *pipeline) | |||||
550 | { | |||||
551 | nb_SETTINGS settings; | |||||
552 | ||||||
553 | /* Initialize all members to 0 */ | |||||
554 | memset(&pipeline->requests, 0, sizeof pipeline->requests); | |||||
555 | pipeline->parent = NULL((void*)0); | |||||
556 | pipeline->flush_start = NULL((void*)0); | |||||
557 | pipeline->index = 0; | |||||
558 | memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued); | |||||
559 | pipeline->buf_done_callback = NULL((void*)0); | |||||
560 | ||||||
561 | netbuf_default_settings(&settings); | |||||
562 | ||||||
563 | /** Initialize datapool */ | |||||
564 | netbuf_init(&pipeline->nbmgr, &settings); | |||||
565 | ||||||
566 | /** Initialize request pool */ | |||||
567 | settings.data_basealloc = sizeof(mc_PACKET) * 32; | |||||
568 | netbuf_init(&pipeline->reqpool, &settings);; | |||||
569 | return 0; | |||||
570 | } | |||||
571 | ||||||
572 | void | |||||
573 | mcreq_queue_add_pipelines( | |||||
574 | mc_CMDQUEUE *queue, mc_PIPELINE * const *pipelines, unsigned npipelines, | |||||
575 | lcbvb_CONFIG* config) | |||||
576 | { | |||||
577 | unsigned ii; | |||||
578 | ||||||
579 | lcb_assert(queue->pipelines == NULL)((void) sizeof ((queue->pipelines == ((void*)0)) ? 1 : 0), __extension__ ({ if (queue->pipelines == ((void*)0)) ; else __assert_fail ("queue->pipelines == ((void*)0)", "/home/avsej/code/libcouchbase/src/mc/mcreq.c" , 579, __extension__ __PRETTY_FUNCTION__); })); | |||||
580 | queue->npipelines = npipelines; | |||||
581 | queue->_npipelines_ex = queue->npipelines; | |||||
582 | queue->pipelines = malloc(sizeof(*pipelines) * (npipelines + 1)); | |||||
583 | queue->config = config; | |||||
584 | ||||||
585 | memcpy(queue->pipelines, pipelines, sizeof(*pipelines) * npipelines); | |||||
586 | ||||||
587 | free(queue->scheds); | |||||
588 | queue->scheds = calloc(npipelines+1, 1); | |||||
589 | ||||||
590 | for (ii = 0; ii < npipelines; ii++) { | |||||
591 | pipelines[ii]->parent = queue; | |||||
592 | pipelines[ii]->index = ii; | |||||
593 | } | |||||
594 | ||||||
595 | if (queue->fallback) { | |||||
596 | queue->fallback->index = npipelines; | |||||
597 | queue->pipelines[queue->npipelines] = queue->fallback; | |||||
598 | queue->_npipelines_ex++; | |||||
599 | } | |||||
600 | } | |||||
601 | ||||||
602 | mc_PIPELINE ** | |||||
603 | mcreq_queue_take_pipelines(mc_CMDQUEUE *queue, unsigned *count) | |||||
604 | { | |||||
605 | mc_PIPELINE **ret = queue->pipelines; | |||||
606 | *count = queue->npipelines; | |||||
607 | queue->pipelines = NULL((void*)0); | |||||
608 | queue->npipelines = 0; | |||||
609 | return ret; | |||||
610 | } | |||||
611 | ||||||
612 | int | |||||
613 | mcreq_queue_init(mc_CMDQUEUE *queue) | |||||
614 | { | |||||
615 | queue->seq = 0; | |||||
616 | queue->pipelines = NULL((void*)0); | |||||
617 | queue->scheds = NULL((void*)0); | |||||
618 | queue->fallback = NULL((void*)0); | |||||
619 | queue->npipelines = 0; | |||||
620 | return 0; | |||||
621 | } | |||||
622 | ||||||
623 | void | |||||
624 | mcreq_queue_cleanup(mc_CMDQUEUE *queue) | |||||
625 | { | |||||
626 | if (queue->fallback) { | |||||
627 | mcreq_pipeline_cleanup(queue->fallback); | |||||
628 | free(queue->fallback); | |||||
629 | queue->fallback = NULL((void*)0); | |||||
630 | } | |||||
631 | free(queue->scheds); | |||||
632 | free(queue->pipelines); | |||||
633 | queue->scheds = NULL((void*)0); | |||||
634 | } | |||||
635 | ||||||
636 | void | |||||
637 | mcreq_sched_enter(mc_CMDQUEUE *queue) | |||||
638 | { | |||||
639 | queue->ctxenter = 1; | |||||
640 | } | |||||
641 | ||||||
642 | ||||||
643 | ||||||
644 | static void | |||||
645 | queuectx_leave(mc_CMDQUEUE *queue, int success, int flush) | |||||
646 | { | |||||
647 | unsigned ii; | |||||
648 | ||||||
649 | if (queue->ctxenter) { | |||||
650 | queue->ctxenter = 0; | |||||
651 | } | |||||
652 | ||||||
653 | for (ii = 0; ii < queue->_npipelines_ex; ii++) { | |||||
654 | mc_PIPELINE *pipeline; | |||||
655 | sllist_node *ll_next, *ll; | |||||
656 | ||||||
657 | if (!queue->scheds[ii]) { | |||||
658 | continue; | |||||
659 | } | |||||
660 | ||||||
661 | pipeline = queue->pipelines[ii]; | |||||
662 | ll = SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next; | |||||
663 | ||||||
664 | while (ll) { | |||||
665 | mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET , slnode))); | |||||
666 | ll_next = ll->next; | |||||
667 | ||||||
668 | if (success) { | |||||
669 | mcreq_enqueue_packet(pipeline, pkt); | |||||
670 | } else { | |||||
671 | if (pkt->flags & MCREQ_F_REQEXT) { | |||||
672 | mc_REQDATAEX *rd = pkt->u_rdata.exdata; | |||||
673 | if (rd->procs->fail_dtor) { | |||||
674 | rd->procs->fail_dtor(pkt); | |||||
675 | } | |||||
676 | } | |||||
677 | mcreq_wipe_packet(pipeline, pkt); | |||||
678 | mcreq_release_packet(pipeline, pkt); | |||||
679 | } | |||||
680 | ||||||
681 | ll = ll_next; | |||||
682 | } | |||||
683 | SLLIST_FIRST(&pipeline->ctxqueued)(&pipeline->ctxqueued)->first_prev.next = pipeline->ctxqueued.last = NULL((void*)0); | |||||
684 | if (flush) { | |||||
685 | pipeline->flush_start(pipeline); | |||||
686 | } | |||||
687 | queue->scheds[ii] = 0; | |||||
688 | } | |||||
689 | } | |||||
690 | ||||||
691 | void | |||||
692 | mcreq_sched_leave(mc_CMDQUEUE *queue, int do_flush) | |||||
693 | { | |||||
694 | queuectx_leave(queue, 1, do_flush); | |||||
695 | } | |||||
696 | ||||||
697 | void | |||||
698 | mcreq_sched_fail(mc_CMDQUEUE *queue) | |||||
699 | { | |||||
700 | queuectx_leave(queue, 0, 0); | |||||
701 | } | |||||
702 | ||||||
703 | void | |||||
704 | mcreq_sched_add(mc_PIPELINE *pipeline, mc_PACKET *pkt) | |||||
705 | { | |||||
706 | mc_CMDQUEUE *cq = pipeline->parent; | |||||
707 | if (!cq->scheds[pipeline->index]) { | |||||
708 | cq->scheds[pipeline->index] = 1; | |||||
709 | } | |||||
710 | sllist_append(&pipeline->ctxqueued, &pkt->slnode); | |||||
711 | } | |||||
712 | ||||||
713 | static mc_PACKET * | |||||
714 | pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque, int do_remove) | |||||
715 | { | |||||
716 | sllist_iterator iter; | |||||
717 | SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); ! ((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline ->requests, &iter)) { | |||||
718 | mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); | |||||
719 | if (pkt->opaque == opaque) { | |||||
720 | if (do_remove) { | |||||
721 | sllist_iter_remove(&pipeline->requests, &iter); | |||||
722 | } | |||||
723 | return pkt; | |||||
724 | } | |||||
725 | } | |||||
726 | return NULL((void*)0); | |||||
727 | } | |||||
728 | ||||||
729 | mc_PACKET * | |||||
730 | mcreq_pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque) | |||||
731 | { | |||||
732 | return pipeline_find(pipeline, opaque, 0); | |||||
733 | } | |||||
734 | ||||||
735 | mc_PACKET * | |||||
736 | mcreq_pipeline_remove(mc_PIPELINE *pipeline, lcb_uint32_t opaque) | |||||
737 | { | |||||
738 | return pipeline_find(pipeline, opaque, 1); | |||||
739 | } | |||||
740 | ||||||
741 | void | |||||
742 | mcreq_packet_done(mc_PIPELINE *pipeline, mc_PACKET *pkt) | |||||
743 | { | |||||
744 | assert(pkt->flags & MCREQ_F_FLUSHED)((void) sizeof ((pkt->flags & MCREQ_F_FLUSHED) ? 1 : 0 ), __extension__ ({ if (pkt->flags & MCREQ_F_FLUSHED) ; else __assert_fail ("pkt->flags & MCREQ_F_FLUSHED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c" , 744, __extension__ __PRETTY_FUNCTION__); })); | |||||
745 | assert(pkt->flags & MCREQ_F_INVOKED)((void) sizeof ((pkt->flags & MCREQ_F_INVOKED) ? 1 : 0 ), __extension__ ({ if (pkt->flags & MCREQ_F_INVOKED) ; else __assert_fail ("pkt->flags & MCREQ_F_INVOKED", "/home/avsej/code/libcouchbase/src/mc/mcreq.c" , 745, __extension__ __PRETTY_FUNCTION__); })); | |||||
746 | if (pkt->flags & MCREQ_UBUF_FLAGS(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY)) { | |||||
747 | void *kbuf, *vbuf; | |||||
748 | const void *cookie; | |||||
749 | ||||||
750 | cookie = MCREQ_PKT_COOKIE(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt )->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))-> cookie; | |||||
751 | if (pkt->flags & MCREQ_F_KEY_NOCOPY) { | |||||
752 | kbuf = SPAN_BUFFER(&pkt->kh_span)(((&pkt->kh_span)->offset == (nb_SIZE)-1) ? ((char * )(&pkt->kh_span)->parent) : ((&pkt->kh_span) ->parent->root + (&pkt->kh_span)->offset)); | |||||
753 | } else { | |||||
754 | kbuf = NULL((void*)0); | |||||
755 | } | |||||
756 | if (pkt->flags & MCREQ_F_VALUE_NOCOPY) { | |||||
757 | if (pkt->flags & MCREQ_F_VALUE_IOV) { | |||||
758 | vbuf = pkt->u_value.multi.iov->iov_base; | |||||
759 | } else { | |||||
760 | vbuf = SPAN_SABUFFER_NC(&pkt->u_value.single)((char *)(&pkt->u_value.single)->parent); | |||||
761 | } | |||||
762 | } else { | |||||
763 | vbuf = NULL((void*)0); | |||||
764 | } | |||||
765 | ||||||
766 | pipeline->buf_done_callback(pipeline, cookie, kbuf, vbuf); | |||||
767 | } | |||||
768 | mcreq_wipe_packet(pipeline, pkt); | |||||
769 | mcreq_release_packet(pipeline, pkt); | |||||
770 | } | |||||
771 | ||||||
772 | void | |||||
773 | mcreq_reset_timeouts(mc_PIPELINE *pl, lcb_U64 nstime) | |||||
774 | { | |||||
775 | sllist_node *nn; | |||||
776 | SLLIST_ITERBASIC(&pl->requests, nn)for (nn = (&pl->requests)->first_prev.next; nn; nn = nn->next) { | |||||
777 | mc_PACKET *pkt = SLLIST_ITEM(nn, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(nn) - __builtin_offsetof(mc_PACKET , slnode))); | |||||
778 | MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt )->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata))->start = nstime; | |||||
779 | } | |||||
780 | } | |||||
781 | ||||||
782 | unsigned | |||||
783 | mcreq_pipeline_timeout( | |||||
784 | mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *cbarg, | |||||
785 | hrtime_t oldest_valid, hrtime_t *oldest_start) | |||||
786 | { | |||||
787 | sllist_iterator iter; | |||||
788 | unsigned count = 0; | |||||
789 | ||||||
790 | SLLIST_ITERFOR(&pl->requests, &iter)for (slist_iter_init(&pl->requests, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&pl->requests , &iter)) { | |||||
791 | mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); | |||||
792 | mc_REQDATA *rd = MCREQ_PKT_RDATA(pkt)(((pkt)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)(pkt )->u_rdata.exdata) : (&(pkt)->u_rdata.reqdata)); | |||||
793 | ||||||
794 | /** | |||||
795 | * oldest_valid contains the LOWEST timestamp we can admit to being | |||||
796 | * acceptable. If the current command is newer (i.e. has a higher | |||||
797 | * timestamp) then we break the iteration and return. | |||||
798 | */ | |||||
799 | if (oldest_valid && rd->start > oldest_valid) { | |||||
800 | if (oldest_start) { | |||||
801 | *oldest_start = rd->start; | |||||
802 | } | |||||
803 | return count; | |||||
804 | } | |||||
805 | ||||||
806 | sllist_iter_remove(&pl->requests, &iter); | |||||
807 | failcb(pl, pkt, err, cbarg); | |||||
808 | mcreq_packet_handled(pl, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags & MCREQ_F_FLUSHED) { mcreq_packet_done(pl, pkt); } } while (0) ;; | |||||
809 | count++; | |||||
810 | } | |||||
811 | return count; | |||||
812 | } | |||||
813 | ||||||
814 | unsigned | |||||
815 | mcreq_pipeline_fail( | |||||
816 | mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *arg) | |||||
817 | { | |||||
818 | return mcreq_pipeline_timeout(pl, err, failcb, arg, 0, NULL((void*)0)); | |||||
819 | } | |||||
820 | ||||||
821 | void | |||||
822 | mcreq_iterwipe(mc_CMDQUEUE *queue, mc_PIPELINE *src, | |||||
823 | mcreq_iterwipe_fn callback, void *arg) | |||||
824 | { | |||||
825 | sllist_iterator iter; | |||||
826 | ||||||
827 | SLLIST_ITERFOR(&src->requests, &iter)for (slist_iter_init(&src->requests, &iter); !((& iter)->cur == ((void*)0)); slist_iter_incr(&src->requests , &iter)) { | |||||
828 | int rv; | |||||
829 | mc_PACKET *orig = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); | |||||
830 | rv = callback(queue, src, orig, arg); | |||||
831 | if (rv == MCREQ_REMOVE_PACKET2) { | |||||
832 | sllist_iter_remove(&src->requests, &iter); | |||||
833 | } | |||||
834 | } | |||||
835 | } | |||||
836 | ||||||
837 | #include "mcreq-flush-inl.h" | |||||
838 | typedef struct { | |||||
839 | mc_PIPELINE base; | |||||
840 | mcreq_fallback_cb handler; | |||||
841 | } mc_FALLBACKPL; | |||||
842 | ||||||
843 | static void | |||||
844 | do_fallback_flush(mc_PIPELINE *pipeline) | |||||
845 | { | |||||
846 | nb_IOV iov; | |||||
847 | unsigned nb; | |||||
848 | int nused; | |||||
849 | sllist_iterator iter; | |||||
850 | mc_FALLBACKPL *fpl = (mc_FALLBACKPL*)pipeline; | |||||
851 | ||||||
852 | while ((nb = mcreq_flush_iov_fill(pipeline, &iov, 1, &nused))) { | |||||
853 | mcreq_flush_done(pipeline, nb, nb); | |||||
854 | } | |||||
855 | /* Now handle all the packets, for real */ | |||||
856 | SLLIST_ITERFOR(&pipeline->requests, &iter)for (slist_iter_init(&pipeline->requests, &iter); ! ((&iter)->cur == ((void*)0)); slist_iter_incr(&pipeline ->requests, &iter)) { | |||||
857 | mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(iter.cur) - __builtin_offsetof (mc_PACKET, slnode))); | |||||
858 | fpl->handler(pipeline->parent, pkt); | |||||
859 | sllist_iter_remove(&pipeline->requests, &iter); | |||||
860 | mcreq_packet_handled(pipeline, pkt)do { (pkt)->flags |= MCREQ_F_INVOKED; if ((pkt)->flags & MCREQ_F_FLUSHED) { mcreq_packet_done(pipeline, pkt); } } while (0);; | |||||
861 | } | |||||
862 | } | |||||
863 | ||||||
864 | void | |||||
865 | mcreq_set_fallback_handler(mc_CMDQUEUE *cq, mcreq_fallback_cb handler) | |||||
866 | { | |||||
867 | assert(!cq->fallback)((void) sizeof ((!cq->fallback) ? 1 : 0), __extension__ ({ if (!cq->fallback) ; else __assert_fail ("!cq->fallback" , "/home/avsej/code/libcouchbase/src/mc/mcreq.c", 867, __extension__ __PRETTY_FUNCTION__); })); | |||||
868 | cq->fallback = calloc(1, sizeof (mc_FALLBACKPL)); | |||||
869 | mcreq_pipeline_init(cq->fallback); | |||||
870 | cq->fallback->parent = cq; | |||||
871 | cq->fallback->index = cq->npipelines; | |||||
872 | ((mc_FALLBACKPL*)cq->fallback)->handler = handler; | |||||
873 | cq->fallback->flush_start = do_fallback_flush; | |||||
874 | } | |||||
875 | ||||||
876 | static void | |||||
877 | noop_dumpfn(const void *d, unsigned n, FILE *fp) { (void)d;(void)n;(void)fp; } | |||||
878 | ||||||
879 | #define MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT ) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED) \ | |||||
880 | X(KEY_NOCOPY) \ | |||||
881 | X(VALUE_NOCOPY) \ | |||||
882 | X(VALUE_IOV) \ | |||||
883 | X(HASVALUE) \ | |||||
884 | X(REQEXT) \ | |||||
885 | X(UFWD) \ | |||||
886 | X(FLUSHED) \ | |||||
887 | X(INVOKED) \ | |||||
888 | X(DETACHED) | |||||
889 | ||||||
890 | void | |||||
891 | mcreq_dump_packet(const mc_PACKET *packet, FILE *fp, mcreq_payload_dump_fn dumpfn) | |||||
892 | { | |||||
893 | const char *indent = " "; | |||||
894 | const mc_REQDATA *rdata = MCREQ_PKT_RDATA(packet)(((packet)->flags & MCREQ_F_REQEXT) ? ((mc_REQDATA *)( packet)->u_rdata.exdata) : (&(packet)->u_rdata.reqdata )); | |||||
895 | ||||||
896 | if (!dumpfn) { | |||||
897 | dumpfn = noop_dumpfn; | |||||
898 | } | |||||
899 | if (!fp) { | |||||
900 | fp = stderrstderr; | |||||
901 | } | |||||
902 | ||||||
903 | fprintf(fp, "Packet @%p\n", (void *)packet); | |||||
904 | fprintf(fp, "%sOPAQUE: %u\n", indent, (unsigned int)packet->opaque); | |||||
905 | ||||||
906 | fprintf(fp, "%sPKTFLAGS: 0x%x ", indent, packet->flags); | |||||
907 | #define X(base) \ | |||||
908 | if (packet->flags & MCREQ_F_##base) { fprintf(fp, "%s, ", #base); } | |||||
909 | MCREQ_XFLAGS(X)X(KEY_NOCOPY) X(VALUE_NOCOPY) X(VALUE_IOV) X(HASVALUE) X(REQEXT ) X(UFWD) X(FLUSHED) X(INVOKED) X(DETACHED) | |||||
910 | #undef X | |||||
911 | fprintf(fp, "\n"); | |||||
912 | ||||||
913 | fprintf(fp, "%sKey+Header Size: %u\n", indent, (unsigned int)packet->kh_span.size); | |||||
914 | fprintf(fp, "%sKey Offset: %u\n", indent, MCREQ_PKT_BASESIZE24 + packet->extlen); | |||||
915 | ||||||
916 | ||||||
917 | if (packet->flags & MCREQ_F_HASVALUE) { | |||||
918 | if (packet->flags & MCREQ_F_VALUE_IOV) { | |||||
919 | fprintf(fp, "%sValue Length: %u\n", indent, | |||||
920 | packet->u_value.multi.total_length); | |||||
921 | ||||||
922 | fprintf(fp, "%sValue IOV: [start=%p, n=%d]\n", indent, | |||||
923 | (void *)packet->u_value.multi.iov, packet->u_value.multi.niov); | |||||
924 | } else { | |||||
925 | if (packet->flags & MCREQ_F_VALUE_NOCOPY) { | |||||
926 | fprintf(fp, "%sValue is user allocated\n", indent); | |||||
927 | } | |||||
928 | fprintf(fp, "%sValue: %p, %u bytes\n", indent, | |||||
929 | (void *)SPAN_BUFFER(&packet->u_value.single)(((&packet->u_value.single)->offset == (nb_SIZE)-1) ? ((char *)(&packet->u_value.single)->parent) : (( &packet->u_value.single)->parent->root + (&packet ->u_value.single)->offset)), packet->u_value.single.size); | |||||
930 | } | |||||
931 | } | |||||
932 | ||||||
933 | fprintf(fp, "%sRDATA(%s): %p\n", indent, | |||||
934 | (packet->flags & MCREQ_F_REQEXT) ? "ALLOC" : "EMBEDDED", (void *)rdata); | |||||
935 | ||||||
936 | indent = " "; | |||||
937 | fprintf(fp, "%sStart: %lu\n", indent, (unsigned long)rdata->start); | |||||
938 | fprintf(fp, "%sCookie: %p\n", indent, rdata->cookie); | |||||
939 | ||||||
940 | indent = " "; | |||||
941 | fprintf(fp, "%sNEXT: %p\n", indent, (void *)packet->slnode.next); | |||||
942 | if (dumpfn != noop_dumpfn) { | |||||
943 | fprintf(fp, "PACKET CONTENTS:\n"); | |||||
944 | } | |||||
945 | ||||||
946 | fwrite(SPAN_BUFFER(&packet->kh_span)(((&packet->kh_span)->offset == (nb_SIZE)-1) ? ((char *)(&packet->kh_span)->parent) : ((&packet-> kh_span)->parent->root + (&packet->kh_span)-> offset)), 1, packet->kh_span.size, fp); | |||||
947 | if (packet->flags & MCREQ_F_HASVALUE) { | |||||
948 | if (packet->flags & MCREQ_F_VALUE_IOV) { | |||||
949 | const lcb_IOV *iovs = packet->u_value.multi.iov; | |||||
950 | unsigned ii, ixmax = packet->u_value.multi.niov; | |||||
951 | for (ii = 0; ii < ixmax; ii++) { | |||||
952 | dumpfn(iovs[ii].iov_base, iovs[ii].iov_len, fp); | |||||
953 | } | |||||
954 | } else { | |||||
955 | const nb_SPAN *vspan = &packet->u_value.single; | |||||
956 | dumpfn(SPAN_BUFFER(vspan)(((vspan)->offset == (nb_SIZE)-1) ? ((char *)(vspan)->parent ) : ((vspan)->parent->root + (vspan)->offset)), vspan->size, fp); | |||||
957 | } | |||||
958 | } | |||||
959 | } | |||||
960 | ||||||
961 | void | |||||
962 | mcreq_dump_chain(const mc_PIPELINE *pipeline, FILE *fp, mcreq_payload_dump_fn dumpfn) | |||||
963 | { | |||||
964 | sllist_node *ll; | |||||
965 | SLLIST_FOREACH(&pipeline->requests, ll)for (ll = (&pipeline->requests)->first_prev.next; ll ; ll = ll->next) { | |||||
966 | const mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode)((mc_PACKET *) (void *) ((char *)(ll) - __builtin_offsetof(mc_PACKET , slnode))); | |||||
967 | mcreq_dump_packet(pkt, fp, dumpfn); | |||||
968 | } | |||||
969 | } |