Skip to content

Commit

Permalink
Reduce memory and memory using in the access log module (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Dec 25, 2024
1 parent 78fb9c4 commit 31659c5
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Release Notes.
* Add gRPC sender to sending the access log to the backend.
* Add warning log when the event queue almost full in the access log module.
* Reduce unessential `conntrack` query when detect new connection.
* Reduce CPU and memory usage in the access log module.

#### Bug Fixes
* Fix the base image cannot run in the arm64.
Expand Down
8 changes: 4 additions & 4 deletions pkg/accesslog/collector/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize),
ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} {
return &events.SocketConnectEvent{}
}, func(data interface{}) string {
return fmt.Sprintf("%d", data.(*events.SocketConnectEvent).ConID)
}, func(data interface{}) int {
return int(data.(*events.SocketConnectEvent).ConID)
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} {
return &events.SocketCloseEvent{}
}, func(data interface{}) string {
return fmt.Sprintf("%d", data.(*events.SocketCloseEvent).ConnectionID)
}, func(data interface{}) int {
return int(data.(*events.SocketCloseEvent).ConnectionID)
})
c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)

Expand Down
15 changes: 10 additions & 5 deletions pkg/accesslog/collector/protocols/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"os"
"sort"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -96,14 +97,14 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue, int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} {
return q.detailSupplier()
}, func(data interface{}) string {
return fmt.Sprintf("%d", data.(events.SocketDetail).GetConnectionID())
}, func(data interface{}) int {
return int(data.(events.SocketDetail).GetConnectionID())
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} {
return &events.SocketDataUploadEvent{}
}, func(data interface{}) string {
return fmt.Sprintf("%d", data.(*events.SocketDataUploadEvent).ConnectionID)
}, func(data interface{}) int {
return int(data.(*events.SocketDataUploadEvent).ConnectionID)
})

q.eventQueue.Start(ctx, q.context.BPF.Linker)
Expand Down Expand Up @@ -251,7 +252,11 @@ func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64,
}

func (p *PartitionContext) buildConnectionKey(conID, ranID uint64) string {
return fmt.Sprintf("%d_%d", conID, ranID)
buf := make([]byte, 0, 42) // 21 + 1 + 21
buf = strconv.AppendUint(buf, conID, 10)
buf = append(buf, '_')
buf = strconv.AppendUint(buf, ranID, 10)
return string(buf)
}

func (p *PartitionContext) processEvents() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/accesslog/collector/ztunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx *common.AccessLogConte
}

