Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE - testing candidate fixes to Python SDK #33452

15 changes: 15 additions & 0 deletions sdks/go/cmd/prism/prism.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"log"
"log/slog"
"net"
"os"
"strings"
"time"
Expand All @@ -37,6 +38,7 @@ import (
var (
jobPort = flag.Int("job_port", 8073, "specify the job management service port")
webPort = flag.Int("web_port", 8074, "specify the web ui port")
workerPoolPort = flag.Int("worker_pool_port", 8075, "specify the worker pool port")
jobManagerEndpoint = flag.String("jm_override", "", "set to only stand up a web ui that refers to a seperate JobManagement endpoint")
serveHTTP = flag.Bool("serve_http", true, "enable or disable the web ui")
idleShutdownTimeout = flag.Duration("idle_shutdown_timeout", -1, "duration that prism will wait for a new job before shutting itself down. Negative durations disable auto shutdown. Defaults to never shutting down.")
Expand Down Expand Up @@ -100,6 +102,7 @@ func main() {
Port: *jobPort,
IdleShutdownTimeout: *idleShutdownTimeout,
CancelFn: cancel,
WorkerPoolEndpoint: fmt.Sprintf("localhost:%d", *workerPoolPort),
},
*jobManagerEndpoint)
if err != nil {
Expand All @@ -110,6 +113,18 @@ func main() {
log.Fatalf("error creating web server: %v", err)
}
}
g := prism.CreateWorkerPoolServer(ctx)
addr := fmt.Sprintf(":%d", *workerPoolPort)
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("error creating worker pool server: %v", err)
}
slog.Info("Serving Worker Pool", "endpoint", fmt.Sprintf("localhost:%d", *workerPoolPort))
go g.Serve(lis)
go func() {
<-ctx.Done()
g.GracefulStop()
}()
// Block main thread forever to keep main from exiting.
<-ctx.Done()
}
Expand Down
19 changes: 19 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math/rand"
"net"
"os"
"strings"
"testing"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration/primitives"
Expand Down Expand Up @@ -80,6 +82,23 @@ func executeWithT(ctx context.Context, t testing.TB, p *beam.Pipeline) (beam.Pip
s1 := rand.NewSource(time.Now().UnixNano())
r1 := rand.New(s1)
*jobopts.JobName = fmt.Sprintf("%v-%v", strings.ToLower(t.Name()), r1.Intn(1000))
lis, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
_, port, _ := net.SplitHostPort(lis.Addr().String())
addr := "localhost:" + port
g := worker.NewMultiplexW()
t.Cleanup(g.Stop)
go g.Serve(lis)
s := jobservices.NewServer(0, internal.RunPipeline)
s.WorkerPoolEndpoint = addr
*jobopts.Endpoint = s.Endpoint()
go s.Serve()
t.Cleanup(func() {
*jobopts.Endpoint = ""
s.Stop()
})
return execute(ctx, p)
}

Expand Down
32 changes: 24 additions & 8 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -88,28 +91,41 @@ func RunPipeline(j *jobservices.Job) {

// makeWorker creates a worker for that environment.
func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
wk := worker.New(j.String()+"_"+env, env)
wk := worker.Pool.NewWorker(j.String()+"_"+env, env)

wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
wk.PipelineOptions = j.PipelineOptions()
wk.JobKey = j.JobKey()
wk.ArtifactEndpoint = j.ArtifactEndpoint()

go wk.Serve()
wk.WorkerPoolEndpoint = j.WorkerPoolEndpoint

if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
return nil, fmt.Errorf("failed to start environment %v for job %v: %w", env, j, err)
}
// Check for connection succeeding after we've created the environment successfully.
timeout := 1 * time.Minute
time.AfterFunc(timeout, func() {
if wk.Connected() || wk.Stopped() {
return
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
go func() {
<-ctx.Done()
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
err := fmt.Errorf("prism %v didn't get control connection to %v after %v", wk, j.WorkerPoolEndpoint, timeout)
j.Failed(err)
j.CancelFn(err)
}
err := fmt.Errorf("prism %v didn't get control connection to %v after %v", wk, wk.Endpoint(), timeout)
}()
conn, err := grpc.NewClient(j.WorkerPoolEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
j.Failed(err)
j.CancelFn(err)
})
}
health := healthpb.NewHealthClient(conn)
_, err = health.Check(ctx, &healthpb.HealthCheckRequest{})
if err != nil {
j.Failed(err)
j.CancelFn(err)
}

return wk, nil
}

