Skip to content

Commit

Permalink
WIP: Batch disk operations
Browse files Browse the repository at this point in the history
This adds disk batching to the endpointVM so operations such as volume
filesystem creation can occur in parallel.
It also adjusts the volume cache to remove a lock that seralizes all
creates. This is NOT complete as of this commit - it requires singleflight
or similar wrapping the volume create path, and also consideration as to
the other inspect/delete operations as they apply to a partially completed
volume.

This should also allow an operation to specify if it can tolerate some
latency. The desire behind this is so that disk detach operations for pull
can be deferred and batched with the attach operation for the subsequent
child disk which should halve the number of reconfigure operations
associated with a pull request.
  • Loading branch information
hickeng committed May 4, 2018
1 parent b6355f0 commit 492c2ed
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 54 deletions.
24 changes: 20 additions & 4 deletions lib/portlayer/storage/volume_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,25 +120,37 @@ func (v *VolumeLookupCache) VolumeStoresList(op trace.Operation) ([]string, erro

func (v *VolumeLookupCache) VolumeCreate(op trace.Operation, ID string, store *url.URL, capacityKB uint64, info map[string][]byte) (*Volume, error) {
v.vlcLock.Lock()
defer v.vlcLock.Unlock()

// check if it exists
_, ok := v.vlc[ID]
if ok {
v.vlcLock.Unlock()
// TODO: make this block until success/fail is known
return nil, os.ErrExist
}

// TODO: construct a proper async cache
// this is done because this path was blocking any concurrent volume create
v.vlc[ID] = Volume{
ID: "pending",
}

v.vlcLock.Unlock()

vs, err := v.volumeStore(store)
if err != nil {
return nil, err
}

vol, err := vs.VolumeCreate(op, ID, store, capacityKB, info)
if err != nil {
if err != nil || vol == nil {
return nil, err
}
// Add it to the cache.
v.vlc[vol.ID] = *vol

// Add it to the cache
v.vlcLock.Lock()
v.vlc[ID] = *vol
v.vlcLock.Unlock()

return vol, nil
}
Expand Down Expand Up @@ -192,6 +204,10 @@ func (v *VolumeLookupCache) VolumesList(op trace.Operation) ([]*Volume, error) {
// look in the cache, return the list
l := make([]*Volume, 0, len(v.vlc))
for _, vol := range v.vlc {
if vol.ID == "pending" {
continue
}

// this is idiotic
var e Volume
e = vol
Expand Down
82 changes: 82 additions & 0 deletions pkg/vsphere/disk/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package disk

import (
"context"
"time"

"github.com/vmware/vic/pkg/trace"
)

const (
// BatchLatency is the duration to wait for other work before acting on lazy operations
BatchLatency = 10 * time.Second
// MaxBatchSize is the number of items to batch and is set sufficient to completely cycle attached disks
MaxBatchSize = MaxAttachedDisks * 2
)

type batchMember struct {
err chan error
data interface{}
}

// lazyDeviceChange adds a lazy deferral mechanism for device change operations (specifically disk at this time).
// This is due to the fact that reconfigure operations are unintentionally serializing parallel operations and
// causing performance impacts (concurrent volume create as a primary example which impacts concurrent contianer start if
// there are anonymous volumes - worse if there are multiple volumes per container)
func lazyDeviceChange(ctx context.Context, batch chan batchMember, operation func(operation trace.Operation, data []interface{}) error) {
op := trace.FromContext(ctx, "Lazy batching of disk operations")

for {
data := make([]interface{}, 0, MaxBatchSize) // batching queue for arguments
errors := make([]chan error, 0, MaxBatchSize) // batching queue for error returns

// block and wait for first request
select {
case req, ok := <-batch:
if !ok {
return // channel closed, quit
}
if req.err == nil {
continue
}
data = append(data, req.data)
errors = append(errors, req.err)
case <-op.Done(): // when parent context is cancelled, quit
return
}

// fetch batched requests
// TODO: I want to add some optional latency in here so that the attach/detach pair from
// pull make use of it, but for now it's purely opportunistic for non-serial operations.
for len(batch) > 0 {
req := <-batch
if req.err != nil {
data = append(data, req.data)
errors = append(errors, req.err)
}
}

// process requests
err := operation(op, data)

// signal batched operations and throw back result
for _, member := range errors {
member <- err
close(member)
}
}
}
Loading

0 comments on commit 492c2ed

Please sign in to comment.