Skip to content

Commit

Permalink
The initial work on supporting multiple concurrent exporters was
Browse files Browse the repository at this point in the history
unfinished in the sense that exporter responses (which are abstracted as
a map of key/value pairs) are combined into a single map at the end of
the export: https://github.com/moby/buildkit/blob/55a7483b0564a7ad5b2ce5e62512789dce327bca/solver/llbsolver/solver.go#L808-L809

In order to provide the correct exporter response, each response
(currently at least each container image exporter) needs to be dedicated
to the exporter instance.

To achieve this, assign and propagate exporter IDs from the client and
have corresponding exporters annotate their responses with the
respective ID so the client can order outputs per exporter instance.

Fixes moby#5556.
  • Loading branch information
a-palchikov committed Dec 11, 2024
1 parent 7d7a919 commit c481eaf
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 57 deletions.
63 changes: 47 additions & 16 deletions client/solve.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"maps"
"os"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -130,7 +131,8 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
return nil, err
}

storesToUpdate := []string{}
// maps image exporter id -> store path
storesToUpdate := make(map[string]ociStore)

if !opt.SessionPreInitialized {
if len(syncedDirs) > 0 {
Expand Down Expand Up @@ -195,7 +197,7 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
return nil, err
}
contentStores["export"] = cs
storesToUpdate = append(storesToUpdate, ex.OutputDir)
storesToUpdate[strconv.Itoa(exID)] = ociStore{path: ex.OutputDir}
default:
syncTargets = append(syncTargets, filesync.WithFSSyncDir(exID, ex.OutputDir))
}
Expand Down Expand Up @@ -260,6 +262,8 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
exportDeprecated = exp.Type
exportAttrDeprecated = exp.Attrs
}
// FIXME(dima): make this a dedicated attribute on the Exporter
exp.Attrs[exptypes.ClientKeyID] = strconv.Itoa(i)
exports = append(exports, &controlapi.Exporter{
Type: exp.Type,
Attrs: exp.Attrs,
Expand Down Expand Up @@ -350,29 +354,56 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
}
}
}
if manifestDescDt := res.ExporterResponse[exptypes.ExporterImageDescriptorKey]; manifestDescDt != "" {
manifestDescDt, err := base64.StdEncoding.DecodeString(manifestDescDt)

if len(storesToUpdate) == 0 {
return res, nil
}
for id, store := range storesToUpdate {
manifestDesc, err := getManifestDescriptor(id, res.ExporterResponse)
if err != nil {
return nil, err
}
var manifestDesc ocispecs.Descriptor
if err = json.Unmarshal([]byte(manifestDescDt), &manifestDesc); err != nil {
return nil, err
if manifestDesc == nil {
continue
}
for _, storePath := range storesToUpdate {
tag := "latest"
if t, ok := res.ExporterResponse["image.name"]; ok {
tag = t
}
idx := ociindex.NewStoreIndex(storePath)
if err := idx.Put(tag, manifestDesc); err != nil {
return nil, err
}
tag := "latest"
if t, ok := res.ExporterResponse["image.name"]; ok {
tag = t
}
idx := ociindex.NewStoreIndex(store.path)
if err := idx.Put(tag, *manifestDesc); err != nil {
return nil, err
}
}
return res, nil
}

func getManifestDescriptor(exporterID string, resp map[string]string) (*ocispecs.Descriptor, error) {
if manifestDescDt := resp[exptypes.FormatImageDescriptorKey(exporterID)]; manifestDescDt != "" {
return unmarshalManifestDescriptor(manifestDescDt)
}
if manifestDescDt := resp[exptypes.ExporterImageDescriptorKey]; manifestDescDt != "" {
return unmarshalManifestDescriptor(manifestDescDt)
}
return nil, nil
}

func unmarshalManifestDescriptor(manifestDesc string) (*ocispecs.Descriptor, error) {
manifestDescDt, err := base64.StdEncoding.DecodeString(manifestDesc)
if err != nil {
return nil, err
}
var desc ocispecs.Descriptor
if err = json.Unmarshal([]byte(manifestDescDt), &desc); err != nil {
return nil, err
}
return &desc, nil
}

type ociStore struct {
path string
}

func prepareSyncedFiles(def *llb.Definition, localMounts map[string]fsutil.FS) (filesync.StaticDirSource, error) {
resetUIDAndGID := func(p string, st *fstypes.Stat) fsutil.MapResult {
st.Uid = 0
Expand Down
4 changes: 2 additions & 2 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,13 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
}

var expis []exporter.ExporterInstance
for i, ex := range req.Exporters {
for _, ex := range req.Exporters {
exp, err := w.Exporter(ex.Type, c.opt.SessionManager)
if err != nil {
return nil, err
}
bklog.G(ctx).Debugf("resolve exporter %s with %v", ex.Type, ex.Attrs)
expi, err := exp.Resolve(ctx, i, ex.Attrs)
expi, err := exp.Resolve(ctx, ex.Attrs)
if err != nil {
return nil, err
}
Expand Down
13 changes: 5 additions & 8 deletions exporter/containerimage/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ func New(opt Opt) (exporter.Exporter, error) {
return im, nil
}

func (e *imageExporter) Resolve(ctx context.Context, id int, opt map[string]string) (exporter.ExporterInstance, error) {
func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
i := &imageExporterInstance{
imageExporter: e,
id: id,
attrs: opt,
opts: ImageCommitOpts{
RefCfg: cacheconfig.RefConfig{
Expand All @@ -87,6 +86,8 @@ func (e *imageExporter) Resolve(ctx context.Context, id int, opt map[string]stri

for k, v := range opt {
switch exptypes.ImageExporterOptKey(k) {
case exptypes.ClientKeyID:
i.id = v
case exptypes.OptKeyPush:
if v == "" {
i.push = true
Expand Down Expand Up @@ -171,7 +172,7 @@ func (e *imageExporter) Resolve(ctx context.Context, id int, opt map[string]stri

type imageExporterInstance struct {
*imageExporter
id int
id string
attrs map[string]string

opts ImageCommitOpts
Expand All @@ -186,10 +187,6 @@ type imageExporterInstance struct {
meta map[string][]byte
}

func (e *imageExporterInstance) ID() int {
return e.id
}

func (e *imageExporterInstance) Name() string {
return "exporting to image"
}
Expand Down Expand Up @@ -357,7 +354,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
if err != nil {
return nil, nil, err
}
resp[exptypes.ExporterImageDescriptorKey] = base64.StdEncoding.EncodeToString(dtdesc)
resp[exptypes.FormatImageDescriptorKey(e.id)] = base64.StdEncoding.EncodeToString(dtdesc)

return resp, nil, nil
}
Expand Down
15 changes: 14 additions & 1 deletion exporter/containerimage/exptypes/keys.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package exptypes

import commonexptypes "github.com/moby/buildkit/exporter/exptypes"
import (
"fmt"

commonexptypes "github.com/moby/buildkit/exporter/exptypes"
)

type ImageExporterOptKey string

Expand Down Expand Up @@ -77,3 +81,12 @@ var (
// Value: bool <true|false>
OptKeyRewriteTimestamp ImageExporterOptKey = "rewrite-timestamp"
)

const (
// ClientKeyID optionally identifies the exporter
ClientKeyID = "__clientid"
)

func FormatImageDescriptorKey(id string) string {
return fmt.Sprint(ExporterImageDescriptorKey, "-", id)
}
3 changes: 1 addition & 2 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ type Source = result.Result[cache.ImmutableRef]
type Attestation = result.Attestation[cache.ImmutableRef]

type Exporter interface {
Resolve(context.Context, int, map[string]string) (ExporterInstance, error)
Resolve(context.Context, map[string]string) (ExporterInstance, error)
}

type ExporterInstance interface {
ID() int
Name() string
Config() *Config
Type() string
Expand Down
2 changes: 1 addition & 1 deletion exporter/exptypes/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const (
type ExporterOptKey string

// Options keys supported by all exporters.
var (
const (
// Clamp produced timestamps. For more information see the
// SOURCE_DATE_EPOCH specification.
// Value: int (number of seconds since Unix epoch)
Expand Down
13 changes: 6 additions & 7 deletions exporter/local/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ func New(opt Opt) (exporter.Exporter, error) {
return le, nil
}

func (e *localExporter) Resolve(ctx context.Context, id int, opt map[string]string) (exporter.ExporterInstance, error) {
func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
i := &localExporterInstance{
id: id,
attrs: opt,
localExporter: e,
}
Expand All @@ -47,21 +46,21 @@ func (e *localExporter) Resolve(ctx context.Context, id int, opt map[string]stri
return nil, err
}

if id, ok := opt[exptypes.ClientKeyID]; ok {
i.id = id
}

return i, nil
}

type localExporterInstance struct {
*localExporter
id int
id string
attrs map[string]string

opts CreateFSOpts
}

func (e *localExporterInstance) ID() int {
return e.id
}

func (e *localExporterInstance) Name() string {
return "exporting to client directory"
}
Expand Down
13 changes: 5 additions & 8 deletions exporter/oci/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ func New(opt Opt) (exporter.Exporter, error) {
return im, nil
}

func (e *imageExporter) Resolve(ctx context.Context, id int, opt map[string]string) (exporter.ExporterInstance, error) {
func (e *imageExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
i := &imageExporterInstance{
imageExporter: e,
id: id,
attrs: opt,
tar: true,
opts: containerimage.ImageCommitOpts{
Expand All @@ -81,6 +80,8 @@ func (e *imageExporter) Resolve(ctx context.Context, id int, opt map[string]stri

for k, v := range opt {
switch k {
case exptypes.ClientKeyID:
i.id = v
case keyTar:
if v == "" {
i.tar = true
Expand All @@ -103,18 +104,14 @@ func (e *imageExporter) Resolve(ctx context.Context, id int, opt map[string]stri

type imageExporterInstance struct {
*imageExporter
id int
id string
attrs map[string]string

opts containerimage.ImageCommitOpts
tar bool
meta map[string][]byte
}

func (e *imageExporterInstance) ID() int {
return e.id
}

func (e *imageExporterInstance) Name() string {
return fmt.Sprintf("exporting to %s image format", e.opt.Variant)
}
Expand Down Expand Up @@ -192,7 +189,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
if err != nil {
return nil, nil, err
}
resp[exptypes.ExporterImageDescriptorKey] = base64.StdEncoding.EncodeToString(dtdesc)
resp[exptypes.FormatImageDescriptorKey(e.id)] = base64.StdEncoding.EncodeToString(dtdesc)

if n, ok := src.Metadata["image.name"]; e.opts.ImageName == "*" && ok {
e.opts.ImageName = string(n)
Expand Down
13 changes: 5 additions & 8 deletions exporter/tar/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,30 @@ func New(opt Opt) (exporter.Exporter, error) {
return le, nil
}

func (e *localExporter) Resolve(ctx context.Context, id int, opt map[string]string) (exporter.ExporterInstance, error) {
func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
li := &localExporterInstance{
localExporter: e,
id: id,
attrs: opt,
}
_, err := li.opts.Load(opt)
if err != nil {
return nil, err
}
_ = opt
if id, ok := opt[exptypes.ClientKeyID]; ok {
li.id = id
}

return li, nil
}

type localExporterInstance struct {
*localExporter
id int
id string
attrs map[string]string

opts local.CreateFSOpts
}

func (e *localExporterInstance) ID() int {
return e.id
}

func (e *localExporterInstance) Name() string {
return "exporting to client tarball"
}
Expand Down
8 changes: 4 additions & 4 deletions session/filesync/filesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (sp *fsSyncAttachable) DiffCopy(stream FileSend_DiffCopyServer) (err error)
return writeTargetFile(stream, wc)
}

func CopyToCaller(ctx context.Context, fs fsutil.FS, id int, c session.Caller, progress func(int, bool)) error {
func CopyToCaller(ctx context.Context, fs fsutil.FS, id string, c session.Caller, progress func(int, bool)) error {
method := session.MethodURL(FileSend_ServiceDesc.ServiceName, "diffcopy")
if !c.Supports(method) {
return errors.Errorf("method %s not supported by the client", method)
Expand All @@ -355,7 +355,7 @@ func CopyToCaller(ctx context.Context, fs fsutil.FS, id int, c session.Caller, p
if existingVal, ok := opts[keyExporterID]; ok {
bklog.G(ctx).Warnf("overwriting grpc metadata key %q from value %+v to %+v", keyExporterID, existingVal, id)
}
opts[keyExporterID] = []string{fmt.Sprint(id)}
opts[keyExporterID] = []string{id}
ctx = metadata.NewOutgoingContext(ctx, opts)

cc, err := client.DiffCopy(ctx)
Expand All @@ -366,7 +366,7 @@ func CopyToCaller(ctx context.Context, fs fsutil.FS, id int, c session.Caller, p
return sendDiffCopy(cc, fs, progress)
}

func CopyFileWriter(ctx context.Context, md map[string]string, id int, c session.Caller) (io.WriteCloser, error) {
func CopyFileWriter(ctx context.Context, md map[string]string, id string, c session.Caller) (io.WriteCloser, error) {
method := session.MethodURL(FileSend_ServiceDesc.ServiceName, "diffcopy")
if !c.Supports(method) {
return nil, errors.Errorf("method %s not supported by the client", method)
Expand All @@ -388,7 +388,7 @@ func CopyFileWriter(ctx context.Context, md map[string]string, id int, c session
if existingVal, ok := opts[keyExporterID]; ok {
bklog.G(ctx).Warnf("overwriting grpc metadata key %q from value %+v to %+v", keyExporterID, existingVal, id)
}
opts[keyExporterID] = []string{fmt.Sprint(id)}
opts[keyExporterID] = []string{id}
ctx = metadata.NewOutgoingContext(ctx, opts)

cc, err := client.DiffCopy(ctx)
Expand Down

0 comments on commit c481eaf

Please sign in to comment.