240 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			240 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright (c) 2013, Suryandaru Triandana <syndtr@gmail.com>
 | 
						|
// All rights reserved.
 | 
						|
//
 | 
						|
// Use of this source code is governed by a BSD-style license that can be
 | 
						|
// found in the LICENSE file.
 | 
						|
 | 
						|
package leveldb
 | 
						|
 | 
						|
import (
 | 
						|
	"errors"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/syndtr/goleveldb/leveldb/journal"
 | 
						|
	"github.com/syndtr/goleveldb/leveldb/memdb"
 | 
						|
	"github.com/syndtr/goleveldb/leveldb/storage"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	errHasFrozenMem = errors.New("has frozen mem")
 | 
						|
)
 | 
						|
 | 
						|
type memDB struct {
 | 
						|
	db *DB
 | 
						|
	*memdb.DB
 | 
						|
	ref int32
 | 
						|
}
 | 
						|
 | 
						|
func (m *memDB) getref() int32 {
 | 
						|
	return atomic.LoadInt32(&m.ref)
 | 
						|
}
 | 
						|
 | 
						|
func (m *memDB) incref() {
 | 
						|
	atomic.AddInt32(&m.ref, 1)
 | 
						|
}
 | 
						|
 | 
						|
func (m *memDB) decref() {
 | 
						|
	if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
 | 
						|
		// Only put back memdb with std capacity.
 | 
						|
		if m.Capacity() == m.db.s.o.GetWriteBuffer() {
 | 
						|
			m.Reset()
 | 
						|
			m.db.mpoolPut(m.DB)
 | 
						|
		}
 | 
						|
		m.db = nil
 | 
						|
		m.DB = nil
 | 
						|
	} else if ref < 0 {
 | 
						|
		panic("negative memdb ref")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Get latest sequence number.
 | 
						|
func (db *DB) getSeq() uint64 {
 | 
						|
	return atomic.LoadUint64(&db.seq)
 | 
						|
}
 | 
						|
 | 
						|
// Atomically adds delta to seq.
 | 
						|
func (db *DB) addSeq(delta uint64) {
 | 
						|
	atomic.AddUint64(&db.seq, delta)
 | 
						|
}
 | 
						|
 | 
						|
func (db *DB) setSeq(seq uint64) {
 | 
						|
	atomic.StoreUint64(&db.seq, seq)
 | 
						|
}
 | 
						|
 | 
						|
func (db *DB) sampleSeek(ikey internalKey) {
 | 
						|
	v := db.s.version()
 | 
						|
	if v.sampleSeek(ikey) {
 | 
						|
		// Trigger table compaction.
 | 
						|
		db.compTrigger(db.tcompCmdC)
 | 
						|
	}
 | 
						|
	v.release()
 | 
						|
}
 | 
						|
 | 
						|
func (db *DB) mpoolPut(mem *memdb.DB) {
 | 
						|
	if !db.isClosed() {
 | 
						|
		select {
 | 
						|
		case db.memPool <- mem:
 | 
						|
		default:
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (db *DB) mpoolGet(n int) *memDB {
 | 
						|
	var mdb *memdb.DB
 | 
						|
	select {
 | 
						|
	case mdb = <-db.memPool:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
	if mdb == nil || mdb.Capacity() < n {
 | 
						|
		mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
 | 
						|
	}
 | 
						|
	return &memDB{
 | 
						|
		db: db,
 | 
						|
		DB: mdb,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (db *DB) mpoolDrain() {
 | 
						|
	ticker := time.NewTicker(30 * time.Second)
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ticker.C:
 | 
						|
			select {
 | 
						|
			case <-db.memPool:
 | 
						|
			default:
 | 
						|
			}
 | 
						|
		case <-db.closeC:
 | 
						|
			ticker.Stop()
 | 
						|
			// Make sure the pool is drained.
 | 
						|
			select {
 | 
						|
			case <-db.memPool:
 | 
						|
			case <-time.After(time.Second):
 | 
						|
			}
 | 
						|
			close(db.memPool)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Create new memdb and froze the old one; need external synchronization.
 | 
						|
// newMem only called synchronously by the writer.
 | 
						|
func (db *DB) newMem(n int) (mem *memDB, err error) {
 | 
						|
	fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()}
 | 
						|
	w, err := db.s.stor.Create(fd)
 | 
						|
	if err != nil {
 | 
						|
		db.s.reuseFileNum(fd.Num)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	db.memMu.Lock()
 | 
						|
	defer db.memMu.Unlock()
 | 
						|
 | 
						|
	if db.frozenMem != nil {
 | 
						|
		return nil, errHasFrozenMem
 | 
						|
	}
 | 
						|
 | 
						|
	if db.journal == nil {
 | 
						|
		db.journal = journal.NewWriter(w)
 | 
						|
	} else {
 | 
						|
		db.journal.Reset(w)
 | 
						|
		db.journalWriter.Close()
 | 
						|
		db.frozenJournalFd = db.journalFd
 | 
						|
	}
 | 
						|
	db.journalWriter = w
 | 
						|
	db.journalFd = fd
 | 
						|
	db.frozenMem = db.mem
 | 
						|
	mem = db.mpoolGet(n)
 | 
						|
	mem.incref() // for self
 | 
						|
	mem.incref() // for caller
 | 
						|
	db.mem = mem
 | 
						|
	// The seq only incremented by the writer. And whoever called newMem
 | 
						|
	// should hold write lock, so no need additional synchronization here.
 | 
						|
	db.frozenSeq = db.seq
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// Get all memdbs.
 | 
						|
func (db *DB) getMems() (e, f *memDB) {
 | 
						|
	db.memMu.RLock()
 | 
						|
	defer db.memMu.RUnlock()
 | 
						|
	if db.mem != nil {
 | 
						|
		db.mem.incref()
 | 
						|
	} else if !db.isClosed() {
 | 
						|
		panic("nil effective mem")
 | 
						|
	}
 | 
						|
	if db.frozenMem != nil {
 | 
						|
		db.frozenMem.incref()
 | 
						|
	}
 | 
						|
	return db.mem, db.frozenMem
 | 
						|
}
 | 
						|
 | 
						|
// Get effective memdb.
 | 
						|
func (db *DB) getEffectiveMem() *memDB {
 | 
						|
	db.memMu.RLock()
 | 
						|
	defer db.memMu.RUnlock()
 | 
						|
	if db.mem != nil {
 | 
						|
		db.mem.incref()
 | 
						|
	} else if !db.isClosed() {
 | 
						|
		panic("nil effective mem")
 | 
						|
	}
 | 
						|
	return db.mem
 | 
						|
}
 | 
						|
 | 
						|
// Check whether we has frozen memdb.
 | 
						|
func (db *DB) hasFrozenMem() bool {
 | 
						|
	db.memMu.RLock()
 | 
						|
	defer db.memMu.RUnlock()
 | 
						|
	return db.frozenMem != nil
 | 
						|
}
 | 
						|
 | 
						|
// Get frozen memdb.
 | 
						|
func (db *DB) getFrozenMem() *memDB {
 | 
						|
	db.memMu.RLock()
 | 
						|
	defer db.memMu.RUnlock()
 | 
						|
	if db.frozenMem != nil {
 | 
						|
		db.frozenMem.incref()
 | 
						|
	}
 | 
						|
	return db.frozenMem
 | 
						|
}
 | 
						|
 | 
						|
// Drop frozen memdb; assume that frozen memdb isn't nil.
 | 
						|
func (db *DB) dropFrozenMem() {
 | 
						|
	db.memMu.Lock()
 | 
						|
	if err := db.s.stor.Remove(db.frozenJournalFd); err != nil {
 | 
						|
		db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err)
 | 
						|
	} else {
 | 
						|
		db.logf("journal@remove removed @%d", db.frozenJournalFd.Num)
 | 
						|
	}
 | 
						|
	db.frozenJournalFd = storage.FileDesc{}
 | 
						|
	db.frozenMem.decref()
 | 
						|
	db.frozenMem = nil
 | 
						|
	db.memMu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
// Clear mems ptr; used by DB.Close().
 | 
						|
func (db *DB) clearMems() {
 | 
						|
	db.memMu.Lock()
 | 
						|
	db.mem = nil
 | 
						|
	db.frozenMem = nil
 | 
						|
	db.memMu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
// Set closed flag; return true if not already closed.
 | 
						|
func (db *DB) setClosed() bool {
 | 
						|
	return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
 | 
						|
}
 | 
						|
 | 
						|
// Check whether DB was closed.
 | 
						|
func (db *DB) isClosed() bool {
 | 
						|
	return atomic.LoadUint32(&db.closed) != 0
 | 
						|
}
 | 
						|
 | 
						|
// Check read ok status.
 | 
						|
func (db *DB) ok() error {
 | 
						|
	if db.isClosed() {
 | 
						|
		return ErrClosed
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |