Skip to content

Commit

Permalink
Fix vSphere anti-affinity
Browse files Browse the repository at this point in the history
Signed-off-by: Waleed Malik <[email protected]>
  • Loading branch information
ahmedwaleedmalik committed Apr 23, 2024
1 parent f339b66 commit c186e92
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 69 deletions.
32 changes: 32 additions & 0 deletions .prow/provider-vsphere.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,35 @@ presubmits:
cpu: 2
limits:
memory: 7Gi

- name: pull-machine-controller-e2e-vsphere-anti-affinity
always_run: false
decorate: true
clone_uri: "ssh://[email protected]/kubermatic/machine-controller.git"
labels:
preset-hetzner: "true"
preset-e2e-ssh: "true"
preset-vsphere: "true"
preset-rhel: "true"
preset-goproxy: "true"
preset-kind-volume-mounts: "true"
preset-docker-mirror: "true"
preset-kubeconfig-ci: "true"
spec:
containers:
- image: quay.io/kubermatic/build:go-1.22-node-20-kind-0.22-3
command:
- "./hack/ci/run-e2e-tests.sh"
args:
- "TestVsphereAntiAffinityProvisioningE2E"
env:
- name: CLOUD_PROVIDER
value: vsphere
securityContext:
privileged: true
resources:
requests:
memory: 7Gi
cpu: 2
limits:
memory: 7Gi
5 changes: 0 additions & 5 deletions pkg/cloudprovider/provider/vsphere/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,10 @@ func createClonedVM(ctx context.Context, log *zap.SugaredLogger, vmName string,
if err := clonedVMTask.WaitEx(ctx); err != nil {
return nil, fmt.Errorf("error when waiting for result of clone task: %w", err)
}

virtualMachine, err := session.Finder.VirtualMachine(ctx, vmName)
if err != nil {
return nil, fmt.Errorf("failed to get virtual machine object after cloning: %w", err)
}

vmDevices, err := virtualMachine.Device(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list devices of template VM: %w", err)
Expand Down Expand Up @@ -138,7 +136,6 @@ func createClonedVM(ctx context.Context, log *zap.SugaredLogger, vmName string,

guestInfoUserData = "guestinfo.ignition.config.data"
guestInfoUserDataEncoding = "guestinfo.ignition.config.data.encoding"

for _, item := range mvm.Config.VAppConfig.GetVmConfigInfo().Property {
switch item.Id {
case guestInfoUserData:
Expand Down Expand Up @@ -170,7 +167,6 @@ func createClonedVM(ctx context.Context, log *zap.SugaredLogger, vmName string,
}

diskUUIDEnabled := true

var deviceSpecs []types.BaseVirtualDeviceConfigSpec
if config.DiskSizeGB != nil {
disks, err := getDisksFromVM(ctx, virtualMachine)
Expand Down Expand Up @@ -221,7 +217,6 @@ func createClonedVM(ctx context.Context, log *zap.SugaredLogger, vmName string,
if err := removeFloppyDevice(ctx, virtualMachine); err != nil {
return nil, fmt.Errorf("failed to remove floppy device: %w", err)
}

return virtualMachine, nil
}

Expand Down
18 changes: 7 additions & 11 deletions pkg/cloudprovider/provider/vsphere/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net/url"
"os"
"strings"
"sync"

"github.com/vmware/govmomi/find"
"github.com/vmware/govmomi/object"
Expand All @@ -49,7 +48,6 @@ import (

type provider struct {
configVarResolver *providerconfig.ConfigVarResolver
mutex sync.Mutex
}

// New returns a VSphere provider.
Expand Down Expand Up @@ -384,8 +382,7 @@ func (p *provider) create(ctx context.Context, log *zap.SugaredLogger, machine *
}

if config.VMAntiAffinity {
machineSetName := machine.Name[:strings.LastIndex(machine.Name, "-")]
if err := p.createOrUpdateVMAntiAffinityRule(ctx, session, machineSetName, config); err != nil {
if err := p.createOrUpdateVMAntiAffinityRule(ctx, log, session, machine, config); err != nil {
return nil, fmt.Errorf("failed to add VM to anti affinity rule: %w", err)
}
}
Expand Down Expand Up @@ -452,6 +449,12 @@ func (p *provider) Cleanup(ctx context.Context, log *zap.SugaredLogger, machine
return false, fmt.Errorf("failed to delete tags: %w", err)
}

if config.VMAntiAffinity {
if err := p.createOrUpdateVMAntiAffinityRule(ctx, log, session, machine, config); err != nil {
return false, fmt.Errorf("failed to update VMs in anti-affinity rule: %w", err)
}
}

powerState, err := virtualMachine.PowerState(ctx)
if err != nil {
return false, fmt.Errorf("failed to get virtual machine power state: %w", err)
Expand Down Expand Up @@ -507,13 +510,6 @@ func (p *provider) Cleanup(ctx context.Context, log *zap.SugaredLogger, machine
return false, fmt.Errorf("failed to destroy vm %s: %w", virtualMachine.Name(), err)
}

if config.VMAntiAffinity {
machineSetName := machine.Name[:strings.LastIndex(machine.Name, "-")]
if err := p.createOrUpdateVMAntiAffinityRule(ctx, session, machineSetName, config); err != nil {
return false, fmt.Errorf("failed to add VM to anti affinity rule: %w", err)
}
}

if pc.OperatingSystem != providerconfigtypes.OperatingSystemFlatcar {
filemanager := datastore.NewFileManager(session.Datacenter, false)

Expand Down
98 changes: 45 additions & 53 deletions pkg/cloudprovider/provider/vsphere/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,75 @@ import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"time"
"sync"

"github.com/aws/smithy-go/ptr"
"github.com/vmware/govmomi/find"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
"go.uber.org/zap"

clusterv1alpha1 "github.com/kubermatic/machine-controller/pkg/apis/cluster/v1alpha1"

"k8s.io/utils/ptr"
)

var lock sync.Mutex

// createOrUpdateVMAntiAffinityRule creates or updates an anti affinity rule with the name in the given cluster.
// VMs are attached to the rule based on their folder path and name prefix in vsphere.
// A minimum of two VMs is required.
func (p *provider) createOrUpdateVMAntiAffinityRule(ctx context.Context, session *Session, name string, config *Config) error {
p.mutex.Lock()
defer p.mutex.Unlock()

func (p *provider) createOrUpdateVMAntiAffinityRule(ctx context.Context, log *zap.SugaredLogger, session *Session, machine *clusterv1alpha1.Machine, config *Config) error {
lock.Lock()
defer lock.Unlock()
cluster, err := session.Finder.ClusterComputeResource(ctx, config.Cluster)
if err != nil {
return err
}

machineSetName := machine.Name[:strings.LastIndex(machine.Name, "-")]
vmsInFolder, err := session.Finder.VirtualMachineList(ctx, strings.Join([]string{config.Folder, "*"}, "/"))
if err != nil {
if errors.Is(err, &find.NotFoundError{}) {
return removeVMAntiAffinityRule(ctx, session, config.Cluster, name)
return removeVMAntiAffinityRule(ctx, session, config.Cluster, machineSetName)
}
return err
}

var ruleVMRef []types.ManagedObjectReference
for _, vm := range vmsInFolder {
if strings.HasPrefix(vm.Name(), name) {
// Only add VMs with the same machineSetName to the rule and exclude the machine itself if it is being deleted
if strings.HasPrefix(vm.Name(), machineSetName) && !(vm.Name() == machine.Name && machine.DeletionTimestamp != nil) {
ruleVMRef = append(ruleVMRef, vm.Reference())
}
}

// minimum of two vms required
if len(ruleVMRef) < 2 {
return removeVMAntiAffinityRule(ctx, session, config.Cluster, name)
if len(ruleVMRef) == 0 {
log.Debugf("No VMs in folder %s with name prefix %s found", config.Folder, machineSetName)
return removeVMAntiAffinityRule(ctx, session, config.Cluster, machineSetName)
} else if len(ruleVMRef) < 2 {
// DRS rule must have at least two virtual machine members
log.Debugf("Not enough VMs in folder %s to create anti-affinity rule", config.Folder)
return nil
}

info, err := findClusterAntiAffinityRuleByName(ctx, cluster, name)
info, err := findClusterAntiAffinityRuleByName(ctx, cluster, machineSetName)
if err != nil {
return err
}

log.Debugf("Creating or updating anti-affinity rule for VMs %v in cluster %s", ruleVMRef, config.Cluster)
operation := types.ArrayUpdateOperationEdit

//create new rule
if info == nil {
info = &types.ClusterAntiAffinityRuleSpec{
ClusterRuleInfo: types.ClusterRuleInfo{
Enabled: ptr.Bool(true),
Mandatory: ptr.Bool(false),
Name: name,
UserCreated: ptr.Bool(true),
Enabled: ptr.To(true),
Mandatory: ptr.To(false),
Name: machineSetName,
UserCreated: ptr.To(true),
},
}
operation = types.ArrayUpdateOperationAdd
Expand All @@ -95,49 +106,22 @@ func (p *provider) createOrUpdateVMAntiAffinityRule(ctx context.Context, session
},
}

log.Debugf("Performing %q for anti-affinity rule for VMs %v in cluster %s", operation, ruleVMRef, config.Cluster)
task, err := cluster.Reconfigure(ctx, spec, true)
if err != nil {
return err
}

err = task.WaitEx(ctx)
taskResult, err := task.WaitForResultEx(ctx)
if err != nil {
return err
return fmt.Errorf("error waiting for cluster %v reconfiguration to complete", cluster.Name())
}

return waitForRule(ctx, cluster, info)
}

// waitForRule checks periodically the vsphere api for the ClusterAntiAffinityRule and returns error if the rule was not found after a timeout.
func waitForRule(ctx context.Context, cluster *object.ClusterComputeResource, rule *types.ClusterAntiAffinityRuleSpec) error {
timeout := time.NewTimer(10 * time.Second)
ticker := time.NewTicker(500 * time.Millisecond)
defer timeout.Stop()
defer ticker.Stop()

for {
select {
case <-timeout.C:

info, err := findClusterAntiAffinityRuleByName(ctx, cluster, rule.Name)
if err != nil {
return err
}

if !reflect.DeepEqual(rule, info) {
return fmt.Errorf("expected anti affinity changes not found in vsphere")
}
case <-ticker.C:
info, err := findClusterAntiAffinityRuleByName(ctx, cluster, rule.Name)
if err != nil {
return err
}

if reflect.DeepEqual(rule, info) {
return nil
}
}
if taskResult.State != types.TaskInfoStateSuccess {
return fmt.Errorf("cluster %v reconfiguration task was not successful", cluster.Name())
}
log.Debugf("Successfully created/updated anti-affinity rule for machineset %v against machine %v", machineSetName, machine.Name)

return nil
}

// removeVMAntiAffinityRule removes an anti affinity rule with the name in the given cluster.
Expand Down Expand Up @@ -172,7 +156,15 @@ func removeVMAntiAffinityRule(ctx context.Context, session *Session, clusterPath
if err != nil {
return err
}
return task.WaitEx(ctx)

taskResult, err := task.WaitForResultEx(ctx)
if err != nil {
return fmt.Errorf("error waiting for cluster %v reconfiguration to complete", cluster.Name())
}
if taskResult.State != types.TaskInfoStateSuccess {
return fmt.Errorf("cluster %v reconfiguration task was not successful", cluster.Name())
}
return nil
}

func findClusterAntiAffinityRuleByName(ctx context.Context, cluster *object.ClusterComputeResource, name string) (*types.ClusterAntiAffinityRuleSpec, error) {
Expand Down
18 changes: 18 additions & 0 deletions test/e2e/provisioning/all_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
LinodeManifest = "./testdata/machinedeployment-linode.yaml"
VMwareCloudDirectorManifest = "./testdata/machinedeployment-vmware-cloud-director.yaml"
VSPhereManifest = "./testdata/machinedeployment-vsphere.yaml"
VSPhereAntiAffinityManifest = "./testdata/machinedeployment-vsphere-anti-affinity.yaml"
VSPhereMultipleNICManifest = "./testdata/machinedeployment-vsphere-multiple-nic.yaml"
VSPhereDSCManifest = "./testdata/machinedeployment-vsphere-datastore-cluster.yaml"
VSPhereResourcePoolManifest = "./testdata/machinedeployment-vsphere-resource-pool.yaml"
Expand Down Expand Up @@ -862,6 +863,23 @@ func TestVsphereMultipleNICProvisioningE2E(t *testing.T) {
runScenarios(context.Background(), t, selector, params, VSPhereMultipleNICManifest, fmt.Sprintf("vs-%s", *testRunIdentifier))
}

// TestVsphereAntiAffinityProvisioningE2E - is the same as the TestVsphereProvisioning suit but has anti-affinity rules applied to the VMs.
func TestVsphereAntiAffinityProvisioningE2E(t *testing.T) {
t.Parallel()

params := getVSphereTestParams(t)

scenario := scenario{
name: "VSphere Anti-Affinity provisioning",
osName: "ubuntu",
containerRuntime: defaultContainerRuntime,
kubernetesVersion: defaultKubernetesVersion,
executor: verifyCreateAndDelete,
}

testScenario(context.Background(), t, scenario, *testRunIdentifier, params, VSPhereAntiAffinityManifest, false)
}

// TestVsphereDatastoreClusterProvisioning - is the same as the TestVsphereProvisioning suite but specifies a DatastoreCluster
// instead of the Datastore in the provider specs.
func TestVsphereDatastoreClusterProvisioningE2E(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
apiVersion: "cluster.k8s.io/v1alpha1"
kind: MachineDeployment
metadata:
name: << MACHINE_NAME >>
namespace: kube-system
annotations:
k8c.io/operating-system-profile: osp-<< OS_NAME >>
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
name: << MACHINE_NAME >>
template:
metadata:
labels:
name: << MACHINE_NAME >>
spec:
providerSpec:
value:
sshPublicKeys:
- "<< YOUR_PUBLIC_KEY >>"
cloudProvider: "vsphere"
cloudProviderSpec:
templateVMName: '<< OS_Image_Template >>'
username: '<< VSPHERE_USERNAME >>'
vsphereURL: '<< VSPHERE_ADDRESS >>'
datacenter: 'Hamburg'
folder: '/Hamburg/vm/Kubermatic-ci'
password: << VSPHERE_PASSWORD >>
# example: 'https://your-vcenter:8443'. '/sdk' gets appended automatically
cluster: Kubermatic
vmAntiAffinity: true
datastore: vsan
cpus: 2
MemoryMB: 4096
diskSizeGB: << DISK_SIZE >>
allowInsecure: true
operatingSystem: "<< OS_NAME >>"
operatingSystemSpec:
distUpgradeOnBoot: false
disableAutoUpdate: true
attachSubscription: false
# 'rhelSubscriptionManagerUser' is only used for rhel os and can be set via env var `RHEL_SUBSCRIPTION_MANAGER_USER`
rhelSubscriptionManagerUser: "<< RHEL_SUBSCRIPTION_MANAGER_USER >>"
# 'rhelSubscriptionManagerPassword' is only used for rhel os and can be set via env var `RHEL_SUBSCRIPTION_MANAGER_PASSWORD`
rhelSubscriptionManagerPassword: "<< RHEL_SUBSCRIPTION_MANAGER_PASSWORD >>"
rhsmOfflineToken: "<< REDHAT_SUBSCRIPTIONS_OFFLINE_TOKEN >>"
versions:
kubelet: "<< KUBERNETES_VERSION >>"

0 comments on commit c186e92

Please sign in to comment.