Skip to content

Commit

Permalink
Merge pull request #55 from onemorebsmith/disconnect_cleanup
Browse files Browse the repository at this point in the history
v1.1.5
  • Loading branch information
onemorebsmith authored Oct 7, 2022
2 parents 6b153bf + 397ee8d commit d1aedd7
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 40 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ kaspad.db
debug
debug.test
__debug_bin
.DS_Store

cmd/kaspabridge/kaspabridge
cmd/kaspabridge/bridge.
Expand Down
2 changes: 1 addition & 1 deletion make_release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CMD_PATH="../cmd/kaspabridge"
rm -rf release
mkdir -p release
cd release
VERSION=1.1.4
VERSION=1.1.5
ARCHIVE="ks_bridge-${VERSION}"
OUTFILE="ks_bridge"
OUTDIR="ks_bridge"
Expand Down
1 change: 1 addition & 0 deletions src/gostratum/default_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func HandleAuthorize(ctx *StratumContext, event JsonRpcEvent) error {
}
ctx.WalletAddr = address
ctx.WorkerName = workerName
ctx.Logger = ctx.Logger.With(zap.String("worker", ctx.WorkerName), zap.String("addr", ctx.WalletAddr))

if err := ctx.Reply(NewResponse(event, true, nil)); err != nil {
return errors.Wrap(err, "failed to send response to authorize")
Expand Down
5 changes: 1 addition & 4 deletions src/gostratum/stratum_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ import (
)

func spawnClientListener(ctx *StratumContext, connection net.Conn, s *StratumListener) error {
defer func() {
connection.Close()
s.disconnectChannel <- ctx
}()
defer ctx.Disconnect()

for {
err := readFromConnection(connection, func(line string) error {
Expand Down
49 changes: 41 additions & 8 deletions src/gostratum/stratum_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net"
"sync/atomic"
"time"

"github.com/pkg/errors"
Expand All @@ -23,6 +24,7 @@ type StratumContext struct {
disconnecting bool
onDisconnect chan *StratumContext
State any // gross, but go generics aren't mature enough this can be typed 😭
writeLock int32
}

var ErrorDisconnected = fmt.Errorf("disconnecting")
Expand All @@ -45,9 +47,7 @@ func (sc *StratumContext) Reply(response JsonRpcResponse) error {
return errors.Wrap(err, "failed encoding jsonrpc response")
}
encoded = append(encoded, '\n')
_, err = sc.connection.Write(encoded)
sc.checkDisconnect(err)
return err
return sc.writeWithBackoff(encoded)
}

func (sc *StratumContext) Send(event JsonRpcEvent) error {
Expand All @@ -59,9 +59,42 @@ func (sc *StratumContext) Send(event JsonRpcEvent) error {
return errors.Wrap(err, "failed encoding jsonrpc event")
}
encoded = append(encoded, '\n')
_, err = sc.connection.Write(encoded)
sc.checkDisconnect(err)
return err
return sc.writeWithBackoff(encoded)
}

var errWriteBlocked = fmt.Errorf("error writing to socket, previous write pending")

func (sc *StratumContext) write(data []byte) error {
if atomic.CompareAndSwapInt32(&sc.writeLock, 0, 1) {
defer atomic.StoreInt32(&sc.writeLock, 0)
deadline := time.Now().Add(5 * time.Second)
if err := sc.connection.SetWriteDeadline(deadline); err != nil {
return errors.Wrap(err, "failed setting write deadline for connection")
}
_, err := sc.connection.Write(data)
sc.checkDisconnect(err)
return err
}
return errWriteBlocked
}

func (sc *StratumContext) writeWithBackoff(data []byte) error {
for i := 0; i < 3; i++ {
err := sc.write(data)
if err == nil {
return nil
} else if err == errWriteBlocked {
time.Sleep(5 * time.Millisecond)
continue
} else {
return err
}
}
// this should virtually never happen on a 'healthy' connection. Writes
// to the socket are actually just writing to the outgoing buffer for the
// connection in the OS, if this blocks it's because the receiver has not
// read from the buffer for such a length of time that the tx buffer is full
return fmt.Errorf("failed writing to socket after 3 attempts")
}

func (sc *StratumContext) ReplyStaleShare(id any) error {
Expand Down Expand Up @@ -97,6 +130,7 @@ func (sc *StratumContext) ReplyLowDiffShare(id any) error {

func (sc *StratumContext) Disconnect() {
if !sc.disconnecting {
sc.Logger.Info("disconnecting")
sc.disconnecting = true
if sc.connection != nil {
sc.connection.Close()
Expand All @@ -107,8 +141,7 @@ func (sc *StratumContext) Disconnect() {

func (sc *StratumContext) checkDisconnect(err error) {
if err != nil { // actual error
sc.disconnecting = true
sc.onDisconnect <- sc
go sc.Disconnect() // potentially blocking, so async it
}
}

Expand Down
19 changes: 13 additions & 6 deletions src/kaspastratum/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kaspastratum
import (
"fmt"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -35,10 +36,9 @@ func newClientListener(logger *zap.SugaredLogger, shareHandler *shareHandler) *c
}

func (c *clientListener) OnConnect(ctx *gostratum.StratumContext) {
c.clientLock.Lock()
c.clientCounter++
idx := atomic.AddInt32(&c.clientCounter, 1)
ctx.Id = idx
c.clientLock.Lock()
c.clients[idx] = ctx
c.clientLock.Unlock()
ctx.Logger = ctx.Logger.With(zap.Int("client_id", int(ctx.Id)))
Expand All @@ -52,7 +52,9 @@ func (c *clientListener) OnConnect(ctx *gostratum.StratumContext) {
func (c *clientListener) OnDisconnect(ctx *gostratum.StratumContext) {
ctx.Done()
c.clientLock.Lock()
c.logger.Info("removing client ", ctx.Id)
delete(c.clients, ctx.Id)
c.logger.Info("removed client ", ctx.Id)
c.clientLock.Unlock()
RecordDisconnect(ctx)
}
Expand All @@ -77,8 +79,14 @@ func (c *clientListener) NewBlockAvailable(kapi *KaspaApi) {
}
template, err := kapi.GetBlockTemplate(client)
if err != nil {
RecordWorkerError(client.WalletAddr, ErrFailedBlockFetch)
client.Logger.Error(fmt.Sprintf("failed fetching new block template from kaspa: %s", err))
if strings.Contains(err.Error(), "Could not decode address") {
RecordWorkerError(client.WalletAddr, ErrInvalidAddressFmt)
client.Logger.Error(fmt.Sprintf("failed fetching new block template from kaspa, malformed address: %s", err))
client.Disconnect() // unrecoverable
} else {
RecordWorkerError(client.WalletAddr, ErrFailedBlockFetch)
client.Logger.Error(fmt.Sprintf("failed fetching new block template from kaspa: %s", err))
}
return
}
state.bigDiff = CalculateTarget(uint64(template.Block.Header.Bits))
Expand Down Expand Up @@ -125,8 +133,7 @@ func (c *clientListener) NewBlockAvailable(kapi *KaspaApi) {
return
}
RecordWorkerError(client.WalletAddr, ErrFailedSendWork)
client.Logger.Error(errors.Wrap(err, "failed sending work packet").Error(),
zap.Any("context", client))
client.Logger.Error(errors.Wrapf(err, "failed sending work packet %d", jobId).Error())
}

RecordNewJob(client)
Expand Down
15 changes: 8 additions & 7 deletions src/kaspastratum/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package kaspastratum
type ErrorShortCodeT string

const (
ErrNoMinerAddress ErrorShortCodeT = "err_no_miner_address"
ErrFailedBlockFetch ErrorShortCodeT = "err_failed_block_fetch"
ErrMissingJob ErrorShortCodeT = "err_missing_job"
ErrBadDataFromMiner ErrorShortCodeT = "err_bad_data_from_miner"
ErrFailedSendWork ErrorShortCodeT = "err_failed_sending_work"
ErrFailedSetDiff ErrorShortCodeT = "err_diff_set_failed"
ErrDisconnected ErrorShortCodeT = "err_worker_disconnected"
ErrNoMinerAddress ErrorShortCodeT = "err_no_miner_address"
ErrFailedBlockFetch ErrorShortCodeT = "err_failed_block_fetch"
ErrInvalidAddressFmt ErrorShortCodeT = "err_malformed_wallet_address"
ErrMissingJob ErrorShortCodeT = "err_missing_job"
ErrBadDataFromMiner ErrorShortCodeT = "err_bad_data_from_miner"
ErrFailedSendWork ErrorShortCodeT = "err_failed_sending_work"
ErrFailedSetDiff ErrorShortCodeT = "err_diff_set_failed"
ErrDisconnected ErrorShortCodeT = "err_worker_disconnected"
)
5 changes: 3 additions & 2 deletions src/kaspastratum/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var blockCounter = promauto.NewCounterVec(prometheus.CounterOpts{
var blockGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ks_mined_blocks_gauge",
Help: "Gauge containing 1 unique instance per block mined",
}, append(workerLabels, "nonce", "bluescore"))
}, append(workerLabels, "nonce", "bluescore", "hash"))

var disconnectCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "ks_worker_disconnect_counter",
Expand Down Expand Up @@ -109,11 +109,12 @@ func RecordWeakShare(worker *gostratum.StratumContext) {
invalidCounter.With(labels).Inc()
}

func RecordBlockFound(worker *gostratum.StratumContext, nonce, bluescore uint64) {
func RecordBlockFound(worker *gostratum.StratumContext, nonce, bluescore uint64, hash string) {
blockCounter.With(commonLabels(worker)).Inc()
labels := commonLabels(worker)
labels["nonce"] = fmt.Sprintf("%d", nonce)
labels["bluescore"] = fmt.Sprintf("%d", bluescore)
labels["hash"] = fmt.Sprintf("%d", bluescore)
blockGauge.With(labels).Set(1)
}

Expand Down
2 changes: 1 addition & 1 deletion src/kaspastratum/prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestPromValid(t *testing.T) {
RecordDupeShare(&ctx)
RecordInvalidShare(&ctx)
RecordWeakShare(&ctx)
RecordBlockFound(&ctx, 10000, 12345)
RecordBlockFound(&ctx, 10000, 12345, "abcdefg")
RecordDisconnect(&ctx)
RecordNewJob(&ctx)
RecordNetworkStats(1234, 5678, 910)
Expand Down
14 changes: 8 additions & 6 deletions src/kaspastratum/share_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing"
"github.com/kaspanet/kaspad/domain/consensus/utils/pow"
"github.com/kaspanet/kaspad/infrastructure/network/rpcclient"
"github.com/onemorebsmith/kaspastratum/src/gostratum"
Expand Down Expand Up @@ -207,13 +208,14 @@ func (sh *shareHandler) submit(ctx *gostratum.StratumContext,
block *externalapi.DomainBlock, nonce uint64, eventId any) error {
mutable := block.Header.ToMutable()
mutable.SetNonce(nonce)
_, err := sh.kaspa.SubmitBlock(&externalapi.DomainBlock{
block = &externalapi.DomainBlock{
Header: mutable.ToImmutable(),
Transactions: block.Transactions,
})
}
_, err := sh.kaspa.SubmitBlock(block)
blockhash := consensushashing.BlockHash(block)
// print after the submit to get it submitted faster
ctx.Logger.Info("submitted block to kaspad", ctx.String())
ctx.Logger.Info(fmt.Sprintf("Submitted nonce: %d", nonce))
ctx.Logger.Info(fmt.Sprintf("Submitted block %s", blockhash))

if err != nil {
// :'(
Expand All @@ -234,12 +236,12 @@ func (sh *shareHandler) submit(ctx *gostratum.StratumContext,
}

// :)
ctx.Logger.Info("block accepted")
ctx.Logger.Info(fmt.Sprintf("block accepted %s", blockhash))
stats := sh.getCreateStats(ctx)
stats.LastShare = time.Now()
atomic.AddInt64(&stats.SharesFound, 1)
atomic.AddInt64(&sh.overall.SharesFound, 1)
RecordBlockFound(ctx, block.Header.Nonce(), block.Header.BlueScore())
RecordBlockFound(ctx, block.Header.Nonce(), block.Header.BlueScore(), blockhash.String())
return ctx.Reply(gostratum.JsonRpcResponse{
Result: true,
})
Expand Down
11 changes: 6 additions & 5 deletions src/kaspastratum/stratum_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"go.uber.org/zap/zapcore"
)

const version = "v1.1"
const version = "v1.1.5"

type BridgeConfig struct {
StratumPort string `yaml:"stratum_port"`
Expand Down Expand Up @@ -73,7 +73,10 @@ func ListenAndServe(cfg BridgeConfig) error {
// override the submit handler with an actual useful handler
handlers[string(gostratum.StratumMethodSubmit)] =
func(ctx *gostratum.StratumContext, event gostratum.JsonRpcEvent) error {
return shareHandler.HandleSubmit(ctx, event)
if err := shareHandler.HandleSubmit(ctx, event); err != nil {
ctx.Logger.Error(err) // sink error
}
return nil
}

stratumConfig := gostratum.StratumListenerConfig{
Expand All @@ -94,7 +97,5 @@ func ListenAndServe(cfg BridgeConfig) error {
go shareHandler.startStatsThread()
}

server := gostratum.NewListener(stratumConfig)
server.Listen(context.Background())
return nil
return gostratum.NewListener(stratumConfig).Listen(context.Background())
}

0 comments on commit d1aedd7

Please sign in to comment.