861 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			861 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
 | |
| // All rights reserved.
 | |
| //
 | |
| // Use of this source code is governed by a BSD-style license that can be
 | |
| // found in the LICENSE file.
 | |
| 
 | |
| package leveldb
 | |
| 
 | |
| import (
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/syndtr/goleveldb/leveldb/errors"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/opt"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/storage"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
 | |
| )
 | |
| 
 | |
| type cStat struct {
 | |
| 	duration time.Duration
 | |
| 	read     int64
 | |
| 	write    int64
 | |
| }
 | |
| 
 | |
| func (p *cStat) add(n *cStatStaging) {
 | |
| 	p.duration += n.duration
 | |
| 	p.read += n.read
 | |
| 	p.write += n.write
 | |
| }
 | |
| 
 | |
| func (p *cStat) get() (duration time.Duration, read, write int64) {
 | |
| 	return p.duration, p.read, p.write
 | |
| }
 | |
| 
 | |
| type cStatStaging struct {
 | |
| 	start    time.Time
 | |
| 	duration time.Duration
 | |
| 	on       bool
 | |
| 	read     int64
 | |
| 	write    int64
 | |
| }
 | |
| 
 | |
| func (p *cStatStaging) startTimer() {
 | |
| 	if !p.on {
 | |
| 		p.start = time.Now()
 | |
| 		p.on = true
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *cStatStaging) stopTimer() {
 | |
| 	if p.on {
 | |
| 		p.duration += time.Since(p.start)
 | |
| 		p.on = false
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type cStats struct {
 | |
| 	lk    sync.Mutex
 | |
| 	stats []cStat
 | |
| }
 | |
| 
 | |
| func (p *cStats) addStat(level int, n *cStatStaging) {
 | |
| 	p.lk.Lock()
 | |
| 	if level >= len(p.stats) {
 | |
| 		newStats := make([]cStat, level+1)
 | |
| 		copy(newStats, p.stats)
 | |
| 		p.stats = newStats
 | |
| 	}
 | |
| 	p.stats[level].add(n)
 | |
| 	p.lk.Unlock()
 | |
| }
 | |
| 
 | |
| func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
 | |
| 	p.lk.Lock()
 | |
| 	defer p.lk.Unlock()
 | |
| 	if level < len(p.stats) {
 | |
| 		return p.stats[level].get()
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (db *DB) compactionError() {
 | |
| 	var err error
 | |
| noerr:
 | |
| 	// No error.
 | |
| 	for {
 | |
| 		select {
 | |
| 		case err = <-db.compErrSetC:
 | |
| 			switch {
 | |
| 			case err == nil:
 | |
| 			case err == ErrReadOnly, errors.IsCorrupted(err):
 | |
| 				goto hasperr
 | |
| 			default:
 | |
| 				goto haserr
 | |
| 			}
 | |
| 		case <-db.closeC:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| haserr:
 | |
| 	// Transient error.
 | |
| 	for {
 | |
| 		select {
 | |
| 		case db.compErrC <- err:
 | |
| 		case err = <-db.compErrSetC:
 | |
| 			switch {
 | |
| 			case err == nil:
 | |
| 				goto noerr
 | |
| 			case err == ErrReadOnly, errors.IsCorrupted(err):
 | |
| 				goto hasperr
 | |
| 			default:
 | |
| 			}
 | |
| 		case <-db.closeC:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| hasperr:
 | |
| 	// Persistent error.
 | |
| 	for {
 | |
| 		select {
 | |
| 		case db.compErrC <- err:
 | |
| 		case db.compPerErrC <- err:
 | |
| 		case db.writeLockC <- struct{}{}:
 | |
| 			// Hold write lock, so that write won't pass-through.
 | |
| 			db.compWriteLocking = true
 | |
| 		case <-db.closeC:
 | |
| 			if db.compWriteLocking {
 | |
| 				// We should release the lock or Close will hang.
 | |
| 				<-db.writeLockC
 | |
| 			}
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type compactionTransactCounter int
 | |
| 
 | |
| func (cnt *compactionTransactCounter) incr() {
 | |
| 	*cnt++
 | |
| }
 | |
| 
 | |
| type compactionTransactInterface interface {
 | |
| 	run(cnt *compactionTransactCounter) error
 | |
| 	revert() error
 | |
| }
 | |
| 
 | |
| func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
 | |
| 	defer func() {
 | |
| 		if x := recover(); x != nil {
 | |
| 			if x == errCompactionTransactExiting {
 | |
| 				if err := t.revert(); err != nil {
 | |
| 					db.logf("%s revert error %q", name, err)
 | |
| 				}
 | |
| 			}
 | |
| 			panic(x)
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	const (
 | |
| 		backoffMin = 1 * time.Second
 | |
| 		backoffMax = 8 * time.Second
 | |
| 		backoffMul = 2 * time.Second
 | |
| 	)
 | |
| 	var (
 | |
| 		backoff  = backoffMin
 | |
| 		backoffT = time.NewTimer(backoff)
 | |
| 		lastCnt  = compactionTransactCounter(0)
 | |
| 
 | |
| 		disableBackoff = db.s.o.GetDisableCompactionBackoff()
 | |
| 	)
 | |
| 	for n := 0; ; n++ {
 | |
| 		// Check whether the DB is closed.
 | |
| 		if db.isClosed() {
 | |
| 			db.logf("%s exiting", name)
 | |
| 			db.compactionExitTransact()
 | |
| 		} else if n > 0 {
 | |
| 			db.logf("%s retrying N·%d", name, n)
 | |
| 		}
 | |
| 
 | |
| 		// Execute.
 | |
| 		cnt := compactionTransactCounter(0)
 | |
| 		err := t.run(&cnt)
 | |
| 		if err != nil {
 | |
| 			db.logf("%s error I·%d %q", name, cnt, err)
 | |
| 		}
 | |
| 
 | |
| 		// Set compaction error status.
 | |
| 		select {
 | |
| 		case db.compErrSetC <- err:
 | |
| 		case perr := <-db.compPerErrC:
 | |
| 			if err != nil {
 | |
| 				db.logf("%s exiting (persistent error %q)", name, perr)
 | |
| 				db.compactionExitTransact()
 | |
| 			}
 | |
| 		case <-db.closeC:
 | |
| 			db.logf("%s exiting", name)
 | |
| 			db.compactionExitTransact()
 | |
| 		}
 | |
| 		if err == nil {
 | |
| 			return
 | |
| 		}
 | |
| 		if errors.IsCorrupted(err) {
 | |
| 			db.logf("%s exiting (corruption detected)", name)
 | |
| 			db.compactionExitTransact()
 | |
| 		}
 | |
| 
 | |
| 		if !disableBackoff {
 | |
| 			// Reset backoff duration if counter is advancing.
 | |
| 			if cnt > lastCnt {
 | |
| 				backoff = backoffMin
 | |
| 				lastCnt = cnt
 | |
| 			}
 | |
| 
 | |
| 			// Backoff.
 | |
| 			backoffT.Reset(backoff)
 | |
| 			if backoff < backoffMax {
 | |
| 				backoff *= backoffMul
 | |
| 				if backoff > backoffMax {
 | |
| 					backoff = backoffMax
 | |
| 				}
 | |
| 			}
 | |
| 			select {
 | |
| 			case <-backoffT.C:
 | |
| 			case <-db.closeC:
 | |
| 				db.logf("%s exiting", name)
 | |
| 				db.compactionExitTransact()
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type compactionTransactFunc struct {
 | |
| 	runFunc    func(cnt *compactionTransactCounter) error
 | |
| 	revertFunc func() error
 | |
| }
 | |
| 
 | |
| func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
 | |
| 	return t.runFunc(cnt)
 | |
| }
 | |
| 
 | |
| func (t *compactionTransactFunc) revert() error {
 | |
| 	if t.revertFunc != nil {
 | |
| 		return t.revertFunc()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
 | |
| 	db.compactionTransact(name, &compactionTransactFunc{run, revert})
 | |
| }
 | |
| 
 | |
| func (db *DB) compactionExitTransact() {
 | |
| 	panic(errCompactionTransactExiting)
 | |
| }
 | |
| 
 | |
| func (db *DB) compactionCommit(name string, rec *sessionRecord) {
 | |
| 	db.compCommitLk.Lock()
 | |
| 	defer db.compCommitLk.Unlock() // Defer is necessary.
 | |
| 	db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
 | |
| 		return db.s.commit(rec)
 | |
| 	}, nil)
 | |
| }
 | |
| 
 | |
| func (db *DB) memCompaction() {
 | |
| 	mdb := db.getFrozenMem()
 | |
| 	if mdb == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	defer mdb.decref()
 | |
| 
 | |
| 	db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
 | |
| 
 | |
| 	// Don't compact empty memdb.
 | |
| 	if mdb.Len() == 0 {
 | |
| 		db.logf("memdb@flush skipping")
 | |
| 		// drop frozen memdb
 | |
| 		db.dropFrozenMem()
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Pause table compaction.
 | |
| 	resumeC := make(chan struct{})
 | |
| 	select {
 | |
| 	case db.tcompPauseC <- (chan<- struct{})(resumeC):
 | |
| 	case <-db.compPerErrC:
 | |
| 		close(resumeC)
 | |
| 		resumeC = nil
 | |
| 	case <-db.closeC:
 | |
| 		db.compactionExitTransact()
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		rec        = &sessionRecord{}
 | |
| 		stats      = &cStatStaging{}
 | |
| 		flushLevel int
 | |
| 	)
 | |
| 
 | |
| 	// Generate tables.
 | |
| 	db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
 | |
| 		stats.startTimer()
 | |
| 		flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
 | |
| 		stats.stopTimer()
 | |
| 		return
 | |
| 	}, func() error {
 | |
| 		for _, r := range rec.addedTables {
 | |
| 			db.logf("memdb@flush revert @%d", r.num)
 | |
| 			if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	rec.setJournalNum(db.journalFd.Num)
 | |
| 	rec.setSeqNum(db.frozenSeq)
 | |
| 
 | |
| 	// Commit.
 | |
| 	stats.startTimer()
 | |
| 	db.compactionCommit("memdb", rec)
 | |
| 	stats.stopTimer()
 | |
| 
 | |
| 	db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
 | |
| 
 | |
| 	for _, r := range rec.addedTables {
 | |
| 		stats.write += r.size
 | |
| 	}
 | |
| 	db.compStats.addStat(flushLevel, stats)
 | |
| 
 | |
| 	// Drop frozen memdb.
 | |
| 	db.dropFrozenMem()
 | |
| 
 | |
| 	// Resume table compaction.
 | |
| 	if resumeC != nil {
 | |
| 		select {
 | |
| 		case <-resumeC:
 | |
| 			close(resumeC)
 | |
| 		case <-db.closeC:
 | |
| 			db.compactionExitTransact()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Trigger table compaction.
 | |
| 	db.compTrigger(db.tcompCmdC)
 | |
| }
 | |
| 
 | |
| type tableCompactionBuilder struct {
 | |
| 	db           *DB
 | |
| 	s            *session
 | |
| 	c            *compaction
 | |
| 	rec          *sessionRecord
 | |
| 	stat0, stat1 *cStatStaging
 | |
| 
 | |
| 	snapHasLastUkey bool
 | |
| 	snapLastUkey    []byte
 | |
| 	snapLastSeq     uint64
 | |
| 	snapIter        int
 | |
| 	snapKerrCnt     int
 | |
| 	snapDropCnt     int
 | |
| 
 | |
| 	kerrCnt int
 | |
| 	dropCnt int
 | |
| 
 | |
| 	minSeq    uint64
 | |
| 	strict    bool
 | |
| 	tableSize int
 | |
| 
 | |
| 	tw *tWriter
 | |
| }
 | |
| 
 | |
| func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
 | |
| 	// Create new table if not already.
 | |
| 	if b.tw == nil {
 | |
| 		// Check for pause event.
 | |
| 		if b.db != nil {
 | |
| 			select {
 | |
| 			case ch := <-b.db.tcompPauseC:
 | |
| 				b.db.pauseCompaction(ch)
 | |
| 			case <-b.db.closeC:
 | |
| 				b.db.compactionExitTransact()
 | |
| 			default:
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Create new table.
 | |
| 		var err error
 | |
| 		b.tw, err = b.s.tops.create()
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Write key/value into table.
 | |
| 	return b.tw.append(key, value)
 | |
| }
 | |
| 
 | |
| func (b *tableCompactionBuilder) needFlush() bool {
 | |
| 	return b.tw.tw.BytesLen() >= b.tableSize
 | |
| }
 | |
| 
 | |
| func (b *tableCompactionBuilder) flush() error {
 | |
| 	t, err := b.tw.finish()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	b.rec.addTableFile(b.c.sourceLevel+1, t)
 | |
| 	b.stat1.write += t.size
 | |
| 	b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
 | |
| 	b.tw = nil
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *tableCompactionBuilder) cleanup() {
 | |
| 	if b.tw != nil {
 | |
| 		b.tw.drop()
 | |
| 		b.tw = nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
 | |
| 	snapResumed := b.snapIter > 0
 | |
| 	hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
 | |
| 	lastUkey := append([]byte{}, b.snapLastUkey...)
 | |
| 	lastSeq := b.snapLastSeq
 | |
| 	b.kerrCnt = b.snapKerrCnt
 | |
| 	b.dropCnt = b.snapDropCnt
 | |
| 	// Restore compaction state.
 | |
| 	b.c.restore()
 | |
| 
 | |
| 	defer b.cleanup()
 | |
| 
 | |
| 	b.stat1.startTimer()
 | |
| 	defer b.stat1.stopTimer()
 | |
| 
 | |
| 	iter := b.c.newIterator()
 | |
| 	defer iter.Release()
 | |
| 	for i := 0; iter.Next(); i++ {
 | |
| 		// Incr transact counter.
 | |
| 		cnt.incr()
 | |
| 
 | |
| 		// Skip until last state.
 | |
| 		if i < b.snapIter {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		resumed := false
 | |
| 		if snapResumed {
 | |
| 			resumed = true
 | |
| 			snapResumed = false
 | |
| 		}
 | |
| 
 | |
| 		ikey := iter.Key()
 | |
| 		ukey, seq, kt, kerr := parseInternalKey(ikey)
 | |
| 
 | |
| 		if kerr == nil {
 | |
| 			shouldStop := !resumed && b.c.shouldStopBefore(ikey)
 | |
| 
 | |
| 			if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
 | |
| 				// First occurrence of this user key.
 | |
| 
 | |
| 				// Only rotate tables if ukey doesn't hop across.
 | |
| 				if b.tw != nil && (shouldStop || b.needFlush()) {
 | |
| 					if err := b.flush(); err != nil {
 | |
| 						return err
 | |
| 					}
 | |
| 
 | |
| 					// Creates snapshot of the state.
 | |
| 					b.c.save()
 | |
| 					b.snapHasLastUkey = hasLastUkey
 | |
| 					b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
 | |
| 					b.snapLastSeq = lastSeq
 | |
| 					b.snapIter = i
 | |
| 					b.snapKerrCnt = b.kerrCnt
 | |
| 					b.snapDropCnt = b.dropCnt
 | |
| 				}
 | |
| 
 | |
| 				hasLastUkey = true
 | |
| 				lastUkey = append(lastUkey[:0], ukey...)
 | |
| 				lastSeq = keyMaxSeq
 | |
| 			}
 | |
| 
 | |
| 			switch {
 | |
| 			case lastSeq <= b.minSeq:
 | |
| 				// Dropped because newer entry for same user key exist
 | |
| 				fallthrough // (A)
 | |
| 			case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
 | |
| 				// For this user key:
 | |
| 				// (1) there is no data in higher levels
 | |
| 				// (2) data in lower levels will have larger seq numbers
 | |
| 				// (3) data in layers that are being compacted here and have
 | |
| 				//     smaller seq numbers will be dropped in the next
 | |
| 				//     few iterations of this loop (by rule (A) above).
 | |
| 				// Therefore this deletion marker is obsolete and can be dropped.
 | |
| 				lastSeq = seq
 | |
| 				b.dropCnt++
 | |
| 				continue
 | |
| 			default:
 | |
| 				lastSeq = seq
 | |
| 			}
 | |
| 		} else {
 | |
| 			if b.strict {
 | |
| 				return kerr
 | |
| 			}
 | |
| 
 | |
| 			// Don't drop corrupted keys.
 | |
| 			hasLastUkey = false
 | |
| 			lastUkey = lastUkey[:0]
 | |
| 			lastSeq = keyMaxSeq
 | |
| 			b.kerrCnt++
 | |
| 		}
 | |
| 
 | |
| 		if err := b.appendKV(ikey, iter.Value()); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if err := iter.Error(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Finish last table.
 | |
| 	if b.tw != nil && !b.tw.empty() {
 | |
| 		return b.flush()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *tableCompactionBuilder) revert() error {
 | |
| 	for _, at := range b.rec.addedTables {
 | |
| 		b.s.logf("table@build revert @%d", at.num)
 | |
| 		if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
 | |
| 	defer c.release()
 | |
| 
 | |
| 	rec := &sessionRecord{}
 | |
| 	rec.addCompPtr(c.sourceLevel, c.imax)
 | |
| 
 | |
| 	if !noTrivial && c.trivial() {
 | |
| 		t := c.levels[0][0]
 | |
| 		db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
 | |
| 		rec.delTable(c.sourceLevel, t.fd.Num)
 | |
| 		rec.addTableFile(c.sourceLevel+1, t)
 | |
| 		db.compactionCommit("table-move", rec)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	var stats [2]cStatStaging
 | |
| 	for i, tables := range c.levels {
 | |
| 		for _, t := range tables {
 | |
| 			stats[i].read += t.size
 | |
| 			// Insert deleted tables into record
 | |
| 			rec.delTable(c.sourceLevel+i, t.fd.Num)
 | |
| 		}
 | |
| 	}
 | |
| 	sourceSize := int(stats[0].read + stats[1].read)
 | |
| 	minSeq := db.minSeq()
 | |
| 	db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
 | |
| 
 | |
| 	b := &tableCompactionBuilder{
 | |
| 		db:        db,
 | |
| 		s:         db.s,
 | |
| 		c:         c,
 | |
| 		rec:       rec,
 | |
| 		stat1:     &stats[1],
 | |
| 		minSeq:    minSeq,
 | |
| 		strict:    db.s.o.GetStrict(opt.StrictCompaction),
 | |
| 		tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
 | |
| 	}
 | |
| 	db.compactionTransact("table@build", b)
 | |
| 
 | |
| 	// Commit.
 | |
| 	stats[1].startTimer()
 | |
| 	db.compactionCommit("table", rec)
 | |
| 	stats[1].stopTimer()
 | |
| 
 | |
| 	resultSize := int(stats[1].write)
 | |
| 	db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
 | |
| 
 | |
| 	// Save compaction stats
 | |
| 	for i := range stats {
 | |
| 		db.compStats.addStat(c.sourceLevel+1, &stats[i])
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
 | |
| 	db.logf("table@compaction range L%d %q:%q", level, umin, umax)
 | |
| 	if level >= 0 {
 | |
| 		if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
 | |
| 			db.tableCompaction(c, true)
 | |
| 		}
 | |
| 	} else {
 | |
| 		// Retry until nothing to compact.
 | |
| 		for {
 | |
| 			compacted := false
 | |
| 
 | |
| 			// Scan for maximum level with overlapped tables.
 | |
| 			v := db.s.version()
 | |
| 			m := 1
 | |
| 			for i := m; i < len(v.levels); i++ {
 | |
| 				tables := v.levels[i]
 | |
| 				if tables.overlaps(db.s.icmp, umin, umax, false) {
 | |
| 					m = i
 | |
| 				}
 | |
| 			}
 | |
| 			v.release()
 | |
| 
 | |
| 			for level := 0; level < m; level++ {
 | |
| 				if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
 | |
| 					db.tableCompaction(c, true)
 | |
| 					compacted = true
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			if !compacted {
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (db *DB) tableAutoCompaction() {
 | |
| 	if c := db.s.pickCompaction(); c != nil {
 | |
| 		db.tableCompaction(c, false)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (db *DB) tableNeedCompaction() bool {
 | |
| 	v := db.s.version()
 | |
| 	defer v.release()
 | |
| 	return v.needCompaction()
 | |
| }
 | |
| 
 | |
| // resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
 | |
| func (db *DB) resumeWrite() bool {
 | |
| 	v := db.s.version()
 | |
| 	defer v.release()
 | |
| 	if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() {
 | |
| 		return true
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (db *DB) pauseCompaction(ch chan<- struct{}) {
 | |
| 	select {
 | |
| 	case ch <- struct{}{}:
 | |
| 	case <-db.closeC:
 | |
| 		db.compactionExitTransact()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type cCmd interface {
 | |
| 	ack(err error)
 | |
| }
 | |
| 
 | |
| type cAuto struct {
 | |
| 	// Note for table compaction, an empty ackC represents it's a compaction waiting command.
 | |
| 	ackC chan<- error
 | |
| }
 | |
| 
 | |
| func (r cAuto) ack(err error) {
 | |
| 	if r.ackC != nil {
 | |
| 		defer func() {
 | |
| 			recover()
 | |
| 		}()
 | |
| 		r.ackC <- err
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type cRange struct {
 | |
| 	level    int
 | |
| 	min, max []byte
 | |
| 	ackC     chan<- error
 | |
| }
 | |
| 
 | |
| func (r cRange) ack(err error) {
 | |
| 	if r.ackC != nil {
 | |
| 		defer func() {
 | |
| 			recover()
 | |
| 		}()
 | |
| 		r.ackC <- err
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // This will trigger auto compaction but will not wait for it.
 | |
| func (db *DB) compTrigger(compC chan<- cCmd) {
 | |
| 	select {
 | |
| 	case compC <- cAuto{}:
 | |
| 	default:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // This will trigger auto compaction and/or wait for all compaction to be done.
 | |
| func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
 | |
| 	ch := make(chan error)
 | |
| 	defer close(ch)
 | |
| 	// Send cmd.
 | |
| 	select {
 | |
| 	case compC <- cAuto{ch}:
 | |
| 	case err = <-db.compErrC:
 | |
| 		return
 | |
| 	case <-db.closeC:
 | |
| 		return ErrClosed
 | |
| 	}
 | |
| 	// Wait cmd.
 | |
| 	select {
 | |
| 	case err = <-ch:
 | |
| 	case err = <-db.compErrC:
 | |
| 	case <-db.closeC:
 | |
| 		return ErrClosed
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Send range compaction request.
 | |
| func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
 | |
| 	ch := make(chan error)
 | |
| 	defer close(ch)
 | |
| 	// Send cmd.
 | |
| 	select {
 | |
| 	case compC <- cRange{level, min, max, ch}:
 | |
| 	case err := <-db.compErrC:
 | |
| 		return err
 | |
| 	case <-db.closeC:
 | |
| 		return ErrClosed
 | |
| 	}
 | |
| 	// Wait cmd.
 | |
| 	select {
 | |
| 	case err = <-ch:
 | |
| 	case err = <-db.compErrC:
 | |
| 	case <-db.closeC:
 | |
| 		return ErrClosed
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (db *DB) mCompaction() {
 | |
| 	var x cCmd
 | |
| 
 | |
| 	defer func() {
 | |
| 		if x := recover(); x != nil {
 | |
| 			if x != errCompactionTransactExiting {
 | |
| 				panic(x)
 | |
| 			}
 | |
| 		}
 | |
| 		if x != nil {
 | |
| 			x.ack(ErrClosed)
 | |
| 		}
 | |
| 		db.closeW.Done()
 | |
| 	}()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case x = <-db.mcompCmdC:
 | |
| 			switch x.(type) {
 | |
| 			case cAuto:
 | |
| 				db.memCompaction()
 | |
| 				x.ack(nil)
 | |
| 				x = nil
 | |
| 			default:
 | |
| 				panic("leveldb: unknown command")
 | |
| 			}
 | |
| 		case <-db.closeC:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (db *DB) tCompaction() {
 | |
| 	var (
 | |
| 		x           cCmd
 | |
| 		ackQ, waitQ []cCmd
 | |
| 	)
 | |
| 
 | |
| 	defer func() {
 | |
| 		if x := recover(); x != nil {
 | |
| 			if x != errCompactionTransactExiting {
 | |
| 				panic(x)
 | |
| 			}
 | |
| 		}
 | |
| 		for i := range ackQ {
 | |
| 			ackQ[i].ack(ErrClosed)
 | |
| 			ackQ[i] = nil
 | |
| 		}
 | |
| 		for i := range waitQ {
 | |
| 			waitQ[i].ack(ErrClosed)
 | |
| 			waitQ[i] = nil
 | |
| 		}
 | |
| 		if x != nil {
 | |
| 			x.ack(ErrClosed)
 | |
| 		}
 | |
| 		db.closeW.Done()
 | |
| 	}()
 | |
| 
 | |
| 	for {
 | |
| 		if db.tableNeedCompaction() {
 | |
| 			select {
 | |
| 			case x = <-db.tcompCmdC:
 | |
| 			case ch := <-db.tcompPauseC:
 | |
| 				db.pauseCompaction(ch)
 | |
| 				continue
 | |
| 			case <-db.closeC:
 | |
| 				return
 | |
| 			default:
 | |
| 			}
 | |
| 			// Resume write operation as soon as possible.
 | |
| 			if len(waitQ) > 0 && db.resumeWrite() {
 | |
| 				for i := range waitQ {
 | |
| 					waitQ[i].ack(nil)
 | |
| 					waitQ[i] = nil
 | |
| 				}
 | |
| 				waitQ = waitQ[:0]
 | |
| 			}
 | |
| 		} else {
 | |
| 			for i := range ackQ {
 | |
| 				ackQ[i].ack(nil)
 | |
| 				ackQ[i] = nil
 | |
| 			}
 | |
| 			ackQ = ackQ[:0]
 | |
| 			for i := range waitQ {
 | |
| 				waitQ[i].ack(nil)
 | |
| 				waitQ[i] = nil
 | |
| 			}
 | |
| 			waitQ = waitQ[:0]
 | |
| 			select {
 | |
| 			case x = <-db.tcompCmdC:
 | |
| 			case ch := <-db.tcompPauseC:
 | |
| 				db.pauseCompaction(ch)
 | |
| 				continue
 | |
| 			case <-db.closeC:
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 		if x != nil {
 | |
| 			switch cmd := x.(type) {
 | |
| 			case cAuto:
 | |
| 				if cmd.ackC != nil {
 | |
| 					waitQ = append(waitQ, x)
 | |
| 				} else {
 | |
| 					ackQ = append(ackQ, x)
 | |
| 				}
 | |
| 			case cRange:
 | |
| 				x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
 | |
| 			default:
 | |
| 				panic("leveldb: unknown command")
 | |
| 			}
 | |
| 			x = nil
 | |
| 		}
 | |
| 		db.tableAutoCompaction()
 | |
| 	}
 | |
| }
 |