240 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			240 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com>
 | 
						|
// All rights reserved.
 | 
						|
//
 | 
						|
// Use of this source code is governed by a BSD-style license that can be
 | 
						|
// found in the LICENSE file.
 | 
						|
 | 
						|
package util
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
type buffer struct {
 | 
						|
	b    []byte
 | 
						|
	miss int
 | 
						|
}
 | 
						|
 | 
						|
// BufferPool is a 'buffer pool'.
 | 
						|
type BufferPool struct {
 | 
						|
	pool      [6]chan []byte
 | 
						|
	size      [5]uint32
 | 
						|
	sizeMiss  [5]uint32
 | 
						|
	sizeHalf  [5]uint32
 | 
						|
	baseline  [4]int
 | 
						|
	baseline0 int
 | 
						|
 | 
						|
	mu     sync.RWMutex
 | 
						|
	closed bool
 | 
						|
	closeC chan struct{}
 | 
						|
 | 
						|
	get     uint32
 | 
						|
	put     uint32
 | 
						|
	half    uint32
 | 
						|
	less    uint32
 | 
						|
	equal   uint32
 | 
						|
	greater uint32
 | 
						|
	miss    uint32
 | 
						|
}
 | 
						|
 | 
						|
func (p *BufferPool) poolNum(n int) int {
 | 
						|
	if n <= p.baseline0 && n > p.baseline0/2 {
 | 
						|
		return 0
 | 
						|
	}
 | 
						|
	for i, x := range p.baseline {
 | 
						|
		if n <= x {
 | 
						|
			return i + 1
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return len(p.baseline) + 1
 | 
						|
}
 | 
						|
 | 
						|
// Get returns buffer with length of n.
 | 
						|
func (p *BufferPool) Get(n int) []byte {
 | 
						|
	if p == nil {
 | 
						|
		return make([]byte, n)
 | 
						|
	}
 | 
						|
 | 
						|
	p.mu.RLock()
 | 
						|
	defer p.mu.RUnlock()
 | 
						|
 | 
						|
	if p.closed {
 | 
						|
		return make([]byte, n)
 | 
						|
	}
 | 
						|
 | 
						|
	atomic.AddUint32(&p.get, 1)
 | 
						|
 | 
						|
	poolNum := p.poolNum(n)
 | 
						|
	pool := p.pool[poolNum]
 | 
						|
	if poolNum == 0 {
 | 
						|
		// Fast path.
 | 
						|
		select {
 | 
						|
		case b := <-pool:
 | 
						|
			switch {
 | 
						|
			case cap(b) > n:
 | 
						|
				if cap(b)-n >= n {
 | 
						|
					atomic.AddUint32(&p.half, 1)
 | 
						|
					select {
 | 
						|
					case pool <- b:
 | 
						|
					default:
 | 
						|
					}
 | 
						|
					return make([]byte, n)
 | 
						|
				} else {
 | 
						|
					atomic.AddUint32(&p.less, 1)
 | 
						|
					return b[:n]
 | 
						|
				}
 | 
						|
			case cap(b) == n:
 | 
						|
				atomic.AddUint32(&p.equal, 1)
 | 
						|
				return b[:n]
 | 
						|
			default:
 | 
						|
				atomic.AddUint32(&p.greater, 1)
 | 
						|
			}
 | 
						|
		default:
 | 
						|
			atomic.AddUint32(&p.miss, 1)
 | 
						|
		}
 | 
						|
 | 
						|
		return make([]byte, n, p.baseline0)
 | 
						|
	} else {
 | 
						|
		sizePtr := &p.size[poolNum-1]
 | 
						|
 | 
						|
		select {
 | 
						|
		case b := <-pool:
 | 
						|
			switch {
 | 
						|
			case cap(b) > n:
 | 
						|
				if cap(b)-n >= n {
 | 
						|
					atomic.AddUint32(&p.half, 1)
 | 
						|
					sizeHalfPtr := &p.sizeHalf[poolNum-1]
 | 
						|
					if atomic.AddUint32(sizeHalfPtr, 1) == 20 {
 | 
						|
						atomic.StoreUint32(sizePtr, uint32(cap(b)/2))
 | 
						|
						atomic.StoreUint32(sizeHalfPtr, 0)
 | 
						|
					} else {
 | 
						|
						select {
 | 
						|
						case pool <- b:
 | 
						|
						default:
 | 
						|
						}
 | 
						|
					}
 | 
						|
					return make([]byte, n)
 | 
						|
				} else {
 | 
						|
					atomic.AddUint32(&p.less, 1)
 | 
						|
					return b[:n]
 | 
						|
				}
 | 
						|
			case cap(b) == n:
 | 
						|
				atomic.AddUint32(&p.equal, 1)
 | 
						|
				return b[:n]
 | 
						|
			default:
 | 
						|
				atomic.AddUint32(&p.greater, 1)
 | 
						|
				if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) {
 | 
						|
					select {
 | 
						|
					case pool <- b:
 | 
						|
					default:
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
		default:
 | 
						|
			atomic.AddUint32(&p.miss, 1)
 | 
						|
		}
 | 
						|
 | 
						|
		if size := atomic.LoadUint32(sizePtr); uint32(n) > size {
 | 
						|
			if size == 0 {
 | 
						|
				atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n))
 | 
						|
			} else {
 | 
						|
				sizeMissPtr := &p.sizeMiss[poolNum-1]
 | 
						|
				if atomic.AddUint32(sizeMissPtr, 1) == 20 {
 | 
						|
					atomic.StoreUint32(sizePtr, uint32(n))
 | 
						|
					atomic.StoreUint32(sizeMissPtr, 0)
 | 
						|
				}
 | 
						|
			}
 | 
						|
			return make([]byte, n)
 | 
						|
		} else {
 | 
						|
			return make([]byte, n, size)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Put adds given buffer to the pool.
 | 
						|
func (p *BufferPool) Put(b []byte) {
 | 
						|
	if p == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	p.mu.RLock()
 | 
						|
	defer p.mu.RUnlock()
 | 
						|
 | 
						|
	if p.closed {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	atomic.AddUint32(&p.put, 1)
 | 
						|
 | 
						|
	pool := p.pool[p.poolNum(cap(b))]
 | 
						|
	select {
 | 
						|
	case pool <- b:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func (p *BufferPool) Close() {
 | 
						|
	if p == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	p.mu.Lock()
 | 
						|
	if !p.closed {
 | 
						|
		p.closed = true
 | 
						|
		p.closeC <- struct{}{}
 | 
						|
	}
 | 
						|
	p.mu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (p *BufferPool) String() string {
 | 
						|
	if p == nil {
 | 
						|
		return "<nil>"
 | 
						|
	}
 | 
						|
 | 
						|
	return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}",
 | 
						|
		p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss)
 | 
						|
}
 | 
						|
 | 
						|
func (p *BufferPool) drain() {
 | 
						|
	ticker := time.NewTicker(2 * time.Second)
 | 
						|
	defer ticker.Stop()
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ticker.C:
 | 
						|
			for _, ch := range p.pool {
 | 
						|
				select {
 | 
						|
				case <-ch:
 | 
						|
				default:
 | 
						|
				}
 | 
						|
			}
 | 
						|
		case <-p.closeC:
 | 
						|
			close(p.closeC)
 | 
						|
			for _, ch := range p.pool {
 | 
						|
				close(ch)
 | 
						|
			}
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// NewBufferPool creates a new initialized 'buffer pool'.
 | 
						|
func NewBufferPool(baseline int) *BufferPool {
 | 
						|
	if baseline <= 0 {
 | 
						|
		panic("baseline can't be <= 0")
 | 
						|
	}
 | 
						|
	p := &BufferPool{
 | 
						|
		baseline0: baseline,
 | 
						|
		baseline:  [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4},
 | 
						|
		closeC:    make(chan struct{}, 1),
 | 
						|
	}
 | 
						|
	for i, cap := range []int{2, 2, 4, 4, 2, 1} {
 | 
						|
		p.pool[i] = make(chan []byte, cap)
 | 
						|
	}
 | 
						|
	go p.drain()
 | 
						|
	return p
 | 
						|
}
 |