271 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Go
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			271 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Go
		
	
	
		
			Executable File
		
	
	
	
	
| package alils
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net/http"
 | |
| 	"net/http/httputil"
 | |
| 	"strconv"
 | |
| 
 | |
| 	lz4 "github.com/cloudflare/golz4"
 | |
| 	"github.com/gogo/protobuf/proto"
 | |
| )
 | |
| 
 | |
| // LogStore stores the logs
 | |
| type LogStore struct {
 | |
| 	Name       string `json:"logstoreName"`
 | |
| 	TTL        int
 | |
| 	ShardCount int
 | |
| 
 | |
| 	CreateTime     uint32
 | |
| 	LastModifyTime uint32
 | |
| 
 | |
| 	project *LogProject
 | |
| }
 | |
| 
 | |
| // Shard defines the Log Shard
 | |
| type Shard struct {
 | |
| 	ShardID int `json:"shardID"`
 | |
| }
 | |
| 
 | |
| // ListShards returns shard id list of this logstore.
 | |
| func (s *LogStore) ListShards() (shardIDs []int, err error) {
 | |
| 	h := map[string]string{
 | |
| 		"x-sls-bodyrawsize": "0",
 | |
| 	}
 | |
| 
 | |
| 	uri := fmt.Sprintf("/logstores/%v/shards", s.Name)
 | |
| 	r, err := request(s.project, "GET", uri, h, nil)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	buf, err := io.ReadAll(r.Body)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if r.StatusCode != http.StatusOK {
 | |
| 		errMsg := &errorMessage{}
 | |
| 		err = json.Unmarshal(buf, errMsg)
 | |
| 		if err != nil {
 | |
| 			err = fmt.Errorf("failed to list logstore")
 | |
| 			dump, _ := httputil.DumpResponse(r, true)
 | |
| 			fmt.Println(dump)
 | |
| 			return
 | |
| 		}
 | |
| 		err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	var shards []*Shard
 | |
| 	err = json.Unmarshal(buf, &shards)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	for _, v := range shards {
 | |
| 		shardIDs = append(shardIDs, v.ShardID)
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // PutLogs puts logs into logstore.
 | |
| // The callers should transform user logs into LogGroup.
 | |
| func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
 | |
| 	body, err := proto.Marshal(lg)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Compress body with lz4
 | |
| 	out := make([]byte, lz4.CompressBound(body))
 | |
| 	n, err := lz4.Compress(body, out)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	h := map[string]string{
 | |
| 		"x-sls-compresstype": "lz4",
 | |
| 		"x-sls-bodyrawsize":  fmt.Sprintf("%v", len(body)),
 | |
| 		"Content-Type":       "application/x-protobuf",
 | |
| 	}
 | |
| 
 | |
| 	uri := fmt.Sprintf("/logstores/%v", s.Name)
 | |
| 	r, err := request(s.project, "POST", uri, h, out[:n])
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	buf, err := io.ReadAll(r.Body)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if r.StatusCode != http.StatusOK {
 | |
| 		errMsg := &errorMessage{}
 | |
| 		err = json.Unmarshal(buf, errMsg)
 | |
| 		if err != nil {
 | |
| 			err = fmt.Errorf("failed to put logs")
 | |
| 			dump, _ := httputil.DumpResponse(r, true)
 | |
| 			fmt.Println(dump)
 | |
| 			return
 | |
| 		}
 | |
| 		err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
 | |
| 		return
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // GetCursor gets log cursor of one shard specified by shardID.
 | |
| // The from can be in three form: a) unix timestamp in seccond, b) "begin", c) "end".
 | |
| // For more detail please read: http://gitlab.alibaba-inc.com/sls/doc/blob/master/api/shard.md#logstore
 | |
| func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error) {
 | |
| 	h := map[string]string{
 | |
| 		"x-sls-bodyrawsize": "0",
 | |
| 	}
 | |
| 
 | |
| 	uri := fmt.Sprintf("/logstores/%v/shards/%v?type=cursor&from=%v",
 | |
| 		s.Name, shardID, from)
 | |
| 
 | |
| 	r, err := request(s.project, "GET", uri, h, nil)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	buf, err := io.ReadAll(r.Body)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if r.StatusCode != http.StatusOK {
 | |
| 		errMsg := &errorMessage{}
 | |
| 		err = json.Unmarshal(buf, errMsg)
 | |
| 		if err != nil {
 | |
| 			err = fmt.Errorf("failed to get cursor")
 | |
| 			dump, _ := httputil.DumpResponse(r, true)
 | |
| 			fmt.Println(dump)
 | |
| 			return
 | |
| 		}
 | |
| 		err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	type Body struct {
 | |
| 		Cursor string
 | |
| 	}
 | |
| 	body := &Body{}
 | |
| 
 | |
| 	err = json.Unmarshal(buf, body)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	cursor = body.Cursor
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // GetLogsBytes gets logs binary data from shard specified by shardID according cursor.
 | |
| // The logGroupMaxCount is the max number of logGroup could be returned.
 | |
| // The nextCursor is the next curosr can be used to read logs at next time.
 | |
| func (s *LogStore) GetLogsBytes(shardID int, cursor string,
 | |
| 	logGroupMaxCount int) (out []byte, nextCursor string, err error) {
 | |
| 
 | |
| 	h := map[string]string{
 | |
| 		"x-sls-bodyrawsize": "0",
 | |
| 		"Accept":            "application/x-protobuf",
 | |
| 		"Accept-Encoding":   "lz4",
 | |
| 	}
 | |
| 
 | |
| 	uri := fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
 | |
| 		s.Name, shardID, cursor, logGroupMaxCount)
 | |
| 
 | |
| 	r, err := request(s.project, "GET", uri, h, nil)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	buf, err := io.ReadAll(r.Body)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if r.StatusCode != http.StatusOK {
 | |
| 		errMsg := &errorMessage{}
 | |
| 		err = json.Unmarshal(buf, errMsg)
 | |
| 		if err != nil {
 | |
| 			err = fmt.Errorf("failed to get cursor")
 | |
| 			dump, _ := httputil.DumpResponse(r, true)
 | |
| 			fmt.Println(dump)
 | |
| 			return
 | |
| 		}
 | |
| 		err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	v, ok := r.Header["X-Sls-Compresstype"]
 | |
| 	if !ok || len(v) == 0 {
 | |
| 		err = fmt.Errorf("can't find 'x-sls-compresstype' header")
 | |
| 		return
 | |
| 	}
 | |
| 	if v[0] != "lz4" {
 | |
| 		err = fmt.Errorf("unexpected compress type:%v", v[0])
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	v, ok = r.Header["X-Sls-Cursor"]
 | |
| 	if !ok || len(v) == 0 {
 | |
| 		err = fmt.Errorf("can't find 'x-sls-cursor' header")
 | |
| 		return
 | |
| 	}
 | |
| 	nextCursor = v[0]
 | |
| 
 | |
| 	v, ok = r.Header["X-Sls-Bodyrawsize"]
 | |
| 	if !ok || len(v) == 0 {
 | |
| 		err = fmt.Errorf("can't find 'x-sls-bodyrawsize' header")
 | |
| 		return
 | |
| 	}
 | |
| 	bodyRawSize, err := strconv.Atoi(v[0])
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	out = make([]byte, bodyRawSize)
 | |
| 	err = lz4.Uncompress(buf, out)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // LogsBytesDecode decodes logs binary data returned by GetLogsBytes API
 | |
| func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {
 | |
| 	gl = &LogGroupList{}
 | |
| 	err = proto.Unmarshal(data, gl)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // GetLogs gets logs from shard specified by shardID according cursor.
 | |
| // The logGroupMaxCount is the max number of logGroup could be returned.
 | |
| // The nextCursor is the next curosr can be used to read logs at next time.
 | |
| func (s *LogStore) GetLogs(shardID int, cursor string,
 | |
| 	logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
 | |
| 
 | |
| 	out, nextCursor, err := s.GetLogsBytes(shardID, cursor, logGroupMaxCount)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	gl, err = LogsBytesDecode(out)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	return
 | |
| }
 |