334 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			334 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package memcached
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"encoding/binary"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"math"
 | 
						|
 | 
						|
	"github.com/couchbase/gomemcached"
 | 
						|
	"github.com/couchbase/goutils/logging"
 | 
						|
)
 | 
						|
 | 
						|
// TAP protocol docs: <http://www.couchbase.com/wiki/display/couchbase/TAP+Protocol>
 | 
						|
 | 
						|
// TapOpcode is the tap operation type (found in TapEvent)
 | 
						|
type TapOpcode uint8
 | 
						|
 | 
						|
// Tap opcode values.
 | 
						|
const (
 | 
						|
	TapBeginBackfill = TapOpcode(iota)
 | 
						|
	TapEndBackfill
 | 
						|
	TapMutation
 | 
						|
	TapDeletion
 | 
						|
	TapCheckpointStart
 | 
						|
	TapCheckpointEnd
 | 
						|
	tapEndStream
 | 
						|
)
 | 
						|
 | 
						|
const tapMutationExtraLen = 16
 | 
						|
 | 
						|
var tapOpcodeNames map[TapOpcode]string
 | 
						|
 | 
						|
func init() {
 | 
						|
	tapOpcodeNames = map[TapOpcode]string{
 | 
						|
		TapBeginBackfill:   "BeginBackfill",
 | 
						|
		TapEndBackfill:     "EndBackfill",
 | 
						|
		TapMutation:        "Mutation",
 | 
						|
		TapDeletion:        "Deletion",
 | 
						|
		TapCheckpointStart: "TapCheckpointStart",
 | 
						|
		TapCheckpointEnd:   "TapCheckpointEnd",
 | 
						|
		tapEndStream:       "EndStream",
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (opcode TapOpcode) String() string {
 | 
						|
	name := tapOpcodeNames[opcode]
 | 
						|
	if name == "" {
 | 
						|
		name = fmt.Sprintf("#%d", opcode)
 | 
						|
	}
 | 
						|
	return name
 | 
						|
}
 | 
						|
 | 
						|
// TapEvent is a TAP notification of an operation on the server.
 | 
						|
type TapEvent struct {
 | 
						|
	Opcode     TapOpcode // Type of event
 | 
						|
	VBucket    uint16    // VBucket this event applies to
 | 
						|
	Flags      uint32    // Item flags
 | 
						|
	Expiry     uint32    // Item expiration time
 | 
						|
	Key, Value []byte    // Item key/value
 | 
						|
	Cas        uint64
 | 
						|
}
 | 
						|
 | 
						|
func makeTapEvent(req gomemcached.MCRequest) *TapEvent {
 | 
						|
	event := TapEvent{
 | 
						|
		VBucket: req.VBucket,
 | 
						|
	}
 | 
						|
	switch req.Opcode {
 | 
						|
	case gomemcached.TAP_MUTATION:
 | 
						|
		event.Opcode = TapMutation
 | 
						|
		event.Key = req.Key
 | 
						|
		event.Value = req.Body
 | 
						|
		event.Cas = req.Cas
 | 
						|
	case gomemcached.TAP_DELETE:
 | 
						|
		event.Opcode = TapDeletion
 | 
						|
		event.Key = req.Key
 | 
						|
		event.Cas = req.Cas
 | 
						|
	case gomemcached.TAP_CHECKPOINT_START:
 | 
						|
		event.Opcode = TapCheckpointStart
 | 
						|
	case gomemcached.TAP_CHECKPOINT_END:
 | 
						|
		event.Opcode = TapCheckpointEnd
 | 
						|
	case gomemcached.TAP_OPAQUE:
 | 
						|
		if len(req.Extras) < 8+4 {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		switch op := int(binary.BigEndian.Uint32(req.Extras[8:])); op {
 | 
						|
		case gomemcached.TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
 | 
						|
			event.Opcode = TapBeginBackfill
 | 
						|
		case gomemcached.TAP_OPAQUE_CLOSE_BACKFILL:
 | 
						|
			event.Opcode = TapEndBackfill
 | 
						|
		case gomemcached.TAP_OPAQUE_CLOSE_TAP_STREAM:
 | 
						|
			event.Opcode = tapEndStream
 | 
						|
		case gomemcached.TAP_OPAQUE_ENABLE_AUTO_NACK:
 | 
						|
			return nil
 | 
						|
		case gomemcached.TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
 | 
						|
			return nil
 | 
						|
		default:
 | 
						|
			logging.Infof("TapFeed: Ignoring TAP_OPAQUE/%d", op)
 | 
						|
			return nil // unknown opaque event
 | 
						|
		}
 | 
						|
	case gomemcached.NOOP:
 | 
						|
		return nil // ignore
 | 
						|
	default:
 | 
						|
		logging.Infof("TapFeed: Ignoring %s", req.Opcode)
 | 
						|
		return nil // unknown event
 | 
						|
	}
 | 
						|
 | 
						|
	if len(req.Extras) >= tapMutationExtraLen &&
 | 
						|
		(event.Opcode == TapMutation || event.Opcode == TapDeletion) {
 | 
						|
 | 
						|
		event.Flags = binary.BigEndian.Uint32(req.Extras[8:])
 | 
						|
		event.Expiry = binary.BigEndian.Uint32(req.Extras[12:])
 | 
						|
	}
 | 
						|
 | 
						|
	return &event
 | 
						|
}
 | 
						|
 | 
						|
func (event TapEvent) String() string {
 | 
						|
	switch event.Opcode {
 | 
						|
	case TapBeginBackfill, TapEndBackfill, TapCheckpointStart, TapCheckpointEnd:
 | 
						|
		return fmt.Sprintf("<TapEvent %s, vbucket=%d>",
 | 
						|
			event.Opcode, event.VBucket)
 | 
						|
	default:
 | 
						|
		return fmt.Sprintf("<TapEvent %s, key=%q (%d bytes) flags=%x, exp=%d>",
 | 
						|
			event.Opcode, event.Key, len(event.Value),
 | 
						|
			event.Flags, event.Expiry)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// TapArguments are parameters for requesting a TAP feed.
 | 
						|
//
 | 
						|
// Call DefaultTapArguments to get a default one.
 | 
						|
type TapArguments struct {
 | 
						|
	// Timestamp of oldest item to send.
 | 
						|
	//
 | 
						|
	// Use TapNoBackfill to suppress all past items.
 | 
						|
	Backfill uint64
 | 
						|
	// If set, server will disconnect after sending existing items.
 | 
						|
	Dump bool
 | 
						|
	// The indices of the vbuckets to watch; empty/nil to watch all.
 | 
						|
	VBuckets []uint16
 | 
						|
	// Transfers ownership of vbuckets during cluster rebalance.
 | 
						|
	Takeover bool
 | 
						|
	// If true, server will wait for client ACK after every notification.
 | 
						|
	SupportAck bool
 | 
						|
	// If true, client doesn't want values so server shouldn't send them.
 | 
						|
	KeysOnly bool
 | 
						|
	// If true, client wants the server to send checkpoint events.
 | 
						|
	Checkpoint bool
 | 
						|
	// Optional identifier to use for this client, to allow reconnects
 | 
						|
	ClientName string
 | 
						|
	// Registers this client (by name) till explicitly deregistered.
 | 
						|
	RegisteredClient bool
 | 
						|
}
 | 
						|
 | 
						|
// Value for TapArguments.Backfill denoting that no past events at all
 | 
						|
// should be sent.
 | 
						|
const TapNoBackfill = math.MaxUint64
 | 
						|
 | 
						|
// DefaultTapArguments returns a default set of parameter values to
 | 
						|
// pass to StartTapFeed.
 | 
						|
func DefaultTapArguments() TapArguments {
 | 
						|
	return TapArguments{
 | 
						|
		Backfill: TapNoBackfill,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (args *TapArguments) flags() []byte {
 | 
						|
	var flags gomemcached.TapConnectFlag
 | 
						|
	if args.Backfill != 0 {
 | 
						|
		flags |= gomemcached.BACKFILL
 | 
						|
	}
 | 
						|
	if args.Dump {
 | 
						|
		flags |= gomemcached.DUMP
 | 
						|
	}
 | 
						|
	if len(args.VBuckets) > 0 {
 | 
						|
		flags |= gomemcached.LIST_VBUCKETS
 | 
						|
	}
 | 
						|
	if args.Takeover {
 | 
						|
		flags |= gomemcached.TAKEOVER_VBUCKETS
 | 
						|
	}
 | 
						|
	if args.SupportAck {
 | 
						|
		flags |= gomemcached.SUPPORT_ACK
 | 
						|
	}
 | 
						|
	if args.KeysOnly {
 | 
						|
		flags |= gomemcached.REQUEST_KEYS_ONLY
 | 
						|
	}
 | 
						|
	if args.Checkpoint {
 | 
						|
		flags |= gomemcached.CHECKPOINT
 | 
						|
	}
 | 
						|
	if args.RegisteredClient {
 | 
						|
		flags |= gomemcached.REGISTERED_CLIENT
 | 
						|
	}
 | 
						|
	encoded := make([]byte, 4)
 | 
						|
	binary.BigEndian.PutUint32(encoded, uint32(flags))
 | 
						|
	return encoded
 | 
						|
}
 | 
						|
 | 
						|
func must(err error) {
 | 
						|
	if err != nil {
 | 
						|
		panic(err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (args *TapArguments) bytes() (rv []byte) {
 | 
						|
	buf := bytes.NewBuffer([]byte{})
 | 
						|
 | 
						|
	if args.Backfill > 0 {
 | 
						|
		must(binary.Write(buf, binary.BigEndian, uint64(args.Backfill)))
 | 
						|
	}
 | 
						|
 | 
						|
	if len(args.VBuckets) > 0 {
 | 
						|
		must(binary.Write(buf, binary.BigEndian, uint16(len(args.VBuckets))))
 | 
						|
		for i := 0; i < len(args.VBuckets); i++ {
 | 
						|
			must(binary.Write(buf, binary.BigEndian, uint16(args.VBuckets[i])))
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return buf.Bytes()
 | 
						|
}
 | 
						|
 | 
						|
// TapFeed represents a stream of events from a server.
 | 
						|
type TapFeed struct {
 | 
						|
	C      <-chan TapEvent
 | 
						|
	Error  error
 | 
						|
	closer chan bool
 | 
						|
}
 | 
						|
 | 
						|
// StartTapFeed starts a TAP feed on a client connection.
 | 
						|
//
 | 
						|
// The events can be read from the returned channel.  The connection
 | 
						|
// can no longer be used for other purposes; it's now reserved for
 | 
						|
// receiving the TAP messages. To stop receiving events, close the
 | 
						|
// client connection.
 | 
						|
func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error) {
 | 
						|
	rq := &gomemcached.MCRequest{
 | 
						|
		Opcode: gomemcached.TAP_CONNECT,
 | 
						|
		Key:    []byte(args.ClientName),
 | 
						|
		Extras: args.flags(),
 | 
						|
		Body:   args.bytes()}
 | 
						|
 | 
						|
	err := mc.Transmit(rq)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	ch := make(chan TapEvent)
 | 
						|
	feed := &TapFeed{
 | 
						|
		C:      ch,
 | 
						|
		closer: make(chan bool),
 | 
						|
	}
 | 
						|
	go mc.runFeed(ch, feed)
 | 
						|
	return feed, nil
 | 
						|
}
 | 
						|
 | 
						|
// TapRecvHook is called after every incoming tap packet is received.
 | 
						|
var TapRecvHook func(*gomemcached.MCRequest, int, error)
 | 
						|
 | 
						|
// Internal goroutine that reads from the socket and writes events to
 | 
						|
// the channel
 | 
						|
func (mc *Client) runFeed(ch chan TapEvent, feed *TapFeed) {
 | 
						|
	defer close(ch)
 | 
						|
	var headerBuf [gomemcached.HDR_LEN]byte
 | 
						|
loop:
 | 
						|
	for {
 | 
						|
		// Read the next request from the server.
 | 
						|
		//
 | 
						|
		//  (Can't call mc.Receive() because it reads a
 | 
						|
		//  _response_ not a request.)
 | 
						|
		var pkt gomemcached.MCRequest
 | 
						|
		n, err := pkt.Receive(mc.conn, headerBuf[:])
 | 
						|
		if TapRecvHook != nil {
 | 
						|
			TapRecvHook(&pkt, n, err)
 | 
						|
		}
 | 
						|
 | 
						|
		if err != nil {
 | 
						|
			if err != io.EOF {
 | 
						|
				feed.Error = err
 | 
						|
			}
 | 
						|
			break loop
 | 
						|
		}
 | 
						|
 | 
						|
		//logging.Infof("** TapFeed received %#v : %q", pkt, pkt.Body)
 | 
						|
 | 
						|
		if pkt.Opcode == gomemcached.TAP_CONNECT {
 | 
						|
			// This is not an event from the server; it's
 | 
						|
			// an error response to my connect request.
 | 
						|
			feed.Error = fmt.Errorf("tap connection failed: %s", pkt.Body)
 | 
						|
			break loop
 | 
						|
		}
 | 
						|
 | 
						|
		event := makeTapEvent(pkt)
 | 
						|
		if event != nil {
 | 
						|
			if event.Opcode == tapEndStream {
 | 
						|
				break loop
 | 
						|
			}
 | 
						|
 | 
						|
			select {
 | 
						|
			case ch <- *event:
 | 
						|
			case <-feed.closer:
 | 
						|
				break loop
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if len(pkt.Extras) >= 4 {
 | 
						|
			reqFlags := binary.BigEndian.Uint16(pkt.Extras[2:])
 | 
						|
			if reqFlags&gomemcached.TAP_ACK != 0 {
 | 
						|
				if _, err := mc.sendAck(&pkt); err != nil {
 | 
						|
					feed.Error = err
 | 
						|
					break loop
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if err := mc.Close(); err != nil {
 | 
						|
		logging.Errorf("Error closing memcached client:  %v", err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (mc *Client) sendAck(pkt *gomemcached.MCRequest) (int, error) {
 | 
						|
	res := gomemcached.MCResponse{
 | 
						|
		Opcode: pkt.Opcode,
 | 
						|
		Opaque: pkt.Opaque,
 | 
						|
		Status: gomemcached.SUCCESS,
 | 
						|
	}
 | 
						|
	return res.Transmit(mc.conn)
 | 
						|
}
 | 
						|
 | 
						|
// Close terminates a TapFeed.
 | 
						|
//
 | 
						|
//  Call this if you stop using a TapFeed before its channel ends.
 | 
						|
func (feed *TapFeed) Close() {
 | 
						|
	close(feed.closer)
 | 
						|
}
 |