get_any_replica returns corrupted response

Description

I am seeing the following value for the beer-sample bucket's 21st_amendment_brewery_cafe after a get_any_replica read. See below comment for sample cpp program. Have also attached output log (

).

\x9a\U00000005\xf0\xb1{\"name\":\"21st Amendment Brewery Cafe\",\"city\":\"San Francisco\",\"state\":\"California\",\"code\":\"94107\",\"country\":\"United States\",\"phone\":\"1-415-369-0900\",\"website\":\"http://www.21st-ame\t\xa1(.com/\",\"typ\U00000001\xbe

Expected value:

{ "name": "21st Amendment Brewery Cafe", "city": "San Francisco", "state": "California", "code": "94107", "country": "United States", "phone": "1-415-369-0900", "website": "http://www.21st-amendment.com/", "type": "brewery", "updated": "2010-10-24 13:54:07", "description": "The 21st Amendment Brewery offers a variety of award winning house made brews and American grilled cuisine in a comfortable loft like setting. Join us before and after Giants baseball games in our outdoor beer garden. A great location for functions and parties in our semi-private Brewers Loft. See you soon at the 21A!", "address": [ "563 Second Street" ], "geo": { "accuracy": "ROOFTOP", "lat": 37.7825, "lon": -122.393 } }

Repro Steps:

  1. Setup a 2 node cluster

  2. Determine which node has the active doc for a key in the beer-sample bucket (chose 21st_amendment_brewery_cafe for my testing)

  3. Start looping program

  4. Stop the node w/ the active doc

  5. On failure the program will attempt to get the replica and that is when it fails to parse the JSON (probably b/c the response is incorrect JSON 🙂)

Environment

None

Release Notes Description

None

Attachments

1
  • 21 Oct 2022, 03:06 AM

Activity

Show:

Sergey Auseyau October 22, 2022 at 7:46 AM

Sergey Auseyau October 21, 2022 at 6:11 PM

So according to these lines

[2022-10-20 22:02:38.736] [99845,4433160] [trace] 61ms, [f70306ed-ed88-457a-0f0a-0bccfd79cd24/f794bb6c-4f77-4df1-9e7c-1a192f54fd7c/plain/beer-sample] <ec2-34-212-171-20.us-west-2.compute.amazonaws.com/34.212.171.20:11210> MCBP recv, opaque=14, 18 83 03 1c 04 03 00 00 00 00 02 94 0e 00 00 00 17 1e 63 ad d5 21 00 00 [2022-10-20 22:02:38.736] [99845,4433160] [trace] 0ms, [f70306ed-ed88-457a-0f0a-0bccfd79cd24/f794bb6c-4f77-4df1-9e7c-1a192f54fd7c/plain/beer-sample] <ec2-34-212-171-20.us-west-2.compute.amazonaws.com/34.212.171.20:11210> MCBP invoke operation handler: opcode=get_replica (0x83), opaque=14, status=0 (success (0x00)), ec=Undefined error: 0 libc++abi: terminating with uncaught exception of type tao::pegtl::parse_error: tao::json::events::from_string:1:1: no valid JSON

Server sends with flags 0x03 which is "0x01 (json) | 0x02 (snappy)". I guess somehow we skipped the snappy decompression.

Jared Casey October 21, 2022 at 3:05 AM
Edited

Sample program (probably could just use the kv_loader w/ a tweak)

#include <string> #include <iostream> #include <core/cluster.hxx> #include <core/utils/json.hxx> namespace { volatile std::sig_atomic_t running{ 1 }; } // namespace void sigint_handler(int /* signal */) { running = 0; } struct connection { asio::io_context io_; std::shared_ptr<couchbase::core::cluster> cluster_; std::list<std::thread> io_threads_; connection(int num_io_threads) { cluster_ = couchbase::core::cluster::create(io_); for (int i = 0; i < num_io_threads; i++) { io_threads_.emplace_back([&] { io_.run(); }); } } }; void open_cluster(couchbase::core::cluster& cluster, const couchbase::core::origin& origin) { auto barrier = std::make_shared<std::promise<std::error_code>>(); auto f = barrier->get_future(); cluster.open(origin, [barrier](std::error_code ec) mutable { barrier->set_value(ec); }); auto rc = f.get(); if (rc) { LOG_CRITICAL("unable to connect to the cluster: {}, nodes={}", rc.message(), fmt::join(origin.get_nodes(), ", ")); throw std::system_error(rc); } } void close_cluster(couchbase::core::cluster& cluster) { auto barrier = std::make_shared<std::promise<void>>(); auto f = barrier->get_future(); cluster.close([barrier]() { barrier->set_value(); }); f.get(); } void open_bucket(couchbase::core::cluster& cluster, const std::string& bucket_name) { auto barrier = std::make_shared<std::promise<std::error_code>>(); auto f = barrier->get_future(); cluster.open_bucket(bucket_name, [barrier](std::error_code ec) mutable { barrier->set_value(ec); }); auto rc = f.get(); if (rc) { LOG_CRITICAL("unable to open bucket: {}, name={}", rc.message(), bucket_name); throw std::system_error(rc); } } template<class Request> auto execute(couchbase::core::cluster& cluster, Request request) { using response_type = typename Request::response_type; auto barrier = std::make_shared<std::promise<response_type>>(); auto f = barrier->get_future(); cluster.execute(request, [barrier](response_type resp) { barrier->set_value(std::move(resp)); }); auto resp = f.get(); return resp; } int main() { couchbase::core::logger::create_console_logger(); couchbase::core::logger::set_log_levels(couchbase::core::logger::level::trace); auto connection_str = couchbase::core::utils::parse_connection_string("couchbase://{{endpoint}}"); std::string bucket_name = "beer-sample"; couchbase::core::cluster_credentials auth{}; auth.username = "Administrator"; auth.password = "password"; connection* const conn = new connection(1); auto conn_opts = couchbase::core::origin(auth, connection_str); open_cluster(*conn->cluster_, conn_opts); open_bucket(*conn->cluster_, bucket_name.c_str()); std::signal(SIGINT, sigint_handler); std::signal(SIGTERM, sigint_handler); auto id = couchbase::core::document_id{ "beer-sample", "_default", "_default", "21st_amendment_brewery_cafe" }; while(running != 0){ couchbase::core::operations::get_request req{ id }; auto resp = execute(*conn->cluster_, req); if (resp.ctx.ec()) { if(resp.ctx.ec() == couchbase::errc::common::unambiguous_timeout){ couchbase::core::operations::get_any_replica_request replica_req{ id, std::chrono::milliseconds (5000) }; auto replica_resp = execute(*conn->cluster_, replica_req); if(replica_resp.ctx.ec()){ LOG_ERROR("Error executing get_any_replica: {}", replica_resp.ctx.ec().message()); } else { auto json = couchbase::core::utils::json::parse_binary(replica_resp.value); LOG_INFO("Got replica: {}", couchbase::core::utils::json::generate(json)); } }else { LOG_ERROR("Error executing get: {}", resp.ctx.ec().message()); } } else { auto json = couchbase::core::utils::json::parse_binary(resp.value); LOG_INFO("Got: {}", couchbase::core::utils::json::generate(json)); } std::this_thread::sleep_for (std::chrono::seconds(3)); } close_cluster(*conn->cluster_); conn->io_.stop(); for (auto& t : conn->io_threads_) { if (t.joinable()) { t.join(); } } LOG_INFO_RAW("dealloc_conn completed"); delete conn; return 0; }
Fixed
Pinned fields
Click on the next to a field label to start pinning.

Details

Assignee

Reporter

Sprint

Story Points

Fix versions

Affects versions

Priority

Instabug

Open Instabug

Sentry

Zendesk Support

Created October 21, 2022 at 3:04 AM
Updated October 22, 2022 at 9:10 AM
Resolved October 22, 2022 at 9:10 AM
Instabug