Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce missing details issue in the access log module #168

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Release Notes.
* Fix missing the first socket detail event in HTTPS protocol.
* Support parallel parsing protocol data in the access log module.
* Upgrade Go library to `1.22`, eBPF library to `0.16.0`.
* Reduce missing details issue in the access log module.

#### Bug Fixes
* Fix the base image cannot run in the arm64.
Expand Down
4 changes: 3 additions & 1 deletion configs/rover_configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ access_log:
connection_analyze:
# The size of connection buffer on each CPU
per_cpu_buffer: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PER_CPU_BUFFER:200KB}
# The count of parallel connection event parse
parse_parallels: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PARSE_PARALLELS:1}
# The count of parallel connection analyzer
parallels: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PARALLELS:1}
analyze_parallels: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PARALLELS:1}
# The size of per paralleled analyzer queue
queue_size: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_QUEUE_SIZE:2000}
protocol_analyze:
Expand Down
25 changes: 15 additions & 10 deletions pkg/accesslog/collector/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext
if int(perCPUBufferSize) < os.Getpagesize() {
return fmt.Errorf("the cpu buffer must bigger than %dB", os.Getpagesize())
}
if ctx.Config.ConnectionAnalyze.Parallels < 1 {
if ctx.Config.ConnectionAnalyze.ParseParallels < 1 {
return fmt.Errorf("the parallels cannot be small than 1")
}
if ctx.Config.ConnectionAnalyze.AnalyzeParallels < 1 {
return fmt.Errorf("the parallels cannot be small than 1")
}
if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
Expand All @@ -74,15 +77,17 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext
if err != nil {
connectionLogger.Warnf("cannot create the connection tracker, %v", err)
}
c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
return newConnectionPartitionContext(ctx, track)
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), 1, func() interface{} {
return &events.SocketConnectEvent{}
}, func(data interface{}) string {
return fmt.Sprintf("%d", data.(*events.SocketConnectEvent).ConID)
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), 1, func() interface{} {
c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
return newConnectionPartitionContext(ctx, track)
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize),
ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} {
return &events.SocketConnectEvent{}
}, func(data interface{}) string {
return fmt.Sprintf("%d", data.(*events.SocketConnectEvent).ConID)
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} {
return &events.SocketCloseEvent{}
}, func(data interface{}) string {
return fmt.Sprintf("%d", data.(*events.SocketCloseEvent).ConnectionID)
Expand Down
118 changes: 88 additions & 30 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package protocols

import (
"container/list"
"fmt"
"io"

"github.com/apache/skywalking-rover/pkg/accesslog/common"
Expand All @@ -33,16 +34,18 @@ import (
)

var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
var http1AnalyzeMaxRetryCount = 3

type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response)
type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) error

type HTTP1Protocol struct {
ctx *common.AccessLogContext
analyze HTTP1ProtocolAnalyze
reader *reader.Reader
}

func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze HTTP1ProtocolAnalyze) *HTTP1Protocol {
protocol := &HTTP1Protocol{ctx: ctx}
protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()}
if analyze == nil {
protocol.analyze = protocol.HandleHTTPData
} else {
Expand All @@ -55,29 +58,41 @@ type HTTP1Metrics struct {
ConnectionID uint64
RandomID uint64

halfRequests *list.List
halfRequests *list.List
analyzeUnFinished *list.List
}

func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) ProtocolMetrics {
return &HTTP1Metrics{
ConnectionID: connectionID,
RandomID: randomID,
halfRequests: list.New(),
ConnectionID: connectionID,
RandomID: randomID,
halfRequests: list.New(),
analyzeUnFinished: list.New(),
}
}

type HTTP1AnalyzeUnFinished struct {
request *reader.Request
response *reader.Response
retryCount int
}

func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error {
metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d",
metrics.ConnectionID, metrics.RandomID, buf.DataLength())
p.handleUnFinishedEvents(metrics)
buf.ResetForLoopReading()
for {
if !buf.PrepareForReading() {
return nil
}

messageType, err := reader.IdentityMessageType(buf)
messageType, err := p.reader.IdentityMessageType(buf)
log.Debugf("ready to reading message type, messageType: %v, buf: %p, data id: %d, "+
"connection ID: %d, random ID: %d, error: %v", messageType, buf, buf.Position().DataID(),
metrics.ConnectionID, metrics.RandomID, err)
if err != nil {
http1Log.Debugf("failed to identity message type, %v", err)
if buf.SkipCurrentElement() {
Expand All @@ -89,19 +104,25 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe
var result enums.ParseResult
switch messageType {
case reader.MessageTypeRequest:
result, _ = p.handleRequest(metrics, buf)
result, err = p.handleRequest(metrics, buf)
case reader.MessageTypeResponse:
result, _ = p.handleResponse(metrics, buf)
result, err = p.handleResponse(metrics, buf)
case reader.MessageTypeUnknown:
result = enums.ParseResultSkipPackage
}
if err != nil {
http1Log.Warnf("failed to handle HTTP/1.x protocol, connection ID: %d, random ID: %d, data id: %d, error: %v",
metrics.ConnectionID, metrics.RandomID, buf.Position().DataID(), err)
}

finishReading := false
switch result {
case enums.ParseResultSuccess:
finishReading = buf.RemoveReadElements()
finishReading = buf.RemoveReadElements(false)
case enums.ParseResultSkipPackage:
finishReading = buf.SkipCurrentElement()
log.Debugf("skip current element, data id: %d, buf: %p, connection ID: %d, random ID: %d",
buf.Position().DataID(), buf, metrics.ConnectionID, metrics.RandomID)
}

if finishReading {
Expand All @@ -116,7 +137,7 @@ func (p *HTTP1Protocol) ForProtocol() enums.ConnectionProtocol {
}

func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf *buffer.Buffer) (enums.ParseResult, error) {
req, result, err := reader.ReadRequest(buf, true)
req, result, err := p.reader.ReadRequest(buf, true)
if err != nil {
return enums.ParseResultSkipPackage, err
}
Expand All @@ -130,12 +151,14 @@ func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf *buffer.Buffer)
func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer) (enums.ParseResult, error) {
firstRequest := metrics.halfRequests.Front()
if firstRequest == nil {
log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d",
metrics.ConnectionID, metrics.RandomID)
return enums.ParseResultSkipPackage, nil
}
request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)

// parsing response
response, result, err := reader.ReadResponse(request, b, true)
response, result, err := p.reader.ReadResponse(request, b, true)
defer func() {
// if parsing response failed, then put the request back to the list
if result != enums.ParseResultSuccess {
Expand All @@ -149,37 +172,71 @@ func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer)
}

// getting the request and response, then send to the forwarder
p.analyze(metrics, request, response)
if analyzeError := p.analyze(metrics, request, response); analyzeError != nil {
p.appendAnalyzeUnFinished(metrics, request, response)
}
return enums.ParseResultSuccess, nil
}

