Skip to content

Commit

Permalink
Merge pull request #7124 from tstromberg/less-kubeadm
Browse files Browse the repository at this point in the history
Skip kubeadm if cluster is running & properly configured
  • Loading branch information
tstromberg authored Mar 21, 2020
2 parents 8c75d16 + 474b69c commit 5a5f869
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 64 deletions.
5 changes: 5 additions & 0 deletions pkg/minikube/bootstrapper/bsutil/binaries.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func TransferBinaries(cfg config.KubernetesConfig, c command.Runner) error {
return err
}

// stop kubelet to avoid "Text File Busy" error
if _, err := c.RunCmd(exec.Command("/bin/bash", "-c", "pgrep kubelet && sudo systemctl stop kubelet")); err != nil {
glog.Warningf("unable to stop kubelet: %s", err)
}

var g errgroup.Group
for _, name := range constants.KubernetesReleaseBinaries {
name := name
Expand Down
6 changes: 3 additions & 3 deletions pkg/minikube/bootstrapper/bsutil/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ const (
// ConfigFileAssets returns configuration file assets
func ConfigFileAssets(cfg config.KubernetesConfig, kubeadm []byte, kubelet []byte, kubeletSvc []byte, defaultCNIConfig []byte) []assets.CopyableFile {
fs := []assets.CopyableFile{
assets.NewMemoryAssetTarget(kubeadm, KubeadmYamlPath, "0640"),
assets.NewMemoryAssetTarget(kubelet, KubeletSystemdConfFile, "0644"),
assets.NewMemoryAssetTarget(kubeletSvc, KubeletServiceFile, "0644"),
assets.NewMemoryAssetTarget(kubeadm, KubeadmYamlPath+".new", "0640"),
assets.NewMemoryAssetTarget(kubelet, KubeletSystemdConfFile+".new", "0644"),
assets.NewMemoryAssetTarget(kubeletSvc, KubeletServiceFile+".new", "0644"),
}
// Copy the default CNI config (k8s.conf), so that kubelet can successfully
// start a Pod in the case a user hasn't manually installed any CNI plugin
Expand Down
70 changes: 69 additions & 1 deletion pkg/minikube/bootstrapper/bsutil/kverify/kverify.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/docker/machine/libmachine/state"
"github.com/golang/glog"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -78,6 +79,68 @@ func apiServerPID(cr command.Runner) (int, error) {
return strconv.Atoi(s)
}

// ExpectedComponentsRunning returns whether or not all expected components are running
func ExpectedComponentsRunning(cs *kubernetes.Clientset) error {
expected := []string{
"kube-dns", // coredns
"etcd",
"kube-apiserver",
"kube-controller-manager",
"kube-proxy",
"kube-scheduler",
}

found := map[string]bool{}

pods, err := cs.CoreV1().Pods("kube-system").List(meta.ListOptions{})
if err != nil {
return err
}

for _, pod := range pods.Items {
glog.Infof("found pod: %s", podStatusMsg(pod))
if pod.Status.Phase != core.PodRunning {
continue
}
for k, v := range pod.ObjectMeta.Labels {
if k == "component" || k == "k8s-app" {
found[v] = true
}
}
}

missing := []string{}
for _, e := range expected {
if !found[e] {
missing = append(missing, e)
}
}
if len(missing) > 0 {
return fmt.Errorf("missing components: %v", strings.Join(missing, ", "))
}
return nil
}

// podStatusMsg returns a human-readable pod status, for generating debug status
func podStatusMsg(pod core.Pod) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%q [%s] %s", pod.ObjectMeta.GetName(), pod.ObjectMeta.GetUID(), pod.Status.Phase))
for i, c := range pod.Status.Conditions {
if c.Reason != "" {
if i == 0 {
sb.WriteString(": ")
} else {
sb.WriteString(" / ")
}
sb.WriteString(fmt.Sprintf("%s:%s", c.Type, c.Reason))
}
if c.Message != "" {
sb.WriteString(fmt.Sprintf(" (%s)", c.Message))
}
}
return sb.String()
}

// WaitForSystemPods verifies essential pods for running kurnetes is running
func WaitForSystemPods(r cruntime.Manager, bs bootstrapper.Bootstrapper, cr command.Runner, client *kubernetes.Clientset, start time.Time, timeout time.Duration) error {
glog.Info("waiting for kube-system pods to appear ...")
Expand All @@ -99,6 +162,10 @@ func WaitForSystemPods(r cruntime.Manager, bs bootstrapper.Bootstrapper, cr comm
return false, nil
}
glog.Infof("%d kube-system pods found", len(pods.Items))
for _, pod := range pods.Items {
glog.Infof(podStatusMsg(pod))
}

if len(pods.Items) < 2 {
return false, nil
}
Expand Down Expand Up @@ -159,7 +226,7 @@ func APIServerStatus(cr command.Runner, ip net.IP, port int) (state.State, error

pid, err := apiServerPID(cr)
if err != nil {
glog.Warningf("unable to get apiserver pid: %v", err)
glog.Warningf("stopped: unable to get apiserver pid: %v", err)
return state.Stopped, nil
}

Expand Down Expand Up @@ -205,6 +272,7 @@ func apiServerHealthz(ip net.IP, port int) (state.State, error) {
resp, err := client.Get(url)
// Connection refused, usually.
if err != nil {
glog.Infof("stopped: %s: %v", url, err)
return state.Stopped, nil
}
if resp.StatusCode == http.StatusUnauthorized {
Expand Down
155 changes: 95 additions & 60 deletions pkg/minikube/bootstrapper/kubeadm/kubeadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ func (k *Bootstrapper) StartCluster(cfg config.ClusterConfig) error {

}

c = exec.Command("/bin/bash", "-c", fmt.Sprintf("%s init --config %s %s --ignore-preflight-errors=%s", bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), bsutil.KubeadmYamlPath, extraFlags, strings.Join(ignore, ",")))
conf := bsutil.KubeadmYamlPath
c = exec.Command("/bin/bash", "-c", fmt.Sprintf("sudo mv %s.new %s && %s init --config %s %s --ignore-preflight-errors=%s", conf, conf, bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), conf, extraFlags, strings.Join(ignore, ",")))
rr, err := k.c.RunCmd(c)
if err != nil {
return errors.Wrapf(err, "init failed. output: %q", rr.Output())
Expand All @@ -236,6 +237,20 @@ func (k *Bootstrapper) StartCluster(cfg config.ClusterConfig) error {
return nil
}

func (k *Bootstrapper) controlPlaneEndpoint(cfg config.ClusterConfig) (string, int, error) {
cp, err := config.PrimaryControlPlane(&cfg)
if err != nil {
return "", 0, err
}

if driver.IsKIC(cfg.Driver) {
ip := oci.DefaultBindIPV4
port, err := oci.ForwardedPort(cfg.Driver, cfg.Name, cp.Port)
return ip, port, err
}
return cp.IP, cp.Port, nil
}

// client sets and returns a Kubernetes client to use to speak to a kubeadm launched apiserver
func (k *Bootstrapper) client(ip string, port int) (*kubernetes.Clientset, error) {
if k.k8sClient != nil {
Expand All @@ -262,34 +277,28 @@ func (k *Bootstrapper) client(ip string, port int) (*kubernetes.Clientset, error
// WaitForNode blocks until the node appears to be healthy
func (k *Bootstrapper) WaitForNode(cfg config.ClusterConfig, n config.Node, timeout time.Duration) error {
start := time.Now()
out.T(out.Waiting, "Waiting for cluster to come online ...")

if !n.ControlPlane {
glog.Infof("%s is not a control plane, nothing to wait for", n.Name)
return nil
}

cr, err := cruntime.New(cruntime.Config{Type: cfg.KubernetesConfig.ContainerRuntime, Runner: k.c})
if err != nil {
return err
}

if n.ControlPlane {
if err := kverify.WaitForAPIServerProcess(cr, k, k.c, start, timeout); err != nil {
return err
}
if err := kverify.WaitForAPIServerProcess(cr, k, k.c, start, timeout); err != nil {
return err
}

ip := n.IP
port := n.Port
if driver.IsKIC(cfg.Driver) {
ip = oci.DefaultBindIPV4
p, err := oci.ForwardedPort(cfg.Driver, driver.MachineName(cfg, n), port)
if err != nil {
return errors.Wrapf(err, "get host-bind port %d for container %s", port, cfg.Name)
}
port = p
ip, port, err := k.controlPlaneEndpoint(cfg)
if err != nil {
return err
}

if n.ControlPlane {
if err := kverify.WaitForHealthyAPIServer(cr, k, k.c, start, ip, port, timeout); err != nil {
return err
}
if err := kverify.WaitForHealthyAPIServer(cr, k, k.c, start, ip, port, timeout); err != nil {
return err
}

c, err := k.client(ip, port)
Expand All @@ -303,6 +312,31 @@ func (k *Bootstrapper) WaitForNode(cfg config.ClusterConfig, n config.Node, time
return nil
}

// needsReset returns whether or not the cluster needs to be reconfigured
func (k *Bootstrapper) needsReset(conf string, ip string, port int, client *kubernetes.Clientset) bool {
if _, err := k.c.RunCmd(exec.Command("sudo", "diff", "-u", conf, conf+".new")); err != nil {
glog.Infof("needs reset: configs differ")
return true
}

st, err := kverify.APIServerStatus(k.c, net.ParseIP(ip), port)
if err != nil {
glog.Infof("needs reset: apiserver error: %v", err)
return true
}

if st != state.Running {
glog.Infof("needs reset: apiserver in state %s", st)
return true
}

if err := kverify.ExpectedComponentsRunning(client); err != nil {
glog.Infof("needs reset: %v", err)
return true
}
return false
}

// restartCluster restarts the Kubernetes cluster configured by kubeadm
func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
glog.Infof("restartCluster start")
Expand All @@ -328,14 +362,36 @@ func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
glog.Errorf("failed to create compat symlinks: %v", err)
}

ip, port, err := k.controlPlaneEndpoint(cfg)
if err != nil {
return errors.Wrap(err, "control plane")
}

client, err := k.client(ip, port)
if err != nil {
return errors.Wrap(err, "getting k8s client")
}

// If the cluster is running, check if we have any work to do.
conf := bsutil.KubeadmYamlPath
if !k.needsReset(conf, ip, port, client) {
glog.Infof("Taking a shortcut, as the cluster seems to be properly configured")
return nil
}

if _, err := k.c.RunCmd(exec.Command("sudo", "mv", conf+".new", conf)); err != nil {
return errors.Wrap(err, "mv")
}

baseCmd := fmt.Sprintf("%s %s", bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), phase)
cmds := []string{
fmt.Sprintf("%s phase certs all --config %s", baseCmd, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase kubeconfig all --config %s", baseCmd, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase %s all --config %s", baseCmd, controlPlane, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase etcd local --config %s", baseCmd, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase certs all --config %s", baseCmd, conf),
fmt.Sprintf("%s phase kubeconfig all --config %s", baseCmd, conf),
fmt.Sprintf("%s phase %s all --config %s", baseCmd, controlPlane, conf),
fmt.Sprintf("%s phase etcd local --config %s", baseCmd, conf),
}

glog.Infof("resetting cluster from %s", conf)
// Run commands one at a time so that it is easier to root cause failures.
for _, c := range cmds {
rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", c))
Expand All @@ -346,38 +402,19 @@ func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {

cr, err := cruntime.New(cruntime.Config{Type: cfg.KubernetesConfig.ContainerRuntime, Runner: k.c})
if err != nil {
return err
return errors.Wrap(err, "runtime")
}

// We must ensure that the apiserver is healthy before proceeding
if err := kverify.WaitForAPIServerProcess(cr, k, k.c, time.Now(), kconst.DefaultControlPlaneTimeout); err != nil {
return errors.Wrap(err, "apiserver healthz")
}

cp, err := config.PrimaryControlPlane(&cfg)
if err != nil {
return errors.Wrap(err, "getting control plane")
}
ip := cp.IP
port := cp.Port
if driver.IsKIC(cfg.Driver) {
ip = oci.DefaultBindIPV4
port, err = oci.ForwardedPort(cfg.Driver, driver.MachineName(cfg, cp), port)
if err != nil {
return errors.Wrapf(err, "get host-bind port %d for container %s", port, driver.MachineName(cfg, cp))
}
}
client, err := k.client(ip, port)
if err != nil {
return errors.Wrap(err, "getting k8s client")
}

if err := kverify.WaitForSystemPods(cr, k, k.c, client, time.Now(), kconst.DefaultControlPlaneTimeout); err != nil {
return errors.Wrap(err, "system pods")
}

// Explicitly re-enable kubeadm addons (proxy, coredns) so that they will check for IP or configuration changes.
if rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s phase addon all --config %s", baseCmd, bsutil.KubeadmYamlPath))); err != nil {
if rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s phase addon all --config %s", baseCmd, conf))); err != nil {
return errors.Wrapf(err, fmt.Sprintf("addon phase cmd:%q", rr.Command()))
}

Expand Down Expand Up @@ -496,11 +533,6 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru

glog.Infof("kubelet %s config:\n%+v", kubeletCfg, cfg.KubernetesConfig)

// stop kubelet to avoid "Text File Busy" error
if err := stopKubelet(k.c); err != nil {
glog.Warningf("unable to stop kubelet: %s", err)
}

if err := bsutil.TransferBinaries(cfg.KubernetesConfig, k.c); err != nil {
return errors.Wrap(err, "downloading binaries")
}
Expand All @@ -509,25 +541,19 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru
if cfg.KubernetesConfig.EnableDefaultCNI {
cniFile = []byte(defaultCNIConfig)
}

// Install assets into temporary files
files := bsutil.ConfigFileAssets(cfg.KubernetesConfig, kubeadmCfg, kubeletCfg, kubeletService, cniFile)
if err := copyFiles(k.c, files); err != nil {
return err
}

if err := startKubelet(k.c); err != nil {
if err := reloadKubelet(k.c); err != nil {
return err
}
return nil
}

func stopKubelet(runner command.Runner) error {
stopCmd := exec.Command("/bin/bash", "-c", "pgrep kubelet && sudo systemctl stop kubelet")
if rr, err := runner.RunCmd(stopCmd); err != nil {
return errors.Wrapf(err, "command: %q output: %q", rr.Command(), rr.Output())
}
return nil
}

func copyFiles(runner command.Runner, files []assets.CopyableFile) error {
// Combine mkdir request into a single call to reduce load
dirs := []string{}
Expand All @@ -547,8 +573,17 @@ func copyFiles(runner command.Runner, files []assets.CopyableFile) error {
return nil
}

func startKubelet(runner command.Runner) error {
startCmd := exec.Command("/bin/bash", "-c", "sudo systemctl daemon-reload && sudo systemctl start kubelet")
func reloadKubelet(runner command.Runner) error {
svc := bsutil.KubeletServiceFile
conf := bsutil.KubeletSystemdConfFile

checkCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("pgrep kubelet && diff -u %s %s.new && diff -u %s %s.new", svc, svc, conf, conf))
if _, err := runner.RunCmd(checkCmd); err == nil {
glog.Infof("kubelet is already running with the right configs")
return nil
}

startCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("sudo mv %s.new %s && sudo mv %s.new %s && sudo systemctl daemon-reload && sudo systemctl restart kubelet", svc, svc, conf, conf))
if _, err := runner.RunCmd(startCmd); err != nil {
return errors.Wrap(err, "starting kubelet")
}
Expand Down

0 comments on commit 5a5f869

Please sign in to comment.