The reclaimList is cleaned by the DestroyWCtx which is invoked when we close the old plasma instance after recovery is done. However, there are also associated pageBuffers from the underlying SCtx, to which the gCtx is bounded. No leak in good case ``` fmt.Printf("swapped reclaimList sizes old :%d new : %d\n", len(recovered.gCtx.reclaimList), len(p.gCtx.reclaimList)) if p.gCtx.sCtx != nil { fmt.Printf("page buffers :%d/%d\n", recovered.gCtx.sCtx.getBufferMemUsed(), p.gCtx.sCtx.getBufferMemUsed()) } ``` (s *Shard) doRecovery (s *Shard) recoverFromFullDataScan (s *Shard) AddInstance ``` // Create gCtx before adding instance to dbInstances, so that stats can be gathered. // After recovery, this instance may be swapped with the recovered instance and this // gCtx is destroyed in closeForRecovery. if p.shouldPersist { p.gCtx = p.newWCtx() } ``` globalContextFree: --------------------------- func (s *Shard)  recoverFromShard func (s *Shard) doRecovery // Close plasmas if returning error ``` defer func() { if err != nil { for _, p := range recoveredPlasmas { p.closeForRecovery() } recoveredPlasmas = nil } }() ``` ``` cleanPlasma := func(lss *LSSCtx) { var corrupted []PlasmaId for plasmaId, p := range recoveredPlasmas { // close the plasma instance if it is corrupted if lss.isInstanceCorrupted(plasmaId) != nil { p.closeForRecovery() corrupted = append(corrupted, plasmaId) } } for _, plasmaId := range corrupted { delete(recoveredPlasmas, plasmaId) } } ``` ``` // There is no error to recovery instance for full data scan. // Cleanup the old corrupted instance p.closeForRecovery() p = p2 ``` ``` for _, p := range corrupted { p.closeForRecovery() delete(recoveredPlasmas, p.plasmaId) } ``` (s *Shard) recoverFromFullDataScan ``` // Perform full scan on data log to recovery the plasma instance if err = s.recoverFromDataReplay(dataLSSCtx, 0, getPlasma, nil); err != nil { p.closeForRecovery() return nil, err } ``` func (s *Shard) recoveryDone() ``` for _, inst := range s.recoveredInsts { inst.closeForRecovery() } ``` --- ``` func (s *Plasma) addWCtxToList(ctx *wCtx) { node := &wCtxNode{ ptr: ctx, next: s.getWCtxList(), } atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&s.wCtxList)), unsafe.Pointer(node)) } ``` ``` func (s *Plasma) destroyWCtxs() { for w := s.getWCtxList(); w != nil; w = w.next { s.destroyObjects([][]reclaimObject{w.ptr.reclaimList}) w.ptr.Plasma = nil } } ``` ``` // FreeObjects performs a safe memory access session flush func (s *Plasma) FreeObjects(lists [][]reclaimObject) { if len(lists) > 0 { s.Skiplist.GetAccesBarrier().FlushSession(unsafe.Pointer(&lists)) } } ``` ``` // trySMRObjects performs SMR session flush by batching unlinked objects func (s *Plasma) trySMRObjects(ctx *wCtx, numObjects int) { if s.Config.activeMaxSMRPendingObjs < numObjects { numObjects = s.Config.activeMaxSMRPendingObjs } if len(ctx.reclaimList) > numObjects || ctx.sts.ReclaimListSize > int64(s.Config.activeMaxSMRPendingMem) { s.FreeObjects([][]reclaimObject{ctx.reclaimList}) ctx.sts.ReclaimListSize = 0 ctx.sts.ReclaimListCount = 0 ctx.reclaimList = nil } } ``` ``` // newBSDestroyCallback sets the callback for performing deallocation of safe free objects func (s *Plasma) newBSDestroyCallback() skiplist.BarrierSessionDestructor { return func(ref unsafe.Pointer) { if s.smrChan != nil { s.smrChan <- ref } else { s.logInfo(fmt.Sprintf("smr channel is nil (useMemMgmt %v)", s.deallocCtx.useMemMgmt)) } } } ``` Work contexts for plasma: ------------------------- All work contexts reside in a wCtxList protected by a lock. They also reside on a freeList backed by a concurrent queue. It is from the freeList that a wCtx is fetched from and released back to. ``` s.wCtxList = nil s.wCtxFree = nil ``` ``` ctx.freePages(frees, s.gCtx.sts) ctx.freePtrs(fptrs, s.gCtx.sts) ``` reclaimStats ``` sts.CacheHitRatio = s.gCtx.sts.CacheHitRatio sts.ReaderCacheHitRatio = s.gCtx.sts.ReaderCacheHitRatio ``` ``` func (s *Plasma) releaseWCtx(hashkey int64, ctx *wCtx) { if ctx != nil { ctx.unbind(hashkey, true) s.freeWCtx(hashkey, ctx) } } func (s *Plasma) freeWCtx(hashkey int64, ctx *wCtx) { threshold := writerSMRBufferSize if s.purgeSMRPendingMem { threshold = 0 } s.trySMRObjects(ctx, threshold) if cnt := atomic.AddInt64(&ctx.refCnt, -1); cnt != 0 { panic(fmt.Sprintf("wCtx cnt is not 0 when freed (%v)", cnt)) } ctx.pgAllocCtx.Reset() s.wCtxFree.free(hashkey%wCtxFreeSize, ctx.typ, ctx) } func (s *Plasma) getFreeWCtx(hashkey int64, wt workerType) *wCtx { e := s.wCtxFree.get(hashkey%wCtxFreeSize, wt) if e != nil { ctx := e.(*wCtx) if cnt := atomic.AddInt64(&ctx.refCnt, 1); cnt != 1 { panic(fmt.Sprintf("wCtx cnt is not 1 when allocated (%v)", cnt)) } if !ctx.pgAllocCtx.IsReset() { panic("wCtx is not empty when allocated") } if ctx.Plasma != s { panic("wCtx does not match plasma instance") } if ctx.typ != wt { panic(fmt.Sprintf("wCtx does not match worker type %v != %v", wt, ctx.typ)) } return ctx } return nil } ``` ``` func newBuffer(doAlignment bool, mode IOMode, useMemMgmt bool) *Buffer { var needsAligned bool if doAlignment && mode == DirectIO { needsAligned = true } return &Buffer{ isAligned: needsAligned, useMemMgmt: useMemMgmt, } } ``` ``` func (ctx *sCtx) GetBuffer(id int) *Buffer { if ctx == nil { return nil } if ctx.pgBuffers[id] == nil { ctx.pgBuffers[id] = newBuffer(true, ctx.lssCtx.IOMode, ctx.lssCtx.UseMemoryMgmt) } return ctx.pgBuffers[id] } ``` ``` func (ctx *sCtx) freeBuffers() { if ctx == nil { return } for i := 0; i < maxCtxBuffers; i++ { ctx.pgBuffers[i].Reset() } } ``` ``` func (s *LSSCtx) trimSCtx(sampleOnly bool) { allFreeCtxs := s.sCtxFree.trim(sampleOnly) if len(allFreeCtxs) == 0 { return } deleted := make([]*sCtx, len(allFreeCtxs)) added := make([]*sCtx, numWorkerType) for i, e := range allFreeCtxs { ctx := e.(*sCtx) deleted[i] = ctx if added[ctx.wtyp] == nil { added[ctx.wtyp] = newSCtx() added[ctx.wtyp].init(s, ctx.wtyp) atomic.AddInt64(&s.numActiveSCtx, 1) } /*imp*/ ctx.freeBuffers() added[ctx.wtyp].sts.AddTrimStats(ctx.sts) } s.addRemoveSCtxList(deleted, added) for _, ctx := range added { if ctx != nil { s.freeSCtx(rand.Int63(), ctx.wtyp, ctx) } } } ``` a) closeForRecovery is mostly invoked during any errors in the recovery code Path a) The issue looks like we are not releasing the memory under pageBuffer with the associated SCtx, when closeForRecovery is called.  On an error in recovery path or when we swap recovery instance,  we need to disassociate the sCtx with the gCtx and release any associated memory . When we unbind the sCtx, this adds back the sCtx to the freeList. Next when LSSCtx.trimCtx is called,  sCtxNode is removed from sCtxList and pageBuffer memory gets released  via sCtx.freeBuffers. We also need to  reclaimObjects from wCtx. When we release the gCtx to the wCtxFreeList. This takes care of DestroyWCtx and also purge the wCtxNode from the plasma.wCtxList. There is a chance that once we add the gCtx to the wCtxFreeList,  if a newWCtx is called, we return back the gCtx as the new WCtx. I do not any issue with that. b)  closeForRecovery can also be called in another happy path, RecoveryDone. Today this function is invoked only in the test. Nevertheless, since we unbind the sCtx once doRecovery is complete, calling unbind again in the closeForRecovery may be an issue (refCnt) which  sCtx.freeBuffers)  Test Cases: a) Inject Recovery error ``` Plasma: Plasma.Close: Done closing plasma instance reclaimList size :0 CountMin :-9223372036854775808 CountMin :-9223372036854775808 CountMin :-9223372036854775808 trimCtx :0 numAvail :13 Count :17 sCtx size :33057 freeList :13 numSCtx :17 numFreeSCtx :13 ``` ``` func (s *LSSCtx) getStats(cached bool) *LSSStats { ``` p.gCtx.sts.RecoveryRepairAdd++ p.gCtx.sts.RecoveryRepairDelete++ p.gCtx.addHdrLSSReadBytes(int64(len(bs))) pgCtx.addHdrLSSBlkReadBytes(int64(bytesRead)) p.gCtx.memUsed.Add(int64(memUsed)) func (ctx *wCtx) addFlushHdrSz(size int64) { ``` if ctx.recoveryLSS != nil { ctx.sts.FlushHdrSz += size ctx.recoveryLSS.addFlushDataSz2(int64(ctx.plasmaId), size) } } ``` ``` func (ctx *wCtx) addFlushHdrSz(size int64) { if ctx.recoveryLSS != nil { ctx.sts.FlushHdrSz += size ctx.recoveryLSS.addFlushDataSz2(int64(ctx.plasmaId), size) } } ``` ``` func (s *Shard) recoverFromHeaderReplay ``` ``` p.gCtx.memUsed.Add(int64(memUsed)) ``` ``` func (s *LSSCtx) LSSDataSize() int64 { return s.FlushDataSz() + s.maxSnDataSize + s.rpDataSize + s.checkpointSize } ``` LSSFlush Data ->LSSDataSize ``` --- FAIL: TestCpu (12.88s) 15:49:37 mem_test.go:79: process cpu is 0 ```