* Update Request Header Prefixes and Upgrade API Version to 0.6.0 - Changed all HTTP request header prefixes from `x-sls-` to `x-log-` to comply with the latest API specifications. - Adjusted header prefix validation in `signature.go` to support `x-log-` and `x-acs-`. - Replaced `x-sls-bodyrawsize` with `x-log-bodyrawsize` in multiple files (`log_project.go`, `request.go`, etc.). - Updated header fields for compression type and signing method (e.g., `x-sls-compresstype` to `x-log-compresstype`). - Modified authorization logic to replace `SLS` with `LOG`. * Refactor signature generation logic for improved readability and maintainability Split the signature generation logic into several independent functions, including header extraction, standardized header construction, and resource string generation. This modular design enhances code clarity, reduces redundant logic, and improves error handling mechanisms. Additionally, comments and code structure have been optimized to ensure easier extension and debugging. --------- Co-authored-by: 优胜 <zhushaofei.zsf@alibaba-inc.com>
271 lines
5.8 KiB
Go
Executable File
271 lines
5.8 KiB
Go
Executable File
package alils
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httputil"
|
|
"strconv"
|
|
|
|
lz4 "github.com/cloudflare/golz4"
|
|
"github.com/gogo/protobuf/proto"
|
|
)
|
|
|
|
// LogStore stores the logs
|
|
type LogStore struct {
|
|
Name string `json:"logstoreName"`
|
|
TTL int
|
|
ShardCount int
|
|
|
|
CreateTime uint32
|
|
LastModifyTime uint32
|
|
|
|
project *LogProject
|
|
}
|
|
|
|
// Shard defines the Log Shard
|
|
type Shard struct {
|
|
ShardID int `json:"shardID"`
|
|
}
|
|
|
|
// ListShards returns shard id list of this logstore.
|
|
func (s *LogStore) ListShards() (shardIDs []int, err error) {
|
|
h := map[string]string{
|
|
"x-log-bodyrawsize": "0",
|
|
}
|
|
|
|
uri := fmt.Sprintf("/logstores/%v/shards", s.Name)
|
|
r, err := request(s.project, "GET", uri, h, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
buf, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if r.StatusCode != http.StatusOK {
|
|
errMsg := &errorMessage{}
|
|
err = json.Unmarshal(buf, errMsg)
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to list logstore")
|
|
dump, _ := httputil.DumpResponse(r, true)
|
|
fmt.Println(dump)
|
|
return
|
|
}
|
|
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
|
return
|
|
}
|
|
|
|
var shards []*Shard
|
|
err = json.Unmarshal(buf, &shards)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for _, v := range shards {
|
|
shardIDs = append(shardIDs, v.ShardID)
|
|
}
|
|
return
|
|
}
|
|
|
|
// PutLogs puts logs into logstore.
|
|
// The callers should transform user logs into LogGroup.
|
|
func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
|
|
body, err := proto.Marshal(lg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Compress body with lz4
|
|
out := make([]byte, lz4.CompressBound(body))
|
|
n, err := lz4.Compress(body, out)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
h := map[string]string{
|
|
"x-log-compresstype": "lz4",
|
|
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
|
|
"Content-Type": "application/x-protobuf",
|
|
}
|
|
|
|
uri := fmt.Sprintf("/logstores/%v", s.Name)
|
|
r, err := request(s.project, "POST", uri, h, out[:n])
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
buf, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if r.StatusCode != http.StatusOK {
|
|
errMsg := &errorMessage{}
|
|
err = json.Unmarshal(buf, errMsg)
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to put logs")
|
|
dump, _ := httputil.DumpResponse(r, true)
|
|
fmt.Println(dump)
|
|
return
|
|
}
|
|
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// GetCursor gets log cursor of one shard specified by shardID.
|
|
// The from can be in three form: a) unix timestamp in seccond, b) "begin", c) "end".
|
|
// For more detail please read: http://gitlab.alibaba-inc.com/sls/doc/blob/master/api/shard.md#logstore
|
|
func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error) {
|
|
h := map[string]string{
|
|
"x-log-bodyrawsize": "0",
|
|
}
|
|
|
|
uri := fmt.Sprintf("/logstores/%v/shards/%v?type=cursor&from=%v",
|
|
s.Name, shardID, from)
|
|
|
|
r, err := request(s.project, "GET", uri, h, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
buf, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if r.StatusCode != http.StatusOK {
|
|
errMsg := &errorMessage{}
|
|
err = json.Unmarshal(buf, errMsg)
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to get cursor")
|
|
dump, _ := httputil.DumpResponse(r, true)
|
|
fmt.Println(dump)
|
|
return
|
|
}
|
|
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
|
return
|
|
}
|
|
|
|
type Body struct {
|
|
Cursor string
|
|
}
|
|
body := &Body{}
|
|
|
|
err = json.Unmarshal(buf, body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
cursor = body.Cursor
|
|
return
|
|
}
|
|
|
|
// GetLogsBytes gets logs binary data from shard specified by shardID according cursor.
|
|
// The logGroupMaxCount is the max number of logGroup could be returned.
|
|
// The nextCursor is the next curosr can be used to read logs at next time.
|
|
func (s *LogStore) GetLogsBytes(shardID int, cursor string,
|
|
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
|
|
|
|
h := map[string]string{
|
|
"x-log-bodyrawsize": "0",
|
|
"Accept": "application/x-protobuf",
|
|
"Accept-Encoding": "lz4",
|
|
}
|
|
|
|
uri := fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
|
|
s.Name, shardID, cursor, logGroupMaxCount)
|
|
|
|
r, err := request(s.project, "GET", uri, h, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
buf, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if r.StatusCode != http.StatusOK {
|
|
errMsg := &errorMessage{}
|
|
err = json.Unmarshal(buf, errMsg)
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to get cursor")
|
|
dump, _ := httputil.DumpResponse(r, true)
|
|
fmt.Println(dump)
|
|
return
|
|
}
|
|
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
|
return
|
|
}
|
|
|
|
v, ok := r.Header["X-Log-Compresstype"]
|
|
if !ok || len(v) == 0 {
|
|
err = fmt.Errorf("can't find 'x-log-compresstype' header")
|
|
return
|
|
}
|
|
if v[0] != "lz4" {
|
|
err = fmt.Errorf("unexpected compress type:%v", v[0])
|
|
return
|
|
}
|
|
|
|
v, ok = r.Header["X-Log-Cursor"]
|
|
if !ok || len(v) == 0 {
|
|
err = fmt.Errorf("can't find 'x-log-cursor' header")
|
|
return
|
|
}
|
|
nextCursor = v[0]
|
|
|
|
v, ok = r.Header["X-Log-Bodyrawsize"]
|
|
if !ok || len(v) == 0 {
|
|
err = fmt.Errorf("can't find 'x-log-bodyrawsize' header")
|
|
return
|
|
}
|
|
bodyRawSize, err := strconv.Atoi(v[0])
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
out = make([]byte, bodyRawSize)
|
|
err = lz4.Uncompress(buf, out)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// LogsBytesDecode decodes logs binary data returned by GetLogsBytes API
|
|
func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {
|
|
gl = &LogGroupList{}
|
|
err = proto.Unmarshal(data, gl)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// GetLogs gets logs from shard specified by shardID according cursor.
|
|
// The logGroupMaxCount is the max number of logGroup could be returned.
|
|
// The nextCursor is the next curosr can be used to read logs at next time.
|
|
func (s *LogStore) GetLogs(shardID int, cursor string,
|
|
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
|
|
|
|
out, nextCursor, err := s.GetLogsBytes(shardID, cursor, logGroupMaxCount)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
gl, err = LogsBytesDecode(out)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|