Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP) volume snapshotter support #3

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
Expand Down Expand Up @@ -49,6 +50,11 @@ type ManagerOpt struct {
Differ diff.Comparer
MetadataStore *metadata.Store
MountPoolRoot string

// dagger-specific, see manager_dagger.go
VolumeSnapshotter CtdVolumeSnapshotter
VolumeSourceContentHasher func(context.Context, ImmutableRef, session.Group) (digest.Digest, error)
SeenVolumes *sync.Map
}

type Accessor interface {
Expand All @@ -62,6 +68,9 @@ type Accessor interface {
IdentityMapping() *idtools.IdentityMapping
Merge(ctx context.Context, parents []ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error)
Diff(ctx context.Context, lower, upper ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error)

// Dagger-specific, see manager_dagger.go
GetOrInitVolume(context.Context, string, ImmutableRef, pb.CacheSharingOpt, session.Group) (MutableRef, error)
}

type Controller interface {
Expand Down Expand Up @@ -97,6 +106,11 @@ type cacheManager struct {

muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group[struct{}]

// dagger-specific, see manager_dagger.go
volumeSnapshotter VolumeSnapshotter
volumeSourceContentHasher func(context.Context, ImmutableRef, session.Group) (digest.Digest, error)
seenVolumes *sync.Map
}

func NewManager(opt ManagerOpt) (Manager, error) {
Expand All @@ -110,6 +124,10 @@ func NewManager(opt ManagerOpt) (Manager, error) {
Differ: opt.Differ,
MetadataStore: opt.MetadataStore,
records: make(map[string]*cacheRecord),

volumeSnapshotter: newVolumeSnapshotter(context.TODO(), opt.VolumeSnapshotter, opt.LeaseManager),
volumeSourceContentHasher: opt.VolumeSourceContentHasher,
seenVolumes: opt.SeenVolumes,
}

if err := cm.init(context.TODO()); err != nil {
Expand Down Expand Up @@ -444,7 +462,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt
return rec, nil
} else if IsNotFound(err) {
// The equal mutable for this ref is not found, check to see if our snapshot exists
if _, statErr := cm.Snapshotter.Stat(ctx, md.getSnapshotID()); statErr != nil {
if _, statErr := cm.snapshotterFor(md).Stat(ctx, md.getSnapshotID()); statErr != nil {
// this ref's snapshot also doesn't exist, just remove this record
cm.MetadataStore.Clear(id)
return nil, errors.Wrap(errNotFound, id)
Expand Down Expand Up @@ -484,7 +502,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt

if rec.mutable {
// If the record is mutable, then the snapshot must exist
if _, err := cm.Snapshotter.Stat(ctx, rec.ID()); err != nil {
if _, err := cm.snapshotterFor(md).Stat(ctx, rec.ID()); err != nil {
if !cerrdefs.IsNotFound(err) {
return nil, errors.Wrap(err, "failed to check mutable ref snapshot")
}
Expand Down
300 changes: 300 additions & 0 deletions cache/manager_dagger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
package cache

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/containerd/containerd/leases"
ctdsnapshots "github.com/containerd/containerd/snapshots"
"github.com/containerd/continuity/fs"
cerrdefs "github.com/containerd/errdefs"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/leaseutil"
"github.com/opencontainers/go-digest"
)

type AcquireSnapshotter interface {
Acquire(ctx context.Context, key string, sharingMode pb.CacheSharingOpt) (func() error, error)
}

type CtdVolumeSnapshotter interface {
ctdsnapshots.Snapshotter
Name() string
AcquireSnapshotter
}

type VolumeSnapshotter interface {
snapshot.MergeSnapshotter
AcquireSnapshotter
}

func newVolumeSnapshotter(ctx context.Context, ctdSnapshoter CtdVolumeSnapshotter, leaseManager leases.Manager) VolumeSnapshotter {
return volumeSnapshotterAdapter{
MergeSnapshotter: snapshot.NewMergeSnapshotter(ctx, containerd.NewSnapshotter(
ctdSnapshoter.Name(),
ctdSnapshoter,
"buildkit",
nil, // no idmapping
), leaseManager),
base: ctdSnapshoter,
}
}

type volumeSnapshotterAdapter struct {
snapshot.MergeSnapshotter
base CtdVolumeSnapshotter
}

var _ VolumeSnapshotter = (*volumeSnapshotterAdapter)(nil)

func (sn volumeSnapshotterAdapter) Acquire(ctx context.Context, key string, sharingMode pb.CacheSharingOpt) (func() error, error) {
return sn.base.Acquire(ctx, key, sharingMode)
}

func (cm *cacheManager) GetOrInitVolume(ctx context.Context, key string, source ImmutableRef, sharingMode pb.CacheSharingOpt, sess session.Group) (_ MutableRef, rerr error) {
// figure out the unique definition-based ID of the volume.
idParts := []string{key}

sourceChecksum, err := cm.volumeSourceContentHasher(ctx, source, sess)
if err != nil {
return nil, fmt.Errorf("failed to calculate sourceChecksum: %w", err)
}
idParts = append(idParts, sourceChecksum.String())

id := digest.FromString(strings.Join(idParts, "\x00")).Encoded()

var parent *immutableRef
if source != nil {
if _, ok := source.(*immutableRef); ok {
parent = source.Clone().(*immutableRef)
} else {
p, err := cm.Get(ctx, source.ID(), nil, NoUpdateLastUsed)
if err != nil {
return nil, err
}
parent = p.(*immutableRef)
}
if err := parent.Finalize(ctx); err != nil {
return nil, err
}
if err := parent.Extract(ctx, sess); err != nil {
return nil, err
}
}
defer func() {
if parent != nil {
parent.Release(context.WithoutCancel(ctx))
}
}()

rec, err := func() (_ *cacheRecord, rerr error) {
cm.mu.Lock()
defer cm.mu.Unlock()

rec, err := cm.getRecord(ctx, id)
switch {
case err == nil:
return rec, nil

case errors.Is(err, errNotFound):
md, _ := cm.getMetadata(id)

rec = &cacheRecord{
mu: &sync.Mutex{},
mutable: true,
cm: cm,
refs: make(map[ref]struct{}),
cacheMetadata: md,
}

opts := []RefOption{
WithRecordType(client.UsageRecordTypeCacheMount),
WithDescription(fmt.Sprintf("cache mount %s (%s)", key, id)), // TODO: rest of metadata?
CachePolicyRetain,
withSnapshotID(id),
}
if err := initializeMetadata(rec.cacheMetadata, rec.parentRefs, opts...); err != nil {
return nil, err
}
// this is needed because for some reason snapshotID is an imageRefOption
if err := setImageRefMetadata(rec.cacheMetadata, opts...); err != nil {
return nil, fmt.Errorf("failed to append image ref metadata to ref %s: %w", id, err)
}

cm.records[id] = rec
return rec, nil

default:
return nil, fmt.Errorf("failed to get volume cache record: %w", err)
}
}()
if err != nil {
return nil, err
}

// TODO: race condition here if someone grabs the record somehow before the lock below

rec.mu.Lock()

_, err = cm.volumeSnapshotter.Stat(ctx, id)
exists := err == nil

if !exists {
l, err := cm.LeaseManager.Create(ctx, func(l *leases.Lease) error {
l.ID = id
l.Labels = map[string]string{
"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
}
return nil
})
if err != nil && !cerrdefs.IsAlreadyExists(err) {
rec.mu.Unlock()
return nil, fmt.Errorf("failed to create lease: %w", err)
}
if cerrdefs.IsAlreadyExists(err) {
l = leases.Lease{ID: id}
}
defer func() {
if rerr != nil {
ctx := context.WithoutCancel(ctx)
if err := cm.LeaseManager.Delete(ctx, leases.Lease{
ID: id,
}); err != nil {
bklog.G(ctx).Errorf("failed to remove lease: %+v", err)
}
}
}()
if err := cm.LeaseManager.AddResource(ctx, l, leases.Resource{
ID: id,
Type: "snapshots/" + cm.volumeSnapshotter.Name(),
}); err != nil && !cerrdefs.IsAlreadyExists(err) {
rec.mu.Unlock()
return nil, fmt.Errorf("failed to add snapshot %s resource to lease: %w", id, err)
}

var sourceSnapshotID string
if parent != nil {
sourceSnapshotID = sourceChecksum.Encoded()
_, err := cm.volumeSnapshotter.Stat(ctx, sourceSnapshotID)
sourceExists := err == nil

if !sourceExists {
if err := cm.LeaseManager.AddResource(ctx, l, leases.Resource{
ID: sourceSnapshotID,
Type: "snapshots/" + cm.volumeSnapshotter.Name(),
}); err != nil && !cerrdefs.IsAlreadyExists(err) {
return nil, fmt.Errorf("failed to add source snapshot resource to lease: %w", err)
}

tmpActiveSnapshotID := identity.NewID()
if _, err := cm.LeaseManager.Create(ctx, func(l *leases.Lease) error {
l.ID = tmpActiveSnapshotID
l.Labels = map[string]string{
"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
}
return nil
}, leaseutil.MakeTemporary); err != nil && !cerrdefs.IsAlreadyExists(err) {
return nil, fmt.Errorf("failed to create lease for tmp active source snapshot: %w", err)
}
defer func() {
ctx := context.WithoutCancel(ctx)
if err := cm.LeaseManager.Delete(ctx, leases.Lease{
ID: tmpActiveSnapshotID,
}); err != nil {
bklog.G(ctx).Errorf("failed to remove lease: %+v", err)
}
}()
if err := cm.LeaseManager.AddResource(ctx, leases.Lease{
ID: tmpActiveSnapshotID,
}, leases.Resource{
ID: tmpActiveSnapshotID,
Type: "snapshots/" + cm.volumeSnapshotter.Name(),
}); err != nil && !cerrdefs.IsAlreadyExists(err) {
return nil, fmt.Errorf("failed to add source snapshot resource to lease: %w", err)
}

if err := cm.volumeSnapshotter.Prepare(ctx, tmpActiveSnapshotID, ""); err != nil && !cerrdefs.IsAlreadyExists(err) {
return nil, fmt.Errorf("failed to prepare source snapshot: %w", err)
}
newMntable, err := cm.volumeSnapshotter.Mounts(ctx, tmpActiveSnapshotID)
if err != nil {
return nil, fmt.Errorf("failed to get source mounts: %w", err)
}
newMnter := snapshot.LocalMounter(newMntable)
newMntpoint, err := newMnter.Mount()
if err != nil {
return nil, fmt.Errorf("failed to mount new source snapshot: %w", err)
}

oldMntable, err := source.Mount(ctx, true, sess)
if err != nil {
newMnter.Unmount()
return nil, fmt.Errorf("failed to get old source mounts: %w", err)
}
oldMnter := snapshot.LocalMounter(oldMntable)
oldMntpoint, err := oldMnter.Mount()
if err != nil {
newMnter.Unmount()
return nil, fmt.Errorf("failed to mount old source snapshot: %w", err)
}

if err := fs.CopyDir(newMntpoint, oldMntpoint, fs.WithAllowXAttrErrors()); err != nil {
newMnter.Unmount()
oldMnter.Unmount()
return nil, fmt.Errorf("failed to copy source snapshot: %w", err)
}

newMnter.Unmount()
oldMnter.Unmount()

if err := cm.volumeSnapshotter.Commit(ctx, sourceSnapshotID, tmpActiveSnapshotID); err != nil {
return nil, fmt.Errorf("failed to commit source snapshot: %w", err)
}
}
}

if err := cm.volumeSnapshotter.Prepare(ctx, id, sourceSnapshotID); err != nil && !cerrdefs.IsAlreadyExists(err) {
rec.mu.Unlock()
return nil, fmt.Errorf("failed to prepare volume: %w", err)
}
}
rec.mu.Unlock()

releaseFunc, err := cm.volumeSnapshotter.Acquire(ctx, id, sharingMode)
if err != nil {
return nil, fmt.Errorf("failed to acquire volume: %w", err)
}
defer func() {
if rerr != nil {
rerr = errors.Join(rerr, releaseFunc())
}
}()

rec.mu.Lock()
defer rec.mu.Unlock()

// TODO: note about how we are creating multiple mutable refs on a cacheRecord but it is safe to do so it turns out
ref := rec.mref(true, DescHandlers{})
ref.releaseFunc = releaseFunc

cm.seenVolumes.Store(id, struct{}{})

return ref, nil
}

func (cm *cacheManager) snapshotterFor(md *cacheMetadata) snapshot.MergeSnapshotter {
if md.GetRecordType() == client.UsageRecordTypeCacheMount {
return cm.volumeSnapshotter
}
return cm.Snapshotter
}
Loading