diff --git a/Makefile b/Makefile index e6bbd42b06..4999ee5639 100644 --- a/Makefile +++ b/Makefile @@ -212,7 +212,7 @@ whitespace: @infra/scripts/whitespace-check.sh # exit 1 if golint complains about anything other than comments -golintf = $(GOLINT) $(1) | sh -c "! grep -v 'lib/apiservers/portlayer/restapi/operations'" | sh -c "! grep -v 'lib/config/dynamic/admiral/client'" | sh -c "! grep -v 'should have comment'" | sh -c "! grep -v 'comment on exported'" | sh -c "! grep -v 'by other packages, and that stutters'" | sh -c "! grep -v 'error strings should not be capitalized'" +golintf = $(GOLINT) $(1) | sh -c "! grep -v 'lib/apiservers/portlayer/restapi/operations'" | sh -c "! grep -v 'lib/apiservers/service/restapi/operations/not_yet_implemented'" | sh -c "! grep -v 'lib/config/dynamic/admiral/client'" | sh -c "! grep -v 'should have comment'" | sh -c "! grep -v 'comment on exported'" | sh -c "! grep -v 'by other packages, and that stutters'" | sh -c "! grep -v 'error strings should not be capitalized'" golint: $(GOLINT) @echo checking go lint... diff --git a/cmd/port-layer-server/main.go b/cmd/port-layer-server/main.go index a3d3455414..2af54d2759 100644 --- a/cmd/port-layer-server/main.go +++ b/cmd/port-layer-server/main.go @@ -23,6 +23,7 @@ import ( "github.com/go-openapi/loads" "github.com/jessevdk/go-flags" + "github.com/vmware/govmomi/vim25/debug" "github.com/vmware/vic/lib/apiservers/portlayer/restapi" "github.com/vmware/vic/lib/apiservers/portlayer/restapi/operations" "github.com/vmware/vic/lib/config" @@ -98,6 +99,22 @@ func main() { extraconfig.SetLogLevel(log.DebugLevel) } + if vchConfig.Diagnostics.DebugLevel > 4 { + // TODO: pull this from an env var or similar + debugDir := "/var/log/vic/govmomi" + + err := os.MkdirAll(debugDir, 0700) + if err != nil { + log.Fatalf("Unable to create govmomi debug directory: %s", err) + } + + p := debug.FileProvider{ + Path: debugDir, + } + + debug.SetProvider(&p) + } + if vchConfig.Diagnostics.SysLogConfig != nil { logcfg.Syslog = &viclog.SyslogConfig{ Network: vchConfig.Diagnostics.SysLogConfig.Network, diff --git a/infra/scripts/bash-helpers.sh b/infra/scripts/bash-helpers.sh index c3f86fdf6d..25c629b203 100644 --- a/infra/scripts/bash-helpers.sh +++ b/infra/scripts/bash-helpers.sh @@ -30,7 +30,7 @@ fi # 1: release numnber, e.g. 1.4.1 or 1.4.1-rc2, or build id, e.g. 19653 vic-set-version () { version="$1" - + if [ "$1" == "" ]; then unset VIC_VERSION return @@ -63,7 +63,7 @@ init-profile () { unset-vic () { unset TARGET_URL MAPPED_NETWORKS NETWORKS IMAGE_STORE DATASTORE COMPUTE VOLUME_STORES IPADDR TLS THUMBPRINT OPS_CREDS VIC_NAME PRESERVE_VOLUMES - unset GOVC_URL GOVC_INSECURE GOVC_DATACENTER GOVC_USERNAME GOVC_PASSWORD GOVC_DATASTORE GOVC_CERTIFICATE + unset GOVC_URL GOVC_INSECURE GOVC_DATACENTER GOVC_USERNAME GOVC_PASSWORD GOVC_DATASTORE GOVC_CERTIFICATE unset vsphere datacenter user password opsuser opspass opsgrant timeout compute datastore dns publicNet publicIP publicGW bridgeNet bridgeRange unset clientNet clientIP clientGW managementNet managementIP managementGW tls volumestores preserveVolumestores containernet @@ -79,7 +79,7 @@ vic-create () { base=$(pwd) ( cd "$(vic-path)"/bin || return - "$(vic-path)/bin/${VIC_VERSION}/vic-machine-$OS" create --target="$TARGET_URL" "${OPS_CREDS[@]}" --image-store="$IMAGE_STORE" --compute-resource="$COMPUTE" "${TLS[@]}" ${TLS_OPTS} --name="${VIC_NAME:-${USER}test}" "${MAPPED_NETWORKS[@]}" "${VOLUME_STORES[@]}" "${NETWORKS[@]}" ${IPADDR} ${TIMEOUT} --thumbprint="$THUMBPRINT" --ai="${VIC_VERSION}/appliance.iso" --bi="${VIC_VERSION}/bootstrap.iso" "$@" + "$(vic-path)/bin/${VIC_VERSION}/vic-machine-$OS" create --target="$TARGET_URL" "${OPS_CREDS[@]}" --image-store="$IMAGE_STORE" --compute-resource="$COMPUTE" "${TLS[@]}" ${TLS_OPTS} --name="${VIC_NAME:-${USER}test}" "${MAPPED_NETWORKS[@]}" "${VOLUME_STORES[@]}" "${NETWORKS[@]}" ${IPADDR} --timeout=${TIMEOUT} --thumbprint="$THUMBPRINT" --ai="${VIC_VERSION}/appliance.iso" --bi="${VIC_VERSION}/bootstrap.iso" "$@" ) vic-select diff --git a/lib/apiservers/engine/backends/container.go b/lib/apiservers/engine/backends/container.go index 196dbc6f4e..e3f2f8e833 100644 --- a/lib/apiservers/engine/backends/container.go +++ b/lib/apiservers/engine/backends/container.go @@ -709,7 +709,7 @@ func (c *ContainerBackend) ContainerCreate(config types.ContainerCreateConfig) ( // Reserve the container name to prevent duplicates during a parallel operation. if config.Name != "" { - err := cache.ContainerCache().ReserveName(container, config.Name) + err = cache.ContainerCache().ReserveName(container, config.Name) if err != nil { return containertypes.ContainerCreateCreatedBody{}, derr.NewRequestConflictError(err) } @@ -1856,7 +1856,7 @@ func clientFriendlyContainerName(name string) string { func createInternalVicContainer(image *metadata.ImageConfig) (*viccontainer.VicContainer, error) { // provide basic container config via the image container := viccontainer.NewVicContainer() - container.LayerID = image.V1Image.ID // store childmost layer ID to map to the proper vmdk + container.LayerID = image.VMDK // store childmost layer ID to map to the proper vmdk container.ImageID = image.ImageID container.Config = image.Config //Set defaults. Overrides will get copied below. diff --git a/lib/apiservers/engine/backends/image.go b/lib/apiservers/engine/backends/image.go index c7c58eaaf1..8c98d6814f 100644 --- a/lib/apiservers/engine/backends/image.go +++ b/lib/apiservers/engine/backends/image.go @@ -129,7 +129,7 @@ func (i *ImageBackend) ImageDelete(imageRef string, force, prune bool) ([]types. keepNodes[idx] = imgURL.String() } - params := storage.NewDeleteImageParamsWithContext(op).WithStoreName(storeName).WithID(img.ID).WithKeepNodes(keepNodes) + params := storage.NewDeleteImageParamsWithContext(op).WithStoreName(storeName).WithID(img.VMDK).WithKeepNodes(keepNodes) // TODO: This will fail if any containerVMs are referencing the vmdk - vanilla docker // allows the removal of an image (via force flag) even if a container is referencing it // should vic? diff --git a/lib/apiservers/engine/proxy/storage_proxy.go b/lib/apiservers/engine/proxy/storage_proxy.go index 20848b77be..a1a5fee42a 100644 --- a/lib/apiservers/engine/proxy/storage_proxy.go +++ b/lib/apiservers/engine/proxy/storage_proxy.go @@ -21,6 +21,7 @@ import ( "regexp" "strconv" "strings" + "sync" log "github.com/Sirupsen/logrus" "github.com/google/uuid" @@ -95,7 +96,7 @@ var SupportedVolDrivers = map[string]struct{}{ "local": {}, } -//Validation pattern for Volume Names +// Validation pattern for Volume Names var volumeNameRegex = regexp.MustCompile("^[a-zA-Z0-9][a-zA-Z0-9_.-]*$") func NewStorageProxy(client *client.PortLayer) VicStorageProxy { @@ -248,6 +249,7 @@ func (s *StorageProxy) Remove(ctx context.Context, name string) error { // - imageID is the current label by which we address this image and is recorded as metadata // - repoName is the repository for the image and is recorded as metadata // returns: +// // modified handle func (s *StorageProxy) AddImageToContainer(ctx context.Context, handle, deltaID, layerID, imageID string, config types.ContainerCreateConfig) (string, error) { op := trace.FromContext(ctx, "AddImageToContainer: %s", deltaID) @@ -299,6 +301,7 @@ func (s *StorageProxy) AddImageToContainer(ctx context.Context, handle, deltaID, // If an error is returned, the returned handle should not be used. // // returns: +// // modified handle func (s *StorageProxy) AddVolumesToContainer(ctx context.Context, handle string, config types.ContainerCreateConfig) (string, error) { op := trace.FromContext(ctx, "AddVolumesToContainer: %s", handle) @@ -340,33 +343,52 @@ func (s *StorageProxy) AddVolumesToContainer(ctx context.Context, handle string, } } - // Create and join volumes. + // Create volumes in parallel + results := make(chan error, len(volList)) + wg := sync.WaitGroup{} + wg.Add(len(volList)) for _, fields := range volList { - // We only set these here for volumes made on a docker create - volumeData := make(map[string]string) - volumeData[DriverArgFlagKey] = fields.Flags - volumeData[DriverArgContainerKey] = config.Name - volumeData[DriverArgImageKey] = config.Config.Image - - // NOTE: calling volumeCreate regardless of whether the volume is already - // present can be avoided by adding an extra optional param to VolumeJoin, - // which would then call volumeCreate if the volume does not exist. - _, err := s.volumeCreate(op, fields.ID, "vsphere", volumeData, nil) - if err != nil { + go func(id, flags string) { + defer wg.Done() + + // We only set these here for volumes made on a docker create + volumeData := make(map[string]string) + volumeData[DriverArgFlagKey] = flags + volumeData[DriverArgContainerKey] = config.Name + volumeData[DriverArgImageKey] = config.Config.Image + + // NOTE: calling volumeCreate regardless of whether the volume is already + // present can be avoided by adding an extra optional param to VolumeJoin, + // which would then call volumeCreate if the volume does not exist. + _, err := s.volumeCreate(op, id, "vsphere", volumeData, nil) + if err == nil { + log.Infof("volumeCreate succeeded. Volume mount section ID: %s", id) + return + } + switch err := err.(type) { case *storage.CreateVolumeConflict: // Implicitly ignore the error where a volume with the same name // already exists. We can just join the said volume to the container. - log.Infof("a volume with the name %s already exists", fields.ID) + log.Infof("a volume with the name %s already exists", id) case *storage.CreateVolumeNotFound: - return handle, errors.VolumeCreateNotFoundError(volumeStore(volumeData)) + results <- errors.VolumeCreateNotFoundError(volumeStore(volumeData)) default: - return handle, errors.InternalServerError(err.Error()) + results <- errors.InternalServerError(err.Error()) } - } else { - log.Infof("volumeCreate succeeded. Volume mount section ID: %s", fields.ID) - } + }(fields.ID, fields.Flags) + } + wg.Wait() + if len(results) > 0 { + // TODO: should we attempt to do anything with errors other than the first? + // given we're not logging errors at this point prior to the parallelization I don't + // think it's needed. + return handle, <-results + } + + // Attach volumes. + for _, fields := range volList { flags := make(map[string]string) //NOTE: for now we are passing the flags directly through. This is NOT SAFE and only a stop gap. flags[constants.Mode] = fields.Flags diff --git a/lib/imagec/docker.go b/lib/imagec/docker.go index 4641bb4c80..5466e11cdc 100644 --- a/lib/imagec/docker.go +++ b/lib/imagec/docker.go @@ -44,8 +44,14 @@ import ( const ( // DigestSHA256EmptyTar is the canonical sha256 digest of empty tar file - - // (1024 NULL bytes) + // (1024 NULL bytes) - aka, the "empty" layer diffID. DigestSHA256EmptyTar = string(dlayer.DigestSHA256EmptyTar) + + // DigestSHA256EmptyBlobSum is the canonical sha256 digest of a gzipped empty tar file - + // (1024 NULL bytes, gzipped) - aka, the "empty" blobsum. This is used to identify empty + // layers using the schema v1 manifest without having to download, unzip and sha256 the + // 1024-byte empty layer tar. + DigestSHA256EmptyBlobSum = "sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4" ) // FSLayer is a container struct for BlobSums defined in an image manifest diff --git a/lib/imagec/download.go b/lib/imagec/download.go index 0861205233..96960c0eb2 100644 --- a/lib/imagec/download.go +++ b/lib/imagec/download.go @@ -33,6 +33,11 @@ const ( maxConcurrentDownloads = 3 ) +var ( + vmdkMap map[string]string + leafLayer *ImageWithMeta +) + type downloadTransfer struct { xfer.Transfer @@ -128,12 +133,19 @@ func (ldm *LayerDownloader) DownloadLayers(ctx context.Context, ic *ImageC) erro // Grab the imageLayers layers := ic.ImageLayers + vmdkMap = getVMDKMap(layers) + // iterate backwards through layers to download for i := len(layers) - 1; i >= 0; i-- { layer := layers[i] - id := layer.ID + if layer.EmptyLayer { + // skip attempting to download empty layers + continue + } + + id := layer.ID layerConfig, err := LayerCache().Get(id) if err != nil { @@ -155,6 +167,10 @@ func (ldm *LayerDownloader) DownloadLayers(ctx context.Context, ic *ImageC) erro layer.Downloading = true LayerCache().Add(layer) + // the child-most non-empty layer should be used as the leaf layer, and will + // be the parent of the container R/W layer VMDK + leafLayer = layer + continue default: return err @@ -288,20 +304,27 @@ func (ldm *LayerDownloader) makeDownloadFunc(op trace.Operation, layer *ImageWit } } + // set this layer's parent to the appropriate (non-empty) VMDK + layer.DiskParent = vmdkMap[layer.ID] + // is this the leaf layer? - imageLayer := layer.ID == layers[0].ID + isLeaf := layer.ID == leafLayer.ID if !ic.Standalone { // if this is the leaf layer, we are done and can now create the image config - if imageLayer { + if isLeaf { imageConfig, err := ic.CreateImageConfig(derivedOp, layers) if err != nil { d.err = err return } + + // set the image VMDK so container creation uses the correct disk as the R/W's parent + imageConfig.VMDK = layer.ID + // cache and persist the image cache.ImageCache().Add(&imageConfig) - if err := cache.ImageCache().Save(); err != nil { + if err = cache.ImageCache().Save(); err != nil { d.err = fmt.Errorf("error saving image cache: %s", err) return } @@ -321,7 +344,7 @@ func (ldm *LayerDownloader) makeDownloadFunc(op trace.Operation, layer *ImageWit defer ldm.m.Unlock() // Write blob to the storage layer - if err := ic.WriteImageBlob(derivedOp, layer, progressOutput, imageLayer); err != nil { + if err := ic.WriteImageBlob(derivedOp, layer, progressOutput, isLeaf); err != nil { d.err = err return } @@ -392,3 +415,22 @@ func (ldm *LayerDownloader) makeDownloadFuncFromDownload(op trace.Operation, lay } } + +func getVMDKMap(layers []*ImageWithMeta) map[string]string { + result := make(map[string]string) + diskParent := "scratch" + + for i := len(layers) - 1; i > 0; i-- { + layer := layers[i] + if layer.EmptyLayer { + // skip empty layers + continue + } + // set layer's parent to last known non-empty layer in the chain + result[layer.ID] = diskParent + // this layer isn't empty, so it will be the next parent layer in the chain + diskParent = layer.ID + } + + return result +} diff --git a/lib/imagec/imagec.go b/lib/imagec/imagec.go index 39eff9f6b3..b4aece8d41 100644 --- a/lib/imagec/imagec.go +++ b/lib/imagec/imagec.go @@ -126,6 +126,13 @@ type ImageWithMeta struct { Meta string Size int64 + // DiskParent is the ID of the most recent non-empty parent layer that will serve + // as this layer's parent in the disk chain + DiskParent string + + // EmptyLayer is true if this layer contains no filesystem data, false otherwise + EmptyLayer bool + Downloading bool } @@ -246,6 +253,12 @@ func (ic *ImageC) LayersToDownload(ctx context.Context) ([]*ImageWithMeta, error return nil, fmt.Errorf("Failed to unmarshall image history: %s", err) } + log.Infof(">>>>> Layer %s empty? %t - blobsum: %s", v1.ID, layer.BlobSum == DigestSHA256EmptyBlobSum, layer.BlobSum) + emptyLayer := false + if layer.BlobSum == DigestSHA256EmptyBlobSum { + emptyLayer = true + } + // if parent is empty set it to scratch parent := constants.ScratchLayerID if v1.Parent != "" { @@ -259,8 +272,9 @@ func (ic *ImageC) LayersToDownload(ctx context.Context) ([]*ImageWithMeta, error Parent: parent, Store: ic.Storename, }, - Meta: history.V1Compatibility, - Layer: layer, + Meta: history.V1Compatibility, + Layer: layer, + EmptyLayer: emptyLayer, } // populate manifest layer with existing cached data @@ -409,7 +423,7 @@ func (ic *ImageC) CreateImageConfig(ctx context.Context, images []*ImageWithMeta } // is this an empty layer? - if layer.DiffID == dockerLayer.DigestSHA256EmptyTar.String() { + if layer.Layer.BlobSum == DigestSHA256EmptyBlobSum { h.EmptyLayer = true } else { // if not empty, add diffID to rootFS diff --git a/lib/imagec/storage.go b/lib/imagec/storage.go index b5e7e6e37c..51efbc5acd 100644 --- a/lib/imagec/storage.go +++ b/lib/imagec/storage.go @@ -97,7 +97,7 @@ func WriteImage(op trace.Operation, host string, image *ImageWithMeta, data io.R storage.NewWriteImageParamsWithContext(op). WithOpID(&opID). WithImageID(image.ID). - WithParentID(image.Parent). + WithParentID(image.DiskParent). WithStoreName(image.Store). WithMetadatakey(key). WithMetadataval(blob). @@ -111,5 +111,4 @@ func WriteImage(op trace.Operation, host string, image *ImageWithMeta, data io.R log.Printf("Created an image %#v", r.Payload) return nil - } diff --git a/lib/install/management/appliance.go b/lib/install/management/appliance.go index bec409b888..45899ce73f 100644 --- a/lib/install/management/appliance.go +++ b/lib/install/management/appliance.go @@ -383,7 +383,16 @@ func (d *Dispatcher) createApplianceSpec(conf *config.VirtualContainerHostConfig MemoryMB: memory, // Encode the config both here and after the VMs created so that it can be identified as a VCH appliance as soon as // creation is complete. - ExtraConfig: append(vmomi.OptionValueFromMap(cfg, true), &types.OptionValue{Key: "answer.msg.serial.file.open", Value: "Append"}), + ExtraConfig: append( + vmomi.OptionValueFromMap(cfg, true), + // if we hit out of space errors then ensure we don't block other operations - observed during testing parallel anonymous volume create + // tried Cancel initially but that results in the following in vmware.log for the VM. Hopefully retry is more useful for us + // 2022-12-03T00:10:30.208Z In(05) vmx - MsgQuestion: msg.hbacommon.outofspace reply=1 + // 2022-12-03T00:10:30.208Z Cr(01) vmx - PANIC: Exiting because of failed disk operation. + &types.OptionValue{Key: "answer.msg.hbacommon.outofspace", Value: "Retry"}, + // needed to avoid the question that occur when opening a file backed serial port - this will happen all the time given our logging mechanism + &types.OptionValue{Key: "answer.msg.serial.file.open", Value: "Append"}, + ), }, } diff --git a/lib/metadata/image_config.go b/lib/metadata/image_config.go index 8030bf3f0e..59eef1c3ec 100644 --- a/lib/metadata/image_config.go +++ b/lib/metadata/image_config.go @@ -18,7 +18,8 @@ import ( docker "github.com/docker/docker/image" ) -// ImageConfig contains configuration data describing images and their layers +// ImageConfig defines the docker format for representing an image. When marshaled to JSON, the sha256 sum +// of the resulting bytes is the image ID. type ImageConfig struct { docker.V1Image @@ -30,4 +31,11 @@ type ImageConfig struct { DiffIDs map[string]string `json:"diff_ids,omitempty"` History []docker.History `json:"history,omitempty"` Reference string `json:"registry"` + + // VMDK is the ID of the VMDK to be used as the R/W layer's parent disk when + // creating a container from the image. This is the type that is stored in our + // image cache. + // + // This field is ignored when marshalling into JSON to preserve the image ID. + VMDK string `json:"vmdk,omitempty"` } diff --git a/lib/portlayer/storage/volume/cache/cache.go b/lib/portlayer/storage/volume/cache/cache.go index 0ea9e112fe..3e1a1feea8 100644 --- a/lib/portlayer/storage/volume/cache/cache.go +++ b/lib/portlayer/storage/volume/cache/cache.go @@ -31,6 +31,10 @@ import ( "github.com/vmware/vic/pkg/trace" ) +// pendingID is used as the ID inside a Volume before the ID is known. This is used to allow concurrent +// volume creation +const pendingID string = "pending" + // VolumeLookupCache caches Volume references to volumes in the system. type VolumeLookupCache struct { @@ -122,25 +126,52 @@ func (v *VolumeLookupCache) VolumeStoresList(op trace.Operation) ([]string, erro func (v *VolumeLookupCache) VolumeCreate(op trace.Operation, ID string, store *url.URL, capacityKB uint64, info map[string][]byte) (*volume.Volume, error) { v.vlcLock.Lock() - defer v.vlcLock.Unlock() // check if it exists _, ok := v.vlc[ID] if ok { + v.vlcLock.Unlock() + // TODO: make this block until success/fail is known return nil, os.ErrExist } + // if we exit this function without having replaced the placeholder Volume in the cache + // there was an error and we should delete the placeholder so a subsequent attempt at the + // same ID can proceed. + defer func() { + v.vlcLock.Lock() + + vol, ok := v.vlc[ID] + if ok { + if vol.ID == pendingID { + delete(v.vlc, ID) + } + } + v.vlcLock.Unlock() + }() + + // TODO: construct a proper async cache + // this is done because this path was blocking any concurrent volume create + v.vlc[ID] = volume.Volume{ + ID: pendingID, + } + + v.vlcLock.Unlock() + vs, err := v.volumeStore(store) if err != nil { return nil, err } vol, err := vs.VolumeCreate(op, ID, store, capacityKB, info) - if err != nil { + if err != nil || vol == nil { return nil, err } - // Add it to the cache. - v.vlc[vol.ID] = *vol + + // Replace the pending entry with the actual entry + v.vlcLock.Lock() + v.vlc[ID] = *vol + v.vlcLock.Unlock() return vol, nil } @@ -194,6 +225,10 @@ func (v *VolumeLookupCache) VolumesList(op trace.Operation) ([]*volume.Volume, e // look in the cache, return the list l := make([]*volume.Volume, 0, len(v.vlc)) for _, vol := range v.vlc { + if vol.ID == "pending" { + continue + } + // this is idiotic var e volume.Volume e = vol diff --git a/lib/portlayer/storage/volume/vsphere/store.go b/lib/portlayer/storage/volume/vsphere/store.go index c27b01045e..6da3f930aa 100644 --- a/lib/portlayer/storage/volume/vsphere/store.go +++ b/lib/portlayer/storage/volume/vsphere/store.go @@ -118,7 +118,15 @@ func (v *VolumeStore) VolumeCreate(op trace.Operation, ID string, store *url.URL if err != nil { return nil, err } + + // TODO: handle the error that can come back from detach - if we hit this path nothing is going to attempt to detach this disk. + // That will mean: + // * the disk cannot be attached to a container (assuming it was prepped correctly) + // * we've leaked one of the limited number of disks that can be attached to the VCH at a time + // v.Detach has been modified to use async batching, so likely requires more structural revision, eg. a disk manager thread + // responsible for detach that this specific functional path can confidently delegate to instead of trying to handle inline. defer v.Detach(op, vmdisk.VirtualDiskConfig) + vol, err := volume.NewVolume(store, ID, info, vmdisk, executor.CopyNew) if err != nil { return nil, err @@ -149,7 +157,7 @@ func (v *VolumeStore) VolumeCreate(op trace.Operation, ID string, store *url.URL return nil, err } - op.Infof("volumestore: %s (%s)", ID, vol.SelfLink) + op.Infof("VolumeStore: %s (%s)", ID, vol.SelfLink) return vol, nil } diff --git a/lib/spec/spec.go b/lib/spec/spec.go index 9b901dd7a1..a50869d094 100644 --- a/lib/spec/spec.go +++ b/lib/spec/spec.go @@ -93,6 +93,8 @@ func NewVirtualMachineConfigSpec(ctx context.Context, session *session.Session, &types.OptionValue{Key: "disk.EnableUUID", Value: "true"}, // needed to avoid the questions that occur when attaching multiple disks with the same uuid (bugzilla 1362918) &types.OptionValue{Key: "answer.msg.disk.duplicateUUID", Value: "Yes"}, + // if we hit out of space errors then ensure we don't block other operations - observed during testing parallel anonymous volume create + &types.OptionValue{Key: "answer.msg.hbacommon.outofspace", Value: "Cancel"}, // needed to avoid the question that occur when opening a file backed serial port &types.OptionValue{Key: "answer.msg.serial.file.open", Value: "Append"}, diff --git a/pkg/vsphere/disk/batcher.go b/pkg/vsphere/disk/batcher.go new file mode 100644 index 0000000000..c3cf455e6f --- /dev/null +++ b/pkg/vsphere/disk/batcher.go @@ -0,0 +1,85 @@ +// Copyright 2016-2017 VMware, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package disk + +import ( + "context" + "time" + + "github.com/vmware/vic/pkg/trace" +) + +const ( + // BatchLatency is the duration to wait for other work before acting on lazy operations + BatchLatency = 10 * time.Second + // MaxBatchSize is the number of items to batch and is set sufficient to completely cycle attached disks + MaxBatchSize = MaxAttachedDisks * 2 +) + +type batchMember struct { + op trace.Operation + err chan error + data interface{} +} + +// lazyDeviceChange adds a lazy deferral mechanism for device change operations (specifically disk at this time). +// This is due to the fact that reconfigure operations are unintentionally serializing parallel operations and +// causing performance impacts (concurrent volume create as a primary example which impacts concurrent container start if +// there are anonymous volumes - worse if there are multiple volumes per container) +func lazyDeviceChange(ctx context.Context, batch chan batchMember, operation func(operation trace.Operation, data []interface{}) error) { + op := trace.FromContext(ctx, "Lazy batching of disk operations") + + for { + data := make([]interface{}, 0, MaxBatchSize) // batching queue for arguments + errors := make([]chan error, 0, MaxBatchSize) // batching queue for error returns + + // block and wait for first request + select { + case req, ok := <-batch: + if !ok { + return // channel closed, quit + } + if req.err == nil { + continue + } + data = append(data, req.data) + errors = append(errors, req.err) + req.op.Debugf("Dispatching queued operation") + case <-op.Done(): // when parent context is cancelled, quit + return + } + + // fetch batched requests + // TODO: I want to add some optional latency in here so that the attach/detach pair from + // pull make use of it, but for now it's purely opportunistic for non-serial operations. + for len(batch) > 0 { + req := <-batch + if req.err != nil { + data = append(data, req.data) + errors = append(errors, req.err) + req.op.Debugf("Dispatching queued operation") + } + } + + // process requests + err := operation(op, data) + + // signal batched operations and throw back result + for _, member := range errors { + member <- err + close(member) + } + } +} diff --git a/pkg/vsphere/disk/disk_manager.go b/pkg/vsphere/disk/disk_manager.go index d1b6f4b8b9..5843a18dad 100644 --- a/pkg/vsphere/disk/disk_manager.go +++ b/pkg/vsphere/disk/disk_manager.go @@ -35,9 +35,10 @@ import ( ) const ( - // You can assign the device to (1:z ), where 1 is SCSI controller 1 and z is a virtual device node from 0 to 15. + // You can assign the device to (1:z ), where 1 is SCSI controller 1 and z is a virtual device node from 0 to 14. // https://pubs.vmware.com/vsphere-65/index.jsp#com.vmware.vsphere.vm_admin.doc/GUID-5872D173-A076-42FE-8D0B-9DB0EB0E7362.html - MaxAttachedDisks = 16 + // From vSphere 6.7 the pvscsi limit is increased to 64 so we should make this number dynamic based on backend version + MaxAttachedDisks = 15 ) // Manager manages disks for the vm it runs on. The expectation is this is run @@ -69,8 +70,12 @@ type Manager struct { // map of URIs to VirtualDisk structs so that we can return the same instance to the caller, required for ref counting Disks map[uint64]*VirtualDisk + // used for locking the disk cache disksLock sync.Mutex + + // device batching queue + batchQueue chan batchMember } // NewDiskManager creates a new Manager instance associated with the caller VM @@ -89,7 +94,10 @@ func NewDiskManager(op trace.Operation, session *session.Session, v *view.Contai return nil, err } - return &Manager{ + // start the batching code + // TODO: can probably make this a lazy trigger from the queue function so that we will + // restart it implicitly if it dies + manager := &Manager{ maxAttached: make(chan bool, MaxAttachedDisks), vm: vm, vdMngr: object.NewVirtualDiskManager(vm.Vim25()), @@ -97,7 +105,12 @@ func NewDiskManager(op trace.Operation, session *session.Session, v *view.Contai controller: controller, byPathFormat: byPathFormat, Disks: make(map[uint64]*VirtualDisk), - }, nil + batchQueue: make(chan batchMember, MaxBatchSize), + } + + go lazyDeviceChange(trace.NewOperation(op, "lazy disk dispatcher"), manager.batchQueue, manager.dequeueBatch) + + return manager, nil } // toSpec converts the given config to VirtualDisk spec @@ -181,7 +194,7 @@ func (m *Manager) CreateAndAttach(op trace.Operation, config *VirtualDiskConfig) // if it is then it's indicative of an error because it wasn't found in the cache, but this lets us recover _, ferr := findDiskByFilename(op, m.vm, d.DatastoreURI.String(), d.IsPersistent()) if os.IsNotExist(ferr) { - if err := m.attach(op, config); err != nil { + if err := m.attach(op, config, nil); err != nil { return nil, errors.Trace(err) } } else { @@ -205,7 +218,7 @@ func (m *Manager) CreateAndAttach(op trace.Operation, config *VirtualDiskConfig) op.Debugf("findDiskByFilename(%s) failed with %s", d.DatastoreURI, errors.ErrorStack(findErr)) } - if detachErr := m.detach(op, disk); detachErr != nil { + if detachErr := m.detach(op, disk, nil); detachErr != nil { op.Debugf("detach(%s) failed with %s", d.DatastoreURI, errors.ErrorStack(detachErr)) } @@ -325,8 +338,67 @@ func (m *Manager) DiskParent(op trace.Operation, config *VirtualDiskConfig) (*ob // return nil // } -// Attach attempts to attach a virtual disk -func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig) error { +// queueBatch adds a disk operation into the batching queue. +// TODO: need to test what occurs if attach/detach for the SAME disk are batched together +// Note that the error handler needs to be careful with locking as this function does not handle it +func (m *Manager) queueBatch(op trace.Operation, change types.BaseVirtualDeviceConfigSpec, errHandler func(err error)) error { + chg := batchMember{ + op: op, + err: make(chan error), + data: change, + } + + op.Debugf("Queuing disk change operation (%s:%+v)", change.GetVirtualDeviceConfigSpec().Operation, *(change.GetVirtualDeviceConfigSpec().Device.GetVirtualDevice().Backing.(types.BaseVirtualDeviceFileBackingInfo)).GetVirtualDeviceFileBackingInfo()) + m.batchQueue <- chg + + if errHandler == nil { + // this will block until the batch is performed + return <-chg.err + } + + // this will run the error processing in the background + go func() { + errHandler(<-chg.err) + }() + + return nil +} + +func (m *Manager) dequeueBatch(op trace.Operation, data []interface{}) error { + // convert to device change array + changes := make([]types.BaseVirtualDeviceConfigSpec, 0, len(data)) + + for i := range data { + changeSpec := data[i].(types.BaseVirtualDeviceConfigSpec) + dev := changeSpec.GetVirtualDeviceConfigSpec().Device.GetVirtualDevice() + // VC requires that the keys be unique within a config spec + if changeSpec.GetVirtualDeviceConfigSpec().Operation == types.VirtualDeviceConfigSpecOperationAdd && dev.Key == -1 { + dev.Key = int32(-1 - i) + } + op.Debugf("Appending change spec: %s:%+v", changeSpec.GetVirtualDeviceConfigSpec().Operation, *(changeSpec.GetVirtualDeviceConfigSpec().Device.GetVirtualDevice().Backing.(types.BaseVirtualDeviceFileBackingInfo)).GetVirtualDeviceFileBackingInfo()) + changes = append(changes, changeSpec) + } + + machineSpec := types.VirtualMachineConfigSpec{} + machineSpec.DeviceChange = changes + + _, err := m.vm.WaitForResult(op, func(ctx context.Context) (tasks.Task, error) { + t, er := m.vm.Reconfigure(ctx, machineSpec) + + if t != nil { + op.Debugf("Batched disk reconfigure (%d batched operations) task=%s", len(machineSpec.DeviceChange), t.Reference()) + } + + return t, er + }) + + return err +} + +// attach attempts to attach the specified disk to the VM. +// if there is an error handling function, then this removal is queued and dispatched async. +// if the handling function is nil this blocks until operation is complete +func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig, errHandler func(error)) error { defer trace.End(trace.Begin("")) disk := m.toSpec(config) @@ -342,70 +414,94 @@ func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig) error { machineSpec := types.VirtualMachineConfigSpec{} machineSpec.DeviceChange = append(machineSpec.DeviceChange, changeSpec...) - // ensure we abide by max attached disks limits + // ensure we abide by max attached disks limits at all times + // we undo this in error handling if we fail the attach m.maxAttached <- true - m.mu.Lock() - defer m.mu.Unlock() - // make sure the op is still valid as the above line could block for a long time select { case <-op.Done(): + // if the op has been cancelled then we need to undo our allocation of an available disk slot + select { + case <-m.maxAttached: + default: + } + return op.Err() default: } - _, err = m.vm.WaitForResult(op, func(ctx context.Context) (tasks.Task, error) { - t, er := m.vm.Reconfigure(ctx, machineSpec) + handler := func(err error) { + if err != nil { + select { + case <-m.maxAttached: + default: + } - if t != nil { - op.Debugf("Attach reconfigure task=%s", t.Reference()) + op.Errorf("vmdk storage driver failed to attach disk: %s", errors.ErrorStack(err)) } - return t, er - }) - - if err != nil { - select { - case <-m.maxAttached: - default: + if errHandler != nil { + errHandler(err) } + } - op.Errorf("vmdk storage driver failed to attach disk: %s", errors.ErrorStack(err)) - return errors.Trace(err) + var wrapper func(error) + if errHandler != nil { + wrapper = handler } - return nil + // batch the operation + // run the error handling in the background when the batch completes if a handler is provided + err = m.queueBatch(op, changeSpec[0], wrapper) + if errHandler == nil { + handler(err) + } + + return err } // Detach attempts to detach a virtual disk func (m *Manager) Detach(op trace.Operation, config *VirtualDiskConfig) error { defer trace.End(trace.Begin("")) - // we have to hold the cache lock until we're done deleting the cache entry - // or until we know we're not going to delete the entry m.disksLock.Lock() - defer m.disksLock.Unlock() d, err := NewVirtualDisk(op, config, m.Disks) if err != nil { + m.disksLock.Unlock() return errors.Trace(err) } d.l.Lock() defer d.l.Unlock() + // if there is a second operation trying to detach the same disk then + // one of them will likely return due to reference count being greater than zero, but + // even if not then we check here whether the disk is still attached no we have the disk lock. + // This is leveraging the DevicePath as that is cleared on successful detach + if d.DevicePath == "" { + op.Debugf("detach returning early as no action required for %s", d.DatastoreURI) + m.disksLock.Unlock() + return nil + } + count := d.attachedRefs.Decrement() op.Debugf("decremented attach count for %s: %d", d.DatastoreURI, count) if count > 0 { + m.disksLock.Unlock() return nil } if err := d.canBeDetached(); err != nil { + m.disksLock.Unlock() op.Errorf("disk needs to be detached but is still in use: %s", err) return errors.Trace(err) } + // unlocking the cache here allows for parallel detach operations to occur, enabling batching + m.disksLock.Unlock() + op.Infof("Detaching disk %s", d.DevicePath) disk, err := findDiskByFilename(op, m.vm, d.DatastoreURI.String(), d.IsPersistent()) @@ -413,14 +509,21 @@ func (m *Manager) Detach(op trace.Operation, config *VirtualDiskConfig) error { return errors.Trace(err) } - if err = m.detach(op, disk); err != nil { - op.Errorf("detach for %s failed with %s", d.DevicePath, errors.ErrorStack(err)) - return errors.Trace(err) - } + // run the result handler in the batch error handler + resultHandler := func(err error) { + if err != nil { + op.Errorf("detach for %s failed with %s", d.DevicePath, errors.ErrorStack(err)) + return + } - // this deletes the disk from the disk cache - d.setDetached(op, m.Disks) + // this deletes the disk from the disk cache + m.disksLock.Lock() + d.setDetached(op, m.Disks) + m.disksLock.Unlock() + } + // execution is async with a result handler, so no error processing here + m.detach(op, disk, resultHandler) return nil } @@ -433,47 +536,51 @@ func (m *Manager) DetachAll(op trace.Operation) error { } for _, disk := range disks { - if err2 := m.detach(op, disk); err != nil { - op.Errorf("error detaching disk: %s", err2.Error()) - // return the last error on the return of this function - err = err2 - // if we failed here that means we have a disk attached, ensure we abide by max attached disks limits - m.maxAttached <- true - } + // it's packed into m.detach to not drain the reference counting channel on error + // but providing an error handler here allows this to happen in parallel + m.detach(op, disk, func(err error) { + op.Errorf("vmdk storage driver failed to detach disk: %s", errors.ErrorStack(err)) + }) } return err } -func (m *Manager) detach(op trace.Operation, disk *types.VirtualDisk) error { - config := []types.BaseVirtualDeviceConfigSpec{ - &types.VirtualDeviceConfigSpec{ - Device: disk, - Operation: types.VirtualDeviceConfigSpecOperationRemove, - }, +// detach removes the specified disk from the VM. +// if there is an error handling function, then this removal is queued and dispatched async. +// if the handling function is nil this blocks until operation is complete +func (m *Manager) detach(op trace.Operation, disk *types.VirtualDisk, errHandler func(error)) error { + config := &types.VirtualDeviceConfigSpec{ + Device: disk, + Operation: types.VirtualDeviceConfigSpecOperationRemove, } - spec := types.VirtualMachineConfigSpec{} - spec.DeviceChange = config - - m.mu.Lock() - defer m.mu.Unlock() - - _, err := m.vm.WaitForResult(op, func(ctx context.Context) (tasks.Task, error) { - t, er := m.vm.Reconfigure(ctx, spec) + handler := func(err error) { + if err != nil { + // can only enter here if the errHandler is nil, when this blocks until the op is completed + op.Errorf("vmdk storage driver failed to detach disk: %s", errors.ErrorStack(err)) + } else { + select { + case <-m.maxAttached: + default: + } + } - if t != nil { - op.Debugf("Detach reconfigure task=%s", t.Reference()) + if errHandler != nil { + errHandler(err) } + } - return t, er - }) + var wrapper func(error) + if errHandler != nil { + wrapper = handler + } - if err == nil { - select { - case <-m.maxAttached: - default: - } + // batch the operation + // run the error handling in the background when the batch completes if a handler is provided + err := m.queueBatch(op, config, wrapper) + if errHandler == nil { + handler(err) } return err diff --git a/pkg/vsphere/disk/disk_manager_test.go b/pkg/vsphere/disk/disk_manager_test.go index 342fd03071..0cee79d2a1 100644 --- a/pkg/vsphere/disk/disk_manager_test.go +++ b/pkg/vsphere/disk/disk_manager_test.go @@ -332,7 +332,7 @@ func TestRefCounting(t *testing.T) { config = NewPersistentDisk(child).WithParent(scratch) // attempt attach - assert.NoError(t, vdm.attach(op, config), "Error attempting to attach %s", config) + assert.NoError(t, vdm.attach(op, config, nil), "Error attempting to attach %s", config) devicePath, err := vdm.devicePathByURI(op, child, config.IsPersistent()) if !assert.NoError(t, err) { diff --git a/pkg/vsphere/vm/vm.go b/pkg/vsphere/vm/vm.go index 250f1e444e..43ca649243 100644 --- a/pkg/vsphere/vm/vm.go +++ b/pkg/vsphere/vm/vm.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + "golang.org/x/sync/singleflight" + "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/property" "github.com/vmware/govmomi/vim25/methods" @@ -46,6 +48,9 @@ const ( UpdateStatus = "UpdateInProgress" ) +// deduplication of calls to update device list +var deviceGroup singleflight.Group + type InvalidState struct { r types.ManagedObjectReference } @@ -684,7 +689,7 @@ func (vm *VirtualMachine) IsInvalidState(ctx context.Context) bool { func (vm *VirtualMachine) WaitForResult(ctx context.Context, f func(context.Context) (tasks.Task, error)) (*types.TaskInfo, error) { op := trace.FromContext(ctx, "WaitForResult") - info, err := tasks.WaitForResult(op, f) + info, err := tasks.WaitForResultAndRetryIf(op, f, tasks.IsTransientError) if err == nil || !vm.needsFix(op, err) { return info, err } @@ -695,7 +700,13 @@ func (vm *VirtualMachine) WaitForResult(ctx context.Context, f func(context.Cont return info, err } op.Debug("Fixed") - return tasks.WaitForResult(op, f) + + taskInfo, err := tasks.WaitForResult(op, f) + + // TODO: only really needed after a reconfigure operation + deviceGroup.Forget(vm.Reference().String()) + + return taskInfo, err } func (vm *VirtualMachine) Properties(ctx context.Context, r types.ManagedObjectReference, ps []string, o *mo.VirtualMachine) error { @@ -1039,6 +1050,25 @@ func (vm *VirtualMachine) InCluster(op trace.Operation) bool { return cls } +func (vm *VirtualMachine) Device(op trace.Operation) (object.VirtualDeviceList, error) { + devices := func() (interface{}, error) { + return vm.VirtualMachine.Device(op) + } + + // TODO: do we need to call Forget during Reconfigure? + devlist, err, shared := deviceGroup.Do(vm.Reference().String(), devices) + if err != nil || devlist == nil { + return nil, err + } + + if shared { + // TODO: consider whether we need to duplicate this list for each caller + return devlist.(object.VirtualDeviceList), nil + } + + return devlist.(object.VirtualDeviceList), nil +} + // IsAlreadyPoweredOffError is an accessor method because of the number of times package name and // variable name tend to collide for VMs. func (vm *VirtualMachine) IsAlreadyPoweredOffError(err error) bool { diff --git a/tests/check-org-membership.sh b/tests/check-org-membership.sh index a948d6ec14..fda7c9e508 100755 --- a/tests/check-org-membership.sh +++ b/tests/check-org-membership.sh @@ -24,7 +24,11 @@ if [ "$SKIP_CHECK_MEMBERSHIP" == "true" ]; then exit 0 fi -if [ ! $(curl --silent -H "Authorization: token $GITHUB_AUTOMATION_API_KEY" "https://api.github.com/orgs/vmware/members/${DRONE_COMMIT_AUTHOR}") ]; then +# assuming that the logic here is both a successful curl AND no output. +# testing with a bad auth token shows a 0 return code for curl, but json blob with error +# good auth token is just empty response +result=$(curl --silent -H "Authorization: token $GITHUB_AUTOMATION_API_KEY" "https://api.github.com/orgs/vmware/members/${DRONE_COMMIT_AUTHOR}") +if [ "$?" -eq 0 -o -n "$result" ]; then echo "checked origin membership successfully" else echo "failed to check origin membership"