Details
-
Bug
-
Resolution: Fixed
-
Major
-
master
-
Untriaged
-
No
Description
Apologies if this is assigned to the wrong person, I couldn't find a clear owner for cbdatasource in Jira.
Repro:
- Start a 2+ node cluster (any version, I used v5.5.2 EE)
- Connect using cbdatasource.NewBucketDataSource()
- Hard-failover one node and rebalance to remove it from the cluster
- See via options.Logf logging that the vbucket/cluster map has updated properly (remaining node(s) now responsible for 1024 vbuckets)
- Also see that the ExponentialBackoffLoop is still indefinitely trying to connect to the removed node
Detail:
cbdatasource is falling into the following error block under this scenario, but it doesn't have the same logic to empty the worker channel as was applied in MB-25912
But there's also a tweak required in emptyWorkerCh to properly terminate the retry loop when the workerCh is closed.
Proposed fix:
I've fixed the issue locally using the following patch, but I'm unsure of potential side-effects. Please let me know what you think.
diff --git a/cbdatasource/cbdatasource.go b/cbdatasource/cbdatasource.go
|
index b5620a3..fac8f58 100644
|
--- a/cbdatasource/cbdatasource.go
|
+++ b/cbdatasource/cbdatasource.go
|
@@ -846,18 +846,19 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int {
|
connect = memcached.Connect
|
}
|
|
- emptyWorkerCh := func() {
|
+ emptyWorkerCh := func() int {
|
for {
|
select {
|
case _, ok := <-workerCh:
|
if !ok {
|
- return
|
+ return -1 // workerCh was closed
|
}
|
|
// Else, keep looping to consume workerCh.
|
|
default:
|
- return // Stop loop when workerCh is empty.
|
+ d.refreshWorkersCh <- "workerCh-emptied"
|
+ return 0 // Stop loop when workerCh is empty.
|
}
|
}
|
}
|
@@ -872,11 +873,11 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int {
|
// or failed-over, so consume the workerCh so that the
|
// refresh-cluster goroutine will be unblocked and can receive
|
// our kick.
|
- emptyWorkerCh()
|
+ ret := emptyWorkerCh()
|
|
d.Kick("worker-connect-err")
|
|
- return 0
|
+ return ret
|
}
|
atomic.AddUint64(&d.stats.TotWorkerConnectOk, 1)
|
|
@@ -902,11 +903,11 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int {
|
// rebalanced out, so consume the workerCh so that the
|
// refresh-cluster goroutine will be unblocked and can
|
// receive our kick.
|
- emptyWorkerCh()
|
+ ret := emptyWorkerCh()
|
|
d.Kick("worker-auth-AuthenticateMemcachedConn")
|
|
- return 0
|
+ return ret
|
}
|
atomic.AddUint64(&d.stats.TotWorkerAuthenticateMemcachedConnOk, 1)
|
} else if auth, ok := d.auth.(couchbase.AuthWithSaslHandler); ok {
|
@@ -922,7 +923,16 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int {
|
atomic.AddUint64(&d.stats.TotWorkerAuthErr, 1)
|
d.receiver.OnError(fmt.Errorf("worker auth, server: %s,"+
|
" user: %s, err: %v", server, user, err))
|
- return 0
|
+
|
+ // If we can't authenticate, then maybe a node was
|
+ // rebalanced out, so consume the workerCh so that the
|
+ // refresh-cluster goroutine will be unblocked and can
|
+ // receive our kick.
|
+ ret := emptyWorkerCh()
|
+
|
+ d.Kick("worker-auth-client")
|
+
|
+ return ret
|
}
|
|
if res.Status != gomemcached.SUCCESS {
|
Attachments
For Gerrit Dashboard: MB-32044 | ||||||
---|---|---|---|---|---|---|
# | Subject | Branch | Project | Status | CR | V |
101921,3 | MB-32044 Stop cbdatasource worker retry loop for removed nodes | master | go-couchbase | Status: MERGED | +2 | +1 |