477 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			477 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package pool
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"net"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/go-redis/redis/internal"
 | |
| )
 | |
| 
 | |
| var ErrClosed = errors.New("redis: client is closed")
 | |
| var ErrPoolTimeout = errors.New("redis: connection pool timeout")
 | |
| 
 | |
| var timers = sync.Pool{
 | |
| 	New: func() interface{} {
 | |
| 		t := time.NewTimer(time.Hour)
 | |
| 		t.Stop()
 | |
| 		return t
 | |
| 	},
 | |
| }
 | |
| 
 | |
| // Stats contains pool state information and accumulated stats.
 | |
| type Stats struct {
 | |
| 	Hits     uint32 // number of times free connection was found in the pool
 | |
| 	Misses   uint32 // number of times free connection was NOT found in the pool
 | |
| 	Timeouts uint32 // number of times a wait timeout occurred
 | |
| 
 | |
| 	TotalConns uint32 // number of total connections in the pool
 | |
| 	IdleConns  uint32 // number of idle connections in the pool
 | |
| 	StaleConns uint32 // number of stale connections removed from the pool
 | |
| }
 | |
| 
 | |
| type Pooler interface {
 | |
| 	NewConn() (*Conn, error)
 | |
| 	CloseConn(*Conn) error
 | |
| 
 | |
| 	Get() (*Conn, error)
 | |
| 	Put(*Conn)
 | |
| 	Remove(*Conn)
 | |
| 
 | |
| 	Len() int
 | |
| 	IdleLen() int
 | |
| 	Stats() *Stats
 | |
| 
 | |
| 	Close() error
 | |
| }
 | |
| 
 | |
| type Options struct {
 | |
| 	Dialer  func() (net.Conn, error)
 | |
| 	OnClose func(*Conn) error
 | |
| 
 | |
| 	PoolSize           int
 | |
| 	MinIdleConns       int
 | |
| 	MaxConnAge         time.Duration
 | |
| 	PoolTimeout        time.Duration
 | |
| 	IdleTimeout        time.Duration
 | |
| 	IdleCheckFrequency time.Duration
 | |
| }
 | |
| 
 | |
| type ConnPool struct {
 | |
| 	opt *Options
 | |
| 
 | |
| 	dialErrorsNum uint32 // atomic
 | |
| 
 | |
| 	lastDialErrorMu sync.RWMutex
 | |
| 	lastDialError   error
 | |
| 
 | |
| 	queue chan struct{}
 | |
| 
 | |
| 	connsMu      sync.Mutex
 | |
| 	conns        []*Conn
 | |
| 	idleConns    []*Conn
 | |
| 	poolSize     int
 | |
| 	idleConnsLen int
 | |
| 
 | |
| 	stats Stats
 | |
| 
 | |
| 	_closed uint32 // atomic
 | |
| }
 | |
| 
 | |
| var _ Pooler = (*ConnPool)(nil)
 | |
| 
 | |
| func NewConnPool(opt *Options) *ConnPool {
 | |
| 	p := &ConnPool{
 | |
| 		opt: opt,
 | |
| 
 | |
| 		queue:     make(chan struct{}, opt.PoolSize),
 | |
| 		conns:     make([]*Conn, 0, opt.PoolSize),
 | |
| 		idleConns: make([]*Conn, 0, opt.PoolSize),
 | |
| 	}
 | |
| 
 | |
| 	for i := 0; i < opt.MinIdleConns; i++ {
 | |
| 		p.checkMinIdleConns()
 | |
| 	}
 | |
| 
 | |
| 	if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
 | |
| 		go p.reaper(opt.IdleCheckFrequency)
 | |
| 	}
 | |
| 
 | |
| 	return p
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) checkMinIdleConns() {
 | |
| 	if p.opt.MinIdleConns == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
 | |
| 		p.poolSize++
 | |
| 		p.idleConnsLen++
 | |
| 		go p.addIdleConn()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) addIdleConn() {
 | |
| 	cn, err := p.newConn(true)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	p.connsMu.Lock()
 | |
| 	p.conns = append(p.conns, cn)
 | |
| 	p.idleConns = append(p.idleConns, cn)
 | |
| 	p.connsMu.Unlock()
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) NewConn() (*Conn, error) {
 | |
| 	return p._NewConn(false)
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
 | |
| 	cn, err := p.newConn(pooled)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	p.connsMu.Lock()
 | |
| 	p.conns = append(p.conns, cn)
 | |
| 	if pooled {
 | |
| 		if p.poolSize < p.opt.PoolSize {
 | |
| 			p.poolSize++
 | |
| 		} else {
 | |
| 			cn.pooled = false
 | |
| 		}
 | |
| 	}
 | |
| 	p.connsMu.Unlock()
 | |
| 	return cn, nil
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
 | |
| 	if p.closed() {
 | |
| 		return nil, ErrClosed
 | |
| 	}
 | |
| 
 | |
| 	if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
 | |
| 		return nil, p.getLastDialError()
 | |
| 	}
 | |
