diff --git a/lib/portlayer/storage/volume_cache.go b/lib/portlayer/storage/volume_cache.go index 95037eb190..83b26403e8 100644 --- a/lib/portlayer/storage/volume_cache.go +++ b/lib/portlayer/storage/volume_cache.go @@ -120,25 +120,37 @@ func (v *VolumeLookupCache) VolumeStoresList(op trace.Operation) ([]string, erro func (v *VolumeLookupCache) VolumeCreate(op trace.Operation, ID string, store *url.URL, capacityKB uint64, info map[string][]byte) (*Volume, error) { v.vlcLock.Lock() - defer v.vlcLock.Unlock() // check if it exists _, ok := v.vlc[ID] if ok { + v.vlcLock.Unlock() + // TODO: make this block until success/fail is known return nil, os.ErrExist } + // TODO: construct a proper async cache + // this is done because this path was blocking any concurrent volume create + v.vlc[ID] = Volume{ + ID: "pending", + } + + v.vlcLock.Unlock() + vs, err := v.volumeStore(store) if err != nil { return nil, err } vol, err := vs.VolumeCreate(op, ID, store, capacityKB, info) - if err != nil { + if err != nil || vol == nil { return nil, err } - // Add it to the cache. - v.vlc[vol.ID] = *vol + + // Add it to the cache + v.vlcLock.Lock() + v.vlc[ID] = *vol + v.vlcLock.Unlock() return vol, nil } @@ -192,6 +204,10 @@ func (v *VolumeLookupCache) VolumesList(op trace.Operation) ([]*Volume, error) { // look in the cache, return the list l := make([]*Volume, 0, len(v.vlc)) for _, vol := range v.vlc { + if vol.ID == "pending" { + continue + } + // this is idiotic var e Volume e = vol diff --git a/pkg/vsphere/disk/batcher.go b/pkg/vsphere/disk/batcher.go new file mode 100644 index 0000000000..50668dbb72 --- /dev/null +++ b/pkg/vsphere/disk/batcher.go @@ -0,0 +1,82 @@ +// Copyright 2016-2017 VMware, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package disk + +import ( + "context" + "time" + + "github.com/vmware/vic/pkg/trace" +) + +const ( + // BatchLatency is the duration to wait for other work before acting on lazy operations + BatchLatency = 10 * time.Second + // MaxBatchSize is the number of items to batch and is set sufficient to completely cycle attached disks + MaxBatchSize = MaxAttachedDisks * 2 +) + +type batchMember struct { + err chan error + data interface{} +} + +// lazyDeviceChange adds a lazy deferral mechanism for device change operations (specifically disk at this time). +// This is due to the fact that reconfigure operations are unintentionally serializing parallel operations and +// causing performance impacts (concurrent volume create as a primary example which impacts concurrent contianer start if +// there are anonymous volumes - worse if there are multiple volumes per container) +func lazyDeviceChange(ctx context.Context, batch chan batchMember, operation func(operation trace.Operation, data []interface{}) error) { + op := trace.FromContext(ctx, "Lazy batching of disk operations") + + for { + data := make([]interface{}, 0, MaxBatchSize) // batching queue for arguments + errors := make([]chan error, 0, MaxBatchSize) // batching queue for error returns + + // block and wait for first request + select { + case req, ok := <-batch: + if !ok { + return // channel closed, quit + } + if req.err == nil { + continue + } + data = append(data, req.data) + errors = append(errors, req.err) + case <-op.Done(): // when parent context is cancelled, quit + return + } + + // fetch batched requests + // TODO: I want to add some optional latency in here so that the attach/detach pair from + // pull make use of it, but for now it's purely opportunistic for non-serial operations. + for len(batch) > 0 { + req := <-batch + if req.err != nil { + data = append(data, req.data) + errors = append(errors, req.err) + } + } + + // process requests + err := operation(op, data) + + // signal batched operations and throw back result + for _, member := range errors { + member <- err + close(member) + } + } +} diff --git a/pkg/vsphere/disk/disk_manager.go b/pkg/vsphere/disk/disk_manager.go index d1b6f4b8b9..f62d002779 100644 --- a/pkg/vsphere/disk/disk_manager.go +++ b/pkg/vsphere/disk/disk_manager.go @@ -69,8 +69,12 @@ type Manager struct { // map of URIs to VirtualDisk structs so that we can return the same instance to the caller, required for ref counting Disks map[uint64]*VirtualDisk + // used for locking the disk cache disksLock sync.Mutex + + // device batching queue + batchQueue chan batchMember } // NewDiskManager creates a new Manager instance associated with the caller VM @@ -89,7 +93,10 @@ func NewDiskManager(op trace.Operation, session *session.Session, v *view.Contai return nil, err } - return &Manager{ + // start the batching code + // TODO: can probably make this a lazy trigger from the queue function so that we will + // restart it implicitly if it dies + manager := &Manager{ maxAttached: make(chan bool, MaxAttachedDisks), vm: vm, vdMngr: object.NewVirtualDiskManager(vm.Vim25()), @@ -97,7 +104,12 @@ func NewDiskManager(op trace.Operation, session *session.Session, v *view.Contai controller: controller, byPathFormat: byPathFormat, Disks: make(map[uint64]*VirtualDisk), - }, nil + batchQueue: make(chan batchMember, MaxBatchSize), + } + + go lazyDeviceChange(trace.NewOperation(op, "lazy disk dispatcher"), manager.batchQueue, manager.dequeueBatch) + + return manager, nil } // toSpec converts the given config to VirtualDisk spec @@ -325,6 +337,56 @@ func (m *Manager) DiskParent(op trace.Operation, config *VirtualDiskConfig) (*ob // return nil // } +// queueBatch adds a disk operation into the batching queue. +// TODO: need to test what occurs if attach/detach for the SAME disk are batched together +// Note that the error handler needs to be careful with locking as this function does not handle it +func (m *Manager) queueBatch(op trace.Operation, change types.BaseVirtualDeviceConfigSpec, errHandler func(err error)) error { + chg := batchMember{ + err: make(chan error), + data: change, + } + + m.batchQueue <- chg + op.Debug("Queued disk change operation") + + if errHandler == nil { + // this will block until the batch is performed + return <-chg.err + } + + // this will run the error processing in the background + go func() { + errHandler(<-chg.err) + }() + + return nil +} + +func (m *Manager) dequeueBatch(op trace.Operation, data []interface{}) error { + // convert to device change array + changes := make([]types.BaseVirtualDeviceConfigSpec, 0, len(data)) + + for i := range data { + changeSpec := data[i].(types.BaseVirtualDeviceConfigSpec) + changes = append(changes, changeSpec) + } + + machineSpec := types.VirtualMachineConfigSpec{} + machineSpec.DeviceChange = changes + + _, err := m.vm.WaitForResult(op, func(ctx context.Context) (tasks.Task, error) { + t, er := m.vm.Reconfigure(ctx, machineSpec) + + if t != nil { + op.Debugf("Batched disk reconfigure (%d batched operations) task=%s", len(machineSpec.DeviceChange), t.Reference()) + } + + return t, er + }) + + return err +} + // Attach attempts to attach a virtual disk func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig) error { defer trace.End(trace.Begin("")) @@ -345,8 +407,11 @@ func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig) error { // ensure we abide by max attached disks limits m.maxAttached <- true - m.mu.Lock() - defer m.mu.Unlock() + // hickeng: I don't think this locking is needed - at least I cannot tell what for after having rewritten + // this for batching. It appears to be explicitly for serializing the reconfigure, I suspect to avoid TaskInProgress + // issues. + // m.mu.Lock() + // defer m.mu.Unlock() // make sure the op is still valid as the above line could block for a long time select { @@ -355,16 +420,8 @@ func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig) error { default: } - _, err = m.vm.WaitForResult(op, func(ctx context.Context) (tasks.Task, error) { - t, er := m.vm.Reconfigure(ctx, machineSpec) - - if t != nil { - op.Debugf("Attach reconfigure task=%s", t.Reference()) - } - - return t, er - }) - + // batch the operation and run the error handling in the background when the batch completes + err = m.queueBatch(op, changeSpec[0], nil) if err != nil { select { case <-m.maxAttached: @@ -372,7 +429,6 @@ func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig) error { } op.Errorf("vmdk storage driver failed to attach disk: %s", errors.ErrorStack(err)) - return errors.Trace(err) } return nil @@ -382,30 +438,44 @@ func (m *Manager) attach(op trace.Operation, config *VirtualDiskConfig) error { func (m *Manager) Detach(op trace.Operation, config *VirtualDiskConfig) error { defer trace.End(trace.Begin("")) - // we have to hold the cache lock until we're done deleting the cache entry - // or until we know we're not going to delete the entry m.disksLock.Lock() - defer m.disksLock.Unlock() d, err := NewVirtualDisk(op, config, m.Disks) if err != nil { + m.disksLock.Unlock() return errors.Trace(err) } d.l.Lock() defer d.l.Unlock() + // if there is a second operation trying to detach the same disk then + // one of them will likely return due to reference count being greater than zero, but + // even if not then we check here whether the disk is still attached no we have the disk lock. + // This is leveraging the DevicePath as that is cleared on successful detach + if d.DevicePath == "" { + op.Debugf("detach returning early as no action required for %s", d.DatastoreURI) + m.disksLock.Unlock() + return nil + } + count := d.attachedRefs.Decrement() op.Debugf("decremented attach count for %s: %d", d.DatastoreURI, count) if count > 0 { + m.disksLock.Unlock() return nil } if err := d.canBeDetached(); err != nil { + m.disksLock.Unlock() op.Errorf("disk needs to be detached but is still in use: %s", err) return errors.Trace(err) } + // unlocking the cache here allows for parallel detach operations to occur, + // enabling batching + m.disksLock.Unlock() + op.Infof("Detaching disk %s", d.DevicePath) disk, err := findDiskByFilename(op, m.vm, d.DatastoreURI.String(), d.IsPersistent()) @@ -419,7 +489,9 @@ func (m *Manager) Detach(op trace.Operation, config *VirtualDiskConfig) error { } // this deletes the disk from the disk cache + m.disksLock.Lock() d.setDetached(op, m.Disks) + m.disksLock.Unlock() return nil } @@ -433,50 +505,39 @@ func (m *Manager) DetachAll(op trace.Operation) error { } for _, disk := range disks { - if err2 := m.detach(op, disk); err != nil { - op.Errorf("error detaching disk: %s", err2.Error()) - // return the last error on the return of this function - err = err2 - // if we failed here that means we have a disk attached, ensure we abide by max attached disks limits - m.maxAttached <- true - } + // TODO: pretty sure we do not need the additional error handling on this path as it's + // packed into m.detach to not drain the channel on error + m.detach(op, disk) } return err } func (m *Manager) detach(op trace.Operation, disk *types.VirtualDisk) error { - config := []types.BaseVirtualDeviceConfigSpec{ - &types.VirtualDeviceConfigSpec{ - Device: disk, - Operation: types.VirtualDeviceConfigSpecOperationRemove, - }, + config := &types.VirtualDeviceConfigSpec{ + Device: disk, + Operation: types.VirtualDeviceConfigSpecOperationRemove, } - spec := types.VirtualMachineConfigSpec{} - spec.DeviceChange = config - - m.mu.Lock() - defer m.mu.Unlock() + // hickeng: I don't think this locking is needed - at least I cannot tell what for after having rewritten + // this for batching. It appears to be explicitly for serializing the reconfigure, I suspect to avoid TaskInProgress + // issues. + // m.mu.Lock() + // defer m.mu.Unlock() - _, err := m.vm.WaitForResult(op, func(ctx context.Context) (tasks.Task, error) { - t, er := m.vm.Reconfigure(ctx, spec) - - if t != nil { - op.Debugf("Detach reconfigure task=%s", t.Reference()) - } - - return t, er - }) + // batch the operation and run the error handling in the background when the batch completes + err := m.queueBatch(op, config, nil) + if err != nil { + op.Errorf("vmdk storage driver failed to detach disk: %s", errors.ErrorStack(err)) + return err + } - if err == nil { - select { - case <-m.maxAttached: - default: - } + select { + case <-m.maxAttached: + default: } - return err + return nil } func (m *Manager) devicePathByURI(op trace.Operation, datastoreURI *object.DatastorePath, persistent bool) (string, error) {