Uploaded image for project: 'Couchbase Documentation'
  1. Couchbase Documentation
  2. DOC-5833

Show how to deliver Replicator notifications on a custom executor

    XMLWordPrintable

Details

    • Task
    • Resolution: Fixed
    • Critical
    • Mobile 2.6
    • Mobile 2.6
    • couchbase-lite
    • None
    • DOC-2020-S1-Jan13
    • 1

    Description

      The following code snippets show how to use a custom executor for delivery of replicator notifications:

      /**
       * This version guarantees in order delivery and is parsimonious with space
       * The listener does not need to be thread safe (at least as far as this code is concerned).
       * It will run on only thread (the Executor's thread) and must return from a given call
       * before the next call commences.  Events may be delivered arbitrarily late, though,
       * depending on how long it takes the listener to run.
       */
      public class InOrderExample {
          private static final ExecutorService IN_ORDER_EXEC = Executors.newSingleThreadExecutor();
       
          public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
              throws CouchbaseLiteException {
              ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
              config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
              config.setContinuous(false);
       
              Replicator repl = new Replicator(config);
              ListenerToken token = repl.addChangeListener(IN_ORDER_EXEC, listener::changed);
       
              repl.start();
       
              return repl;
          }
      }
      

      /**
       * This version maximizes throughput.  It will deliver change notifications as quickly
       * as CPU availability allows. It may deliver change notifications out of order.
       * Listeners must be thread safe because they may be called from multiple threads.
       * In fact, they must be re-entrant because a given listener may be running on mutiple threads
       * simultaneously.  In addition, when notifications swamp the processors, notifications awaiting
       * a processor will be queued as Threads, (instead of as Runnables) with accompanying memory
       * and GC impact.
       */
      public class MaxThroughputExample {
          private static final ExecutorService MAX_THROUGHPUT_EXEC = Executors.newCachedThreadPool();
       
          public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
              throws CouchbaseLiteException {
              ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
              config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
              config.setContinuous(false);
       
              Replicator repl = new Replicator(config);
              ListenerToken token = repl.addChangeListener(MAX_THROUGHPUT_EXEC, listener::changed);
       
              repl.start();
       
              return repl;
          }
      }
      

      /**
       * This version demonstrates the extreme configurability of the CouchBase Lite replicator callback system.
       * It may deliver updates out of order and does require thread-safe and re-entrant listeners
       * (though it does correctly synchronizes tasks passed to it using a SynchronousQueue).
       * The thread pool executor shown here is configured for the sweet spot for number of threads per CPU.
       * In a real system, this single executor might be used by the entire application and be passed to
       * this module, thus establishing a reasonable app-wide threading policy.
       * In an emergency (Rejected Execution) it lazily creates a backup executor with an unbounded queue
       * in front of it.  It, thus, may deliver notifications late, as well as out of order.
       */
      public class PolicyExample {
          private static final int CPUS = Runtime.getRuntime().availableProcessors();
       
          private static ThreadPoolExecutor BACKUP_EXEC;
       
          private static final RejectedExecutionHandler BACKUP_EXECUTION
              = new RejectedExecutionHandler() {
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                  synchronized (this) {
                      if (BACKUP_EXEC == null) { BACKUP_EXEC = createBackupExecutor(); }
                  }
                  BACKUP_EXEC.execute(r);
              }
          };
       
          private static ThreadPoolExecutor createBackupExecutor() {
              ThreadPoolExecutor exec
                  = new ThreadPoolExecutor(CPUS + 1, 2 * CPUS + 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
              exec.allowCoreThreadTimeOut(true);
              return exec;
          }
       
          private static final ThreadPoolExecutor STANDARD_EXEC
              = new ThreadPoolExecutor(CPUS + 1, 2 * CPUS + 1, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
       
          static { STANDARD_EXEC.setRejectedExecutionHandler(BACKUP_EXECUTION); }
       
          public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
              throws CouchbaseLiteException {
              ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
              config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
              config.setContinuous(false);
       
              Replicator repl = new Replicator(config);
              ListenerToken token = repl.addChangeListener(STANDARD_EXEC, listener::changed);
       
              repl.start();
       
              return repl;
          }
      }
      

      Here is a list, I believe exhaustive, of calls on which client code might want to specify an Executor:

      Query.addChangeListener
      MessageEndpointListerner.addChangeListener
      LiveQuery.addChangeListener
      AbstractReplicator.addDocumentReplicationListener
      AbstractReplicator.addChangeListener
      Database.addChangeListener
      Database.addDocumentChangeListener
      Database.addDatabaseChangeListener
      Database.addChangeListener
      

      Explanation: I’ve changed our executor strategy. tl;dr: clients used to use our executors to run their business logic.

      In 2.5, we spun up multiple single-threaded executors (an executor manages a pool of threads and, perhaps, a queue in front of the) to handle the asynchronous callbacks initiated by the calls above. In addition to causing deadlocks in our own code (ok, ok, not strictly “dead”, but blocking for things that shouldn’t have caused blocks), they had the double downside of both delivering events out of order (because there were multiple queues) AND spinning up way to many threads (because there were multiple executors).

      As of 2.6.0 CBL policy is that the spins up a number of thread proportional to the number of CPUs. 2 x CPUs + n, where n is small, is a well know sweet spot for applications. Remember, though, that we are only a library in somebody else’s multi-threaded application. I spin up 2 x CPUs – n and, unless in a specifically in-order context, do not put any queue in front of the executor.

      The result is that some applications may see RejectedExecutionExceptions aborting their change listeners. This happened both to NCL and to our own beloved TestServer. It happens when there are more than 2 x CPUs – n events being processed at the same time.

      The solution to this problem is for the client code to specify an executor in the overloaded versions of the above calls. The client executor can enforce an application policy for ordering/# of threads. I sent code that describes how to create the appropriate executor, depending on policy, along to documentation. I can copy it to you if you want it.

      Attachments

        No reviews matched the request. Check your Options in the drop-down menu of this sections header.

        Activity

          People

            jamiltz James Nocentini
            blake.meike Blake Meike
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Gerrit Reviews

                There are no open Gerrit changes

                PagerDuty