Expand Down
12 changes: 12 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"math/rand"
"net"
"os"
"strings"
"testing"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
Expand All @@ -43,8 +45,18 @@ func TestMain(m *testing.M) {

func initRunner(t testing.TB) {
t.Helper()
lis, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
_, port, _ := net.SplitHostPort(lis.Addr().String())
addr := "localhost:" + port
g := worker.NewMultiplexW()
t.Cleanup(g.Stop)
go g.Serve(lis)
if *jobopts.Endpoint == "" {
s := jobservices.NewServer(0, internal.RunPipeline)
s.WorkerPoolEndpoint = addr
*jobopts.Endpoint = s.Endpoint()
go s.Serve()
t.Cleanup(func() {
Expand Down
3 changes: 2 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ type Job struct {
// Logger for this job.
Logger *slog.Logger

metrics metricsStore
metrics metricsStore
WorkerPoolEndpoint string
}

func (j *Job) ArtifactEndpoint() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *
cancelFn(err)
terminalOnceWrap()
},
Logger: s.logger, // TODO substitute with a configured logger.
artifactEndpoint: s.Endpoint(),
Logger: s.logger, // TODO substitute with a configured logger.
artifactEndpoint: s.Endpoint(),
WorkerPoolEndpoint: s.WorkerPoolEndpoint,
}
// Stop the idle timer when a new job appears.
if idleTimer := s.idleTimer.Load(); idleTimer != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type Server struct {
execute func(*Job)

// Artifact hack
artifacts map[string][]byte
artifacts map[string][]byte
WorkerPoolEndpoint string
}

// NewServer acquires the indicated port.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func TestBundle_ProcessOn(t *testing.T) {
wk := New("test", "testEnv")
wk := Pool.NewWorker("test", "testEnv")
b := &B{
InstID: "testInst",
PBDID: "testPBDID",
Expand Down
166 changes: 166 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package worker

import (
"context"
"fmt"
"log/slog"
"math"
"sync"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

var (
mu sync.Mutex

// Pool stores *W.
Pool = make(MapW)
)

// MapW manages the creation and querying of *W.
type MapW map[string]*W

func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) {
id, err := grpcx.ReadWorkerID(ctx)
if err != nil {
return nil, err
}
if id == "" {
return nil, fmt.Errorf("worker id in ctx metadata is an empty string")
}
mu.Lock()
defer mu.Unlock()
if w, ok := m[id]; ok {
return w, nil
}
return nil, fmt.Errorf("worker id: '%s' read from ctx but not registered in worker pool", id)
}

// NewWorker instantiates and registers new *W instances.
func (m MapW) NewWorker(id, env string) *W {
wk := &W{
ID: id,
Env: env,

InstReqs: make(chan *fnpb.InstructionRequest, 10),
DataReqs: make(chan *fnpb.Elements, 10),
StoppedChan: make(chan struct{}),

activeInstructions: make(map[string]controlResponder),
Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
}
mu.Lock()
defer mu.Unlock()
m[wk.ID] = wk
return wk
}

// NewMultiplexW instantiates a grpc.Server for multiplexing worker FnAPI requests.
func NewMultiplexW(opts ...grpc.ServerOption) *grpc.Server {
opts = append(opts, grpc.MaxSendMsgSize(math.MaxInt32))

g := grpc.NewServer(opts...)
wk := &MultiplexW{
logger: slog.Default(),
}

fnpb.RegisterBeamFnControlServer(g, wk)
fnpb.RegisterBeamFnDataServer(g, wk)
fnpb.RegisterBeamFnLoggingServer(g, wk)
fnpb.RegisterBeamFnStateServer(g, wk)
fnpb.RegisterProvisionServiceServer(g, wk)
healthpb.RegisterHealthServer(g, wk)

return g
}

// MultiplexW multiplexes FnAPI gRPC requests to *W stored in the Pool.
type MultiplexW struct {
fnpb.UnimplementedBeamFnControlServer
fnpb.UnimplementedBeamFnDataServer
fnpb.UnimplementedBeamFnStateServer
fnpb.UnimplementedBeamFnLoggingServer
fnpb.UnimplementedProvisionServiceServer
healthpb.UnimplementedHealthServer

logger *slog.Logger
}

func (mw *MultiplexW) Check(_ context.Context, _ *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
return &healthpb.HealthCheckResponse{Status: healthpb.HealthCheckResponse_SERVING}, nil
}

func (mw *MultiplexW) GetProvisionInfo(ctx context.Context, req *fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error) {
w, err := Pool.workerFromMetadataCtx(ctx)
if err != nil {
return nil, err
}
return w.GetProvisionInfo(ctx, req)
}

func (mw *MultiplexW) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
w, err := Pool.workerFromMetadataCtx(stream.Context())
if err != nil {
return err
}
return w.Logging(stream)
}

func (mw *MultiplexW) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) {
w, err := Pool.workerFromMetadataCtx(ctx)
if err != nil {
return nil, err
}
return w.GetProcessBundleDescriptor(ctx, req)
}

func (mw *MultiplexW) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
w, err := Pool.workerFromMetadataCtx(ctrl.Context())
if err != nil {
return err
}
return w.Control(ctrl)
}

func (mw *MultiplexW) Data(data fnpb.BeamFnData_DataServer) error {
w, err := Pool.workerFromMetadataCtx(data.Context())
if err != nil {
return err
}
return w.Data(data)
}

func (mw *MultiplexW) State(state fnpb.BeamFnState_StateServer) error {
w, err := Pool.workerFromMetadataCtx(state.Context())
if err != nil {
return err
}
return w.State(state)
}

func (mw *MultiplexW) MonitoringMetadata(ctx context.Context, unknownIDs []string) *fnpb.MonitoringInfosMetadataResponse {
w, err := Pool.workerFromMetadataCtx(ctx)
if err != nil {
mw.logger.Error(err.Error())
return nil
}
return w.MonitoringMetadata(ctx, unknownIDs)
}
Loading
Loading