Summary > Report 35e510

Bug Summary

File:home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc
Warning:line 551, column 16
Attempt to delete released memory
Report Bug

Annotated Source Code

1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 * Copyright 2016 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17#define NOMINMAX
18#include <libcouchbase/ixmgmt.h>
19#include <string>
20#include <set>
21#include <algorithm>
22
23#include "contrib/lcb-jsoncpp/lcb-jsoncpp.h"
24#include "lcbio/lcbio.h"
25#include "lcbio/timer-ng.h"
26#include "settings.h"
27#include "internal.h"
28
29#define LOGFMT"(mgreq=%p) " "(mgreq=%p) "
30#define LOGID(req)static_cast<const void*>(req) static_cast<const void*>(req)
31#define LOGARGS(req, lvl)(req)->m_instance->settings, "ixmgmt", LCB_LOG_lvl, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc"
, 31
(req)->m_instance->settings, "ixmgmt", LCB_LOG_##lvl, __FILE__"/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc", __LINE__31
32
33using std::vector;
34using std::string;
35
36static const char *
37ixtype_2_str(unsigned ixtype)
38{
39 if (ixtype == LCB_N1XSPEC_T_GSI1) {
40 return "gsi";
41 } else if (ixtype == LCB_N1XSPEC_T_VIEW2) {
42 return "view";
43 } else {
44 return NULL__null;
45 }
46}
47
48struct IndexOpCtx {
49 lcb_N1XMGMTCALLBACK callback;
50 void *cookie;
51};
52
53struct ErrorSpec {
54 string msg;
55 unsigned code;
56};
57
58template <typename T> void my_delete(T p) {
59 delete p;
60}
61
62template <typename T> lcb_error_t
63extract_n1ql_errors(const char *s, size_t n, T& err_out)
64{
65 Json::Value jresp;
66 if (!Json::Reader().parse(s, s + n, jresp)) {
67 return LCB_PROTOCOL_ERROR;
68 }
69 if (jresp["status"].asString() == "success") {
70 return LCB_SUCCESS;
71 }
72
73 Json::Value& errors = jresp["errors"];
74 if (errors.isNull()) {
75 return LCB_SUCCESS;
76 } else if (!errors.isArray()) {
77 return LCB_PROTOCOL_ERROR;
78 }
79
80 if (errors.empty()) {
81 return LCB_SUCCESS;
82 }
83
84 for (Json::ArrayIndex ii = 0; ii < errors.size(); ++ii) {
85 const Json::Value& err = errors[ii];
86 if (!err.isObject()) {
87 continue; // expected an object!
88 }
89 ErrorSpec spec;
90 spec.msg = err["msg"].asString();
91 spec.code = err["code"].asUInt();
92 err_out.insert(err_out.end(), spec);
93 }
94 return LCB_ERROR;
95}
96
97static lcb_error_t
98get_n1ql_error(const char *s, size_t n)
99{
100 std::vector<ErrorSpec> dummy;
101 return extract_n1ql_errors(s, n, dummy);
102}
103
104// Called for generic operations and establishes existence or lack thereof
105static void
106cb_generic(lcb_t instance, int, const lcb_RESPN1QL *resp)
107{
108 // Get the real cookie..
109 if (!(resp->rflags & LCB_RESP_F_FINAL)) {
110 return;
111 }
112
113 IndexOpCtx *ctx = reinterpret_cast<IndexOpCtx*>(resp->cookie);
114 lcb_RESPN1XMGMT w_resp = { 0 };
115 w_resp.cookie = ctx->cookie;
116
117 if ((w_resp.rc = resp->rc) == LCB_SUCCESS || resp->rc == LCB_HTTP_ERROR) {
118 // Check if the top-level N1QL response succeeded, and then
119 // descend to determine additional errors. This is primarily
120 // required to support EEXIST for GSI primary indexes
121
122 vector<ErrorSpec> errors;
123 lcb_error_t rc = extract_n1ql_errors(resp->row, resp->nrow, errors);
124 if (rc == LCB_ERROR) {
125 w_resp.rc = LCB_QUERY_ERROR;
126 for (size_t ii = 0; ii < errors.size(); ++ii) {
127 const std::string& msg = errors[ii].msg;
128 if (msg.find("already exist") != string::npos) {
129 w_resp.rc = LCB_KEY_EEXISTS; // Index entry already exists
130 } else if (msg.find("not found") != string::npos) {
131 w_resp.rc = LCB_KEY_ENOENT;
132 }
133 }
134 } else {
135 w_resp.rc = rc;
136 }
137 }
138
139 w_resp.inner = resp;
140 w_resp.specs = NULL__null;
141 w_resp.nspecs = 0;
142 ctx->callback(instance, LCB_CALLBACK_IXMGMT-3, &w_resp);
143 delete ctx;
144}
145
146/**
147 * Dispatch the actual operation using a N1QL query
148 * @param instance
149 * @param cookie User cookie
150 * @param u_callback User callback (to assign to new context)
151 * @param i_callback Internal callback to be invoked when N1QL response
152 * is done
153 * @param s N1QL request payload
154 * @param n N1QL request length
155 * @param obj Internal context. Created with new if NULL
156 * @return
157 *
158 * See other overload for passing just the query string w/o extra parameters
159 */
160template <typename T> lcb_error_t
161dispatch_common(lcb_t instance,
162 const void *cookie, lcb_N1XMGMTCALLBACK u_callback,
163 lcb_N1QLCALLBACK i_callback, const char *s, size_t n, T *obj)
164{
165 lcb_error_t rc = LCB_SUCCESS;
166 bool our_alloc = false;
167 lcb_CMDN1QL cmd = { 0 };
168 struct { lcb_t m_instance; } ixwrap = { instance }; // For logging
169
170 if (obj == NULL__null) {
171 obj = new T();
172 our_alloc = true;
173 }
174
175 if (!(obj->callback = u_callback)) {
176 rc = LCB_EINVAL;
177 goto GT_ERROR;
178 }
179
180 obj->cookie = const_cast<void*>(cookie);
181
182 cmd.query = s;
183 cmd.nquery = n;
184 cmd.callback = i_callback;
185 lcb_log(LOGARGS(&ixwrap, DEBUG)(&ixwrap)->m_instance->settings, "ixmgmt", LCB_LOG_DEBUG
, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc", 185
, LOGFMT"(mgreq=%p) " "Issuing query %.*s", LOGID(obj)static_cast<const void*>(obj), (int)n, s);
186 rc = lcb_n1ql_query(instance, obj, &cmd);
187
188 GT_ERROR:
189 if (rc != LCB_SUCCESS && our_alloc) {
190 delete obj;
191 }
192 return rc;
193}
194
195template <typename T> lcb_error_t
196dispatch_common(lcb_t instance,
197 const void *cookie, lcb_N1XMGMTCALLBACK u_callback,
198 lcb_N1QLCALLBACK i_callback, const string& ss, T *obj = NULL__null)
199{
200 Json::Value root;
201 root["statement"] = ss;
202 string reqbuf = Json::FastWriter().write(root);
203 return dispatch_common<T>(instance,
204 cookie, u_callback, i_callback,
205 reqbuf.c_str(), reqbuf.size()-1 /*newline*/, obj);
206}
207
208
209// Class to back the storage for the actual lcb_IXSPEC without doing too much
210// mind-numbing buffer copies. Maybe this can be done via a macro instead?
211class IndexSpec : public lcb_N1XSPEC {
212public:
213 IndexSpec(const char *s, size_t n) : lcb_N1XSPEC() {
214 load_json(s, n);
215 }
216 inline IndexSpec(const lcb_N1XSPEC *spec);
217 static inline void to_key(const lcb_N1XSPEC *spec, std::string& out);
218 bool is_primary() const { return flags & LCB_N1XSPEC_F_PRIMARY1<<16; }
219 bool is_defer() const { return flags & LCB_N1XSPEC_F_DEFER1<<17; }
220 void ensure_keyspace(lcb_t instance) {
221 if (nkeyspace) {
222 return;
223 }
224 keyspace = LCBT_SETTING(instance, bucket)(instance)->settings->bucket;
225 nkeyspace = strlen(keyspace);
226 }
227
228private:
229 // Load fields from a JSON string
230 inline void load_json(const char *s, size_t n);
231
232 // Load all fields
233 inline size_t load_fields(const Json::Value& root, bool do_copy);
234
235 size_t total_fields_size(const Json::Value& src) {
236 return load_fields(src, false);
237 }
238
239 // Load field from a JSON object
240 inline size_t load_json_field(
241 const Json::Value& root,
242 const char *name, const char **tgt_ptr, size_t *tgt_len, bool do_copy);
243
244 // Load field from another pointer
245 void load_field(const char **dest, const char *src, size_t n) {
246 m_buf.append(src, n);
247 if (n) {
248 *dest = &m_buf.c_str()[m_buf.size()-n];
249 } else {
250 *dest = NULL__null;
251 }
252 }
253
254 string m_buf;
255 IndexSpec(const IndexSpec&);
256};
257
258LIBCOUCHBASE_API__attribute__ ((visibility("default")))
259lcb_error_t
260lcb_n1x_create(lcb_t instance, const void *cookie, const lcb_CMDN1XMGMT *cmd)
261{
262 string ss;
263 IndexSpec spec(&cmd->spec);
264 spec.ensure_keyspace(instance);
265
266 ss = "CREATE";
267 if (spec.is_primary()) {
268 ss += " PRIMARY";
269 } else if (!spec.nname) {
270 return LCB_EMPTY_KEY;
271 }
272 ss.append(" INDEX");
273 if (spec.nname) {
274 ss.append(" `").append(spec.name, spec.nname).append("` ");
275 }
276 ss.append(" ON `").append(spec.keyspace, spec.nkeyspace).append("`");
277
278 if (!spec.is_primary()) {
279 if (!spec.nfields) {
280 return LCB_EMPTY_KEY;
281 }
282
283 // See if we can parse 'fields' properly. First, try to parse as
284 // JSON:
285 Json::Value fields_arr;
286 Json::Reader r;
287 if (!r.parse(spec.fields, spec.fields + spec.nfields, fields_arr)) {
288 // Invalid JSON!
289 return LCB_EINVAL;
290 }
291
292 ss.append(" (");
293 if (fields_arr.isArray()) {
294 if (!fields_arr.size()) {
295 return LCB_EMPTY_KEY;
296 }
297 for (size_t ii = 0; ii < fields_arr.size(); ++ii) {
298 static Json::Value empty;
299 const Json::Value& field = fields_arr.get(ii, empty);
300 if (!field.isString()) {
301 return LCB_EINVAL;
302 }
303 ss.append(field.asString());
304 if (ii != fields_arr.size()-1) {
305 ss.append(",");
306 }
307 }
308 } else if (fields_arr.isString()) {
309 std::string field_list = fields_arr.asString();
310 if (field_list.empty()) {
311 return LCB_EMPTY_KEY;
312 }
313 ss.append(field_list);
314 } else {
315 return LCB_EINVAL;
316 }
317 ss.append(") ");
318 }
319
320 if (spec.ncond) {
321 if (spec.is_primary()) {
322 return LCB_EINVAL;
323 }
324 ss.append(" WHERE ").append(spec.cond, spec.ncond).append(" ");
325 }
326
327 if (spec.ixtype) {
328 const char *ixtype = ixtype_2_str(spec.ixtype);
329 if (!ixtype) {
330 return LCB_EINVAL;
331 }
332 ss.append(" USING ").append(ixtype);
333 }
334
335 if (spec.is_defer()) {
336 ss.append(" WITH {\"defer_build\": true}");
337 }
338
339 return dispatch_common<IndexOpCtx>(instance, cookie, cmd->callback, cb_generic, ss);
340}
341
342
343class ListIndexCtx : public IndexOpCtx {
344public:
345 vector<IndexSpec*> specs;
346
347 virtual void invoke(lcb_t instance, lcb_RESPN1XMGMT *resp) {
348 finish(instance, resp);
349 }
350
351 void finish(lcb_t instance, lcb_RESPN1XMGMT *resp = NULL__null) {
352 lcb_RESPN1XMGMT w_resp = { 0 };
353 if (resp == NULL__null) {
354 resp = &w_resp;
355 resp->rc = LCB_SUCCESS;
356 }
357 resp->cookie = cookie;
358 lcb_N1XSPEC **speclist = reinterpret_cast<lcb_N1XSPEC**>(&specs[0]);
359 resp->specs = speclist;
360 resp->nspecs = specs.size();
361 callback(instance, LCB_CALLBACK_IXMGMT-3, resp);
362 delete this;
363 }
364
365 virtual ~ListIndexCtx() {
366 for (size_t ii = 0; ii < specs.size(); ++ii) {
367 delete specs[ii];
368 }
369 specs.clear();
370 }
371};
372
373static void
374cb_index_list(lcb_t instance, int, const lcb_RESPN1QL *resp)
375{
376 ListIndexCtx *ctx = reinterpret_cast<ListIndexCtx *>(resp->cookie);
377 if (!(resp->rflags & LCB_RESP_F_FINAL)) {
378 ctx->specs.push_back(new IndexSpec(resp->row, resp->nrow));
379 return;
380 }
381
382 lcb_RESPN1XMGMT w_resp = { 0 };
383 if ((w_resp.rc = resp->rc) == LCB_SUCCESS) {
384 w_resp.rc = get_n1ql_error(resp->row, resp->nrow);
385 }
386 w_resp.inner = resp;
387 ctx->invoke(instance, &w_resp);
388}
389
390static lcb_error_t
391do_index_list(lcb_t instance, const void *cookie, const lcb_CMDN1XMGMT *cmd,
392 ListIndexCtx *ctx)
393{
394 string ss;
395 IndexSpec spec(&cmd->spec);
396 ss = "SELECT idx.* FROM system:indexes idx WHERE";
397
398 if (spec.flags & LCB_N1XSPEC_F_PRIMARY1<<16) {
3
Assuming the condition is false
4
Taking false branch
399 ss.append(" is_primary=true AND");
400 }
401 if (spec.nkeyspace) {
5
Assuming the condition is false
6
Taking false branch
402 ss.append(" keyspace_id=\"").append(spec.keyspace, spec.nkeyspace).append("\" AND");
403 }
404 if (spec.nnspace) {
7
Assuming the condition is false
8
Taking false branch
405 ss.append(" namespace_id=\"").append(spec.nspace, spec.nnspace).append("\" AND");
406 }
407 if (spec.ixtype) {
9
Assuming the condition is true
10
Taking true branch
408 const char *s_ixtype = ixtype_2_str(spec.ixtype);
409 if (s_ixtype == NULL__null) {
11
Taking true branch
410 if (ctx != NULL__null) {
12
Taking true branch
411 delete ctx;
13
Memory is released
412 }
413 return LCB_EINVAL;
414 }
415 ss.append(" using=\"").append(s_ixtype).append("\" AND");
416 }
417 if (spec.nname) {
418 ss.append(" name=\"").append(spec.name, spec.nname).append("\" AND");
419 }
420
421 // WHERE <.....> true
422 ss.append(" true");
423 ss.append(" ORDER BY is_primary DESC, name ASC");
424
425 return dispatch_common<ListIndexCtx>(instance,
426 cookie, cmd->callback, cb_index_list, ss, ctx);
427}
428
429LIBCOUCHBASE_API__attribute__ ((visibility("default")))
430lcb_error_t
431lcb_n1x_list(lcb_t instance, const void *cookie, const lcb_CMDN1XMGMT *cmd)
432{
433 return do_index_list(instance, cookie, cmd, NULL__null);
434}
435
436LIBCOUCHBASE_API__attribute__ ((visibility("default")))
437lcb_error_t
438lcb_n1x_drop(lcb_t instance, const void *cookie, const lcb_CMDN1XMGMT *cmd)
439{
440 string ss;
441 IndexSpec spec(&cmd->spec);
442 spec.ensure_keyspace(instance);
443
444 if (spec.nname) {
445 ss = "DROP INDEX";
446 ss.append(" `").append(spec.keyspace, spec.nkeyspace).append("`");
447 ss.append(".`").append(spec.name, spec.nname).append("`");
448 } else if (spec.flags & LCB_N1XSPEC_F_PRIMARY1<<16) {
449 ss = "DROP PRIMARY INDEX ON";
450 ss.append(" `").append(spec.keyspace, spec.nkeyspace).append("`");
451 } else {
452 return LCB_EMPTY_KEY;
453 }
454
455 if (spec.ixtype) {
456 const char *stype = ixtype_2_str(spec.ixtype);
457 if (!stype) {
458 return LCB_EINVAL;
459 }
460 ss.append(" USING ").append(stype);
461 }
462
463 return dispatch_common<IndexOpCtx>(instance, cookie, cmd->callback, cb_generic, ss);
464}
465
466class ListIndexCtx_BuildIndex : public ListIndexCtx {
467public:
468 virtual inline void invoke(lcb_t instance, lcb_RESPN1XMGMT *resp);
469 inline lcb_error_t try_build(lcb_t instance);
470};
471
472static void
473cb_build_submitted(lcb_t instance, int, const lcb_RESPN1QL *resp)
474{
475 ListIndexCtx *ctx = reinterpret_cast<ListIndexCtx*>(resp->cookie);
476
477 if (resp->rflags & LCB_RESP_F_FINAL) {
478 lcb_RESPN1XMGMT w_resp = { 0 };
479 if ((w_resp.rc = resp->rc) == LCB_SUCCESS) {
480 w_resp.rc = get_n1ql_error(resp->row, resp->nrow);
481 }
482 ctx->finish(instance, &w_resp);
483 }
484}
485
486lcb_error_t
487ListIndexCtx_BuildIndex::try_build(lcb_t instance)
488{
489 vector<IndexSpec*> pending;
490 for (size_t ii = 0; ii < specs.size(); ++ii) {
491 IndexSpec* spec = specs[ii];
492 if (strncmp(spec->state, "pending", spec->nstate) == 0 ||
493 strncmp(spec->state, "deferred", spec->nstate) == 0) {
494 pending.push_back(spec);
495 }
496 }
497
498 if (pending.empty()) {
499 return LCB_KEY_ENOENT;
500 }
501
502 string ss;
503 ss = "BUILD INDEX ON `";
504
505 ss.append(pending[0]->keyspace, pending[0]->nkeyspace).append("`");
506 ss += '(';
507 for (size_t ii = 0; ii < pending.size(); ++ii) {
508 ss += '`';
509 ss.append(pending[ii]->name, pending[ii]->nname);
510 ss += '`';
511 if (ii+1 < pending.size()) {
512 ss += ',';
513 }
514 }
515 ss += ')';
516
517 lcb_error_t rc = dispatch_common<ListIndexCtx_BuildIndex>(
518 instance, cookie, callback, cb_build_submitted, ss,
519 this);
520
521 if (rc == LCB_SUCCESS) {
522 std::set<IndexSpec*> to_remove(specs.begin(), specs.end());
523 for (size_t ii = 0; ii < pending.size(); ++ii) {
524 to_remove.erase(pending[ii]);
525 }
526
527 std::for_each(to_remove.begin(), to_remove.end(), my_delete<IndexSpec*>);
528
529 specs = pending;
530 }
531 return rc;
532}
533
534void
535ListIndexCtx_BuildIndex::invoke(lcb_t instance, lcb_RESPN1XMGMT *resp)
536{
537 if (resp->rc == LCB_SUCCESS &&
538 (resp->rc = try_build(instance)) == LCB_SUCCESS) {
539 return;
540 }
541 finish(instance, resp);
542}
543
544LIBCOUCHBASE_API__attribute__ ((visibility("default")))
545lcb_error_t
546lcb_n1x_startbuild(lcb_t instance, const void *cookie, const lcb_CMDN1XMGMT *cmd)
547{
548 ListIndexCtx_BuildIndex *ctx = new ListIndexCtx_BuildIndex();
1
Memory is allocated
549 lcb_error_t rc = do_index_list(instance, cookie, cmd, ctx);
2
Calling 'do_index_list'
14
Returning; memory was released via 4th parameter
550 if (rc != LCB_SUCCESS) {
15
Taking true branch
551 delete ctx;
16
Attempt to delete released memory
552 }
553 return rc;
554}
555
556struct WatchIndexCtx : public IndexOpCtx {
557 // Interval timer
558 lcbio_pTIMER m_timer;
559 uint32_t m_interval;
560 uint64_t m_tsend;
561 lcb_t m_instance;
562 std::map<std::string,IndexSpec*> m_defspend;
563 std::vector<IndexSpec*> m_defsok;
564
565 inline void read_state(const lcb_RESPN1XMGMT *resp);
566 inline void reschedule();
567 inline lcb_error_t do_poll();
568 inline lcb_error_t load_defs(const lcb_CMDN1XWATCH *);
569 inline WatchIndexCtx(lcb_t, const void *, const lcb_CMDN1XWATCH*);
570 inline ~WatchIndexCtx();
571 inline void finish(lcb_error_t rc, const lcb_RESPN1XMGMT *);
572};
573
574static void
575cb_watchix_tm(void *arg)
576{
577 WatchIndexCtx *ctx = reinterpret_cast<WatchIndexCtx*>(arg);
578 uint64_t now = lcb_nstime();
579 if (now >= ctx->m_tsend) {
580 ctx->finish(LCB_ETIMEDOUT, NULL__null);
581 } else {
582 ctx->do_poll();
583 }
584}
585
586#define DEFAULT_WATCH_TIMEOUT(((lcb_uint32_t)30) * 1000000) LCB_S2US(30)(((lcb_uint32_t)30) * 1000000)
587#define DEFAULT_WATCH_INTERVAL((500) * 1000) LCB_MS2US(500)((500) * 1000)
588
589WatchIndexCtx::WatchIndexCtx(lcb_t instance, const void *cookie_, const lcb_CMDN1XWATCH *cmd)
590: m_instance(instance)
591{
592 uint64_t now = lcb_nstime();
593 uint32_t timeout = cmd->timeout ? cmd->timeout : DEFAULT_WATCH_TIMEOUT(((lcb_uint32_t)30) * 1000000);
594 m_interval = cmd->interval ? cmd->interval : DEFAULT_WATCH_INTERVAL((500) * 1000);
595 m_interval = std::min(m_interval, timeout);
596 m_tsend = now + LCB_US2NS(timeout)(((hrtime_t)timeout) * 1000);
597
598 this->callback = cmd->callback;
599 this->cookie = const_cast<void*>(cookie_);
600
601 m_timer = lcbio_timer_new(instance->iotable, this, cb_watchix_tm);
602 lcb_aspend_add(&instance->pendops, LCB_PENDTYPE_COUNTER, NULL__null);
603}
604
605WatchIndexCtx::~WatchIndexCtx()
606{
607 if (m_timer) {
608 lcbio_timer_destroy(m_timer);
609 }
610 if (m_instance) {
611 lcb_aspend_del(&m_instance->pendops, LCB_PENDTYPE_COUNTER, NULL__null);
612 lcb_maybe_breakout(m_instance);
613 }
614
615 std::for_each(m_defsok.begin(), m_defsok.end(), my_delete<IndexSpec*>);
616 for (std::map<string,IndexSpec*>::iterator ii = m_defspend.begin();
617 ii != m_defspend.end(); ++ii) {
618 delete ii->second;
619 }
620}
621
622void
623IndexSpec::to_key(const lcb_N1XSPEC* spec, std::string& s)
624{
625 // Identity is:
626 // {keyspace,name,is_primary,type}
627 s.append(spec->nspace, spec->nnspace).append(" ");
628 s.append(spec->keyspace, spec->nkeyspace).append(" ");
629 s.append(spec->name, spec->nname).append(" ");
630 const char *type_s = ixtype_2_str(spec->ixtype);
631 if (!type_s) {
632 type_s = "<UNKNOWN>";
633 }
634 s.append(type_s);
635}
636
637void
638WatchIndexCtx::read_state(const lcb_RESPN1XMGMT *resp)
639{
640 // We examine the indexes here to see which ones have been completed
641 // Make them all into an std::map
642 if (resp->rc != LCB_SUCCESS) {
643 lcb_log(LOGARGS(this, INFO)(this)->m_instance->settings, "ixmgmt", LCB_LOG_INFO, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc"
, 643
, LOGFMT"(mgreq=%p) " "Error 0x%x while listing indexes. Rescheduling", LOGID(this)static_cast<const void*>(this), resp->rc);
644 reschedule();
645 return;
646 }
647
648 std::map<std::string, const lcb_N1XSPEC*> in_specs;
649 for (size_t ii = 0; ii < resp->nspecs; ++ii) {
650 std::string key;
651 IndexSpec::to_key(resp->specs[ii], key);
652 in_specs[key] = resp->specs[ii];
653 }
654
655 std::map<std::string, IndexSpec*>::iterator it_remain = m_defspend.begin();
656 while (it_remain != m_defspend.end()) {
657 // See if the index is 'online' yet!
658 std::map<std::string,const lcb_N1XSPEC*>::iterator res;
659 res = in_specs.find(it_remain->first);
660 if (res == in_specs.end()) {
661 lcb_log(LOGARGS(this, INFO)(this)->m_instance->settings, "ixmgmt", LCB_LOG_INFO, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc"
, 661
, LOGFMT"(mgreq=%p) " "Index [%s] not in cluster", LOGID(this)static_cast<const void*>(this), it_remain->first.c_str());
662 // We can't find our own index. Someone else deleted it. Bail!
663 finish(LCB_KEY_ENOENT, resp);
664 return;
665 }
666
667 std::string s_state(res->second->state, res->second->nstate);
668 if (s_state == "online") {
669 lcb_log(LOGARGS(this, DEBUG)(this)->m_instance->settings, "ixmgmt", LCB_LOG_DEBUG, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc"
, 669
, LOGFMT"(mgreq=%p) " "Index [%s] is ready", LOGID(this)static_cast<const void*>(this), it_remain->first.c_str());
670 m_defsok.push_back(it_remain->second);
671 m_defspend.erase(it_remain++);
672 } else {
673 ++it_remain;
674 }
675 }
676
677 if (m_defspend.empty()) {
678 finish(LCB_SUCCESS, resp);
679 } else {
680 reschedule();
681 }
682}
683
684lcb_error_t
685WatchIndexCtx::load_defs(const lcb_CMDN1XWATCH *cmd)
686{
687 for (size_t ii = 0; ii < cmd->nspec; ++ii) {
688 std::string key;
689 IndexSpec *extspec = new IndexSpec(cmd->specs[ii]);
690 IndexSpec::to_key(extspec, key);
691 m_defspend[key] = extspec;
692 }
693 if (m_defspend.empty()) {
694 return LCB_ENO_COMMANDS;
695 }
696 return LCB_SUCCESS;
697}
698
699void
700WatchIndexCtx::finish(lcb_error_t rc, const lcb_RESPN1XMGMT *resp)
701{
702 lcb_RESPN1XMGMT my_resp = { 0 };
703 my_resp.cookie = cookie;
704 my_resp.rc = rc;
705
706 if (resp) {
707 my_resp.inner = resp->inner;
708 }
709
710 lcb_N1XSPEC **speclist = reinterpret_cast<lcb_N1XSPEC**>(&m_defsok[0]);
711 my_resp.specs = speclist;
712 my_resp.nspecs = m_defsok.size();
713 callback(m_instance, LCB_CALLBACK_IXMGMT-3, &my_resp);
714 delete this;
715}
716
717void
718WatchIndexCtx::reschedule()
719{
720 // Next interval!
721 uint64_t now = lcb_nstime();
722 if (now + LCB_US2NS(m_interval)(((hrtime_t)m_interval) * 1000) >= m_tsend) {
723 finish(LCB_ETIMEDOUT, NULL__null);
724 } else {
725 lcbio_timer_rearm(m_timer, m_interval);
726 }
727}
728
729static void
730cb_watch_gotlist(lcb_t, int, const lcb_RESPN1XMGMT *resp)
731{
732 WatchIndexCtx *ctx = reinterpret_cast<WatchIndexCtx*>(resp->cookie);
733 ctx->read_state(resp);
734}
735
736lcb_error_t
737WatchIndexCtx::do_poll()
738{
739 lcb_CMDN1XMGMT cmd;
740 memset(&cmd, 0, sizeof cmd);
741 cmd.callback = cb_watch_gotlist;
742 lcb_log(LOGARGS(this, DEBUG)(this)->m_instance->settings, "ixmgmt", LCB_LOG_DEBUG, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc"
, 742
, LOGFMT"(mgreq=%p) " "Will check for index readiness of %lu indexes. %lu completed", LOGID(this)static_cast<const void*>(this), m_defspend.size(), m_defsok.size());
743 return lcb_n1x_list(m_instance, this, &cmd);
744}
745
746LIBCOUCHBASE_API__attribute__ ((visibility("default")))
747lcb_error_t
748lcb_n1x_watchbuild(lcb_t instance, const void *cookie, const lcb_CMDN1XWATCH *cmd)
749{
750 WatchIndexCtx *ctx = new WatchIndexCtx(instance, cookie, cmd);
751 lcb_error_t rc;
752 if ((rc = ctx->load_defs(cmd)) != LCB_SUCCESS) {
753 delete ctx;
754 return rc;
755 }
756 if ((rc = ctx->do_poll()) != LCB_SUCCESS) {
757 delete ctx;
758 return rc;
759 }
760 return LCB_SUCCESS;
761}
762
763void
764IndexSpec::load_json(const char *s, size_t n) {
765 Json::Value root;
766 // Set the JSON first!
767 m_buf.assign(s, n);
768 nrawjson = n;
769
770 if (!Json::Reader().parse(s, s + n, root)) {
771 rawjson = m_buf.c_str();
772 return;
773 }
774
775 m_buf.reserve(n + total_fields_size(root));
776 load_fields(root, true);
777
778 // Once all the fields are loaded, it's time to actually assign the
779 // rawjson field, which is simply the beginning of the buffer
780 rawjson = m_buf.c_str();
781
782 // Get the index type
783 string ixtype_s = root["using"].asString();
784 if (ixtype_s == "gsi") {
785 ixtype = LCB_N1XSPEC_T_GSI1;
786 } else if (ixtype_s == "view") {
787 ixtype = LCB_N1XSPEC_T_VIEW2;
788 }
789 if (root["is_primary"].asBool()) {
790 flags |= LCB_N1XSPEC_F_PRIMARY1<<16;
791 }
792}
793
794// IndexSpec stuff
795IndexSpec::IndexSpec(const lcb_N1XSPEC *spec)
796{
797 *static_cast<lcb_N1XSPEC*>(this) = *spec;
798 if (spec->nrawjson) {
799 load_json(spec->rawjson, spec->nrawjson);
800 return;
801 }
802 // Initialize the bufs
803 m_buf.reserve(nname + nkeyspace + nnspace + nstate + nfields + nrawjson + nstate + ncond);
804 load_field(&rawjson, spec->rawjson, nrawjson);
805 load_field(&name, spec->name, nname);
806 load_field(&keyspace, spec->keyspace, nkeyspace);
807 load_field(&nspace, spec->nspace, nnspace);
808 load_field(&state, spec->state, nstate);
809 load_field(&fields, spec->fields, nfields);
810 load_field(&cond, spec->cond, ncond);
811}
812
813size_t
814IndexSpec::load_fields(const Json::Value& root, bool do_copy)
815{
816 size_t size = 0;
817 size += load_json_field(root, "name", &name, &nname, do_copy);
818 size += load_json_field(root, "keyspace_id", &keyspace, &nkeyspace, do_copy);
819 size += load_json_field(root, "namespace_id", &nspace, &nnspace, do_copy);
820 size += load_json_field(root, "state", &state, &nstate, do_copy);
821 size += load_json_field(root, "index_key", &fields, &nfields, do_copy);
822 size += load_json_field(root, "condition", &cond, &ncond, do_copy);
823 return size;
824}
825
826size_t
827IndexSpec::load_json_field(const Json::Value& root,
828 const char *name_, const char **tgt_ptr, size_t *tgt_len, bool do_copy)
829{
830 size_t namelen = strlen(name_);
831 const Json::Value *val = root.find(name_, name_ + namelen);
832 size_t n = 0;
833
834 if (val == NULL__null) {
835 return 0;
836 }
837
838 if (val->isString()) {
839 const char *s_begin, *s_end;
840 if (val->getString(&s_begin, &s_end) && (n = s_end - s_begin) && do_copy) {
841 m_buf.insert(m_buf.end(), s_begin, s_end);
842 }
843 } else {
844 std::string frag = Json::FastWriter().write(*val);
845 n = frag.size();
846 if (do_copy) {
847 m_buf.append(frag);
848 }
849 }
850
851 if (n) {
852 *tgt_ptr = &(m_buf.c_str()[m_buf.size()-n]);
853 *tgt_len = n;
854 } else {
855 *tgt_ptr = NULL__null;
856 *tgt_len = 0;
857 }
858 return n;
859}