Skip to content

Commit

Permalink
Merge pull request #2146 from iotaledger/refactor/metrics
Browse files Browse the repository at this point in the history
Refactor metrics
  • Loading branch information
fijter authored Mar 21, 2023
2 parents 7b28f5f + 01095d9 commit 4f540cb
Show file tree
Hide file tree
Showing 45 changed files with 1,326 additions and 1,247 deletions.
28 changes: 2 additions & 26 deletions core/chains/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,6 @@ func initConfigPars(c *dig.Container) error {
}

func provide(c *dig.Container) error {
type metricsDeps struct {
dig.In

NodeConnection chain.NodeConnection
}

type metricsResult struct {
dig.Out

ChainMetrics *metrics.ChainMetrics
BlockWALMetrics *metrics.BlockWALMetrics
}

if err := c.Provide(func(deps metricsDeps) metricsResult {
return metricsResult{
ChainMetrics: metrics.NewChainMetrics(deps.NodeConnection.GetMetrics()),
BlockWALMetrics: metrics.NewBlockWALMetrics(),
}
}); err != nil {
CoreComponent.LogPanic(err)
}

type chainsDeps struct {
dig.In

Expand All @@ -100,8 +78,7 @@ func provide(c *dig.Container) error {
NodeIdentityProvider registry.NodeIdentityProvider
ConsensusStateRegistry cmtLog.ConsensusStateRegistry
ChainListener *publisher.Publisher
ChainMetrics *metrics.ChainMetrics
BlockWALMetrics *metrics.BlockWALMetrics
ChainMetricsProvider *metrics.ChainMetricsProvider
}

type chainsResult struct {
Expand Down Expand Up @@ -130,8 +107,7 @@ func provide(c *dig.Container) error {
deps.ConsensusStateRegistry,
deps.ChainListener,
shutdown.NewCoordinator("chains", CoreComponent.Logger().Named("Shutdown")),
deps.ChainMetrics,
deps.BlockWALMetrics,
deps.ChainMetricsProvider,
),
}
}); err != nil {
Expand Down
11 changes: 2 additions & 9 deletions core/nodeconn/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/iotaledger/inx-app/pkg/nodebridge"
"github.com/iotaledger/wasp/packages/chain"
"github.com/iotaledger/wasp/packages/daemon"
"github.com/iotaledger/wasp/packages/metrics/nodeconnmetrics"
"github.com/iotaledger/wasp/packages/nodeconn"
)

Expand Down Expand Up @@ -59,24 +58,18 @@ func provide(c *dig.Container) error {
CoreComponent.LogPanic(err)
}

if err := c.Provide(nodeconnmetrics.New); err != nil {
CoreComponent.LogPanic(err)
}

type nodeConnectionDeps struct {
dig.In

NodeBridge *nodebridge.NodeBridge
NodeConnectionMetrics nodeconnmetrics.NodeConnectionMetrics
ShutdownHandler *shutdown.ShutdownHandler
NodeBridge *nodebridge.NodeBridge
ShutdownHandler *shutdown.ShutdownHandler
}

