Skip to content

Commit

Permalink
fix: Graceful shutdown for the API server (#18642)
Browse files Browse the repository at this point in the history
Closes #18642

Implements a graceful shutdown the the API server. Without this, ArgoCD API server will eventually return 502 during rolling update. However, healthcheck would return 503 if the server is terminating.

Signed-off-by: Andrii Korotkov <[email protected]>
Co-authored-by: Leonardo Luz Almeida <[email protected]>
Co-authored-by: Michael Crenshaw <[email protected]>
  • Loading branch information
3 people committed Nov 28, 2024
1 parent 8bce61e commit a8ca06a
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 52 deletions.
3 changes: 3 additions & 0 deletions cmd/argocd-server/commands/argocd_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ func NewCommand() *cobra.Command {
if closer != nil {
closer()
}
if argocd.TerminateRequested() {
break
}
}
},
Example: templates.Examples(`
Expand Down
163 changes: 131 additions & 32 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import (
"net/url"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"reflect"
"regexp"
go_runtime "runtime"
"strings"
gosync "sync"
"sync/atomic"
"syscall"
"time"

// nolint:staticcheck
Expand Down Expand Up @@ -187,17 +190,21 @@ type ArgoCDServer struct {
db db.ArgoDB

// stopCh is the channel which when closed, will shutdown the Argo CD server
stopCh chan struct{}
userStateStorage util_session.UserStateStorage
indexDataInit gosync.Once
indexData []byte
indexDataErr error
staticAssets http.FileSystem
apiFactory api.Factory
secretInformer cache.SharedIndexInformer
configMapInformer cache.SharedIndexInformer
serviceSet *ArgoCDServiceSet
extensionManager *extension.Manager
stopCh chan os.Signal
userStateStorage util_session.UserStateStorage
indexDataInit gosync.Once
indexData []byte
indexDataErr error
staticAssets http.FileSystem
apiFactory api.Factory
secretInformer cache.SharedIndexInformer
configMapInformer cache.SharedIndexInformer
serviceSet *ArgoCDServiceSet
extensionManager *extension.Manager
shutdown func()
terminateRequested atomic.Bool
receivedSignal atomic.Bool
available atomic.Bool
}

type ArgoCDServerOpts struct {
Expand Down Expand Up @@ -329,6 +336,9 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts Applicatio
pg := extension.NewDefaultProjectGetter(projLister, dbInstance)
ug := extension.NewDefaultUserGetter(policyEnf)
em := extension.NewManager(logger, opts.Namespace, sg, ag, pg, enf, ug)
noopShutdown := func() {
log.Error("API Server Shutdown function called but server is not started yet.")
}

a := &ArgoCDServer{
ArgoCDServerOpts: opts,
Expand All @@ -352,6 +362,8 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts Applicatio
secretInformer: secretInformer,
configMapInformer: configMapInformer,
extensionManager: em,
shutdown: noopShutdown,
stopCh: make(chan os.Signal, 1),
}

err = a.logInClusterWarnings()
Expand All @@ -369,6 +381,12 @@ const (
)

func (a *ArgoCDServer) healthCheck(r *http.Request) error {
if a.terminateRequested.Load() {
return errors.New("API Server is terminating and unable to serve requests.")
}
if !a.available.Load() {
return errors.New("API Server is not available. It either hasn't started or is restarting.")
}
if val, ok := r.URL.Query()["full"]; ok && len(val) > 0 && val[0] == "true" {
argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset)
_, err := argoDB.ListClusters(r.Context())
Expand Down Expand Up @@ -601,35 +619,116 @@ func (a *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) {
log.Fatal("Timed out waiting for project cache to sync")
}

a.stopCh = make(chan struct{})
<-a.stopCh
shutdownFunc := func() {
log.Info("API Server shutdown initiated. Shutting down servers...")
a.available.Store(false)
sCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
var wg gosync.WaitGroup

// Shutdown http server
wg.Add(1)
go func() {
defer wg.Done()
err := httpS.Shutdown(sCtx)
if err != nil {
log.Errorf("Error shutting down http server: %s", err)
}
}()

if httpsS != nil {
// Shutdown https server
wg.Add(1)
go func() {
defer wg.Done()
err := httpsS.Shutdown(sCtx)
if err != nil {
log.Errorf("Error shutting down https server: %s", err)
}
}()
}

// Shutdown gRPC server
wg.Add(1)
go func() {
defer wg.Done()
grpcS.GracefulStop()
}()

// Shutdown metrics server
wg.Add(1)
go func() {
defer wg.Done()
err := metricsServ.Shutdown(sCtx)
if err != nil {
log.Errorf("Error shutting down metrics server: %s", err)
}
}()

if tlsm != nil {
// Shutdown tls server
wg.Add(1)
go func() {
defer wg.Done()
tlsm.Close()
}()
}

// Shutdown tcp server
wg.Add(1)
go func() {
defer wg.Done()
tcpm.Close()
}()

c := make(chan struct{})
// This goroutine will wait for all servers to conclude the shutdown
// process
go func() {
defer close(c)
wg.Wait()
}()

select {
case <-c:
log.Info("All servers were gracefully shutdown. Exiting...")
case <-sCtx.Done():
log.Warn("Graceful shutdown timeout. Exiting...")
}
}
a.shutdown = shutdownFunc
signal.Notify(a.stopCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
a.available.Store(true)

select {
case signal := <-a.stopCh:
log.Infof("API Server received signal: %s", signal.String())
a.terminateRequested.Store(true)
a.receivedSignal.Store(true)
a.shutdown()
case <-ctx.Done():
log.Infof("API Server: %s", ctx.Err())
a.terminateRequested.Store(true)
a.shutdown()
}
}

func (a *ArgoCDServer) Initialized() bool {
return a.projInformer.HasSynced() && a.appInformer.HasSynced()
}

// TerminateRequested returns whether a shutdown was initiated by a signal or context cancel
// as opposed to a watch.
func (a *ArgoCDServer) TerminateRequested() bool {
return a.terminateRequested.Load()
}

// checkServeErr checks the error from a .Serve() call to decide if it was a graceful shutdown
func (a *ArgoCDServer) checkServeErr(name string, err error) {
if err != nil {
if a.stopCh == nil {
// a nil stopCh indicates a graceful shutdown
log.Infof("graceful shutdown %s: %v", name, err)
} else {
log.Fatalf("%s: %v", name, err)
}
if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Errorf("Error received from server %s: %v", name, err)
} else {
log.Infof("graceful shutdown %s", name)
}
}

// Shutdown stops the Argo CD server
func (a *ArgoCDServer) Shutdown() {
log.Info("Shut down requested")
stopCh := a.stopCh
a.stopCh = nil
if stopCh != nil {
close(stopCh)
log.Infof("Graceful shutdown of %s initiated", name)
}
}

Expand Down Expand Up @@ -734,7 +833,7 @@ func (a *ArgoCDServer) watchSettings() {
}
}
log.Info("shutting down settings watch")
a.Shutdown()
a.shutdown()
a.settingsMgr.Unsubscribe(updateCh)
close(updateCh)
}
Expand Down
69 changes: 69 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"os"
"path/filepath"
"strings"
gosync "sync"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -419,6 +421,73 @@ func TestCertsAreNotGeneratedInInsecureMode(t *testing.T) {
assert.Nil(t, s.settings.Certificate)
}

func TestGracefulShutdown(t *testing.T) {
port, err := test.GetFreePort()
require.NoError(t, err)
mockRepoClient := &mocks.Clientset{RepoServerServiceClient: &mocks.RepoServerServiceClient{}}
kubeclientset := fake.NewSimpleClientset(test.NewFakeConfigMap(), test.NewFakeSecret())
redis, redisCloser := test.NewInMemoryRedis()
defer redisCloser()
s := NewServer(
context.Background(),
ArgoCDServerOpts{
ListenPort: port,
Namespace: test.FakeArgoCDNamespace,
KubeClientset: kubeclientset,
AppClientset: apps.NewSimpleClientset(),
RepoClientset: mockRepoClient,
RedisClient: redis,
},
ApplicationSetOpts{},
)

projInformerCancel := test.StartInformer(s.projInformer)
defer projInformerCancel()
appInformerCancel := test.StartInformer(s.appInformer)
defer appInformerCancel()
appsetInformerCancel := test.StartInformer(s.appsetInformer)
defer appsetInformerCancel()

lns, err := s.Listen()
require.NoError(t, err)

shutdown := false
runCtx, runCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer runCancel()

err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}})
require.Error(t, err, "API Server is not running. It either hasn't started or is restarting.")

var wg gosync.WaitGroup
wg.Add(1)
go func(shutdown *bool) {
defer wg.Done()
s.Run(runCtx, lns)
*shutdown = true
}(&shutdown)

for {
if s.available.Load() {
err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}})
require.NoError(t, err)
break
}
time.Sleep(10 * time.Millisecond)
}

s.stopCh <- syscall.SIGINT

wg.Wait()

err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}})
require.Error(t, err, "API Server is terminating and unable to serve requests.")

assert.True(t, s.terminateRequested.Load())
assert.True(t, s.receivedSignal.Load())
assert.False(t, s.available.Load())
assert.True(t, shutdown)
}

func TestAuthenticate(t *testing.T) {
type testData struct {
test string
Expand Down
27 changes: 27 additions & 0 deletions test/e2e/aa_graceful_restart_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package e2e

import (
"io"
"testing"

"github.com/stretchr/testify/require"

"github.com/argoproj/argo-cd/v2/test/e2e/fixture"
. "github.com/argoproj/argo-cd/v2/test/e2e/fixture"
)

func TestAPIServerGracefulRestart(t *testing.T) {
EnsureCleanState(t)

resp, err := DoHttpRequest("GET", "/healthz?full=true", "")
require.NoError(t, err)
responseData, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "ok\n", string(responseData))
fixture.RestartAPIServer()
resp, err = DoHttpRequest("GET", "/healthz?full=true", "")
require.NoError(t, err)
responseData, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "ok\n", string(responseData))
}
36 changes: 16 additions & 20 deletions test/e2e/fixture/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,33 +942,29 @@ func RemoveSubmodule() {
// RestartRepoServer performs a restart of the repo server deployment and waits
// until the rollout has completed.
func RestartRepoServer() {
if IsRemote() {
log.Infof("Waiting for repo server to restart")
prefix := os.Getenv("ARGOCD_E2E_NAME_PREFIX")
workload := "argocd-repo-server"
if prefix != "" {
workload = prefix + "-repo-server"
}
FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "restart", "deployment", workload))
FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "status", "deployment", workload))
// wait longer to avoid error on s390x
time.Sleep(10 * time.Second)
log.Infof("Waiting for repo server to restart")
prefix := os.Getenv("ARGOCD_E2E_NAME_PREFIX")
workload := "argocd-repo-server"
if prefix != "" {
workload = prefix + "-repo-server"
}
FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "restart", "deployment", workload))
FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "status", "deployment", workload))
// wait longer to avoid error on s390x
time.Sleep(10 * time.Second)
}

// RestartAPIServer performs a restart of the API server deployemt and waits
// until the rollout has completed.
func RestartAPIServer() {
if IsRemote() {
log.Infof("Waiting for API server to restart")
prefix := os.Getenv("ARGOCD_E2E_NAME_PREFIX")
workload := "argocd-server"
if prefix != "" {
workload = prefix + "-server"
}
FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "restart", "deployment", workload))
FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "status", "deployment", workload))
log.Infof("Waiting for API server to restart")
prefix := os.Getenv("ARGOCD_E2E_NAME_PREFIX")
workload := "argocd-server"
if prefix != "" {
workload = prefix + "-server"
}
FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "restart", "deployment", workload))
FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "status", "deployment", workload))
}

// LocalOrRemotePath selects a path for a given application based on whether
Expand Down

0 comments on commit a8ca06a

Please sign in to comment.