Details
-
Bug
-
Resolution: Fixed
-
Critical
-
7.1.5, 7.2.2
-
Untriaged
-
0
-
Unknown
Description
Indexer scan pipeline doesn't terminate storage iterator if there is a scan timeout or a client cancel request.
1. Scan timeout is handled by cancel callback.
func (c *CancelCb) Run() {
|
go func() {
|
select {
|
case <-c.done:
|
case <-c.cancel:
|
c.callb(common.ErrClientCancel)
|
case <-c.timeout:
|
c.callb(common.ErrScanTimedOut)
|
}
|
}()
|
}
|
2. Cancel callback calls scan pipeline Cancel
cancelCb := NewCancelCallback(req, func(e error) {
|
scanPipeline.Cancel(e)
|
})
|
3. Scan pipeline Cancel calls Shutdown on source.
func (p *ScanPipeline) Cancel(err error) {
|
p.src.Shutdown(err)
|
}
|
4. This only sets the w.err
func (w *ItemWriter) Shutdown(err error) {
|
w.errLock.Lock()
|
defer w.errLock.Unlock()
|
w.err = err
|
}
|
5. This w.err is only utilized by HasShutdown method
func (w *ItemWriter) HasShutdown() error {
|
w.errLock.Lock()
|
defer w.errLock.Unlock()
|
|
return w.err
|
}
|
6. w.HasShutdown is utilized by CloseWrite (called in defer in scan pipeline)
func (w *ItemWriter) CloseWrite() error {
|
if w.closed {
|
return nil
|
}
|
|
err := w.HasShutdown()
|
or during WriteItem if there is ErrNoBlockSpace.
func (w *ItemWriter) WriteItem(itm ...[]byte) error {
|
if w.wr.Put(itm...) == ErrNoBlockSpace {
|
err = w.HasShutdown()
|
If above condition is not true, e.g. if scan pipeline is scanning/filtering a lot of items without returning the results. The storage iterator will continue in such a case.
If there are long running scans, this can waste a lot of CPU resources as pipeline keeps processing scans which have already timed out as well as block next set of scans from getting started.
Attachments
Issue Links
- is a backport of
-
MB-59256 Terminate storage iterator after scan timeout/client cancel
- Closed