if err := c.Provide(func(deps nodeConnectionDeps) chain.NodeConnection {
nodeConnection, err := nodeconn.New(
CoreComponent.Daemon().ContextStopped(),
CoreComponent.Logger().Named("nc"),
deps.NodeBridge,
deps.NodeConnectionMetrics,
deps.ShutdownHandler,
)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions packages/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/iotaledger/hive.go/logger"
iotago "github.com/iotaledger/iota.go/v3"
"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/metrics/nodeconnmetrics"
"github.com/iotaledger/wasp/packages/parameters"
"github.com/iotaledger/wasp/packages/peering"
"github.com/iotaledger/wasp/packages/state"
Expand All @@ -22,7 +21,6 @@ type NodeConnection interface {
ChainNodeConn
Run(ctx context.Context) error
WaitUntilInitiallySynced(context.Context) error
GetMetrics() nodeconnmetrics.NodeConnectionMetrics
GetBech32HRP() iotago.NetworkPrefix
GetL1Params() *parameters.L1Params
GetL1ProtocolParams() *iotago.ProtocolParameters
Expand Down
4 changes: 2 additions & 2 deletions packages/chain/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type mempoolImpl struct {
netPeerPubs map[gpa.NodeID]*cryptolib.PublicKey
net peering.NetworkProvider
log *logger.Logger
metrics metrics.IMempoolMetrics
metrics metrics.IChainMempoolMetrics
listener ChainListener
}

Expand Down Expand Up @@ -192,7 +192,7 @@ func New(
nodeIdentity *cryptolib.KeyPair,
net peering.NetworkProvider,
log *logger.Logger,
metrics metrics.IMempoolMetrics,
metrics metrics.IChainMempoolMetrics,
listener ChainListener,
) Mempool {
netPeeringID := peering.HashPeeringIDFromBytes(chainID.Bytes(), []byte("Mempool")) // ChainID × Mempool
Expand Down
4 changes: 2 additions & 2 deletions packages/chain/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ type MockMempoolMetrics struct {
processedRequestCounter int
}

func (m *MockMempoolMetrics) IncBlocksPerChain() {}
func (m *MockMempoolMetrics) IncRequestsReceived(req isc.Request) {
if req.IsOffLedger() {
m.offLedgerRequestCounter++
Expand All @@ -509,10 +510,9 @@ func (m *MockMempoolMetrics) IncRequestsProcessed() {
m.processedRequestCounter++
}

func (m *MockMempoolMetrics) IncRequestsAckMessages() {}
func (m *MockMempoolMetrics) SetRequestProcessingTime(_ time.Duration) {}

func (m *MockMempoolMetrics) IncBlocksPerChain() {}

////////////////////////////////////////////////////////////////////////////////

func getRequestsOnLedger(t *testing.T, chainAddress iotago.Address, amount int, f ...func(int, *isc.RequestParameters)) []isc.OnLedgerRequest {
Expand Down
25 changes: 15 additions & 10 deletions packages/chain/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/iotaledger/wasp/packages/gpa"
"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/metrics"
"github.com/iotaledger/wasp/packages/metrics/nodeconnmetrics"
"github.com/iotaledger/wasp/packages/origin"
"github.com/iotaledger/wasp/packages/peering"
"github.com/iotaledger/wasp/packages/registry"
Expand Down Expand Up @@ -78,8 +77,8 @@ type Chain interface {
// can query the nodes for blocks, etc. NOTE: servers = access⁻¹
ServersUpdated(serverNodes []*cryptolib.PublicKey)
// Metrics and the current descriptive state.
GetChainMetrics() metrics.IChainMetrics
GetConsensusPipeMetrics() ConsensusPipeMetrics // TODO: Review this.
GetNodeConnectionMetrics() nodeconnmetrics.NodeConnectionMetrics
GetConsensusWorkflowStatus() ConsensusWorkflowStatus
}

Expand Down Expand Up @@ -133,6 +132,8 @@ type ChainNodeConn interface {
recvRequestCB RequestOutputHandler,
recvAliasOutput AliasOutputHandler,
recvMilestone MilestoneHandler,
onChainConnect func(),
onChainDisconnect func(),
)
}

Expand Down Expand Up @@ -181,6 +182,7 @@ type chainNodeImpl struct {
netPeerPubs map[gpa.NodeID]*cryptolib.PublicKey
net peering.NetworkProvider
shutdownCoordinator *shutdown.Coordinator
chainMetrics metrics.IChainMetrics
log *logger.Logger
}

Expand Down Expand Up @@ -241,6 +243,7 @@ var _ Chain = &chainNodeImpl{}
//nolint:funlen
func New(
ctx context.Context,
log *logger.Logger,
chainID isc.ChainID,
chainStore state.Store,
nodeConn NodeConnection,
Expand All @@ -252,9 +255,10 @@ func New(
listener ChainListener,
accessNodesFromNode []*cryptolib.PublicKey,
net peering.NetworkProvider,
chainMetric metrics.IChainMetric,
chainMetrics metrics.IChainMetrics,
shutdownCoordinator *shutdown.Coordinator,
log *logger.Logger,
onChainConnect func(),
onChainDisconnect func(),
) (Chain, error) {
log.Debugf("Starting the chain, chainID=%v", chainID)
if listener == nil {
Expand Down Expand Up @@ -301,6 +305,7 @@ func New(
netPeerPubs: map[gpa.NodeID]*cryptolib.PublicKey{},
net: net,
shutdownCoordinator: shutdownCoordinator,
chainMetrics: chainMetrics,
log: log,
}
cni.tryRecoverStoreFromWAL(chainStore, blockWAL)
Expand Down Expand Up @@ -342,7 +347,7 @@ func New(
nodeIdentity,
net,
cni.log.Named("MP"),
chainMetric,
chainMetrics,
cni.listener,
)
cni.chainMgr = gpa.NewAckHandler(cni.me, chainMgr.AsGPA(), redeliveryPeriod)
Expand Down Expand Up @@ -394,7 +399,7 @@ func New(
log.Debugf("recvMilestoneCB[%p], %v", cni, timestamp)
recvMilestonePipeInCh <- timestamp
}
nodeConn.AttachChain(ctx, chainID, recvRequestCB, recvAliasOutputCB, recvMilestoneCB)
nodeConn.AttachChain(ctx, chainID, recvRequestCB, recvAliasOutputCB, recvMilestoneCB, onChainConnect, onChainDisconnect)
//
// Run the main thread.

Expand Down Expand Up @@ -1053,12 +1058,12 @@ func (cni *chainNodeImpl) GetCandidateNodes() []*governance.AccessNodeInfo {
return governance.NewStateAccess(state).GetCandidateNodes()
}

func (cni *chainNodeImpl) GetConsensusPipeMetrics() ConsensusPipeMetrics {
return &consensusPipeMetricsImpl{}
func (cni *chainNodeImpl) GetChainMetrics() metrics.IChainMetrics {
return cni.chainMetrics
}

func (cni *chainNodeImpl) GetNodeConnectionMetrics() nodeconnmetrics.NodeConnectionMetrics {
return cni.nodeConn.GetMetrics()
func (cni *chainNodeImpl) GetConsensusPipeMetrics() ConsensusPipeMetrics {
return &consensusPipeMetricsImpl{}
}

func (cni *chainNodeImpl) GetConsensusWorkflowStatus() ConsensusWorkflowStatus {
Expand Down
13 changes: 6 additions & 7 deletions packages/chain/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/kv/dict"
"github.com/iotaledger/wasp/packages/metrics"
"github.com/iotaledger/wasp/packages/metrics/nodeconnmetrics"
"github.com/iotaledger/wasp/packages/parameters"
"github.com/iotaledger/wasp/packages/peering"
"github.com/iotaledger/wasp/packages/registry"
Expand Down Expand Up @@ -325,6 +324,8 @@ func (tnc *testNodeConn) AttachChain(
recvRequestCB chain.RequestOutputHandler,
recvAliasOutput chain.AliasOutputHandler,
recvMilestone chain.MilestoneHandler,
onChainConnect func(),
onChainDisconnect func(),
) {
if !tnc.chainID.Empty() {
tnc.t.Error("duplicate attach")
Expand Down Expand Up @@ -360,10 +361,6 @@ func (tnc *testNodeConn) GetL1ProtocolParams() *iotago.ProtocolParameters {
return testparameters.GetL1ProtocolParamsForTesting()
}

func (tnc *testNodeConn) GetMetrics() nodeconnmetrics.NodeConnectionMetrics {
panic("should be unused in test")
}

////////////////////////////////////////////////////////////////////////////////
// testEnv

Expand Down Expand Up @@ -436,6 +433,7 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv {
log := te.log.Named(fmt.Sprintf("N#%v", i))
te.nodes[i], err = chain.New(
te.ctx,
log,
te.chainID,
state.NewStore(mapdb.NewMapDB()),
te.nodeConns[i],
Expand All @@ -447,9 +445,10 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv {
chain.NewEmptyChainListener(),
[]*cryptolib.PublicKey{}, // Access nodes.
te.networkProviders[i],
metrics.NewEmptyChainMetric(),
metrics.NewEmptyChainMetrics(),
shutdown.NewCoordinator("test", log),
log,
nil,
nil,
)
require.NoError(t, err)
te.nodes[i].ServersUpdated(te.peerPubKeys)
Expand Down
22 changes: 11 additions & 11 deletions packages/chain/statemanager/smGPA/smGPAUtils/block_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ import (
type blockWAL struct {
*logger.WrappedLogger

dir string
blockWALMetrics metrics.IBlockWALMetric
dir string
metrics metrics.IChainBlockWALMetrics
}

const constFileSuffix = ".blk"

func NewBlockWAL(log *logger.Logger, baseDir string, chainID isc.ChainID, blockWALMetrics metrics.IBlockWALMetric) (BlockWAL, error) {
func NewBlockWAL(log *logger.Logger, baseDir string, chainID isc.ChainID, metrics metrics.IChainBlockWALMetrics) (BlockWAL, error) {
dir := filepath.Join(baseDir, chainID.String())
if err := ioutils.CreateDirectory(dir, 0o777); err != nil {
return nil, fmt.Errorf("BlockWAL cannot create folder %v: %w", dir, err)
}

result := &blockWAL{
WrappedLogger: logger.NewWrappedLogger(log),
dir: dir,
blockWALMetrics: blockWALMetrics,
WrappedLogger: logger.NewWrappedLogger(log),
dir: dir,
metrics: metrics,
}
result.LogDebugf("BlockWAL created in folder %v", dir)
return result, nil
Expand All @@ -49,21 +49,21 @@ func (bwT *blockWAL) Write(block state.Block) error {
bwT.LogDebugf("Writing block %s to wal; file name - %s", commitment, fileName)
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o666)
if err != nil {
bwT.blockWALMetrics.IncFailedWrites()
bwT.metrics.IncFailedWrites()
return fmt.Errorf("openning file %s for writing failed: %w", fileName, err)
}
defer f.Close()
blockBytes := block.Bytes()
n, err := f.Write(blockBytes)
if err != nil {
bwT.blockWALMetrics.IncFailedReads()
bwT.metrics.IncFailedReads()
return fmt.Errorf("writing block data to file %s failed: %w", fileName, err)
}
if len(blockBytes) != n {
bwT.blockWALMetrics.IncFailedReads()
bwT.metrics.IncFailedReads()
return fmt.Errorf("only %v of total %v bytes of block were written to file %s", n, len(blockBytes), fileName)
}
bwT.blockWALMetrics.IncSegments()
bwT.metrics.IncSegments()
return nil
}

Expand All @@ -77,7 +77,7 @@ func (bwT *blockWAL) Read(blockHash state.BlockHash) (state.Block, error) {
filePath := filepath.Join(bwT.dir, fileName)
block, err := blockFromFilePath(filePath)
if err != nil {
bwT.blockWALMetrics.IncFailedReads()
bwT.metrics.IncFailedReads()
return nil, err
}
return block, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (bwtsmT *blockWALTestSM) Init(t *rapid.T) {
bwtsmT.factory = NewBlockFactory(t)
bwtsmT.lastBlockCommitment = origin.L1Commitment(nil, 0)
bwtsmT.log = testlogger.NewLogger(t)
bwtsmT.bw, err = NewBlockWAL(bwtsmT.log, constTestFolder, bwtsmT.factory.GetChainID(), metrics.NewEmptyBlockWALMetric())
bwtsmT.bw, err = NewBlockWAL(bwtsmT.log, constTestFolder, bwtsmT.factory.GetChainID(), metrics.NewEmptyChainBlockWALMetrics())
require.NoError(t, err)
bwtsmT.blocks = make(map[state.BlockHash]state.Block)
bwtsmT.blocksMoved = make([]state.BlockHash, 0)
Expand Down Expand Up @@ -165,7 +165,7 @@ func (bwtsmT *blockWALTestSM) ReadDamagedBlock(t *rapid.T) {

func (bwtsmT *blockWALTestSM) Restart(t *rapid.T) {
var err error
bwtsmT.bw, err = NewBlockWAL(bwtsmT.log, constTestFolder, bwtsmT.factory.GetChainID(), metrics.NewEmptyBlockWALMetric())
bwtsmT.bw, err = NewBlockWAL(bwtsmT.log, constTestFolder, bwtsmT.factory.GetChainID(), metrics.NewEmptyChainBlockWALMetrics())
require.NoError(t, err)
t.Log("Block WAL restarted")
}
Expand Down
Loading

0 comments on commit 4f540cb

Please sign in to comment.