Skip to content

Commit

Permalink
Merge pull request #5384 from spacemeshos/fetch-active-set-v1.2
Browse files Browse the repository at this point in the history
Extend proposal builder to allow the use of a fallback active set for v1.2.x
  • Loading branch information
fasmat authored Dec 21, 2023
2 parents c0c3aff + c22ac92 commit 5dca9df
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 69 deletions.
14 changes: 11 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@

See [RELEASE](./RELEASE.md) for workflow instructions.

## Release v1.2.13

### Improvements

* [#5384](https://github.com/spacemeshos/go-spacemesh/pull/5384) to improve network stability and performance allow the
active set to be set in advance for an epoch.

## Release v1.2.12

### Improvements

* [#5373](https://github.com/spacemeshos/go-spacemesh/pull/5373) automatic scaling of post verifying workers to a lower value (1 by default) when POST proving starts.
The workers are scaled up when POST proving finishes.
* [#5373](https://github.com/spacemeshos/go-spacemesh/pull/5373) automatic scaling of post verifying workers to a lower
value (1 by default) when POST proving starts. The workers are scaled up when POST proving finishes.

* [#5382](https://github.com/spacemeshos/go-spacemesh/pull/5382) avoid processing same (gossiped/fetched) ATX many times in parallel
* [#5382](https://github.com/spacemeshos/go-spacemesh/pull/5382) avoid processing same (gossiped/fetched) ATX many times
in parallel

## Release v1.2.11

Expand Down
8 changes: 3 additions & 5 deletions bootstrap/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
suffixLen = 2
SuffixBeacon = "bc"
SuffixActiveSet = "as"
SuffixBoostrap = "bs"
SuffixBootstrap = "bs"

httpTimeout = 5 * time.Second
notifyTimeout = time.Second
Expand Down Expand Up @@ -210,7 +210,7 @@ func (u *Updater) addUpdate(epoch types.EpochID, suffix string) {
u.mu.Lock()
defer u.mu.Unlock()
switch suffix {
case SuffixActiveSet, SuffixBeacon, SuffixBoostrap:
case SuffixActiveSet, SuffixBeacon, SuffixBootstrap:
default:
return
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func (u *Updater) DoIt(ctx context.Context) error {
}
}()
for _, epoch := range requiredEpochs(current) {
verified, cached, err := u.checkEpochUpdate(ctx, epoch, SuffixBoostrap)
verified, cached, err := u.checkEpochUpdate(ctx, epoch, SuffixBootstrap)
if err != nil {
return err
}
Expand Down Expand Up @@ -320,8 +320,6 @@ func (u *Updater) get(ctx context.Context, uri string) (*VerifiedUpdate, []byte,
return nil, nil, fmt.Errorf("scheme not supported %v", resource.Scheme)
}

ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()
t0 := time.Now()
data, err := query(ctx, u.client, resource)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions bootstrap/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestLoad(t *testing.T) {
{
desc: "recovery required",
persisted: map[types.EpochID][]string{
current - 2: {bootstrap.SuffixBoostrap, update1},
current - 2: {bootstrap.SuffixBootstrap, update1},
current - 1: {bootstrap.SuffixActiveSet, update2},
current: {bootstrap.SuffixBeacon, update3},
current + 1: {bootstrap.SuffixActiveSet, update4},
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestLoadedNotDownloadedAgain(t *testing.T) {
current + 1: update4,
}
for epoch, update := range persisted {
persisted := filepath.Join(cfg.DataDir, bootstrap.DirName, strconv.Itoa(int(epoch)), bootstrap.UpdateName(epoch, bootstrap.SuffixBoostrap))
persisted := filepath.Join(cfg.DataDir, bootstrap.DirName, strconv.Itoa(int(epoch)), bootstrap.UpdateName(epoch, bootstrap.SuffixBootstrap))
require.NoError(t, fs.MkdirAll(filepath.Dir(persisted), 0o700))
require.NoError(t, afero.WriteFile(fs, persisted, []byte(update), 0o400))
}
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestDoIt(t *testing.T) {
{
desc: "in order",
updates: map[string]string{
"/" + bootstrap.UpdateName(1, bootstrap.SuffixBoostrap): update1,
"/" + bootstrap.UpdateName(1, bootstrap.SuffixBootstrap): update1,
"/" + bootstrap.UpdateName(2, bootstrap.SuffixActiveSet): update2,
"/" + bootstrap.UpdateName(3, bootstrap.SuffixBeacon): update3,
"/" + bootstrap.UpdateName(4, bootstrap.SuffixActiveSet): update4,
Expand All @@ -311,7 +311,7 @@ func TestDoIt(t *testing.T) {
{
desc: "bootstrap trumps others",
updates: map[string]string{
"/" + bootstrap.UpdateName(3, bootstrap.SuffixBoostrap): update1,
"/" + bootstrap.UpdateName(3, bootstrap.SuffixBootstrap): update1,
"/" + bootstrap.UpdateName(3, bootstrap.SuffixActiveSet): update2,
"/" + bootstrap.UpdateName(3, bootstrap.SuffixBeacon): update3,
"/" + bootstrap.UpdateName(4, bootstrap.SuffixActiveSet): update4,
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestNoNewUpdate(t *testing.T) {
numQ := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodGet, r.Method)
if r.URL.String() != "/"+bootstrap.UpdateName(3, bootstrap.SuffixBoostrap) {
if r.URL.String() != "/"+bootstrap.UpdateName(3, bootstrap.SuffixBootstrap) {
w.WriteHeader(http.StatusNotFound)
return
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/bootstrapper/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var cmd = &cobra.Command{
Short: "generate bootstrapping data",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("epoch not specfiied")
return fmt.Errorf("epoch not specified")
}
var targetEpochs []types.EpochID
epochs := strings.Split(args[0], ",")
Expand Down
13 changes: 6 additions & 7 deletions cmd/bootstrapper/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (g *Generator) Generate(ctx context.Context, targetEpoch types.EpochID, gen
err error
)
if genBeacon && genActiveSet {
suffix = bootstrap.SuffixBoostrap
suffix = bootstrap.SuffixBootstrap
} else if genBeacon {
suffix = bootstrap.SuffixBeacon
} else if genActiveSet {
Expand Down Expand Up @@ -177,7 +177,7 @@ func queryBitcoin(ctx context.Context, client *http.Client, targetUrl string) (*
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("bootstrap read resonse: %w", err)
return nil, fmt.Errorf("bootstrap read response: %w", err)
}
var br BitcoinResponse
err = json.Unmarshal(data, &br)
Expand All @@ -200,7 +200,7 @@ func getActiveSet(ctx context.Context, endpoint string, epoch types.EpochID) ([]
if err != nil {
return nil, fmt.Errorf("epoch stream %v: %w", endpoint, err)
}
activeSet := make([]types.ATXID, 0, 10_000)
activeSet := make([]types.ATXID, 0, 300_000)
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
Expand All @@ -217,7 +217,7 @@ func getActiveSet(ctx context.Context, endpoint string, epoch types.EpochID) ([]
func (g *Generator) GenUpdate(epoch types.EpochID, beacon types.Beacon, activeSet []types.ATXID, suffix string) (string, error) {
as := make([]string, 0, len(activeSet))
for _, atx := range activeSet {
as = append(as, hex.EncodeToString(atx.Hash32().Bytes())) // no leading 0x
as = append(as, hex.EncodeToString(atx.Bytes())) // no leading 0x
}
var update bootstrap.Update
update.Version = SchemaVersion
Expand All @@ -233,19 +233,18 @@ func (g *Generator) GenUpdate(epoch types.EpochID, beacon types.Beacon, activeSe
}
data, err := json.Marshal(update)
if err != nil {
return "", fmt.Errorf("marshal data %v: %w", string(data), err)
return "", fmt.Errorf("marshal data: %w", err)
}
// make sure the data is valid
if err = bootstrap.ValidateSchema(data); err != nil {
return "", fmt.Errorf("invalid data %v: %w", string(data), err)
return "", fmt.Errorf("invalid data: %w", err)
}
filename := PersistedFilename(epoch, suffix)
err = afero.WriteFile(g.fs, filename, data, 0o600)
if err != nil {
return "", fmt.Errorf("persist epoch update %v: %w", filename, err)
}
g.logger.With().Info("generated update",
log.String("update", string(data)),
log.String("filename", filename),
)
return filename, nil
Expand Down
16 changes: 9 additions & 7 deletions cmd/bootstrapper/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,16 @@ func launchServer(tb testing.TB, cdb *datastore.CachedDB) func() {
}
}

func verifyUpdate(t *testing.T, data []byte, epoch types.EpochID, expBeacon string, expAsSize int) {
require.NoError(t, bootstrap.ValidateSchema(data))
func verifyUpdate(tb testing.TB, data []byte, epoch types.EpochID, expBeacon string, expAsSize int) {
tb.Helper()
require.NoError(tb, bootstrap.ValidateSchema(data))

var update bootstrap.Update
require.NoError(t, json.Unmarshal(data, &update))
require.Equal(t, SchemaVersion, update.Version)
require.EqualValues(t, epoch, update.Data.Epoch.ID)
require.Equal(t, expBeacon, update.Data.Epoch.Beacon)
require.Len(t, update.Data.Epoch.ActiveSet, expAsSize)
require.NoError(tb, json.Unmarshal(data, &update))
require.Equal(tb, SchemaVersion, update.Version)
require.Equal(tb, epoch.Uint32(), update.Data.Epoch.ID)
require.Equal(tb, expBeacon, update.Data.Epoch.Beacon)
require.Len(tb, update.Data.Epoch.ActiveSet, expAsSize)
}

func TestGenerator_Generate(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/bootstrapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (s *Server) GenBootstrap(ctx context.Context, epoch types.EpochID) error {
if err != nil {
return err
}
suffix := bootstrap.SuffixBoostrap
suffix := bootstrap.SuffixBootstrap
_, err = s.gen.GenUpdate(epoch, epochBeacon(epoch), actives, suffix)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/bootstrapper/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ func TestServer(t *testing.T) {

for _, epoch := range epochs {
createAtxs(t, db, epoch-1, types.RandomActiveSet(activeSetSize))
fname := PersistedFilename(epoch, bootstrap.SuffixBoostrap)
fname := PersistedFilename(epoch, bootstrap.SuffixBootstrap)
require.Eventually(t, func() bool {
_, err := fs.Stat(fname)
return err == nil
}, 5*time.Second, 100*time.Millisecond)
require.Empty(t, ch)

data := query(t, ctx, bootstrap.UpdateName(epoch, bootstrap.SuffixBoostrap))
data := query(t, ctx, bootstrap.UpdateName(epoch, bootstrap.SuffixBootstrap))
verifyUpdate(t, data, epoch, hex.EncodeToString(epochBeacon(epoch).Bytes()), activeSetSize)
require.NoError(t, fs.Remove(fname))
}
Expand Down
108 changes: 86 additions & 22 deletions miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"sort"
"sync"
"time"

"golang.org/x/exp/maps"
Expand Down Expand Up @@ -70,6 +71,11 @@ type ProposalBuilder struct {

signer *signing.EdSigner
session *session

fallback struct {
mu sync.Mutex
data map[types.EpochID][]types.ATXID
}
}

// session per every signing key for the whole epoch.
Expand Down Expand Up @@ -215,6 +221,12 @@ func New(
tortoise: trtl,
syncer: syncer,
conState: conState,
fallback: struct {
mu sync.Mutex
data map[types.EpochID][]types.ATXID
}{
data: map[types.EpochID][]types.ATXID{},
},
}
for _, opt := range opts {
opt(pb)
Expand Down Expand Up @@ -317,6 +329,20 @@ func (pb *ProposalBuilder) decideMeshHash(ctx context.Context, current types.Lay
return mesh
}

func (pb *ProposalBuilder) UpdateActiveSet(epoch types.EpochID, activeSet []types.ATXID) {
pb.logger.With().Info("received activeset update",
epoch,
log.Int("size", len(activeSet)),
)
pb.fallback.mu.Lock()
defer pb.fallback.mu.Unlock()
if _, ok := pb.fallback.data[epoch]; ok {
pb.logger.With().Debug("fallback active set already exists", epoch)
return
}
pb.fallback.data[epoch] = activeSet
}

func (pb *ProposalBuilder) initSessionData(ctx context.Context, lid types.LayerID) error {
if pb.session == nil || pb.session.epoch != lid.GetEpoch() {
pb.session = &session{epoch: lid.GetEpoch()}
Expand Down Expand Up @@ -362,29 +388,48 @@ func (pb *ProposalBuilder) initSessionData(ctx context.Context, lid types.LayerI
return fmt.Errorf("get refballot %w", err)
}
if errors.Is(err, sql.ErrNotFound) {
weight, set, err := generateActiveSet(
pb.logger,
pb.cdb,
pb.signer.VRFSigner(),
pb.session.epoch,
pb.clock.LayerToTime(pb.session.epoch.FirstLayer()),
pb.cfg.GoodAtxPercent,
pb.cfg.networkDelay,
pb.session.atx,
pb.session.atxWeight,
)
if err != nil {
return err
if weight, set, err := pb.fallbackActiveSet(pb.session.epoch); err == nil {
pb.logger.With().Info("using fallback active set",
pb.session.epoch,
log.Int("size", len(set)),
)
sort.Slice(set, func(i, j int) bool {
return bytes.Compare(set[i].Bytes(), set[j].Bytes()) < 0
})
pb.session.active.set = set
pb.session.active.weight = weight
pb.session.eligibilities.slots = proposals.MustGetNumEligibleSlots(
pb.session.atxWeight,
pb.cfg.minActiveSetWeight,
weight,
pb.cfg.layerSize,
pb.cfg.layersPerEpoch,
)
} else {
weight, set, err := generateActiveSet(
pb.logger,
pb.cdb,
pb.signer.VRFSigner(),
pb.session.epoch,
pb.clock.LayerToTime(pb.session.epoch.FirstLayer()),
pb.cfg.GoodAtxPercent,
pb.cfg.networkDelay,
pb.session.atx,
pb.session.atxWeight,
)
if err != nil {
return err
}
pb.session.active.set = set
pb.session.active.weight = weight
pb.session.eligibilities.slots = proposals.MustGetNumEligibleSlots(
pb.session.atxWeight,
pb.cfg.minActiveSetWeight,
weight,
pb.cfg.layerSize,
pb.cfg.layersPerEpoch,
)
}
pb.session.active.set = set
pb.session.active.weight = weight
pb.session.eligibilities.slots = proposals.MustGetNumEligibleSlots(
pb.session.atxWeight,
pb.cfg.minActiveSetWeight,
weight,
pb.cfg.layerSize,
pb.cfg.layersPerEpoch,
)
} else {
if ballot.EpochData == nil {
return fmt.Errorf("atx %d created invalid first ballot", pb.session.atx)
Expand Down Expand Up @@ -605,6 +650,25 @@ func activesFromFirstBlock(
return totalWeight, set, nil
}

func (pb *ProposalBuilder) fallbackActiveSet(targetEpoch types.EpochID) (uint64, []types.ATXID, error) {
pb.fallback.mu.Lock()
defer pb.fallback.mu.Unlock()
set, ok := pb.fallback.data[targetEpoch]
if !ok {
return 0, nil, fmt.Errorf("no fallback active set for epoch %d", targetEpoch)
}

var totalWeight uint64
for _, id := range set {
atx, err := pb.cdb.GetAtxHeader(id)
if err != nil {
return 0, nil, err
}
totalWeight += atx.GetWeight()
}
return totalWeight, set, nil
}

func generateActiveSet(
logger log.Log,
cdb *datastore.CachedDB,
Expand Down
Loading

0 comments on commit 5dca9df

Please sign in to comment.