func (z *ZTunnelCollector) ReadyToFlushConnection(connection *common.ConnectionInfo, _ events.Event) {
if connection == nil || connection.Socket == nil || connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil {
if connection == nil || connection.Socket == nil || connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil ||
z.ipMappingCache.Len() == 0 {
return
}
key := z.buildIPMappingCacheKey(connection.Socket.SrcIP, int(connection.Socket.SrcPort),
Expand Down
17 changes: 11 additions & 6 deletions pkg/process/finders/kubernetes/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package kubernetes

import (
"sync"

"github.com/shirou/gopsutil/process"

"github.com/apache/skywalking-rover/pkg/process/api"
Expand All @@ -29,24 +31,23 @@ type Process struct {
original *process.Process

// process data
pid int32
cmd string
profiling *profiling.Info
podContainer *PodContainer
pid int32
cmd string
profilingOnce sync.Once
profiling *profiling.Info
podContainer *PodContainer

// entity for the backend
entity *api.ProcessEntity
}

func NewProcess(p *process.Process, cmdline string, pc *PodContainer, entity *api.ProcessEntity) *Process {
stat, _ := base.BuildProfilingStat(p)
return &Process{
original: p,
pid: p.Pid,
cmd: cmdline,
podContainer: pc,
entity: entity,
profiling: stat,
}
}

Expand All @@ -67,6 +68,10 @@ func (p *Process) DetectType() api.ProcessDetectType {
}

func (p *Process) ProfilingStat() *profiling.Info {
p.profilingOnce.Do(func() {
stat, _ := base.BuildProfilingStat(p.original)
p.profiling = stat
})
return p.profiling
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/process/finders/kubernetes/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kubernetes

import (
"reflect"
"time"

"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -133,14 +134,17 @@ func chooseServiceName(a, b string) string {
return b
}

func (r *Registry) OnAdd(_ interface{}) {
func (r *Registry) OnAdd(d interface{}) {
r.recomposePodServiceName()
}

func (r *Registry) OnUpdate(_, _ interface{}) {
r.recomposePodServiceName()
func (r *Registry) OnUpdate(d, u interface{}) {
same := reflect.DeepEqual(d, u)
if !same {
r.recomposePodServiceName()
}
}

func (r *Registry) OnDelete(_ interface{}) {
func (r *Registry) OnDelete(d interface{}) {
r.recomposePodServiceName()
}
8 changes: 4 additions & 4 deletions pkg/profiling/task/network/analyze/layer7/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) {
// socket buffer data
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDataUploadEvent{}
}, func(data interface{}) string {
return data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID()
}, func(data interface{}) int {
return int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
})

// socket detail
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDetailEvent{}
}, func(data interface{}) string {
return data.(*analyzeBase.SocketDetailEvent).GenerateConnectionID()
}, func(data interface{}) int {
return int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
})

l.socketDataQueue.Start(ctx, bpfLoader.Linker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/http"
"strconv"
"strings"
"sync"

"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
Expand All @@ -38,6 +39,11 @@ var (
requestMethods = []string{
"GET", "POST", "OPTIONS", "HEAD", "PUT", "DELETE", "CONNECT", "TRACE", "PATCH",
}
pooledReader = sync.Pool{
New: func() any {
return bufio.NewReader(nil)
},
}
)

var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "http1", "reader")
Expand Down Expand Up @@ -313,3 +319,13 @@ func (c *charsetReadWrapper) Read(p []byte) (n int, err error) {
func (c *charsetReadWrapper) Close() error {
return nil
}

func newPooledReaderFromBuffer(b *buffer.Buffer) *bufio.Reader {
reader := pooledReader.Get().(*bufio.Reader)
reader.Reset(b)
return reader
}

func releasePooledReader(r *bufio.Reader) {
pooledReader.Put(r)
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ func (r *Request) Original() *http.Request {

// nolint
func (r *Reader) ReadRequest(buf *buffer.Buffer, readBody bool) (*Request, enums.ParseResult, error) {
bufReader := bufio.NewReader(buf)
bufReader := newPooledReaderFromBuffer(buf)
defer func() {
releasePooledReader(bufReader)
}()
tp := textproto.NewReader(bufReader)
req := &http.Request{}
result := &Request{original: req, reader: r}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func (r *Response) Original() *http.Response {
}

func (r *Reader) ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) (*Response, enums.ParseResult, error) {
bufReader := bufio.NewReader(buf)
bufReader := newPooledReaderFromBuffer(buf)
defer func() {
releasePooledReader(bufReader)
}()
tp := textproto.NewReader(bufReader)
resp := &http.Response{}
result := &Response{original: resp, req: req, reader: r}
Expand Down
22 changes: 12 additions & 10 deletions pkg/tools/btf/ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,45 +52,47 @@ func LoadBPFAndAssign(loadBPF func() (*ebpf.CollectionSpec, error), objs interfa

func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec) *ebpf.CollectionOptions {
findBTFOnce.Do(func() {
readSpec, err := getKernelBTFAddress()
readSpec, kernel, err := getKernelBTFAddress()
if err != nil {
log.Warnf("found BTF failure: %v", err)
return
}

spec = readSpec
if !kernel {
spec = readSpec
}
})

return &ebpf.CollectionOptions{Programs: ebpf.ProgramOptions{KernelTypes: spec}}
}

// getKernelBTFAddress means get the kernel BTF file path
func getKernelBTFAddress() (spec *btf.Spec, err error) {
spec, err = btf.LoadKernelSpec()
func getKernelBTFAddress() (spec *btf.Spec, fromKernel bool, err error) {
_, err = btf.LoadKernelSpec()
if err == nil {
return spec, nil
return nil, true, nil
}

distributeInfo, err := operator.GetDistributionInfo()
if err != nil {
return nil, fmt.Errorf("could not load the system distribute info: %v", err)
return nil, false, fmt.Errorf("could not load the system distribute info: %v", err)
}
uname, err := operator.GetOSUname()
if err != nil {
return nil, fmt.Errorf("could not load the uname info: %v", err)
return nil, false, fmt.Errorf("could not load the uname info: %v", err)
}

path := fmt.Sprintf("files/%s/%s/%s/%s.btf", distributeInfo.Name, distributeInfo.Version,
distributeInfo.Architecture, uname.Release)
bpfObjBuff, err := asset(path)
if err != nil {
return nil, fmt.Errorf("could not found customized BTF file: %s", path)
return nil, false, fmt.Errorf("could not found customized BTF file: %s", path)
}
spec, err = btf.LoadSpecFromReader(bytes.NewReader(bpfObjBuff))
if err != nil {
return nil, fmt.Errorf("could not load customized BTF file: %s", path)
return nil, false, fmt.Errorf("could not load customized BTF file: %s", path)
}
return spec, nil
return spec, false, nil
}

func asset(file string) ([]byte, error) {
Expand Down
16 changes: 5 additions & 11 deletions pkg/tools/btf/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package btf
import (
"context"
"fmt"
"hash/fnv"
"sync"
"time"

Expand Down Expand Up @@ -122,7 +121,7 @@ type mapReceiver struct {
emap *ebpf.Map
perCPUBuffer int
dataSupplier func() interface{}
router func(data interface{}) string
router func(data interface{}) int
parallels int
}

Expand All @@ -135,7 +134,7 @@ func NewEventQueue(name string, partitionCount, sizePerPartition int, contextGen
}

func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, parallels int, dataSupplier func() interface{},
routeGenerator func(data interface{}) string) {
routeGenerator func(data interface{}) int) {
e.receivers = append(e.receivers, &mapReceiver{
emap: emap,
perCPUBuffer: perCPUBufferSize,
Expand All @@ -151,14 +150,9 @@ func (e *EventQueue) Start(ctx context.Context, linker *Linker) {
})
}

func (e *EventQueue) Push(key string, data interface{}) {
// calculate hash of key
h := fnv.New32a()
h.Write([]byte(key))
sum32 := int(h.Sum32())

func (e *EventQueue) Push(key int, data interface{}) {
// append data
e.partitions[sum32%e.count].channel <- data
e.partitions[key%e.count].channel <- data
}

func (e *EventQueue) PartitionContexts() []PartitionContext {
Expand Down Expand Up @@ -217,7 +211,7 @@ func (e *EventQueue) start0(ctx context.Context, linker *Linker) {
}()
}

func (e *EventQueue) routerTransformer(data interface{}, routeGenerator func(data interface{}) string) {
func (e *EventQueue) routerTransformer(data interface{}, routeGenerator func(data interface{}) int) {
key := routeGenerator(data)
e.Push(key, data)
}
Expand Down

0 comments on commit 31659c5

Please sign in to comment.