-
Notifications
You must be signed in to change notification settings - Fork 0
/
ariadne.go
165 lines (141 loc) · 4.23 KB
/
ariadne.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Package ariadne is a library for fetching blocks from cosmos based blockchain node.
package ariadne
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/tendermint/spm/cosmoscmd"
tt "github.com/tendermint/tendermint/proto/tendermint/types"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"github.com/Decentr-net/decentr/app"
)
// ErrTooHighBlockRequested returned when blockchain's height is less than requested.
var ErrTooHighBlockRequested = errors.New("too high block requested")
// Block presents transactions and height.
// If you need have more information open new issue on github or DIY and send pull request.
type Block struct {
Height uint64
Time time.Time
Txs []sdk.Tx
}
//go:generate mockgen -destination=./mock/ariadne_mock.go -package=mock -source=ariadne.go
// Fetcher interface for fetching.
type Fetcher interface {
// FetchBlocks starts fetching routine and runs handleFunc for every block.
FetchBlocks(ctx context.Context, from uint64, handleFunc func(b Block) error, opts ...FetchBlocksOption) error
// FetchBlock fetches block from blockchain.
// If height is zero then the highest block will be requested.
FetchBlock(ctx context.Context, height uint64) (*Block, error)
}
type fetcher struct {
c tmservice.ServiceClient
d sdk.TxDecoder
timeout time.Duration
}
// New returns new instance of fetcher.
func New(ctx context.Context, node string, timeout time.Duration) (Fetcher, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, node, grpc.WithInsecure())
if err != nil {
return nil, fmt.Errorf("failed to create grpc conn: %w", err)
}
return fetcher{
c: tmservice.NewServiceClient(conn),
d: cosmoscmd.MakeEncodingConfig(app.ModuleBasics).TxConfig.TxDecoder(),
timeout: timeout,
}, nil
}
// FetchBlocks starts fetching routine and runs handleFunc for every block.
func (f fetcher) FetchBlocks(ctx context.Context, from uint64, handleFunc func(b Block) error, opts ...FetchBlocksOption) error {
cfg := defaultFetchBlockOptions
for _, v := range opts {
v(&cfg)
}
height := uint64(1)
if from > 0 {
height = from
}
var (
b *Block
err error
)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if b == nil {
if b, err = f.FetchBlock(ctx, height); err != nil {
if errors.Is(err, ErrTooHighBlockRequested) {
time.Sleep(cfg.retryLastBlockInterval)
continue
}
cfg.errHandler(height, fmt.Errorf("failed to get block: %w", err))
time.Sleep(cfg.retryInterval)
continue
}
}
if err := handleFunc(*b); err != nil {
cfg.errHandler(b.Height, err)
if !cfg.skipError {
time.Sleep(cfg.retryInterval)
continue
}
}
b = nil
height++
}
}
}
// FetchBlock fetches block from blockchain.
// If height is zero then the highest block will be requested.
func (f fetcher) FetchBlock(ctx context.Context, height uint64) (*Block, error) {
ctx, cancel := context.WithTimeout(ctx, f.timeout)
defer cancel()
var block *tt.Block
if height == 0 {
res, err := f.c.GetLatestBlock(ctx, &tmservice.GetLatestBlockRequest{})
if err != nil {
return nil, fmt.Errorf("failed to get latest block: %w", err)
}
block = res.Block
} else {
res, err := f.c.GetBlockByHeight(ctx, &tmservice.GetBlockByHeightRequest{Height: int64(height)})
if err != nil {
if err, ok := status.FromError(err); ok {
if strings.Contains(err.Message(), "requested block height is bigger then the chain length") {
return nil, ErrTooHighBlockRequested
}
}
return nil, fmt.Errorf("failed to get block: %w", err)
}
block = res.Block
}
txs := make([]sdk.Tx, len(block.Data.Txs))
for i, v := range block.Data.Txs {
tx, err := f.d(v)
if err != nil {
return nil, fmt.Errorf("failed to decode tx: %w", err)
}
txs[i] = tx
}
return &Block{
Height: uint64(block.Header.Height),
Time: block.Header.Time,
Txs: txs,
}, nil
}
// Messages returns all messages in all transactions.
func (b Block) Messages() []sdk.Msg {
msgs := make([]sdk.Msg, 0, len(b.Txs))
for _, tx := range b.Txs {
msgs = append(msgs, tx.GetMsgs()...)
}
return msgs
}