func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) {
detailEvents := make([]events.SocketDetail, 0)
detailEvents = AppendSocketDetailsFromBuffer(detailEvents, request.HeaderBuffer())
detailEvents = AppendSocketDetailsFromBuffer(detailEvents, request.BodyBuffer())
detailEvents = AppendSocketDetailsFromBuffer(detailEvents, response.HeaderBuffer())
detailEvents = AppendSocketDetailsFromBuffer(detailEvents, response.BodyBuffer())

if len(detailEvents) == 0 {
http1Log.Warnf("cannot found any detail events for HTTP/1.x protocol, connection ID: %d, random ID: %d, data id: %d-%d",
metrics.ConnectionID, metrics.RandomID,
request.MinDataID(), response.BodyBuffer().LastSocketBuffer().DataID())
return
func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) {
metrics.analyzeUnFinished.PushBack(&HTTP1AnalyzeUnFinished{
request: request,
response: response,
retryCount: 0,
})
}

func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics) {
for element := m.analyzeUnFinished.Front(); element != nil; {
unFinished := element.Value.(*HTTP1AnalyzeUnFinished)
err := p.analyze(m, unFinished.request, unFinished.response)
if err != nil {
unFinished.retryCount++
if unFinished.retryCount < http1AnalyzeMaxRetryCount {
element = element.Next()
continue
}
http1Log.Warnf("failed to analyze HTTP1 request and response, connection ID: %d, random ID: %d, "+
"retry count: %d, error: %v", m.ConnectionID, m.RandomID, unFinished.retryCount, err)
}
next := element.Next()
m.analyzeUnFinished.Remove(element)
element = next
}
http1Log.Debugf("found fully HTTP1 request and response, contains %d detail events , connection ID: %d, random ID: %d",
len(detailEvents), metrics.ConnectionID, metrics.RandomID)
}

func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) error {
details := make([]events.SocketDetail, 0)
var allInclude = true
var idRange *buffer.DataIDRange
details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, request.HeaderBuffer(), idRange, allInclude)
details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, request.BodyBuffer(), idRange, allInclude)
details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, response.HeaderBuffer(), idRange, allInclude)
details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, response.BodyBuffer(), idRange, allInclude)

if !allInclude {
return fmt.Errorf("cannot found full detail events for HTTP/1.x protocol, "+
"data id: %d-%d, current details count: %d",
request.MinDataID(), response.BodyBuffer().LastSocketBuffer().DataID(), len(details))
}

http1Log.Debugf("found fully HTTP1 request and response, contains %d detail events, "+
"connection ID: %d, random ID: %d, data range: %d-%d(%t)",
len(details), metrics.ConnectionID, metrics.RandomID, idRange.From, idRange.To, idRange.IsToBufferReadFinished)
originalRequest := request.Original()
originalResponse := response.Original()
// delete details(each request or response is fine because it's will delete the original buffer)
idRange.DeleteDetails(request.HeaderBuffer())

defer func() {
p.CloseStream(originalRequest.Body)
p.CloseStream(originalResponse.Body)
}()
forwarder.SendTransferProtocolEvent(p.ctx, detailEvents, &v3.AccessLogProtocolLogs{
forwarder.SendTransferProtocolEvent(p.ctx, details, &v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
StartTime: forwarder.BuildOffsetTimestamp(detailEvents[0].GetStartTime()),
EndTime: forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
StartTime: forwarder.BuildOffsetTimestamp(details[0].GetStartTime()),
EndTime: forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()),
Version: v3.AccessLogHTTPProtocolVersion_HTTP1,
Request: &v3.AccessLogHTTPProtocolRequest{
Method: TransformHTTPMethod(originalRequest.Method),
Expand All @@ -198,6 +255,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Re
},
},
})
return nil
}

func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
Expand Down
Loading
Loading