-
Notifications
You must be signed in to change notification settings - Fork 0
/
ledgermetadatareader.go
89 lines (78 loc) · 2.68 KB
/
ledgermetadatareader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package main
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/cdp"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)
type LedgerMetadataReader struct {
processors []Processor
historyArchiveURLs []string
dataStoreConfig datastore.DataStoreConfig
startLedger uint32
endLedger uint32
}
func NewLedgerMetadataReader(config *datastore.DataStoreConfig,
historyArchiveUrls []string,
processors []Processor, startLedger uint32, endLedger uint32) (*LedgerMetadataReader, error) {
if config == nil {
return nil, errors.New("missing configuration")
}
return &LedgerMetadataReader{
processors: processors,
dataStoreConfig: *config,
historyArchiveURLs: historyArchiveUrls,
startLedger: startLedger,
endLedger: endLedger,
}, nil
}
func (a *LedgerMetadataReader) Run(ctx context.Context) error {
historyArchive, err := historyarchive.NewArchivePool(a.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "cdp-hackathon-validators",
Context: ctx,
},
})
if err != nil {
return errors.Wrap(err, "error creating history archive client")
}
latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()
if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}
// Determine the actual ledger range to process
var ledgerRange ledgerbackend.Range
// If no start ledger specified, start from the latest ledger
if a.startLedger == 0 {
a.startLedger = latestNetworkLedger
}
// If no end ledger specified, or it's greater than the latest ledger,
// use an unbounded range from the start ledger
if a.endLedger == 0 || a.endLedger > latestNetworkLedger {
fmt.Printf("Starting at ledger %v ...\n", latestNetworkLedger)
ledgerRange = ledgerbackend.UnboundedRange(a.startLedger)
} else {
fmt.Printf("Processing ledgers from %d to %d\n", a.startLedger, a.endLedger)
ledgerRange = ledgerbackend.BoundedRange(a.startLedger, a.endLedger)
}
pubConfig := cdp.PublisherConfig{
DataStoreConfig: a.dataStoreConfig,
BufferedStorageConfig: cdp.DefaultBufferedStorageBackendConfig(a.dataStoreConfig.Schema.LedgersPerFile),
}
pubConfig.BufferedStorageConfig.RetryLimit = 20
pubConfig.BufferedStorageConfig.RetryWait = 3
return cdp.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
func(lcm xdr.LedgerCloseMeta) error {
for _, processor := range a.processors {
if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
return err
}
}
return nil
})
}