| 
 | |
| 	netConn, err := p.opt.Dialer()
 | |
| 	if err != nil {
 | |
| 		p.setLastDialError(err)
 | |
| 		if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
 | |
| 			go p.tryDial()
 | |
| 		}
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	cn := NewConn(netConn)
 | |
| 	cn.pooled = pooled
 | |
| 	return cn, nil
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) tryDial() {
 | |
| 	for {
 | |
| 		if p.closed() {
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		conn, err := p.opt.Dialer()
 | |
| 		if err != nil {
 | |
| 			p.setLastDialError(err)
 | |
| 			time.Sleep(time.Second)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		atomic.StoreUint32(&p.dialErrorsNum, 0)
 | |
| 		_ = conn.Close()
 | |
| 		return
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) setLastDialError(err error) {
 | |
| 	p.lastDialErrorMu.Lock()
 | |
| 	p.lastDialError = err
 | |
| 	p.lastDialErrorMu.Unlock()
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) getLastDialError() error {
 | |
| 	p.lastDialErrorMu.RLock()
 | |
| 	err := p.lastDialError
 | |
| 	p.lastDialErrorMu.RUnlock()
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Get returns existed connection from the pool or creates a new one.
 | |
| func (p *ConnPool) Get() (*Conn, error) {
 | |
| 	if p.closed() {
 | |
| 		return nil, ErrClosed
 | |
| 	}
 | |
| 
 | |
| 	err := p.waitTurn()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	for {
 | |
| 		p.connsMu.Lock()
 | |
| 		cn := p.popIdle()
 | |
| 		p.connsMu.Unlock()
 | |
| 
 | |
| 		if cn == nil {
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		if p.isStaleConn(cn) {
 | |
| 			_ = p.CloseConn(cn)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		atomic.AddUint32(&p.stats.Hits, 1)
 | |
| 		return cn, nil
 | |
| 	}
 | |
| 
 | |
| 	atomic.AddUint32(&p.stats.Misses, 1)
 | |
| 
 | |
| 	newcn, err := p._NewConn(true)
 | |
| 	if err != nil {
 | |
| 		p.freeTurn()
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return newcn, nil
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) getTurn() {
 | |
| 	p.queue <- struct{}{}
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) waitTurn() error {
 | |
| 	select {
 | |
| 	case p.queue <- struct{}{}:
 | |
| 		return nil
 | |
| 	default:
 | |
| 		timer := timers.Get().(*time.Timer)
 | |
| 		timer.Reset(p.opt.PoolTimeout)
 | |
| 
 | |
| 		select {
 | |
| 		case p.queue <- struct{}{}:
 | |
| 			if !timer.Stop() {
 | |
| 				<-timer.C
 | |
| 			}
 | |
| 			timers.Put(timer)
 | |
| 			return nil
 | |
| 		case <-timer.C:
 | |
| 			timers.Put(timer)
 | |
| 			atomic.AddUint32(&p.stats.Timeouts, 1)
 | |
| 			return ErrPoolTimeout
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) freeTurn() {
 | |
| 	<-p.queue
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) popIdle() *Conn {
 | |
| 	if len(p.idleConns) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	idx := len(p.idleConns) - 1
 | |
| 	cn := p.idleConns[idx]
 | |
| 	p.idleConns = p.idleConns[:idx]
 | |
| 	p.idleConnsLen--
 | |
| 	p.checkMinIdleConns()
 | |
| 	return cn
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) Put(cn *Conn) {
 | |
| 	if !cn.pooled {
 | |
| 		p.Remove(cn)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	p.connsMu.Lock()
 | |
| 	p.idleConns = append(p.idleConns, cn)
 | |
| 	p.idleConnsLen++
 | |
| 	p.connsMu.Unlock()
 | |
| 	p.freeTurn()
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) Remove(cn *Conn) {
 | |
| 	p.removeConn(cn)
 | |
| 	p.freeTurn()
 | |
| 	_ = p.closeConn(cn)
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) CloseConn(cn *Conn) error {
 | |
| 	p.removeConn(cn)
 | |
| 	return p.closeConn(cn)
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) removeConn(cn *Conn) {
 | |
| 	p.connsMu.Lock()
 | |
| 	for i, c := range p.conns {
 | |
| 		if c == cn {
 | |
| 			p.conns = append(p.conns[:i], p.conns[i+1:]...)
 | |
| 			if cn.pooled {
 | |
| 				p.poolSize--
 | |
| 				p.checkMinIdleConns()
 | |
| 			}
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	p.connsMu.Unlock()
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) closeConn(cn *Conn) error {
 | |
| 	if p.opt.OnClose != nil {
 | |
| 		_ = p.opt.OnClose(cn)
 | |
| 	}
 | |
| 	return cn.Close()
 | |
| }
 | |
| 
 | |
| // Len returns total number of connections.
 | |
| func (p *ConnPool) Len() int {
 | |
| 	p.connsMu.Lock()
 | |
| 	n := len(p.conns)
 | |
| 	p.connsMu.Unlock()
 | |
| 	return n
 | |
| }
 | |
| 
 | |
| // IdleLen returns number of idle connections.
 | |
| func (p *ConnPool) IdleLen() int {
 | |
| 	p.connsMu.Lock()
 | |
| 	n := p.idleConnsLen
 | |
| 	p.connsMu.Unlock()
 | |
| 	return n
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) Stats() *Stats {
 | |
| 	idleLen := p.IdleLen()
 | |
| 	return &Stats{
 | |
| 		Hits:     atomic.LoadUint32(&p.stats.Hits),
 | |
| 		Misses:   atomic.LoadUint32(&p.stats.Misses),
 | |
| 		Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
 | |
| 
 | |
| 		TotalConns: uint32(p.Len()),
 | |
| 		IdleConns:  uint32(idleLen),
 | |
| 		StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) closed() bool {
 | |
| 	return atomic.LoadUint32(&p._closed) == 1
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) Filter(fn func(*Conn) bool) error {
 | |
| 	var firstErr error
 | |
| 	p.connsMu.Lock()
 | |
| 	for _, cn := range p.conns {
 | |
| 		if fn(cn) {
 | |
| 			if err := p.closeConn(cn); err != nil && firstErr == nil {
 | |
| 				firstErr = err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	p.connsMu.Unlock()
 | |
| 	return firstErr
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) Close() error {
 | |
| 	if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
 | |
| 		return ErrClosed
 | |
| 	}
 | |
| 
 | |
| 	var firstErr error
 | |
| 	p.connsMu.Lock()
 | |
| 	for _, cn := range p.conns {
 | |
| 		if err := p.closeConn(cn); err != nil && firstErr == nil {
 | |
| 			firstErr = err
 | |
| 		}
 | |
| 	}
 | |
| 	p.conns = nil
 | |
| 	p.poolSize = 0
 | |
| 	p.idleConns = nil
 | |
| 	p.idleConnsLen = 0
 | |
| 	p.connsMu.Unlock()
 | |
| 
 | |
| 	return firstErr
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) reapStaleConn() *Conn {
 | |
| 	if len(p.idleConns) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	cn := p.idleConns[0]
 | |
| 	if !p.isStaleConn(cn) {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
 | |
| 	p.idleConnsLen--
 | |
| 
 | |
| 	return cn
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) ReapStaleConns() (int, error) {
 | |
| 	var n int
 | |
| 	for {
 | |
| 		p.getTurn()
 | |
| 
 | |
| 		p.connsMu.Lock()
 | |
| 		cn := p.reapStaleConn()
 | |
| 		p.connsMu.Unlock()
 | |
| 
 | |
| 		if cn != nil {
 | |
| 			p.removeConn(cn)
 | |
| 		}
 | |
| 
 | |
| 		p.freeTurn()
 | |
| 
 | |
| 		if cn != nil {
 | |
| 			p.closeConn(cn)
 | |
| 			n++
 | |
| 		} else {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	return n, nil
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) reaper(frequency time.Duration) {
 | |
| 	ticker := time.NewTicker(frequency)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for range ticker.C {
 | |
| 		if p.closed() {
 | |
| 			break
 | |
| 		}
 | |
| 		n, err := p.ReapStaleConns()
 | |
| 		if err != nil {
 | |
| 			internal.Logf("ReapStaleConns failed: %s", err)
 | |
| 			continue
 | |
| 		}
 | |
| 		atomic.AddUint32(&p.stats.StaleConns, uint32(n))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *ConnPool) isStaleConn(cn *Conn) bool {
 | |
| 	if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	now := time.Now()
 | |
| 	if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
 | |
| 		return true
 | |
| 	}
 | |
| 	if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 |