diff --git a/pkg/accesslog/collector/protocols/connection.go b/pkg/accesslog/collector/protocols/connection.go index 7ae93256..b6a9e244 100644 --- a/pkg/accesslog/collector/protocols/connection.go +++ b/pkg/accesslog/collector/protocols/connection.go @@ -29,8 +29,8 @@ import ( type PartitionConnection struct { connectionID, randomID uint64 - dataBuffer *buffer.Buffer - protocol map[enums.ConnectionProtocol]bool + dataBuffers map[enums.ConnectionProtocol]*buffer.Buffer + protocol map[enums.ConnectionProtocol]uint64 // protocol with minimal data id protocolAnalyzer map[enums.ConnectionProtocol]Protocol protocolMetrics map[enums.ConnectionProtocol]ProtocolMetrics closed bool @@ -48,8 +48,8 @@ func (p *PartitionConnection) IsExistProtocol(protocol enums.ConnectionProtocol) return exist } -func (p *PartitionConnection) Buffer() *buffer.Buffer { - return p.dataBuffer +func (p *PartitionConnection) Buffer(protocol enums.ConnectionProtocol) *buffer.Buffer { + return p.dataBuffers[protocol] } func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, detail events.SocketDetail) { @@ -58,12 +58,12 @@ func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, detail forwarder.SendTransferNoProtocolEvent(ctx, detail) return } - p.dataBuffer.AppendDetailEvent(detail) + p.dataBuffers[detail.GetProtocol()].AppendDetailEvent(detail) } func (p *PartitionConnection) AppendData(data buffer.SocketDataBuffer) { if p.skipAllDataAnalyze { return } - p.dataBuffer.AppendDataEvent(data) + p.dataBuffers[data.Protocol()].AppendDataEvent(data) } diff --git a/pkg/accesslog/collector/protocols/http1.go b/pkg/accesslog/collector/protocols/http1.go index 06b1b49d..92a81736 100644 --- a/pkg/accesslog/collector/protocols/http1.go +++ b/pkg/accesslog/collector/protocols/http1.go @@ -68,18 +68,19 @@ func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) Protoc func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error { metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics) + buf := connection.Buffer(enums.ConnectionProtocolHTTP) http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d", - metrics.ConnectionID, metrics.RandomID, connection.Buffer().DataLength()) - connection.Buffer().ResetForLoopReading() + metrics.ConnectionID, metrics.RandomID, buf.DataLength()) + buf.ResetForLoopReading() for { - if !connection.Buffer().PrepareForReading() { + if !buf.PrepareForReading() { return nil } - messageType, err := reader.IdentityMessageType(connection.Buffer()) + messageType, err := reader.IdentityMessageType(buf) if err != nil { http1Log.Debugf("failed to identity message type, %v", err) - if connection.Buffer().SkipCurrentElement() { + if buf.SkipCurrentElement() { break } continue @@ -88,9 +89,9 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe var result enums.ParseResult switch messageType { case reader.MessageTypeRequest: - result, _ = p.handleRequest(metrics, connection.Buffer()) + result, _ = p.handleRequest(metrics, buf) case reader.MessageTypeResponse: - result, _ = p.handleResponse(metrics, connection.Buffer()) + result, _ = p.handleResponse(metrics, buf) case reader.MessageTypeUnknown: result = enums.ParseResultSkipPackage } @@ -98,9 +99,9 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = connection.Buffer().RemoveReadElements() + finishReading = buf.RemoveReadElements() case enums.ParseResultSkipPackage: - finishReading = connection.Buffer().SkipCurrentElement() + finishReading = buf.SkipCurrentElement() } if finishReading { diff --git a/pkg/accesslog/collector/protocols/http2.go b/pkg/accesslog/collector/protocols/http2.go index 486533c8..b18226a6 100644 --- a/pkg/accesslog/collector/protocols/http2.go +++ b/pkg/accesslog/collector/protocols/http2.go @@ -89,19 +89,20 @@ func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) Protoc func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error { http2Metrics := connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics) + buf := connection.Buffer(enums.ConnectionProtocolHTTP2) http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID: %d, random ID: %d", http2Metrics.connectionID, http2Metrics.randomID) - connection.Buffer().ResetForLoopReading() + buf.ResetForLoopReading() for { - if !connection.Buffer().PrepareForReading() { + if !buf.PrepareForReading() { return nil } - startPosition := connection.Buffer().Position() - header, err := http2.ReadFrameHeader(connection.Buffer()) + startPosition := buf.Position() + header, err := http2.ReadFrameHeader(buf) if err != nil { http2Log.Debugf("failed to read frame header, %v", err) - if connection.Buffer().SkipCurrentElement() { + if buf.SkipCurrentElement() { break } continue @@ -112,12 +113,12 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze var result enums.ParseResult switch header.Type { case http2.FrameHeaders: - result, protocolBreak, _ = r.handleHeader(&header, startPosition, http2Metrics, connection.Buffer()) + result, protocolBreak, _ = r.handleHeader(&header, startPosition, http2Metrics, buf) case http2.FrameData: - result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, connection.Buffer()) + result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, buf) default: tmp := make([]byte, header.Length) - if err := connection.Buffer().ReadUntilBufferFull(tmp); err != nil { + if err := buf.ReadUntilBufferFull(tmp); err != nil { if errors.Is(err, buffer.ErrNotComplete) { result = enums.ParseResultSkipPackage } else { @@ -139,9 +140,9 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = connection.Buffer().RemoveReadElements() + finishReading = buf.RemoveReadElements() case enums.ParseResultSkipPackage: - finishReading = connection.Buffer().SkipCurrentElement() + finishReading = buf.SkipCurrentElement() } if finishReading { diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index e505d3e9..ad665841 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "os" + "sort" "sync" "time" @@ -119,25 +120,30 @@ type PartitionContext struct { analyzeLocker sync.Mutex } -func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) *PartitionConnection { +func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64, + protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection { connection := &PartitionConnection{ connectionID: conID, randomID: randomID, - dataBuffer: buffer.NewBuffer(), - protocol: make(map[enums.ConnectionProtocol]bool), + dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer), + protocol: make(map[enums.ConnectionProtocol]uint64), protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol), protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics), } - connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol) + connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, currentDataID) return connection } -func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) { - if _, exist := p.protocol[protocol]; !exist { +func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64, + protocol enums.ConnectionProtocol, currentDataID uint64) { + if minDataID, exist := p.protocol[protocol]; !exist { analyzer := protocolMgr.GetProtocol(protocol) - p.protocol[protocol] = true + p.protocol[protocol] = currentDataID + p.dataBuffers[protocol] = buffer.NewBuffer() p.protocolAnalyzer[protocol] = analyzer p.protocolMetrics[protocol] = analyzer.GenerateConnection(conID, randomID) + } else if currentDataID < minDataID { + p.protocol[protocol] = currentDataID } } @@ -212,26 +218,27 @@ func (p *PartitionContext) Consume(data interface{}) { forwarder.SendTransferNoProtocolEvent(p.context, event) return } - connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol()) + connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol(), event.DataID()) connection.AppendDetail(p.context, event) case *events.SocketDataUploadEvent: pid, _ := events.ParseConnectionID(event.ConnectionID) log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d", - event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol) - connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol) + event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol0) + connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol0, event.DataID0) connection.AppendData(event) } } -func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, protocol enums.ConnectionProtocol) *PartitionConnection { +func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, + protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection { conKey := p.buildConnectionKey(connectionID, randomID) conn, exist := p.connections.Get(conKey) if exist { connection := conn.(*PartitionConnection) - connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol) + connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol, currentDataID) return connection } - result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol) + result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol, currentDataID) p.connections.Set(conKey, result) return result } @@ -254,7 +261,10 @@ func (p *PartitionContext) processEvents() { p.processConnectionEvents(info) // if the connection already closed and not contains any buffer data, then delete the connection - bufLen := info.dataBuffer.DataLength() + var bufLen = 0 + for _, buf := range info.dataBuffers { + bufLen += buf.DataLength() + } if bufLen > 0 { return } @@ -309,9 +319,11 @@ func (p *PartitionContext) processExpireEvents() { } func (p *PartitionContext) processConnectionExpireEvents(connection *PartitionConnection) { - if c := connection.dataBuffer.DeleteExpireEvents(maxBufferExpireDuration); c > 0 { - log.Debugf("total removed %d expired socket data events from connection ID: %d, random ID: %d", c, - connection.connectionID, connection.randomID) + for _, buf := range connection.dataBuffers { + if c := buf.DeleteExpireEvents(maxBufferExpireDuration); c > 0 { + log.Debugf("total removed %d expired socket data events from connection ID: %d, random ID: %d", c, + connection.connectionID, connection.randomID) + } } } @@ -320,8 +332,17 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti return } helper := &AnalyzeHelper{} - for protocol, analyzer := range connection.protocolAnalyzer { - if err := analyzer.Analyze(connection, helper); err != nil { + + // since the socket data/detail are getting unsorted, so rover need to using the minimal data id to analyze to ensure the order + sortedProtocols := make([]enums.ConnectionProtocol, 0, len(connection.protocol)) + for protocol := range connection.protocol { + sortedProtocols = append(sortedProtocols, protocol) + } + sort.Slice(sortedProtocols, func(i, j int) bool { + return connection.protocol[sortedProtocols[i]] < connection.protocol[sortedProtocols[j]] + }) + for _, protocol := range sortedProtocols { + if err := connection.protocolAnalyzer[protocol].Analyze(connection, helper); err != nil { log.Warnf("failed to analyze the %s protocol data: %v", enums.ConnectionProtocolString(protocol), err) } } @@ -330,6 +351,8 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti // notify the connection manager to skip analyze all data(just sending the detail) connection.skipAllDataAnalyze = true p.context.ConnectionMgr.SkipAllDataAnalyze(connection.connectionID, connection.randomID) - connection.dataBuffer.Clean() + for _, buf := range connection.dataBuffers { + buf.Clean() + } } } diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go index 747ff9fc..ed480596 100644 --- a/pkg/accesslog/events/data.go +++ b/pkg/accesslog/events/data.go @@ -24,7 +24,7 @@ import ( ) type SocketDataUploadEvent struct { - Protocol enums.ConnectionProtocol + Protocol0 enums.ConnectionProtocol HaveReduce uint8 Direction0 enums.SocketDataDirection Finished uint8 @@ -39,6 +39,10 @@ type SocketDataUploadEvent struct { Buffer [2048]byte } +func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol { + return s.Protocol0 +} + func (s *SocketDataUploadEvent) GenerateConnectionID() string { return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID) } diff --git a/pkg/profiling/task/network/analyze/events/data.go b/pkg/profiling/task/network/analyze/events/data.go index 9ca242d0..804e96b5 100644 --- a/pkg/profiling/task/network/analyze/events/data.go +++ b/pkg/profiling/task/network/analyze/events/data.go @@ -24,7 +24,7 @@ import ( ) type SocketDataUploadEvent struct { - Protocol enums.ConnectionProtocol + Protocol0 enums.ConnectionProtocol HaveReduce uint8 Direction0 enums.SocketDataDirection Finished uint8 @@ -39,6 +39,10 @@ type SocketDataUploadEvent struct { Buffer [2048]byte } +func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol { + return s.Protocol0 +} + func (s *SocketDataUploadEvent) GenerateConnectionID() string { return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID) } diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go index 60bcd710..0b8c887c 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go @@ -71,10 +71,10 @@ func (a *Analyzer) Start(ctx context.Context) { } func (a *Analyzer) ReceiveSocketDataEvent(event *events.SocketDataUploadEvent) { - analyzer := a.protocols[event.Protocol] + analyzer := a.protocols[event.Protocol()] if analyzer == nil { log.Warnf("could not found any protocol to handle socket data, connection id: %s, protocol: %s(%d)", - event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol), event.Protocol) + event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol()), event.Protocol()) return } analyzer.ReceiveSocketData(a.ctx, event) diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go index 1c71f7fe..0bfc5d9c 100644 --- a/pkg/tools/buffer/buffer.go +++ b/pkg/tools/buffer/buffer.go @@ -33,6 +33,8 @@ var ( ) type SocketDataBuffer interface { + // Protocol of the buffer + Protocol() enums.ConnectionProtocol // GenerateConnectionID for identity the buffer belong which connection GenerateConnectionID() string // BufferData of the buffer @@ -88,6 +90,10 @@ type SocketDataEventLimited struct { Size int } +func (s *SocketDataEventLimited) Protocol() enums.ConnectionProtocol { + return s.SocketDataBuffer.Protocol() +} + func (s *SocketDataEventLimited) BufferData() []byte { return s.SocketDataBuffer.BufferData()[s.From:s.Size] } diff --git a/pkg/tools/ssl/gotls.go b/pkg/tools/ssl/gotls.go index 2b2b3d7c..573854e0 100644 --- a/pkg/tools/ssl/gotls.go +++ b/pkg/tools/ssl/gotls.go @@ -206,7 +206,7 @@ func (r *Register) generateGOTLSSymbolOffsets(register *Register, elfFile *elf.F sym := register.SearchSymbol(func(a, b string) bool { return a == b - }, "go.itab.*net.TCPConn,net.Conn") + }, "go.itab.*net.TCPConn,net.Conn", "go:itab.*net.TCPConn,net.Conn") if sym == nil { log.Warnf("could not found the tcp connection symbol: go.itab.*net.TCPConn,net.Conn") return nil, nil