Skip to content

Commit

Permalink
Implement "builds" concurrency (and more correct output while at it t…
Browse files Browse the repository at this point in the history
…hat doesn't require postprocessing)

This takes the current subset calculation from ~6m down to ~10s
  • Loading branch information
tianon committed Dec 7, 2023
1 parent c2bf4c7 commit 24775b8
Showing 1 changed file with 156 additions and 101 deletions.
257 changes: 156 additions & 101 deletions builds.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"os"
"os/exec"
"strings"
"sync"

c8derrdefs "github.com/containerd/containerd/errdefs"
"github.com/docker-library/bashbrew/registry"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus" // this is used by containerd libraries, so we need to set the default log level for it
)

var concurrency = 50

type MetaSource struct {
SourceID string `json:"sourceId"`
AllTags []string `json:"allTags"`
Expand Down Expand Up @@ -55,40 +58,47 @@ type MetaBuild struct {
}

var (
// keys are image/tag names
cacheResolve = map[string]*registry.ResolvedObject{}
// keys are image/index digests
cacheResolveArches = map[string]map[string][]registry.ResolvedObject{}
// keys are image/tag names, values are functions that return either cacheResolveType or error
cacheResolve = sync.Map{}
)

type cacheResolveType struct {
r *registry.ResolvedObject
arches map[string][]registry.ResolvedObject
}

func resolveRemoteArch(ctx context.Context, img string, arch string) (*RemoteResolvedFull, error) {
var (
r *registry.ResolvedObject
rArches map[string][]registry.ResolvedObject
err error
ok bool
)
if r, ok = cacheResolve[img]; !ok {
r, err = registry.Resolve(context.TODO(), img)
cacheFunc, _ := cacheResolve.LoadOrStore(img, sync.OnceValues(func() (*cacheResolveType, error) {
var (
ret = cacheResolveType{}
err error
)

ret.r, err = registry.Resolve(ctx, img)
if c8derrdefs.IsNotFound(err) {
r = nil
return nil, nil
} else if err != nil {
return nil, err
}
cacheResolve[img] = r
}
if r == nil {
return nil, nil
}

if rArches, ok = cacheResolveArches[string(r.Desc.Digest)]; !ok {
// TODO more efficient lookup of single architecture? (probably doesn't matter much)
rArches, err = r.Architectures(context.TODO())
// TODO more efficient lookup of single architecture? (probably doesn't matter much, and then we have to have two independent caches)
ret.arches, err = ret.r.Architectures(ctx)
if err != nil {
return nil, err
}
cacheResolveArches[string(r.Desc.Digest)] = rArches

return &ret, nil
}))
cache, err := cacheFunc.(func() (*cacheResolveType, error))()
if err != nil {
return nil, err
}
if cache == nil {
return nil, nil
}
r := cache.r
rArches := cache.arches

if _, ok := rArches[arch]; !ok {
// TODO this should probably be just like a 404, right? (it's effectively a 404, even if it's not literally a 404)
return nil, fmt.Errorf("%s missing %s arch", img, arch)
Expand Down Expand Up @@ -127,111 +137,156 @@ func main() {
// containerd uses logrus, but it defaults to "info" (which is a bit leaky where we use containerd)
logrus.SetLevel(logrus.WarnLevel)

// Go does not have ordered maps *and* is complicated to read an object, make a tiny modification, write it back out (without modelling the entire schema), so we'll let a single invocation of jq solve both problems (munging the documents in the way we expect *and* giving us an in-order stream)
jq := exec.Command("jq", "-c", ".[] | (.arches | to_entries[]) as $arch | .arches = { ($arch.key): $arch.value }", sourcesJsonFile)
jq.Stderr = os.Stderr

stdout, err := jq.StdoutPipe()
if err != nil {
panic(err)
type out struct {
buildId string
json []byte
}
if err := jq.Start(); err != nil {
panic(err)
}

var lastSourceID string // used for making cleaner progress output (coalescing multiple concurrent sources)
outs := make(chan chan out, concurrency) // we want the end result to be "in order", so we have a channel of channels of outputs so each output can be generated async (and write to the "inner" channel) and the outer channel stays in the input order

sourceArchResolved := map[string]*RemoteResolvedFull{}
go func() {
// Go does not have ordered maps *and* is complicated to read an object, make a tiny modification, write it back out (without modelling the entire schema), so we'll let a single invocation of jq solve both problems (munging the documents in the way we expect *and* giving us an in-order stream)
jq := exec.Command("jq", "-c", ".[] | (.arches | to_entries[]) as $arch | .arches = { ($arch.key): $arch.value }", sourcesJsonFile)
jq.Stderr = os.Stderr

decoder := json.NewDecoder(stdout)
NextSource:
for decoder.More() {
var build MetaBuild
build.Build.Parents = map[string]string{}
build.Build.ResolvedParents = map[string]RemoteResolvedFull{}

if err := decoder.Decode(&build.Source); err == io.EOF {
break
} else if err != nil {
stdout, err := jq.StdoutPipe()
if err != nil {
panic(err)
}

var source MetaSource
if err := json.Unmarshal(build.Source, &source); err != nil {
if err := jq.Start(); err != nil {
panic(err)
}

build.Build.SourceID = source.SourceID
sourceArchResolved := map[string](func() *RemoteResolvedFull){}
sourceArchResolvedMutex := sync.RWMutex{}

if len(source.Arches) != 1 {
panic("unexpected arches length: " + string(build.Source))
}
for build.Build.Arch = range source.Arches {
// I really hate Go.
// (just doing a lookup of the only key in my map into a variable)
}
decoder := json.NewDecoder(stdout)
for decoder.More() {
var build MetaBuild
build.Build.Parents = map[string]string{} // thanks Go (nil slice becomes null)
build.Build.ResolvedParents = map[string]RemoteResolvedFull{} // thanks Go (nil slice becomes null)

if lastSourceID != source.SourceID {
fmt.Fprintf(os.Stderr, "%s (%s):\n", source.SourceID, source.AllTags[0])
lastSourceID = source.SourceID
}
fmt.Fprintf(os.Stderr, " -> %s: ", build.Build.Arch)
if err := decoder.Decode(&build.Source); err == io.EOF {
break
} else if err != nil {
panic(err)
}

var source MetaSource
if err := json.Unmarshal(build.Source, &source); err != nil {
panic(err)
}

build.Build.SourceID = source.SourceID

for from, parent := range source.Arches[build.Build.Arch].Parents {
if from == "scratch" {
continue
if len(source.Arches) != 1 {
panic("unexpected arches length: " + string(build.Source))
}
var resolved *RemoteResolvedFull
if parent.SourceID != nil {
resolved = sourceArchResolved[*parent.SourceID+"-"+build.Build.Arch]
} else {
lookup := from
if parent.Pin != nil {
lookup += "@" + *parent.Pin
for build.Build.Arch = range source.Arches {
// I really hate Go.
// (just doing a lookup of the only key in my map into a variable)
}

outChan := make(chan out, 1)
outs <- outChan

sourceArchResolvedFunc := sync.OnceValue(func() *RemoteResolvedFull {
for from, parent := range source.Arches[build.Build.Arch].Parents {
if from == "scratch" {
continue
}
var resolved *RemoteResolvedFull
if parent.SourceID != nil {
sourceArchResolvedMutex.RLock()
resolvedFunc, ok := sourceArchResolved[*parent.SourceID+"-"+build.Build.Arch]
if !ok {
panic("parent of " + source.SourceID + " on " + build.Build.Arch + " should be " + *parent.SourceID + " but that sourceId is unknown to us!")
}
sourceArchResolvedMutex.RUnlock()
resolved = resolvedFunc()
} else {
lookup := from
if parent.Pin != nil {
lookup += "@" + *parent.Pin
}

resolved, err = resolveRemoteArch(context.TODO(), lookup, build.Build.Arch)
if err != nil {
panic(err)
}
}
if resolved == nil {
fmt.Fprintf(os.Stderr, "%s (%s) -> not yet! [%s]\n", source.SourceID, source.AllTags[0], build.Build.Arch)
close(outChan)
return nil
}
build.Build.ResolvedParents[from] = *resolved
build.Build.Parents[from] = string(resolved.Manifest.Desc.Digest)
}

resolved, err = resolveRemoteArch(context.TODO(), lookup, build.Build.Arch)
// buildId calculation
buildIDJSON, err := json.Marshal(&build.Build.BuildIDParts)
if err != nil {
panic(err)
}
}
if resolved == nil {
fmt.Fprintf(os.Stderr, "not yet!\n")
continue NextSource
}
build.Build.ResolvedParents[from] = *resolved
build.Build.Parents[from] = string(resolved.Manifest.Desc.Digest)
}
buildIDJSON = append(buildIDJSON, byte('\n')) // previous calculation of buildId included a newline in the JSON, so this preserves compatibility
// TODO if we ever have a bigger "buildId break" event (like adding major base images that force the whole tree to rebuild), we should probably ditch this newline

// buildId calculation
buildIDJSON, err := json.Marshal(&build.Build.BuildIDParts)
if err != nil {
panic(err)
}
buildIDJSON = append(buildIDJSON, byte('\n')) // previous calculation of buildId included a newline in the JSON, so this preserves compatibility
build.BuildID = fmt.Sprintf("%x", sha256.Sum256(buildIDJSON))
fmt.Fprintf(os.Stderr, "%s (%s) -> %s [%s]\n", source.SourceID, source.AllTags[0], build.BuildID, build.Build.Arch)

build.BuildID = fmt.Sprintf("%x", sha256.Sum256(buildIDJSON))
fmt.Fprintf(os.Stderr, "%s\n", build.BuildID)
build.Build.Img = strings.ReplaceAll(strings.ReplaceAll(stagingTemplate, "BUILD", build.BuildID), "ARCH", build.Build.Arch) // "oisupport/staging-amd64:xxxx"

build.Build.Img = strings.ReplaceAll(strings.ReplaceAll(stagingTemplate, "BUILD", build.BuildID), "ARCH", build.Build.Arch) // "oisupport/staging-amd64:xxxx"
build.Build.Resolved, err = resolveRemoteArch(context.TODO(), build.Build.Img, build.Build.Arch)
if err != nil {
panic(err)
}

build.Build.Resolved, err = resolveRemoteArch(context.TODO(), build.Build.Img, build.Build.Arch)
if err != nil {
json, err := json.Marshal(&build)
if err != nil {
panic(err)
}
outChan <- out{
buildId: build.BuildID,
json: json,
}

return build.Build.Resolved
})
sourceArchResolvedMutex.Lock()
sourceArchResolved[source.SourceID+"-"+build.Build.Arch] = sourceArchResolvedFunc
sourceArchResolvedMutex.Unlock()
go sourceArchResolvedFunc()
}

if err := stdout.Close(); err != nil {
panic(err)
}
if err := jq.Wait(); err != nil {
panic(err)
}
sourceArchResolved[source.SourceID+"-"+build.Build.Arch] = build.Build.Resolved

json, err := json.Marshal(&build)
close(outs)
}()

fmt.Print("{")
first := true
for outChan := range outs {
out, ok := <-outChan
if !ok {
continue
}
if !first {
fmt.Print(",")
} else {
first = false
}
fmt.Println()
buildId, err := json.Marshal(out.buildId)
if err != nil {
panic(err)
}
fmt.Println(string(json))
}

if err := stdout.Close(); err != nil {
panic(err)
}
if err := jq.Wait(); err != nil {
panic(err)
fmt.Printf("\t%s: %s", string(buildId), string(out.json))
}
fmt.Println()
fmt.Println("}")
}

0 comments on commit 24775b8

Please sign in to comment.