Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): cover Exchange with traces #150

Merged
merged 11 commits into from
Feb 1, 2024
63 changes: 58 additions & 5 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/go-header"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
)

var log = logging.Logger("header/p2p")
var (
log = logging.Logger("header/p2p")

tracerClient = otel.Tracer("header/p2p-client")
)

// minHeadResponses is the minimum number of headers of the same height
// received from peers to determine the network head. If all trusted peers
Expand Down Expand Up @@ -113,6 +121,8 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error {
// and return the highest one.
func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (H, error) {
log.Debug("requesting head")
ctx, span := tracerClient.Start(ctx, "head")
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
defer span.End()

reqCtx := ctx
startTime := time.Now()
Expand Down Expand Up @@ -157,8 +167,15 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
)
for _, from := range peers {
go func(from peer.ID) {
_, newSpan := span.TracerProvider().Tracer("requesting peer").Start(
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
ctx, "",
trace.WithAttributes(attribute.String("peerID", from.String())),
)
defer newSpan.End()

headers, err := ex.request(reqCtx, from, headerReq)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
newSpan.SetStatus(codes.Error, err.Error())
log.Errorw("head request to peer failed", "peer", from, "err", err)
headerRespCh <- zero
return
Expand All @@ -171,6 +188,7 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
if errors.As(err, &verErr) && verErr.SoftFailure {
log.Debugw("received head from tracked peer that soft-failed verification",
"tracked peer", from, "err", err)
newSpan.SetStatus(codes.Error, err.Error())
headerRespCh <- headers[0]
return
}
Expand All @@ -180,10 +198,12 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
}
logF("verifying head received from tracked peer", "tracked peer", from,
"height", headers[0].Height(), "err", err)
newSpan.SetStatus(codes.Error, err.Error())
headerRespCh <- zero
return
}
}
newSpan.SetStatus(codes.Ok, "")
// request ensures that the result slice will have at least one Header
headerRespCh <- headers[0]
}(from)
Expand All @@ -206,22 +226,25 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
status = headStatusTimeout
}

span.SetStatus(codes.Error, fmt.Sprintf("head request %s", status))
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, status)
return zero, ctx.Err()
case <-ex.ctx.Done():
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusCanceled)
span.SetStatus(codes.Error, "exchange client stopped")
return zero, ex.ctx.Err()
}
}

head, err := bestHead[H](headers)
if err != nil {
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusNoHeaders)
span.SetStatus(codes.Error, headStatusNoHeaders)
return zero, err
}

ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusOk)
span.SetStatus(codes.Ok, "")
return head, nil
}

Expand All @@ -230,10 +253,17 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
// thereafter.
func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
log.Debugw("requesting header", "height", height)
ctx, span := tracerClient.Start(ctx, "get-by-height",
trace.WithAttributes(
attribute.Int64("height", int64(height)),
))
defer span.End()
var zero H
// sanity check height
if height == 0 {
return zero, fmt.Errorf("specified request height must be greater than 0")
err := fmt.Errorf("specified request height must be greater than 0")
span.SetStatus(codes.Error, err.Error())
return zero, err
}
// create request
req := &p2p_pb.HeaderRequest{
Expand All @@ -242,8 +272,10 @@ func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error
}
headers, err := ex.performRequest(ctx, req)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return zero, err
}
span.SetStatus(codes.Ok, "")
return headers[0], nil
}

Expand All @@ -254,19 +286,36 @@ func (ex *Exchange[H]) GetRangeByHeight(
from H,
to uint64,
) ([]H, error) {
ctx, span := tracerClient.Start(ctx, "get-range-by-height",
trace.WithAttributes(
attribute.Int64("from", int64(from.Height())),
attribute.Int64("to", int64(to)),
))
defer span.End()
session := newSession[H](
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from),
)
defer session.close()
// we request the next header height that we don't have: `fromHead`+1
amount := to - (from.Height() + 1)
return session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest)
result, err := session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.SetStatus(codes.Ok, "")
return result, nil
}

// Get performs a request for the Header by the given hash corresponding
// to the RawHeader. Note that the Header must be verified thereafter.
func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
log.Debugw("requesting header", "hash", hash.String())
ctx, span := tracerClient.Start(ctx, "get-by-hash",
trace.WithAttributes(
attribute.String("hash", hash.String()),
))
defer span.End()
var zero H
// create request
req := &p2p_pb.HeaderRequest{
Expand All @@ -275,12 +324,16 @@ func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
}
headers, err := ex.performRequest(ctx, req)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return zero, err
}

if !bytes.Equal(headers[0].Hash(), hash) {
return zero, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash())
err = fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash())
span.SetStatus(codes.Error, err.Error())
return zero, err
}
span.SetStatus(codes.Ok, "")
return headers[0], nil
}

Expand Down
8 changes: 4 additions & 4 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

var (
tracer = otel.Tracer("header/server")
tracerServ = otel.Tracer("header/server")
)

