File: | home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc |
Warning: | line 551, column 16 Attempt to delete released memory |
1 | /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ | |||
2 | /* | |||
3 | * Copyright 2016 Couchbase, Inc. | |||
4 | * | |||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |||
6 | * you may not use this file except in compliance with the License. | |||
7 | * You may obtain a copy of the License at | |||
8 | * | |||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |||
10 | * | |||
11 | * Unless required by applicable law or agreed to in writing, software | |||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
14 | * See the License for the specific language governing permissions and | |||
15 | * limitations under the License. | |||
16 | */ | |||
17 | #define NOMINMAX | |||
18 | #include <libcouchbase/ixmgmt.h> | |||
19 | #include <string> | |||
20 | #include <set> | |||
21 | #include <algorithm> | |||
22 | ||||
23 | #include "contrib/lcb-jsoncpp/lcb-jsoncpp.h" | |||
24 | #include "lcbio/lcbio.h" | |||
25 | #include "lcbio/timer-ng.h" | |||
26 | #include "settings.h" | |||
27 | #include "internal.h" | |||
28 | ||||
29 | #define LOGFMT"(mgreq=%p) " "(mgreq=%p) " | |||
30 | #define LOGID(req)static_cast<const void*>(req) static_cast<const void*>(req) | |||
31 | #define LOGARGS(req, lvl)(req)->m_instance->settings, "ixmgmt", LCB_LOG_lvl, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc" , 31 (req)->m_instance->settings, "ixmgmt", LCB_LOG_##lvl, __FILE__"/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc", __LINE__31 | |||
32 | ||||
33 | using std::vector; | |||
34 | using std::string; | |||
35 | ||||
36 | static const char * | |||
37 | ixtype_2_str(unsigned ixtype) | |||
38 | { | |||
39 | if (ixtype == LCB_N1XSPEC_T_GSI1) { | |||
40 | return "gsi"; | |||
41 | } else if (ixtype == LCB_N1XSPEC_T_VIEW2) { | |||
42 | return "view"; | |||
43 | } else { | |||
44 | return NULL__null; | |||
45 | } | |||
46 | } | |||
47 | ||||
48 | struct IndexOpCtx { | |||
49 | lcb_N1XMGMTCALLBACK callback; | |||
50 | void *cookie; | |||
51 | }; | |||
52 | ||||
53 | struct ErrorSpec { | |||
54 | string msg; | |||
55 | unsigned code; | |||
56 | }; | |||
57 | ||||
58 | template <typename T> void my_delete(T p) { | |||
59 | delete p; | |||
60 | } | |||
61 | ||||
62 | template <typename T> lcb_error_t | |||
63 | extract_n1ql_errors(const char *s, size_t n, T& err_out) | |||
64 | { | |||
65 | Json::Value jresp; | |||
66 | if (!Json::Reader().parse(s, s + n, jresp)) { | |||
67 | return LCB_PROTOCOL_ERROR; | |||
68 | } | |||
69 | if (jresp["status"].asString() == "success") { | |||
70 | return LCB_SUCCESS; | |||
71 | } | |||
72 | ||||
73 | Json::Value& errors = jresp["errors"]; | |||
74 | if (errors.isNull()) { | |||
75 | return LCB_SUCCESS; | |||
76 | } else if (!errors.isArray()) { | |||
77 | return LCB_PROTOCOL_ERROR; | |||
78 | } | |||
79 | ||||
80 | if (errors.empty()) { | |||
81 | return LCB_SUCCESS; | |||
82 | } | |||
83 | ||||
84 | for (Json::ArrayIndex ii = 0; ii < errors.size(); ++ii) { | |||
85 | const Json::Value& err = errors[ii]; | |||
86 | if (!err.isObject()) { | |||
87 | continue; // expected an object! | |||
88 | } | |||
89 | ErrorSpec spec; | |||
90 | spec.msg = err["msg"].asString(); | |||
91 | spec.code = err["code"].asUInt(); | |||
92 | err_out.insert(err_out.end(), spec); | |||
93 | } | |||
94 | return LCB_ERROR; | |||
95 | } | |||
96 | ||||
97 | static lcb_error_t | |||
98 | get_n1ql_error(const char *s, size_t n) | |||
99 | { | |||
100 | std::vector<ErrorSpec> dummy; | |||
101 | return extract_n1ql_errors(s, n, dummy); | |||
102 | } | |||
103 | ||||
104 | // Called for generic operations and establishes existence or lack thereof | |||
105 | static void | |||
106 | cb_generic(lcb_t instance, int, const lcb_RESPN1QL *resp) | |||
107 | { | |||
108 | // Get the real cookie.. | |||
109 | if (!(resp->rflags & LCB_RESP_F_FINAL)) { | |||
110 | return; | |||
111 | } | |||
112 | ||||
113 | IndexOpCtx *ctx = reinterpret_cast<IndexOpCtx*>(resp->cookie); | |||
114 | lcb_RESPN1XMGMT w_resp = { 0 }; | |||
115 | w_resp.cookie = ctx->cookie; | |||
116 | ||||
117 | if ((w_resp.rc = resp->rc) == LCB_SUCCESS || resp->rc == LCB_HTTP_ERROR) { | |||
118 | // Check if the top-level N1QL response succeeded, and then | |||
119 | // descend to determine additional errors. This is primarily | |||
120 | // required to support EEXIST for GSI primary indexes | |||
121 | ||||
122 | vector<ErrorSpec> errors; | |||
123 | lcb_error_t rc = extract_n1ql_errors(resp->row, resp->nrow, errors); | |||
124 | if (rc == LCB_ERROR) { | |||
125 | w_resp.rc = LCB_QUERY_ERROR; | |||
126 | for (size_t ii = 0; ii < errors.size(); ++ii) { | |||
127 | const std::string& msg = errors[ii].msg; | |||
128 | if (msg.find("already exist") != string::npos) { | |||
129 | w_resp.rc = LCB_KEY_EEXISTS; // Index entry already exists | |||
130 | } else if (msg.find("not found") != string::npos) { | |||
131 | w_resp.rc = LCB_KEY_ENOENT; | |||
132 | } | |||
133 | } | |||
134 | } else { | |||
135 | w_resp.rc = rc; | |||
136 | } | |||
137 | } | |||
138 | ||||
139 | w_resp.inner = resp; | |||
140 | w_resp.specs = NULL__null; | |||
141 | w_resp.nspecs = 0; | |||
142 | ctx->callback(instance, LCB_CALLBACK_IXMGMT-3, &w_resp); | |||
143 | delete ctx; | |||
144 | } | |||
145 | ||||
146 | /** | |||
147 | * Dispatch the actual operation using a N1QL query | |||
148 | * @param instance | |||
149 | * @param cookie User cookie | |||
150 | * @param u_callback User callback (to assign to new context) | |||
151 | * @param i_callback Internal callback to be invoked when N1QL response | |||
152 | * is done | |||
153 | * @param s N1QL request payload | |||
154 | * @param n N1QL request length | |||
155 | * @param obj Internal context. Created with new if NULL | |||
156 | * @return | |||
157 | * | |||
158 | * See other overload for passing just the query string w/o extra parameters | |||
159 | */ | |||
160 | template <typename T> lcb_error_t | |||
161 | dispatch_common(lcb_t instance, | |||
162 | const void *cookie, lcb_N1XMGMTCALLBACK u_callback, | |||
163 | lcb_N1QLCALLBACK i_callback, const char *s, size_t n, T *obj) | |||
164 | { | |||
165 | lcb_error_t rc = LCB_SUCCESS; | |||
166 | bool our_alloc = false; | |||
167 | lcb_CMDN1QL cmd = { 0 }; | |||
168 | struct { lcb_t m_instance; } ixwrap = { instance }; // For logging | |||
169 | ||||
170 | if (obj == NULL__null) { | |||
171 | obj = new T(); | |||
172 | our_alloc = true; | |||
173 | } | |||
174 | ||||
175 | if (!(obj->callback = u_callback)) { | |||
176 | rc = LCB_EINVAL; | |||
177 | goto GT_ERROR; | |||
178 | } | |||
179 | ||||
180 | obj->cookie = const_cast<void*>(cookie); | |||
181 | ||||
182 | cmd.query = s; | |||
183 | cmd.nquery = n; | |||
184 | cmd.callback = i_callback; | |||
185 | lcb_log(LOGARGS(&ixwrap, DEBUG)(&ixwrap)->m_instance->settings, "ixmgmt", LCB_LOG_DEBUG , "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc", 185, LOGFMT"(mgreq=%p) " "Issuing query %.*s", LOGID(obj)static_cast<const void*>(obj), (int)n, s); | |||
186 | rc = lcb_n1ql_query(instance, obj, &cmd); | |||
187 | ||||
188 | GT_ERROR: | |||
189 | if (rc != LCB_SUCCESS && our_alloc) { | |||
190 | delete obj; | |||
191 | } | |||
192 | return rc; | |||
193 | } | |||
194 | ||||
195 | template <typename T> lcb_error_t | |||
196 | dispatch_common(lcb_t instance, | |||
197 | const void *cookie, lcb_N1XMGMTCALLBACK u_callback, | |||
198 | lcb_N1QLCALLBACK i_callback, const string& ss, T *obj = NULL__null) | |||
199 | { | |||
200 | Json::Value root; | |||
201 | root["statement"] = ss; | |||
202 | string reqbuf = Json::FastWriter().write(root); | |||
203 | return dispatch_common<T>(instance, | |||
204 | cookie, u_callback, i_callback, | |||
205 | reqbuf.c_str(), reqbuf.size()-1 /*newline*/, obj); | |||
206 | } | |||
207 | ||||
208 | ||||
209 | // Class to back the storage for the actual lcb_IXSPEC without doing too much | |||
210 | // mind-numbing buffer copies. Maybe this can be done via a macro instead? | |||
211 | class IndexSpec : public lcb_N1XSPEC { | |||
212 | public: | |||
213 | IndexSpec(const char *s, size_t n) : lcb_N1XSPEC() { | |||
214 | load_json(s, n); | |||
215 | } | |||
216 | inline IndexSpec(const lcb_N1XSPEC *spec); | |||
217 | static inline void to_key(const lcb_N1XSPEC *spec, std::string& out); | |||
218 | bool is_primary() const { return flags & LCB_N1XSPEC_F_PRIMARY1<<16; } | |||
219 | bool is_defer() const { return flags & LCB_N1XSPEC_F_DEFER1<<17; } | |||
220 | void ensure_keyspace(lcb_t instance) { | |||
221 | if (nkeyspace) { | |||
222 | return; | |||
223 | } | |||
224 | keyspace = LCBT_SETTING(instance, bucket)(instance)->settings->bucket; | |||
225 | nkeyspace = strlen(keyspace); | |||
226 | } | |||
227 | ||||
228 | private: | |||
229 | // Load fields from a JSON string | |||
230 | inline void load_json(const char *s, size_t n); | |||
231 | ||||
232 | // Load all fields | |||
233 | inline size_t load_fields(const Json::Value& root, bool do_copy); | |||
234 | ||||
235 | size_t total_fields_size(const Json::Value& src) { | |||
236 | return load_fields(src, false); | |||
237 | } | |||
238 | ||||
239 | // Load field from a JSON object | |||
240 | inline size_t load_json_field( | |||
241 | const Json::Value& root, | |||
242 | const char *name, const char **tgt_ptr, size_t *tgt_len, bool do_copy); | |||
243 | ||||
244 | // Load field from another pointer | |||
245 | void load_field(const char **dest, const char *src, size_t n) { | |||
246 | m_buf.append(src, n); | |||
247 | if (n) { | |||
248 | *dest = &m_buf.c_str()[m_buf.size()-n]; | |||
249 | } else { | |||
250 | *dest = NULL__null; | |||
251 | } | |||
252 | } | |||
253 | ||||
254 | string m_buf; | |||
255 | IndexSpec(const IndexSpec&); | |||
256 | }; | |||
257 | ||||
258 | LIBCOUCHBASE_API__attribute__ ((visibility("default"))) | |||
259 | lcb_error_t | |||
260 | lcb_n1x_create(lcb_t instance, const void *cookie, const lcb_CMDN1XMGMT *cmd) | |||
261 | { | |||
262 | string ss; | |||
263 | IndexSpec spec(&cmd->spec); | |||
264 | spec.ensure_keyspace(instance); | |||
265 | ||||
266 | ss = "CREATE"; | |||
267 | if (spec.is_primary()) { | |||
268 | ss += " PRIMARY"; | |||
269 | } else if (!spec.nname) { | |||
270 | return LCB_EMPTY_KEY; | |||
271 | } | |||
272 | ss.append(" INDEX"); | |||
273 | if (spec.nname) { | |||
274 | ss.append(" `").append(spec.name, spec.nname).append("` "); | |||
275 | } | |||
276 | ss.append(" ON `").append(spec.keyspace, spec.nkeyspace).append("`"); | |||
277 | ||||
278 | if (!spec.is_primary()) { | |||
279 | if (!spec.nfields) { | |||
280 | return LCB_EMPTY_KEY; | |||
281 | } | |||
282 | ||||
283 | // See if we can parse 'fields' properly. First, try to parse as | |||
284 | // JSON: | |||
285 | Json::Value fields_arr; | |||
286 | Json::Reader r; | |||
287 | if (!r.parse(spec.fields, spec.fields + spec.nfields, fields_arr)) { | |||
288 | // Invalid JSON! | |||
289 | return LCB_EINVAL; | |||
290 | } | |||
291 | ||||
292 | ss.append(" ("); | |||
293 | if (fields_arr.isArray()) { | |||
294 | if (!fields_arr.size()) { | |||
295 | return LCB_EMPTY_KEY; | |||
296 | } | |||
297 | for (size_t ii = 0; ii < fields_arr.size(); ++ii) { | |||
298 | static Json::Value empty; | |||
299 | const Json::Value& field = fields_arr.get(ii, empty); | |||
300 | if (!field.isString()) { | |||
301 | return LCB_EINVAL; | |||
302 | } | |||
303 | ss.append(field.asString()); | |||
304 | if (ii != fields_arr.size()-1) { | |||
305 | ss.append(","); | |||
306 | } | |||
307 | } | |||
308 | } else if (fields_arr.isString()) { | |||
309 | std::string field_list = fields_arr.asString(); | |||
310 | if (field_list.empty()) { | |||
311 | return LCB_EMPTY_KEY; | |||
312 | } | |||
313 | ss.append(field_list); | |||
314 | } else { | |||
315 | return LCB_EINVAL; | |||
316 | } | |||
317 | ss.append(") "); | |||
318 | } | |||
319 | ||||
320 | if (spec.ncond) { | |||
321 | if (spec.is_primary()) { | |||
322 | return LCB_EINVAL; | |||
323 | } | |||
324 | ss.append(" WHERE ").append(spec.cond, spec.ncond).append(" "); | |||
325 | } | |||
326 | ||||
327 | if (spec.ixtype) { | |||
328 | const char *ixtype = ixtype_2_str(spec.ixtype); | |||
329 | if (!ixtype) { | |||
330 | return LCB_EINVAL; | |||
331 | } | |||
332 | ss.append(" USING ").append(ixtype); | |||
333 | } | |||
334 | ||||
335 | if (spec.is_defer()) { | |||
336 | ss.append(" WITH {\"defer_build\": true}"); | |||
337 | } | |||
338 | ||||
339 | return dispatch_common<IndexOpCtx>(instance, cookie, cmd->callback, cb_generic, ss); | |||
340 | } | |||
341 | ||||
342 | ||||
343 | class ListIndexCtx : public IndexOpCtx { | |||
344 | public: | |||
345 | vector<IndexSpec*> specs; | |||
346 | ||||
347 | virtual void invoke(lcb_t instance, lcb_RESPN1XMGMT *resp) { | |||
348 | finish(instance, resp); | |||
349 | } | |||
350 | ||||
351 | void finish(lcb_t instance, lcb_RESPN1XMGMT *resp = NULL__null) { | |||
352 | lcb_RESPN1XMGMT w_resp = { 0 }; | |||
353 | if (resp == NULL__null) { | |||
354 | resp = &w_resp; | |||
355 | resp->rc = LCB_SUCCESS; | |||
356 | } | |||
357 | resp->cookie = cookie; | |||
358 | lcb_N1XSPEC **speclist = reinterpret_cast<lcb_N1XSPEC**>(&specs[0]); | |||
359 | resp->specs = speclist; | |||
360 | resp->nspecs = specs.size(); | |||
361 | callback(instance, LCB_CALLBACK_IXMGMT-3, resp); | |||
362 | delete this; | |||
363 | } | |||
364 | ||||
365 | virtual ~ListIndexCtx() { | |||
366 | for (size_t ii = 0; ii < specs.size(); ++ii) { | |||
367 | delete specs[ii]; | |||
368 | } | |||
369 | specs.clear(); | |||
370 | } | |||
371 | }; | |||
372 | ||||
373 | static void | |||
374 | cb_index_list(lcb_t instance, int, const lcb_RESPN1QL *resp) | |||
375 | { | |||
376 | ListIndexCtx *ctx = reinterpret_cast<ListIndexCtx *>(resp->cookie); | |||
377 | if (!(resp->rflags & LCB_RESP_F_FINAL)) { | |||
378 | ctx->specs.push_back(new IndexSpec(resp->row, resp->nrow)); | |||
379 | return; | |||
380 | } | |||
381 | ||||
382 | lcb_RESPN1XMGMT w_resp = { 0 }; | |||
383 | if ((w_resp.rc = resp->rc) == LCB_SUCCESS) { | |||
384 | w_resp.rc = get_n1ql_error(resp->row, resp->nrow); | |||
385 | } | |||
386 | w_resp.inner = resp; | |||
387 | ctx->invoke(instance, &w_resp); | |||
388 | } | |||
389 | ||||
390 | static lcb_error_t | |||
391 | do_index_list(lcb_t instance, const void *cookie, const lcb_CMDN1XMGMT *cmd, | |||
392 | ListIndexCtx *ctx) | |||
393 | { | |||
394 | string ss; | |||
395 | IndexSpec spec(&cmd->spec); | |||
396 | ss = "SELECT idx.* FROM system:indexes idx WHERE"; | |||
397 | ||||
398 | if (spec.flags & LCB_N1XSPEC_F_PRIMARY1<<16) { | |||
399 | ss.append(" is_primary=true AND"); | |||
400 | } | |||
401 | if (spec.nkeyspace) { | |||
402 | ss.append(" keyspace_id=\"").append(spec.keyspace, spec.nkeyspace).append("\" AND"); | |||
403 | } | |||
404 | if (spec.nnspace) { | |||
405 | ss.append(" namespace_id=\"").append(spec.nspace, spec.nnspace).append("\" AND"); | |||
406 | } | |||
407 | if (spec.ixtype) { | |||
408 | const char *s_ixtype = ixtype_2_str(spec.ixtype); | |||
409 | if (s_ixtype == NULL__null) { | |||
410 | if (ctx != NULL__null) { | |||
411 | delete ctx; | |||
412 | } | |||
413 | return LCB_EINVAL; | |||
414 | } | |||
415 | ss.append(" using=\"").append(s_ixtype).append("\" AND"); | |||
416 | } | |||
417 | if (spec.nname) { | |||
418 | ss.append(" name=\"").append(spec.name, spec.nname).append("\" AND"); | |||
419 | } | |||
420 | ||||
421 | // WHERE <.....> true | |||
422 | ss.append(" true"); | |||
423 | ss.append(" ORDER BY is_primary DESC, name ASC"); | |||
424 | ||||
425 | return dispatch_common<ListIndexCtx>(instance, | |||
426 | cookie, cmd->callback, cb_index_list, ss, ctx); | |||
427 | } | |||
428 | ||||
429 | LIBCOUCHBASE_API__attribute__ ((visibility("default"))) | |||
430 | lcb_error_t | |||
431 | lcb_n1x_list(lcb_t instance, const void *cookie, const lcb_CMDN1XMGMT *cmd) | |||
432 | { | |||
433 | return do_index_list(instance, cookie, cmd, NULL__null); | |||
434 | } | |||
435 | ||||
436 | LIBCOUCHBASE_API__attribute__ ((visibility("default"))) | |||
437 | lcb_error_t | |||
438 | lcb_n1x_drop(lcb_t instance, const void *cookie, const lcb_CMDN1XMGMT *cmd) | |||
439 | { | |||
440 | string ss; | |||
441 | IndexSpec spec(&cmd->spec); | |||
442 | spec.ensure_keyspace(instance); | |||
443 | ||||
444 | if (spec.nname) { | |||
445 | ss = "DROP INDEX"; | |||
446 | ss.append(" `").append(spec.keyspace, spec.nkeyspace).append("`"); | |||
447 | ss.append(".`").append(spec.name, spec.nname).append("`"); | |||
448 | } else if (spec.flags & LCB_N1XSPEC_F_PRIMARY1<<16) { | |||
449 | ss = "DROP PRIMARY INDEX ON"; | |||
450 | ss.append(" `").append(spec.keyspace, spec.nkeyspace).append("`"); | |||
451 | } else { | |||
452 | return LCB_EMPTY_KEY; | |||
453 | } | |||
454 | ||||
455 | if (spec.ixtype) { | |||
456 | const char *stype = ixtype_2_str(spec.ixtype); | |||
457 | if (!stype) { | |||
458 | return LCB_EINVAL; | |||
459 | } | |||
460 | ss.append(" USING ").append(stype); | |||
461 | } | |||
462 | ||||
463 | return dispatch_common<IndexOpCtx>(instance, cookie, cmd->callback, cb_generic, ss); | |||
464 | } | |||
465 | ||||
466 | class ListIndexCtx_BuildIndex : public ListIndexCtx { | |||
467 | public: | |||
468 | virtual inline void invoke(lcb_t instance, lcb_RESPN1XMGMT *resp); | |||
469 | inline lcb_error_t try_build(lcb_t instance); | |||
470 | }; | |||
471 | ||||
472 | static void | |||
473 | cb_build_submitted(lcb_t instance, int, const lcb_RESPN1QL *resp) | |||
474 | { | |||
475 | ListIndexCtx *ctx = reinterpret_cast<ListIndexCtx*>(resp->cookie); | |||
476 | ||||
477 | if (resp->rflags & LCB_RESP_F_FINAL) { | |||
478 | lcb_RESPN1XMGMT w_resp = { 0 }; | |||
479 | if ((w_resp.rc = resp->rc) == LCB_SUCCESS) { | |||
480 | w_resp.rc = get_n1ql_error(resp->row, resp->nrow); | |||
481 | } | |||
482 | ctx->finish(instance, &w_resp); | |||
483 | } | |||
484 | } | |||
485 | ||||
486 | lcb_error_t | |||
487 | ListIndexCtx_BuildIndex::try_build(lcb_t instance) | |||
488 | { | |||
489 | vector<IndexSpec*> pending; | |||
490 | for (size_t ii = 0; ii < specs.size(); ++ii) { | |||
491 | IndexSpec* spec = specs[ii]; | |||
492 | if (strncmp(spec->state, "pending", spec->nstate) == 0 || | |||
493 | strncmp(spec->state, "deferred", spec->nstate) == 0) { | |||
494 | pending.push_back(spec); | |||
495 | } | |||
496 | } | |||
497 | ||||
498 | if (pending.empty()) { | |||
499 | return LCB_KEY_ENOENT; | |||
500 | } | |||
501 | ||||
502 | string ss; | |||
503 | ss = "BUILD INDEX ON `"; | |||
504 | ||||
505 | ss.append(pending[0]->keyspace, pending[0]->nkeyspace).append("`"); | |||
506 | ss += '('; | |||
507 | for (size_t ii = 0; ii < pending.size(); ++ii) { | |||
508 | ss += '`'; | |||
509 | ss.append(pending[ii]->name, pending[ii]->nname); | |||
510 | ss += '`'; | |||
511 | if (ii+1 < pending.size()) { | |||
512 | ss += ','; | |||
513 | } | |||
514 | } | |||
515 | ss += ')'; | |||
516 | ||||
517 | lcb_error_t rc = dispatch_common<ListIndexCtx_BuildIndex>( | |||
518 | instance, cookie, callback, cb_build_submitted, ss, | |||
519 | this); | |||
520 | ||||
521 | if (rc == LCB_SUCCESS) { | |||
522 | std::set<IndexSpec*> to_remove(specs.begin(), specs.end()); | |||
523 | for (size_t ii = 0; ii < pending.size(); ++ii) { | |||
524 | to_remove.erase(pending[ii]); | |||
525 | } | |||
526 | ||||
527 | std::for_each(to_remove.begin(), to_remove.end(), my_delete<IndexSpec*>); | |||
528 | ||||
529 | specs = pending; | |||
530 | } | |||
531 | return rc; | |||
532 | } | |||
533 | ||||
534 | void | |||
535 | ListIndexCtx_BuildIndex::invoke(lcb_t instance, lcb_RESPN1XMGMT *resp) | |||
536 | { | |||
537 | if (resp->rc == LCB_SUCCESS && | |||
538 | (resp->rc = try_build(instance)) == LCB_SUCCESS) { | |||
539 | return; | |||
540 | } | |||
541 | finish(instance, resp); | |||
542 | } | |||
543 | ||||
544 | LIBCOUCHBASE_API__attribute__ ((visibility("default"))) | |||
545 | lcb_error_t | |||
546 | lcb_n1x_startbuild(lcb_t instance, const void *cookie, const lcb_CMDN1XMGMT *cmd) | |||
547 | { | |||
548 | ListIndexCtx_BuildIndex *ctx = new ListIndexCtx_BuildIndex(); | |||
| ||||
549 | lcb_error_t rc = do_index_list(instance, cookie, cmd, ctx); | |||
550 | if (rc != LCB_SUCCESS) { | |||
551 | delete ctx; | |||
| ||||
552 | } | |||
553 | return rc; | |||
554 | } | |||
555 | ||||
556 | struct WatchIndexCtx : public IndexOpCtx { | |||
557 | // Interval timer | |||
558 | lcbio_pTIMER m_timer; | |||
559 | uint32_t m_interval; | |||
560 | uint64_t m_tsend; | |||
561 | lcb_t m_instance; | |||
562 | std::map<std::string,IndexSpec*> m_defspend; | |||
563 | std::vector<IndexSpec*> m_defsok; | |||
564 | ||||
565 | inline void read_state(const lcb_RESPN1XMGMT *resp); | |||
566 | inline void reschedule(); | |||
567 | inline lcb_error_t do_poll(); | |||
568 | inline lcb_error_t load_defs(const lcb_CMDN1XWATCH *); | |||
569 | inline WatchIndexCtx(lcb_t, const void *, const lcb_CMDN1XWATCH*); | |||
570 | inline ~WatchIndexCtx(); | |||
571 | inline void finish(lcb_error_t rc, const lcb_RESPN1XMGMT *); | |||
572 | }; | |||
573 | ||||
574 | static void | |||
575 | cb_watchix_tm(void *arg) | |||
576 | { | |||
577 | WatchIndexCtx *ctx = reinterpret_cast<WatchIndexCtx*>(arg); | |||
578 | uint64_t now = lcb_nstime(); | |||
579 | if (now >= ctx->m_tsend) { | |||
580 | ctx->finish(LCB_ETIMEDOUT, NULL__null); | |||
581 | } else { | |||
582 | ctx->do_poll(); | |||
583 | } | |||
584 | } | |||
585 | ||||
586 | #define DEFAULT_WATCH_TIMEOUT(((lcb_uint32_t)30) * 1000000) LCB_S2US(30)(((lcb_uint32_t)30) * 1000000) | |||
587 | #define DEFAULT_WATCH_INTERVAL((500) * 1000) LCB_MS2US(500)((500) * 1000) | |||
588 | ||||
589 | WatchIndexCtx::WatchIndexCtx(lcb_t instance, const void *cookie_, const lcb_CMDN1XWATCH *cmd) | |||
590 | : m_instance(instance) | |||
591 | { | |||
592 | uint64_t now = lcb_nstime(); | |||
593 | uint32_t timeout = cmd->timeout ? cmd->timeout : DEFAULT_WATCH_TIMEOUT(((lcb_uint32_t)30) * 1000000); | |||
594 | m_interval = cmd->interval ? cmd->interval : DEFAULT_WATCH_INTERVAL((500) * 1000); | |||
595 | m_interval = std::min(m_interval, timeout); | |||
596 | m_tsend = now + LCB_US2NS(timeout)(((hrtime_t)timeout) * 1000); | |||
597 | ||||
598 | this->callback = cmd->callback; | |||
599 | this->cookie = const_cast<void*>(cookie_); | |||
600 | ||||
601 | m_timer = lcbio_timer_new(instance->iotable, this, cb_watchix_tm); | |||
602 | lcb_aspend_add(&instance->pendops, LCB_PENDTYPE_COUNTER, NULL__null); | |||
603 | } | |||
604 | ||||
605 | WatchIndexCtx::~WatchIndexCtx() | |||
606 | { | |||
607 | if (m_timer) { | |||
608 | lcbio_timer_destroy(m_timer); | |||
609 | } | |||
610 | if (m_instance) { | |||
611 | lcb_aspend_del(&m_instance->pendops, LCB_PENDTYPE_COUNTER, NULL__null); | |||
612 | lcb_maybe_breakout(m_instance); | |||
613 | } | |||
614 | ||||
615 | std::for_each(m_defsok.begin(), m_defsok.end(), my_delete<IndexSpec*>); | |||
616 | for (std::map<string,IndexSpec*>::iterator ii = m_defspend.begin(); | |||
617 | ii != m_defspend.end(); ++ii) { | |||
618 | delete ii->second; | |||
619 | } | |||
620 | } | |||
621 | ||||
622 | void | |||
623 | IndexSpec::to_key(const lcb_N1XSPEC* spec, std::string& s) | |||
624 | { | |||
625 | // Identity is: | |||
626 | // {keyspace,name,is_primary,type} | |||
627 | s.append(spec->nspace, spec->nnspace).append(" "); | |||
628 | s.append(spec->keyspace, spec->nkeyspace).append(" "); | |||
629 | s.append(spec->name, spec->nname).append(" "); | |||
630 | const char *type_s = ixtype_2_str(spec->ixtype); | |||
631 | if (!type_s) { | |||
632 | type_s = "<UNKNOWN>"; | |||
633 | } | |||
634 | s.append(type_s); | |||
635 | } | |||
636 | ||||
637 | void | |||
638 | WatchIndexCtx::read_state(const lcb_RESPN1XMGMT *resp) | |||
639 | { | |||
640 | // We examine the indexes here to see which ones have been completed | |||
641 | // Make them all into an std::map | |||
642 | if (resp->rc != LCB_SUCCESS) { | |||
643 | lcb_log(LOGARGS(this, INFO)(this)->m_instance->settings, "ixmgmt", LCB_LOG_INFO, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc" , 643, LOGFMT"(mgreq=%p) " "Error 0x%x while listing indexes. Rescheduling", LOGID(this)static_cast<const void*>(this), resp->rc); | |||
644 | reschedule(); | |||
645 | return; | |||
646 | } | |||
647 | ||||
648 | std::map<std::string, const lcb_N1XSPEC*> in_specs; | |||
649 | for (size_t ii = 0; ii < resp->nspecs; ++ii) { | |||
650 | std::string key; | |||
651 | IndexSpec::to_key(resp->specs[ii], key); | |||
652 | in_specs[key] = resp->specs[ii]; | |||
653 | } | |||
654 | ||||
655 | std::map<std::string, IndexSpec*>::iterator it_remain = m_defspend.begin(); | |||
656 | while (it_remain != m_defspend.end()) { | |||
657 | // See if the index is 'online' yet! | |||
658 | std::map<std::string,const lcb_N1XSPEC*>::iterator res; | |||
659 | res = in_specs.find(it_remain->first); | |||
660 | if (res == in_specs.end()) { | |||
661 | lcb_log(LOGARGS(this, INFO)(this)->m_instance->settings, "ixmgmt", LCB_LOG_INFO, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc" , 661, LOGFMT"(mgreq=%p) " "Index [%s] not in cluster", LOGID(this)static_cast<const void*>(this), it_remain->first.c_str()); | |||
662 | // We can't find our own index. Someone else deleted it. Bail! | |||
663 | finish(LCB_KEY_ENOENT, resp); | |||
664 | return; | |||
665 | } | |||
666 | ||||
667 | std::string s_state(res->second->state, res->second->nstate); | |||
668 | if (s_state == "online") { | |||
669 | lcb_log(LOGARGS(this, DEBUG)(this)->m_instance->settings, "ixmgmt", LCB_LOG_DEBUG, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc" , 669, LOGFMT"(mgreq=%p) " "Index [%s] is ready", LOGID(this)static_cast<const void*>(this), it_remain->first.c_str()); | |||
670 | m_defsok.push_back(it_remain->second); | |||
671 | m_defspend.erase(it_remain++); | |||
672 | } else { | |||
673 | ++it_remain; | |||
674 | } | |||
675 | } | |||
676 | ||||
677 | if (m_defspend.empty()) { | |||
678 | finish(LCB_SUCCESS, resp); | |||
679 | } else { | |||
680 | reschedule(); | |||
681 | } | |||
682 | } | |||
683 | ||||
684 | lcb_error_t | |||
685 | WatchIndexCtx::load_defs(const lcb_CMDN1XWATCH *cmd) | |||
686 | { | |||
687 | for (size_t ii = 0; ii < cmd->nspec; ++ii) { | |||
688 | std::string key; | |||
689 | IndexSpec *extspec = new IndexSpec(cmd->specs[ii]); | |||
690 | IndexSpec::to_key(extspec, key); | |||
691 | m_defspend[key] = extspec; | |||
692 | } | |||
693 | if (m_defspend.empty()) { | |||
694 | return LCB_ENO_COMMANDS; | |||
695 | } | |||
696 | return LCB_SUCCESS; | |||
697 | } | |||
698 | ||||
699 | void | |||
700 | WatchIndexCtx::finish(lcb_error_t rc, const lcb_RESPN1XMGMT *resp) | |||
701 | { | |||
702 | lcb_RESPN1XMGMT my_resp = { 0 }; | |||
703 | my_resp.cookie = cookie; | |||
704 | my_resp.rc = rc; | |||
705 | ||||
706 | if (resp) { | |||
707 | my_resp.inner = resp->inner; | |||
708 | } | |||
709 | ||||
710 | lcb_N1XSPEC **speclist = reinterpret_cast<lcb_N1XSPEC**>(&m_defsok[0]); | |||
711 | my_resp.specs = speclist; | |||
712 | my_resp.nspecs = m_defsok.size(); | |||
713 | callback(m_instance, LCB_CALLBACK_IXMGMT-3, &my_resp); | |||
714 | delete this; | |||
715 | } | |||
716 | ||||
717 | void | |||
718 | WatchIndexCtx::reschedule() | |||
719 | { | |||
720 | // Next interval! | |||
721 | uint64_t now = lcb_nstime(); | |||
722 | if (now + LCB_US2NS(m_interval)(((hrtime_t)m_interval) * 1000) >= m_tsend) { | |||
723 | finish(LCB_ETIMEDOUT, NULL__null); | |||
724 | } else { | |||
725 | lcbio_timer_rearm(m_timer, m_interval); | |||
726 | } | |||
727 | } | |||
728 | ||||
729 | static void | |||
730 | cb_watch_gotlist(lcb_t, int, const lcb_RESPN1XMGMT *resp) | |||
731 | { | |||
732 | WatchIndexCtx *ctx = reinterpret_cast<WatchIndexCtx*>(resp->cookie); | |||
733 | ctx->read_state(resp); | |||
734 | } | |||
735 | ||||
736 | lcb_error_t | |||
737 | WatchIndexCtx::do_poll() | |||
738 | { | |||
739 | lcb_CMDN1XMGMT cmd; | |||
740 | memset(&cmd, 0, sizeof cmd); | |||
741 | cmd.callback = cb_watch_gotlist; | |||
742 | lcb_log(LOGARGS(this, DEBUG)(this)->m_instance->settings, "ixmgmt", LCB_LOG_DEBUG, "/home/avsej/code/libcouchbase/src/n1ql/ixmgmt.cc" , 742, LOGFMT"(mgreq=%p) " "Will check for index readiness of %lu indexes. %lu completed", LOGID(this)static_cast<const void*>(this), m_defspend.size(), m_defsok.size()); | |||
743 | return lcb_n1x_list(m_instance, this, &cmd); | |||
744 | } | |||
745 | ||||
746 | LIBCOUCHBASE_API__attribute__ ((visibility("default"))) | |||
747 | lcb_error_t | |||
748 | lcb_n1x_watchbuild(lcb_t instance, const void *cookie, const lcb_CMDN1XWATCH *cmd) | |||
749 | { | |||
750 | WatchIndexCtx *ctx = new WatchIndexCtx(instance, cookie, cmd); | |||
751 | lcb_error_t rc; | |||
752 | if ((rc = ctx->load_defs(cmd)) != LCB_SUCCESS) { | |||
753 | delete ctx; | |||
754 | return rc; | |||
755 | } | |||
756 | if ((rc = ctx->do_poll()) != LCB_SUCCESS) { | |||
757 | delete ctx; | |||
758 | return rc; | |||
759 | } | |||
760 | return LCB_SUCCESS; | |||
761 | } | |||
762 | ||||
763 | void | |||
764 | IndexSpec::load_json(const char *s, size_t n) { | |||
765 | Json::Value root; | |||
766 | // Set the JSON first! | |||
767 | m_buf.assign(s, n); | |||
768 | nrawjson = n; | |||
769 | ||||
770 | if (!Json::Reader().parse(s, s + n, root)) { | |||
771 | rawjson = m_buf.c_str(); | |||
772 | return; | |||
773 | } | |||
774 | ||||
775 | m_buf.reserve(n + total_fields_size(root)); | |||
776 | load_fields(root, true); | |||
777 | ||||
778 | // Once all the fields are loaded, it's time to actually assign the | |||
779 | // rawjson field, which is simply the beginning of the buffer | |||
780 | rawjson = m_buf.c_str(); | |||
781 | ||||
782 | // Get the index type | |||
783 | string ixtype_s = root["using"].asString(); | |||
784 | if (ixtype_s == "gsi") { | |||
785 | ixtype = LCB_N1XSPEC_T_GSI1; | |||
786 | } else if (ixtype_s == "view") { | |||
787 | ixtype = LCB_N1XSPEC_T_VIEW2; | |||
788 | } | |||
789 | if (root["is_primary"].asBool()) { | |||
790 | flags |= LCB_N1XSPEC_F_PRIMARY1<<16; | |||
791 | } | |||
792 | } | |||
793 | ||||
794 | // IndexSpec stuff | |||
795 | IndexSpec::IndexSpec(const lcb_N1XSPEC *spec) | |||
796 | { | |||
797 | *static_cast<lcb_N1XSPEC*>(this) = *spec; | |||
798 | if (spec->nrawjson) { | |||
799 | load_json(spec->rawjson, spec->nrawjson); | |||
800 | return; | |||
801 | } | |||
802 | // Initialize the bufs | |||
803 | m_buf.reserve(nname + nkeyspace + nnspace + nstate + nfields + nrawjson + nstate + ncond); | |||
804 | load_field(&rawjson, spec->rawjson, nrawjson); | |||
805 | load_field(&name, spec->name, nname); | |||
806 | load_field(&keyspace, spec->keyspace, nkeyspace); | |||
807 | load_field(&nspace, spec->nspace, nnspace); | |||
808 | load_field(&state, spec->state, nstate); | |||
809 | load_field(&fields, spec->fields, nfields); | |||
810 | load_field(&cond, spec->cond, ncond); | |||
811 | } | |||
812 | ||||
813 | size_t | |||
814 | IndexSpec::load_fields(const Json::Value& root, bool do_copy) | |||
815 | { | |||
816 | size_t size = 0; | |||
817 | size += load_json_field(root, "name", &name, &nname, do_copy); | |||
818 | size += load_json_field(root, "keyspace_id", &keyspace, &nkeyspace, do_copy); | |||
819 | size += load_json_field(root, "namespace_id", &nspace, &nnspace, do_copy); | |||
820 | size += load_json_field(root, "state", &state, &nstate, do_copy); | |||
821 | size += load_json_field(root, "index_key", &fields, &nfields, do_copy); | |||
822 | size += load_json_field(root, "condition", &cond, &ncond, do_copy); | |||
823 | return size; | |||
824 | } | |||
825 | ||||
826 | size_t | |||
827 | IndexSpec::load_json_field(const Json::Value& root, | |||
828 | const char *name_, const char **tgt_ptr, size_t *tgt_len, bool do_copy) | |||
829 | { | |||
830 | size_t namelen = strlen(name_); | |||
831 | const Json::Value *val = root.find(name_, name_ + namelen); | |||
832 | size_t n = 0; | |||
833 | ||||
834 | if (val == NULL__null) { | |||
835 | return 0; | |||
836 | } | |||
837 | ||||
838 | if (val->isString()) { | |||
839 | const char *s_begin, *s_end; | |||
840 | if (val->getString(&s_begin, &s_end) && (n = s_end - s_begin) && do_copy) { | |||
841 | m_buf.insert(m_buf.end(), s_begin, s_end); | |||
842 | } | |||
843 | } else { | |||
844 | std::string frag = Json::FastWriter().write(*val); | |||
845 | n = frag.size(); | |||
846 | if (do_copy) { | |||
847 | m_buf.append(frag); | |||
848 | } | |||
849 | } | |||
850 | ||||
851 | if (n) { | |||
852 | *tgt_ptr = &(m_buf.c_str()[m_buf.size()-n]); | |||
853 | *tgt_len = n; | |||
854 | } else { | |||
855 | *tgt_ptr = NULL__null; | |||
856 | *tgt_len = 0; | |||
857 | } | |||
858 | return n; | |||
859 | } |