Uploaded image for project: 'Couchbase Documentation'
  1. Couchbase Documentation
  2. DOC-3543

Requesting best practices for long-running async N1QL query with Java client

    XMLWordPrintable

Details

    • DOC-S17-Jun01, DOC-S18-Jun19, DOC-S19-Jun29, DOC-S21-Jul20[Vulcan GA]
    • 1

    Description

      (Raising as requested by Richard Smedley)

      Both I and end users could use guidance on how to perform a long-running N1QL query, streaming (not buffering) the rows and correctly handling errors. I've experimented with 3 ways, none of which I love:

      1. The example in our N1QL docs (https://developer.couchbase.com/documentation/server/current/sdk/java/n1ql-queries-with-sdk.html) does this:

      bucket.async()
          .query(select("*").from(i("beer-sample")).limit(10))
          .flatMap(result ->
              result.errors()
              .flatMap(e -> Observable.<AsyncN1qlQueryRow>error(new CouchbaseException("N1QL Error/Warning: " + e)))
              .switchIfEmpty(result.rows())
          )
          .map(AsyncN1qlQueryRow::value)
          .subscribe(
              rowContent -> System.out.println(rowContent),
              runtimeError -> runtimeError.printStackTrace()
          );
      

      This works but (I think) will wait for the entire response to complete, buffering it, before it can know for certain that there were no errors() so it can return the rows() instead. Since my example app is fetching 2 million rows, this isn't ideal and likely to result in an out-of-memory error.

      2. I can subscribe() to both errors() and rows() independently, e.g.

                      s.errors().subscribe(new Subscriber<JsonObject>() {
                          // ...
                      });
       
                      s.rows().subscribe(new Subscriber<AsyncN1qlQueryRow>() {
                          // ...
                      });
      

      This also works, and it now streams both rows and errors nicely with no buffering. But, it's not immediately clear (as a newcomer to RxJava) how to do anything useful with this. E.g. the customer wants to stream the rows() back to their frontend, and interrupt that on an error. What’s the best way of doing that? I’ll figure it out eventually, but right now it’s unclear to me and perhaps others.

 I want to capture these moments of confusion before all this is old hat to me, in the hopes that it will help improve the docs

      3. Finally, I thought of merging the errors() and rows() stream like this:

      



      Observable.merge(s.errors(), s.rows())
              .subscribe(new Subscriber<Object>() {
                  @Override
                  public void onNext(Object o) {
                      if (o instanceof JsonObject) {
                          System.out.println("error: " + o);
                      } else if (o instanceof DefaultAsyncN1qlQueryRow) {
      		   // …

      

      

This works excellently in terms of letting me combine the two streams easily, but now I’m having to use Object and instanceof. Gross.

      Maybe (hopefully) I've missed a much better way of doing things. Either way, I couldn't find it documented.

      Attachments

        No reviews matched the request. Check your Options in the drop-down menu of this sections header.

        Activity

          People

            richard.smedley Richard Smedley
            graham.pople Graham Pople
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Gerrit Reviews

                There are no open Gerrit changes

                PagerDuty