// ExchangeServer represents the server-side component for
Expand Down Expand Up @@ -173,7 +173,7 @@ func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) {
log.Debugw("server: handling header request", "hash", header.Hash(hash).String())
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
defer cancel()
ctx, span := tracer.Start(ctx, "request-by-hash", trace.WithAttributes(
ctx, span := tracerServ.Start(ctx, "request-by-hash", trace.WithAttributes(
attribute.String("hash", header.Hash(hash).String()),
))
defer span.End()
Expand Down Expand Up @@ -204,7 +204,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
}

startTime := time.Now()
ctx, span := tracer.Start(serv.ctx, "request-range", trace.WithAttributes(
ctx, span := tracerServ.Start(serv.ctx, "request-range", trace.WithAttributes(
attribute.Int64("from", int64(from)),
attribute.Int64("to", int64(to))))
defer span.End()
Expand Down Expand Up @@ -273,7 +273,7 @@ func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) {
log.Debug("server: handling head request")
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
defer cancel()
ctx, span := tracer.Start(ctx, "request-head")
ctx, span := tracerServ.Start(ctx, "request-head")
defer span.End()

head, err := serv.store.Head(ctx)
Expand Down
46 changes: 39 additions & 7 deletions p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ import (

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/go-header"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
)

var (
tracerSession = otel.Tracer("header/p2p-session")
)

// errEmptyResponse means that server side closes the connection without sending at least 1
// response.
var errEmptyResponse = errors.New("empty response")
Expand Down Expand Up @@ -77,9 +85,15 @@ func newSession[H header.Header[H]](
func (s *session[H]) getRangeByHeight(
ctx context.Context,
from, amount, headersPerPeer uint64,
) ([]H, error) {
) (_ []H, err error) {
log.Debugw("requesting headers", "from", from, "to", from+amount-1) // -1 need to exclude to+1 height

ctx, span := tracerSession.Start(ctx, "get-range-by-height", trace.WithAttributes(
attribute.Int64("from", int64(from)),
attribute.Int64("to", int64(from+amount-1)),
))
defer span.End()

requests := prepareRequests(from, amount, headersPerPeer)
result := make(chan []H, len(requests))
s.reqCh = make(chan *p2p_pb.HeaderRequest, len(requests))
Expand All @@ -94,8 +108,11 @@ LOOP:
for {
select {
case <-s.ctx.Done():
return nil, errors.New("header/p2p: exchange is closed")
err = errors.New("header/p2p: exchange is closed")
span.SetStatus(codes.Error, err.Error())
return nil, err
case <-ctx.Done():
span.SetStatus(codes.Error, ctx.Err().Error())
return nil, ctx.Err()
case res := <-result:
headers = append(headers, res...)
Expand All @@ -113,6 +130,7 @@ LOOP:
"from", headers[0].Height(),
"to", headers[len(headers)-1].Height(),
)
span.SetStatus(codes.Ok, "")
return headers, nil
}

Expand Down Expand Up @@ -152,19 +170,28 @@ func (s *session[H]) doRequest(
req *p2p_pb.HeaderRequest,
headers chan []H,
) {
ctx, span := tracerSession.Start(ctx, "request-headers-from-peer", trace.WithAttributes(
attribute.String("peerID", stat.peerID.String()),
attribute.Int64("from", int64(req.GetOrigin())),
attribute.Int64("amount", int64(req.Amount)),
))
defer span.End()

ctx, cancel := context.WithTimeout(ctx, s.requestTimeout)
defer cancel()

r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req)
s.metrics.response(ctx, size, duration, err)
if err != nil {
span.SetStatus(codes.Error, err.Error())
// we should not punish peer at this point and should try to parse responses, despite that error
// was received.
log.Debugw("requesting headers from peer failed", "peer", stat.peerID, "err", err)
}

h, err := s.processResponses(r)
if err != nil {
span.SetStatus(codes.Error, err.Error())
logFn := log.Errorw

switch err {
Expand Down Expand Up @@ -195,21 +222,26 @@ func (s *session[H]) doRequest(
"requestedAmount", req.Amount,
)

remainingHeaders := req.Amount - uint64(len(h))

span.SetStatus(codes.Ok, "")

// update peer stats
stat.updateStats(size, duration)

responseLn := uint64(len(h))
// ensure that we received the correct amount of headers.
if responseLn < req.Amount {
from := h[responseLn-1].Height()
amount := req.Amount - responseLn
if remainingHeaders > 0 {
span.AddEvent("remaining headers", trace.WithAttributes(
attribute.Int64("amount", int64(remainingHeaders))),
)

from := h[uint64(len(h))-1].Height()
select {
case <-s.ctx.Done():
return
// create a new request with the remaining headers.
// prepareRequests will return a slice with 1 element at this point
case s.reqCh <- prepareRequests(from+1, amount, req.Amount)[0]:
case s.reqCh <- prepareRequests(from+1, remainingHeaders, req.Amount)[0]:
log.Debugw("sending additional request to get remaining headers")
}
}
Expand Down
Loading