Skip to content

Commit

Permalink
feat: add retry stream client
Browse files Browse the repository at this point in the history
  • Loading branch information
bakkelim committed Sep 19, 2023
1 parent 4278189 commit edc50db
Show file tree
Hide file tree
Showing 5 changed files with 474 additions and 9 deletions.
20 changes: 17 additions & 3 deletions examples/ingest/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"os"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
homedir "github.com/mitchellh/go-homedir"
Expand All @@ -32,9 +33,10 @@ import (
)

var (
cfgFile string
client *ingest.Client
jsonp = protojson.MarshalOptions{
cfgFile string
client *ingest.Client
retryClient *ingest.RetryClient
jsonp = protojson.MarshalOptions{
Multiline: true,
EmitUnpopulated: true,
}
Expand Down Expand Up @@ -110,6 +112,18 @@ func initConfig() {
if err != nil {
er(fmt.Sprintf("failed to create IndyKite Ingest Client: %v", err))
}
retryClient, err = ingest.NewRetryClient(context.Background(),
&ingest.RetryPolicy{
MaxAttempts: 4,
InitialBackoff: "1s", // proto duration format according to spec: https://protobuf.dev/programming-guides/proto3/#json
BackoffMultiplier: 2,
CtxTimeout: 120 * time.Second,
},
grpc.WithCredentialsLoader(apicfg.DefaultEnvironmentLoader),
)
if err != nil {
er(fmt.Sprintf("failed to create IndyKite Ingest RetryClient: %v", err))
}
}

func er(msg interface{}) {
Expand Down
128 changes: 128 additions & 0 deletions examples/ingest/cmd/stream_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) 2023 IndyKite
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"context"
"log"
"time"

"github.com/spf13/cobra"

ingestpb "github.com/indykite/indykite-sdk-go/gen/indykite/ingest/v1beta2"
objects "github.com/indykite/indykite-sdk-go/gen/indykite/objects/v1beta1"
)

// streamRetryCmd represents the command for streaming records with retry on disconnect
var streamRetryCmd = &cobra.Command{
Use: "stream_retry",
Short: "Stream multiple records to the IndyKite Ingest API with retry on disconnect",
Long: `Stream multiple records to the IndyKite Ingest API with retry on disconnect`,
Args: cobra.ExactArgs(0),
Run: func(cmd *cobra.Command, args []string) {
record1 := &ingestpb.Record{
Id: "1",
Operation: &ingestpb.Record_Upsert{
Upsert: &ingestpb.UpsertData{
Data: &ingestpb.UpsertData_Relation{
Relation: &ingestpb.Relation{
Match: &ingestpb.RelationMatch{
SourceMatch: &ingestpb.NodeMatch{
ExternalId: "0000",
Type: "Employee",
},
TargetMatch: &ingestpb.NodeMatch{
ExternalId: "0001",
Type: "Truck",
},
Type: "SERVICES",
},
Properties: []*ingestpb.Property{
{
Key: "since",
Value: &objects.Value{
Value: &objects.Value_StringValue{
StringValue: "production",
},
},
},
},
},
},
},
},
}

record2 := &ingestpb.Record{
Id: "2",
Operation: &ingestpb.Record_Upsert{
Upsert: &ingestpb.UpsertData{
Data: &ingestpb.UpsertData_Node{
Node: &ingestpb.Node{
Type: &ingestpb.Node_Resource{
Resource: &ingestpb.Resource{
ExternalId: "0001",
Type: "Truck",
Properties: []*ingestpb.Property{
{
Key: "vin",
Value: &objects.Value{
Value: &objects.Value_IntegerValue{
IntegerValue: 1234,
},
},
},
},
},
},
},
},
},
},
}

records := []*ingestpb.Record{
record1, record2,
}

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

err := retryClient.OpenStreamClient(ctx)
if err != nil {
log.Fatalf("failed to open ingest stream %v", err)
}

for _, record := range records {
err := retryClient.SendRecord(record)
if err != nil {
log.Fatalf("failed to send record %v", err)
}
resp, err := retryClient.ReceiveResponse()
if err != nil {
log.Fatalf("failed to receive response %v", err)
}
log.Println(jsonp.Format(resp))
}
err = client.Close()
if err != nil {
log.Fatalf("failed to close ingest stream %v", err)
}
},
}

func init() {
rootCmd.AddCommand(streamRetryCmd)
}
157 changes: 157 additions & 0 deletions ingest/ingest_retry_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (c) 2023 IndyKite
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ingest

import (
"context"
"fmt"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/indykite/indykite-sdk-go/errors"
ingestpb "github.com/indykite/indykite-sdk-go/gen/indykite/ingest/v1beta2"
api "github.com/indykite/indykite-sdk-go/grpc"
)

type RetryPolicy struct {
InitialBackoff string
CtxTimeout time.Duration
MaxAttempts int
BackoffMultiplier int
}

type RetryClient struct {
*Client
retryPolicy *RetryPolicy
isUnableToReconnect bool
}

// NewRetryClient creates a new test Ingest API gRPC Client with retry functionality.
func NewRetryClient(ctx context.Context, retryPolicy *RetryPolicy, opts ...api.ClientOption) (*RetryClient, error) {
retryClientOpts := defaultRetryClientOptions()
client, err := NewClient(ctx, append(retryClientOpts, opts...)...)
if err != nil {
return nil, err
}
c := &RetryClient{
Client: client,
retryPolicy: retryPolicy,
}

if _, err = time.ParseDuration(c.retryPolicy.InitialBackoff); err != nil {
return nil, errors.New(codes.FailedPrecondition, fmt.Sprintf("invalid backoff duration: %v", err))
}
return c, nil
}

// NewTestRetryClient creates a new Ingest API gRPC Client with retry functionality.
func NewTestRetryClient(client ingestpb.IngestAPIClient, retryPolicy *RetryPolicy) (*RetryClient, error) {
c, err := NewTestClient(client)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
rc := &RetryClient{
Client: c,
retryPolicy: retryPolicy,
}

if _, err = time.ParseDuration(rc.retryPolicy.InitialBackoff); err != nil {
return nil, errors.New(codes.FailedPrecondition, fmt.Sprintf("invalid backoff duration: %v", err))
}
return rc, nil
}

// SendRecord sends a record on the opened stream and retries if it fails.
func (c *RetryClient) SendRecord(record *ingestpb.Record) error {
if c.stream == nil {
return errors.New(codes.FailedPrecondition, "a stream must be opened first")
}

err := c.Client.SendRecord(record)
if err == nil {
return nil
}
return c.sendRecordWithRetry(record)
}

// ReceiveResponse returns the next response available on the stream. If an error is returned (stream is closed),
// the method will wait for a server reconnect or fail.
func (c *RetryClient) ReceiveResponse() (*ingestpb.StreamRecordsResponse, error) {
if c.stream == nil {
return nil, errors.New(codes.FailedPrecondition, "a stream must be opened first")
}

resp, err := c.Client.ReceiveResponse()
if err == nil {
return resp, nil
}
return c.receiveResponseWhenReconnected()
}

// receiveResponseWhenReconnected tries to read the next response available on the stream.
// Will return error if unable to reconnect.
func (c *RetryClient) receiveResponseWhenReconnected() (*ingestpb.StreamRecordsResponse, error) {
for {
if c.isUnableToReconnect {
return nil, errors.New(codes.Unavailable, "unable to reconnect to server")
}
time.Sleep(1 * time.Second)
resp, err := c.Client.ReceiveResponse()
if err == nil {
return resp, nil
}
}
}

// sendRecordWithRetry creates a new stream and tries sends a record. Will retry based on retry policy.
func (c *RetryClient) sendRecordWithRetry(record *ingestpb.Record) error {
backoffTime, _ := time.ParseDuration(c.retryPolicy.InitialBackoff)
var err error
for i := 1; i <= c.retryPolicy.MaxAttempts; i++ {
log.Printf("attempting to reconnect (%d/%d) in %s...", i, c.retryPolicy.MaxAttempts, backoffTime.String())
time.Sleep(backoffTime)
backoffTime *= time.Duration(c.retryPolicy.BackoffMultiplier)
ctx, cancelCtx := context.WithTimeout(context.Background(), c.retryPolicy.CtxTimeout)
defer cancelCtx()
err = c.OpenStreamClient(ctx)
if err != nil {
continue
}
time.Sleep(500 * time.Millisecond)
err = c.Client.SendRecord(record)
if err == nil {
log.Printf("restablished connection to server")
return nil
}
}
log.Printf("unable to reconnect, closing client")
c.isUnableToReconnect = true
if closeErr := c.Close(); closeErr != nil {
err = closeErr
}
return err
}

func defaultRetryClientOptions() []api.ClientOption {
return []api.ClientOption{
api.WithGRPCDialOption(grpc.WithDisableRetry()),
}
}
Loading

0 comments on commit edc50db

Please sign in to comment.