From 2eccd21e98aa453379521fbcfc8bda4ed88dfcdc Mon Sep 17 00:00:00 2001 From: Kaushal M Date: Fri, 22 Feb 2019 14:08:57 +0530 Subject: [PATCH] Remove glusterd2/oldtransaction --- glusterd2/oldtransaction/context.go | 235 ------------------ glusterd2/oldtransaction/context_mock.go | 62 ----- glusterd2/oldtransaction/lock.go | 217 ---------------- glusterd2/oldtransaction/registry.go | 43 ---- glusterd2/oldtransaction/rpc-client.go | 78 ------ glusterd2/oldtransaction/rpc-service.go | 79 ------ glusterd2/oldtransaction/step.go | 180 -------------- .../oldtransaction/transaction-rpc.pb.go | 205 --------------- .../oldtransaction/transaction-rpc.proto | 16 -- glusterd2/oldtransaction/transaction.go | 194 --------------- 10 files changed, 1309 deletions(-) delete mode 100644 glusterd2/oldtransaction/context.go delete mode 100644 glusterd2/oldtransaction/context_mock.go delete mode 100644 glusterd2/oldtransaction/lock.go delete mode 100644 glusterd2/oldtransaction/registry.go delete mode 100644 glusterd2/oldtransaction/rpc-client.go delete mode 100644 glusterd2/oldtransaction/rpc-service.go delete mode 100644 glusterd2/oldtransaction/step.go delete mode 100644 glusterd2/oldtransaction/transaction-rpc.pb.go delete mode 100644 glusterd2/oldtransaction/transaction-rpc.proto delete mode 100644 glusterd2/oldtransaction/transaction.go diff --git a/glusterd2/oldtransaction/context.go b/glusterd2/oldtransaction/context.go deleted file mode 100644 index 2c2f56705..000000000 --- a/glusterd2/oldtransaction/context.go +++ /dev/null @@ -1,235 +0,0 @@ -package oldtransaction - -import ( - "context" - "encoding/json" - "errors" - "reflect" - "time" - - "github.com/gluster/glusterd2/glusterd2/store" - - "github.com/coreos/etcd/clientv3" - "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" -) - -const etcdTxnTimeout = 10 - -// TxnCtx is used to carry contextual information across the lifetime of a transaction -type TxnCtx interface { - // Set attaches the given key with value to the context. It updates value if key exists already. - Set(key string, value interface{}) error - // SetNodeResult is similar to Set but prefixes the key with node UUID specified. - SetNodeResult(peerID uuid.UUID, key string, value interface{}) error - // Get gets the value for the given key. Returns an error if the key is not present - Get(key string, value interface{}) error - // GetNodeResult is similar to Get but prefixes the key with node UUID specified. - GetNodeResult(peerID uuid.UUID, key string, value interface{}) error - // GetTxnReqID gets the reqID string saved in the transaction. - GetTxnReqID() string - // Delete deletes the key and value - Delete(key string) error - // Logger returns the Logrus logger associated with the context - Logger() log.FieldLogger - - // Commit writes all locally cached keys and values into the store using - // a single etcd transaction. This is for internal use by the txn framework - // and hence isn't exported. - Commit() error - - // SyncCache synchronizes the locally cached keys and values from the store - SyncCache() error -} - -// Tctx represents structure for transaction context -type Tctx struct { - config *TxnCtxConfig // this will be marshalled and sent on wire - logger log.FieldLogger - readSet map[string][]byte // cached responses from store - readCacheDirty bool - writeSet map[string]string // to be written to store -} - -// TxnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx -// on receiver's end. -type TxnCtxConfig struct { - LogFields log.Fields - StorePrefix string -} - -func newCtx(config *TxnCtxConfig) *Tctx { - return &Tctx{ - config: config, - logger: log.StandardLogger().WithFields(config.LogFields), - readSet: make(map[string][]byte), - writeSet: make(map[string]string), - readCacheDirty: true, - } -} - -// NewCtx returns a transaction context from given config -func NewCtx(config *TxnCtxConfig) *Tctx { - return newCtx(config) -} - -// Set attaches the given key-value pair to the context. -// If the key exists, the value will be updated. -func (c *Tctx) Set(key string, value interface{}) error { - - b, err := json.Marshal(value) - if err != nil { - c.logger.WithError(err).WithField("key", key).Error("failed to marshal value") - return err - } - - storeKey := c.config.StorePrefix + key - - // Update the read cache to serve future local Get()s for this key from cache - c.readSet[storeKey] = b - - // Update write cache, the contents of which will be committed to store later - c.writeSet[storeKey] = string(b) - - return nil -} - -// SyncCache synchronizes the locally cached keys and values from the store -func (c *Tctx) SyncCache() error { - resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix()) - if err != nil { - return err - } - - for _, kv := range resp.Kvs { - c.readSet[string(kv.Key)] = kv.Value - } - return nil -} - -// Commit writes all locally cached keys and values into the store using -// a single etcd transaction. -func (c *Tctx) Commit() error { - - if len(c.writeSet) == 0 { - return nil - } - - var putOps []clientv3.Op - for key, value := range c.writeSet { - putOps = append(putOps, clientv3.OpPut(key, value)) - } - - ctx, cancel := context.WithTimeout(context.Background(), etcdTxnTimeout*time.Second) - txn, err := store.Txn(ctx). - If(). - Then(putOps...). - Else(). - Commit() - cancel() - - if err != nil || !txn.Succeeded { - msg := "etcd txn to store txn context keys failed" - if err == nil { - // if txn.Succeeded = false - err = errors.New(msg) - } - c.logger.WithError(err).WithField("keys", - reflect.ValueOf(c.writeSet).MapKeys()).Error(msg) - return err - } - - expTxn.Add("txn_ctx_store_commit", 1) - - c.writeSet = make(map[string]string) - c.readCacheDirty = true - - return nil -} - -// SetNodeResult is similar to Set but prefixes the key with the node UUID -// specified. This function can be used by nodes to store results of -// transaction steps. -func (c *Tctx) SetNodeResult(peerID uuid.UUID, key string, value interface{}) error { - storeKey := peerID.String() + "/" + key - return c.Set(storeKey, value) -} - -// Get gets the value for the given key if available. -// Returns error if not found. -func (c *Tctx) Get(key string, value interface{}) error { - - // cache all keys and values from the store on the first call to Get - if c.readCacheDirty { - if err := c.SyncCache(); err != nil { - c.logger.WithError(err).WithField("key", key).Error("failed to get key from transaction context") - return err - } - c.readCacheDirty = false - } - - // return cached key - storeKey := c.config.StorePrefix + key - if data, ok := c.readSet[storeKey]; ok { - if err := json.Unmarshal(data, value); err != nil { - c.logger.WithError(err).WithField("key", storeKey).Error("failed to unmarshall value") - } - } else { - return errors.New("key not found") - } - - return nil -} - -// GetNodeResult is similar to Get but prefixes the key with node UUID -// specified. This function can be used by the transaction initiator node to -// fetch results of transaction step run on remote nodes. -func (c *Tctx) GetNodeResult(peerID uuid.UUID, key string, value interface{}) error { - storeKey := peerID.String() + "/" + key - return c.Get(storeKey, value) -} - -// GetTxnReqID gets the reqID string saved within the txnCtxConfig. -func (c *Tctx) GetTxnReqID() string { - return c.config.LogFields["reqid"].(string) -} - -// Delete deletes the key and attached value -func (c *Tctx) Delete(key string) error { - - storeKey := c.config.StorePrefix + key - - delete(c.readSet, storeKey) - delete(c.writeSet, storeKey) - - // TODO: Optimize this by doing it as part of etcd txn in commit() - if _, err := store.Delete(context.TODO(), storeKey); err != nil { - c.logger.WithError(err).WithField("key", storeKey).Error( - "failed to delete key") - return err - } - expTxn.Add("txn_ctx_store_delete", 1) - return nil -} - -// Logger returns the Logrus logger associated with the context -func (c *Tctx) Logger() log.FieldLogger { - return c.logger -} - -// MarshalJSON implements the json.Marshaler interface -func (c *Tctx) MarshalJSON() ([]byte, error) { - return json.Marshal(c.config) -} - -// UnmarshalJSON implements the json.Unmarshaler interface -func (c *Tctx) UnmarshalJSON(d []byte) error { - - if err := json.Unmarshal(d, &c.config); err != nil { - return err - } - - *c = *(newCtx(c.config)) - - return nil -} diff --git a/glusterd2/oldtransaction/context_mock.go b/glusterd2/oldtransaction/context_mock.go deleted file mode 100644 index ef6162109..000000000 --- a/glusterd2/oldtransaction/context_mock.go +++ /dev/null @@ -1,62 +0,0 @@ -package oldtransaction - -import ( - "errors" - "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" -) - -// MockTctx implements a dummy context type that can be used in tests -type MockTctx struct { - data map[string]interface{} -} - -// NewMockCtx returns a new instance of MockTctx -func NewMockCtx() *MockTctx { - return &MockTctx{ - data: make(map[string]interface{}), - } -} - -// Set attaches the given key with value to the context. It updates value if key exists already. -func (m *MockTctx) Set(key string, value interface{}) error { - m.data[key] = value - return nil -} - -// SetNodeResult is similar to Set but prefixes the key with the node UUID specified. -func (m *MockTctx) SetNodeResult(peerID uuid.UUID, key string, value interface{}) error { - storeKey := peerID.String() + "/" + key - return m.Set(storeKey, value) -} - -// Get gets the value for the given key. Returns an error if the key is not present -func (m *MockTctx) Get(key string, value interface{}) error { - _, ok := m.data[key] - if !ok { - return errors.New("key not present") - } - return nil -} - -// GetNodeResult is similar to Get but prefixes the key with node UUID specified. -func (m *MockTctx) GetNodeResult(peerID uuid.UUID, key string, value interface{}) error { - storeKey := peerID.String() + "/" + key - return m.Get(storeKey, value) -} - -// Delete deletes the key and value -func (m *MockTctx) Delete(key string) error { - delete(m.data, key) - return nil -} - -// Logger returns a dummy logger -func (m *MockTctx) Logger() log.FieldLogger { - return log.New() -} - -// Prefix returns the prefix to be used for storing values -func (m MockTctx) Prefix() string { - return "mock" -} diff --git a/glusterd2/oldtransaction/lock.go b/glusterd2/oldtransaction/lock.go deleted file mode 100644 index 0a3059775..000000000 --- a/glusterd2/oldtransaction/lock.go +++ /dev/null @@ -1,217 +0,0 @@ -package oldtransaction - -import ( - "context" - "errors" - "time" - - "github.com/gluster/glusterd2/glusterd2/gdctx" - "github.com/gluster/glusterd2/glusterd2/store" - - "github.com/coreos/etcd/clientv3/concurrency" - "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" -) - -const ( - lockPrefix = "locks/" - lockObtainTimeout = 5 * time.Second - lockTTL = 10 -) - -var ( - // ErrLockTimeout is the error returned when lock could not be obtained - // and the request timed out - ErrLockTimeout = errors.New("could not obtain lock: another conflicting transaction may be in progress") - // ErrLockExists is returned when a lock already exists within the transaction - ErrLockExists = errors.New("existing lock found for given lock ID") -) - -// createLockStepFunc returns the registry IDs of StepFuncs which lock/unlock the given key. -// If existing StepFuncs are not found, new funcs are created and registered. -func createLockStepFunc(key string) (string, string, error) { - lockFuncID := key + ".Lock" - unlockFuncID := key + ".Unlock" - - _, lockFuncFound := getStepFunc(lockFuncID) - _, unlockFuncFound := getStepFunc(unlockFuncID) - - if lockFuncFound && unlockFuncFound { - return lockFuncID, unlockFuncID, nil - } - - key = lockPrefix + key - locker := concurrency.NewMutex(store.Store.Session, key) - - lockFunc := func(c TxnCtx) error { - - ctx, cancel := context.WithTimeout(context.Background(), lockObtainTimeout) - defer cancel() - - c.Logger().WithField("key", key).Debug("attempting to lock") - err := locker.Lock(ctx) - switch err { - case nil: - c.Logger().WithField("key", key).Debug("lock obtained") - case context.DeadlineExceeded: - // Propagate this all the way back to the client as a HTTP 409 response - c.Logger().WithField("key", key).Debug("timeout: failed to obtain lock") - err = ErrLockTimeout - } - - return err - } - RegisterStepFunc(lockFunc, lockFuncID) - - unlockFunc := func(c TxnCtx) error { - - c.Logger().WithField("key", key).Debug("attempting to unlock") - err := locker.Unlock(context.Background()) - if err == nil { - c.Logger().WithField("key", key).Debug("lock unlocked") - } - - return err - } - RegisterStepFunc(unlockFunc, unlockFuncID) - - return lockFuncID, unlockFuncID, nil -} - -// CreateLockSteps returns a lock and an unlock Step which lock/unlock the given key -// TODO: Remove this function -func CreateLockSteps(key string) (*Step, *Step, error) { - lockFunc, unlockFunc, err := createLockStepFunc(key) - if err != nil { - return nil, nil, err - } - - lockStep := &Step{DoFunc: lockFunc, UndoFunc: unlockFunc, Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false} - unlockStep := &Step{DoFunc: unlockFunc, UndoFunc: "", Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false} - - return lockStep, unlockStep, nil -} - -// LockUnlockFunc is signature of functions used for distributed locking -// and unlocking. -type LockUnlockFunc func(ctx context.Context) error - -// CreateLockFuncs creates and returns functions for distributed lock and -// unlock. This is similar to CreateLockSteps() but returns normal functions. -// TODO: Remove this function -func CreateLockFuncs(key string) (LockUnlockFunc, LockUnlockFunc) { - - key = lockPrefix + key - locker := concurrency.NewMutex(store.Store.Session, key) - - // TODO: There is an opportunity for refactor here to re-use code - // between CreateLockFunc and CreateLockSteps. This variant doesn't - // have registry either. - - lockFunc := func(ctx context.Context) error { - logger := gdctx.GetReqLogger(ctx) - if logger == nil { - logger = log.StandardLogger() - } - - ctx, cancel := context.WithTimeout(ctx, lockObtainTimeout) - defer cancel() - - logger.WithField("key", key).Debug("attempting to lock") - err := locker.Lock(ctx) - switch err { - case nil: - logger.WithField("key", key).Debug("lock obtained") - case context.DeadlineExceeded: - // Propagate this all the way back to the client as a HTTP 409 response - logger.WithField("key", key).Debug("timeout: failed to obtain lock") - err = ErrLockTimeout - } - - return err - } - - unlockFunc := func(ctx context.Context) error { - logger := gdctx.GetReqLogger(ctx) - if logger == nil { - logger = log.StandardLogger() - } - - logger.WithField("key", key).Debug("attempting to unlock") - if err := locker.Unlock(context.Background()); err != nil { - logger.WithField("key", key).WithError(err).Error("unlock failed") - return err - } - - logger.WithField("key", key).Debug("lock unlocked") - return nil - } - - return lockFunc, unlockFunc -} - -// Locks are the collection of cluster wide transaction lock -type Locks map[string]*concurrency.Mutex - -func (l Locks) lock(lockID string) error { - var logger = log.WithField("lockID", lockID) - - // Ensure that no prior lock exists for the given lockID in this transaction - if _, ok := l[lockID]; ok { - return ErrLockExists - } - - logger.Debug("attempting to obtain lock") - - key := lockPrefix + lockID - s, err := concurrency.NewSession(store.Store.NamespaceClient, concurrency.WithTTL(lockTTL)) - if err != nil { - return err - } - - locker := concurrency.NewMutex(s, key) - - ctx, cancel := context.WithTimeout(store.Store.Ctx(), lockObtainTimeout) - defer cancel() - - err = locker.Lock(ctx) - switch err { - case nil: - logger.Debug("lock obtained") - // Attach lock to the transaction - l[lockID] = locker - - case context.DeadlineExceeded: - logger.Debug("timeout: failed to obtain lock") - // Propagate this all the way back to the client as a HTTP 409 response - err = ErrLockTimeout - - default: - logger.WithError(err).Error("failed to obtain lock") - } - - return err -} - -// Lock obtains a cluster wide transaction lock on the given lockID/lockIDs, -// and attaches the obtained locks to the transaction -func (l Locks) Lock(lockID string, lockIDs ...string) error { - if err := l.lock(lockID); err != nil { - return err - } - for _, id := range lockIDs { - if err := l.lock(id); err != nil { - return err - } - } - return nil -} - -// UnLock releases all cluster wide obtained locks -func (l Locks) UnLock(ctx context.Context) { - for lockID, locker := range l { - if err := locker.Unlock(ctx); err == nil { - delete(l, lockID) - } - } -} diff --git a/glusterd2/oldtransaction/registry.go b/glusterd2/oldtransaction/registry.go deleted file mode 100644 index 589829d74..000000000 --- a/glusterd2/oldtransaction/registry.go +++ /dev/null @@ -1,43 +0,0 @@ -package oldtransaction - -// The StepFunc registry registers StepFunc's to be used by transaction framework - -import ( - "sync" - - log "github.com/sirupsen/logrus" -) - -var sfRegistry = struct { - sync.RWMutex - sfMap map[string]StepFunc -}{} - -func registerStepFunc(s StepFunc, name string) { - if sfRegistry.sfMap == nil { - sfRegistry.sfMap = make(map[string]StepFunc) - } - - if _, ok := sfRegistry.sfMap[name]; ok { - log.WithField("stepname", name).Warning("step with provided name exists in registry and will be overwritten") - } - - sfRegistry.sfMap[name] = s -} - -//RegisterStepFunc registers the given StepFunc in the registry -func RegisterStepFunc(s StepFunc, name string) { - sfRegistry.Lock() - defer sfRegistry.Unlock() - - registerStepFunc(s, name) -} - -//getStepFunc returns named step if found. -func getStepFunc(name string) (StepFunc, bool) { - sfRegistry.RLock() - defer sfRegistry.RUnlock() - - s, ok := sfRegistry.sfMap[name] - return s, ok -} diff --git a/glusterd2/oldtransaction/rpc-client.go b/glusterd2/oldtransaction/rpc-client.go deleted file mode 100644 index e10832c93..000000000 --- a/glusterd2/oldtransaction/rpc-client.go +++ /dev/null @@ -1,78 +0,0 @@ -package oldtransaction - -import ( - "context" - "encoding/json" - "errors" - - "github.com/gluster/glusterd2/glusterd2/peer" - "github.com/gluster/glusterd2/pkg/utils" - - "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" - "go.opencensus.io/plugin/ocgrpc" - "google.golang.org/grpc" -) - -// runStepOn will run the step on the specified node -func runStepOn(origCtx context.Context, step string, node uuid.UUID, c TxnCtx) error { - // TODO: I'm creating connections on demand. This should be changed so that - // we have long term connections. - p, err := peer.GetPeerF(node.String()) - if err != nil { - c.Logger().WithError(err).WithField("peerid", node.String()).Error("peer not found") - return err - } - - logger := c.Logger().WithField("remotepeer", p.ID.String()+"("+p.Name+")") - - var conn *grpc.ClientConn - - remote, err := utils.FormRemotePeerAddress(p.PeerAddresses[0]) - if err != nil { - return err - } - - conn, err = grpc.Dial(remote, - grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), - grpc.WithInsecure(), - ) - if err == nil && conn != nil { - logger.WithFields(log.Fields{ - "remote": remote, - }).Debug("connected to remote") - } - - if conn == nil { - logger.WithError(err).WithField("remote", p.PeerAddresses[0]).Error("failed to grpc.Dial remote") - return err - } - defer conn.Close() - - client := NewTxnSvcClient(conn) - - req := &TxnStepReq{ - StepFunc: step, - } - data, err := json.Marshal(c) - if err != nil { - logger.WithError(err).Error("failed to JSON marshal transaction context") - return err - } - req.Context = data - - var rsp *TxnStepResp - - rsp, err = client.RunStep(origCtx, req) - if err != nil { - logger.WithError(err).WithField("rpc", "TxnSvc.RunStep").Error("failed RPC call") - return err - } - - if rsp.Error != "" { - logger.WithError(errors.New(rsp.Error)).Error("TxnSvc.Runstep failed on peer") - return errors.New(rsp.Error) - } - - return nil -} diff --git a/glusterd2/oldtransaction/rpc-service.go b/glusterd2/oldtransaction/rpc-service.go deleted file mode 100644 index 0a93accbb..000000000 --- a/glusterd2/oldtransaction/rpc-service.go +++ /dev/null @@ -1,79 +0,0 @@ -package oldtransaction - -import ( - "context" - "encoding/json" - "errors" - - "github.com/gluster/glusterd2/glusterd2/servers/peerrpc" - - log "github.com/sirupsen/logrus" - "go.opencensus.io/trace" - "google.golang.org/grpc" -) - -type txnSvc int - -func init() { - peerrpc.Register(new(txnSvc)) -} - -// RunStep handles the incoming request. It executes the requested step and returns the results -func (p *txnSvc) RunStep(rpcCtx context.Context, req *TxnStepReq) (*TxnStepResp, error) { - - var ( - resp TxnStepResp - f StepFunc - err error - ok bool - logger log.FieldLogger - ) - - var ctx Tctx - if err = json.Unmarshal(req.Context, &ctx); err != nil { - log.WithError(err).Error("failed to Unmarshal transaction context") - goto End - } - - logger = ctx.Logger().WithField("stepfunc", req.StepFunc) - logger.Debug("RunStep request received") - - if rpcCtx != nil { - _, span := trace.StartSpan(rpcCtx, req.StepFunc) - reqID := ctx.GetTxnReqID() - span.AddAttributes( - trace.StringAttribute("reqID", reqID), - ) - defer span.End() - } - - f, ok = getStepFunc(req.StepFunc) - if !ok { - err = errors.New("step function not found in registry") - goto End - } - - logger.Debug("executing step function") - if err = f(&ctx); err != nil { - logger.WithError(err).Error("step function failed") - goto End - } - - if err = ctx.Commit(); err != nil { - logger.WithError(err).Error("failed to commit txn context to store") - } - -End: - // Ensure RPC will always send a success reply. Error is stored in - // body of response. - if err != nil { - resp.Error = err.Error() - } - - return &resp, nil -} - -// RegisterService registers txnSvc with the given grpc.Server -func (p *txnSvc) RegisterService(s *grpc.Server) { - RegisterTxnSvcServer(s, p) -} diff --git a/glusterd2/oldtransaction/step.go b/glusterd2/oldtransaction/step.go deleted file mode 100644 index 53b71a0f1..000000000 --- a/glusterd2/oldtransaction/step.go +++ /dev/null @@ -1,180 +0,0 @@ -package oldtransaction - -import ( - "context" - "errors" - "fmt" - "net/http" - - "github.com/gluster/glusterd2/glusterd2/gdctx" - "github.com/gluster/glusterd2/pkg/api" - - "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" - "go.opencensus.io/trace" -) - -// StepFunc is the function that is supposed to be run during a transaction step -type StepFunc func(TxnCtx) error - -// Step is a combination of a StepFunc and a list of nodes the step is supposed to be run on -// -// DoFunc and UndoFunc are names of StepFuncs registered in the registry -// DoFunc performs does the action -// UndoFunc undoes anything done by DoFunc -type Step struct { - DoFunc string - UndoFunc string - Nodes []uuid.UUID - Skip bool - Sync bool -} - -var ( - // ErrStepFuncNotFound is returned if the stepfunc isn't found. - ErrStepFuncNotFound = errors.New("stepFunc was not found") -) - -// do runs the DoFunc on the nodes -func (s *Step) do(origCtx context.Context, ctx TxnCtx) error { - return runStepFuncOnNodes(origCtx, s.DoFunc, ctx, s.Nodes) -} - -// undo runs the UndoFunc on the nodes -func (s *Step) undo(ctx TxnCtx) error { - if s.UndoFunc != "" { - return runStepFuncOnNodes(context.TODO(), s.UndoFunc, ctx, s.Nodes) - } - return nil -} - -// stepPeerResp is response from a single peer that runs a step -type stepPeerResp struct { - PeerID uuid.UUID - Error error -} - -// stepResp contains response from multiple peers that run a step and the type -// implements the `api.ErrorResponse` interface -type stepResp struct { - Step string - Resps []stepPeerResp - errCount int -} - -func (r stepResp) Error() string { - return fmt.Sprintf("Step %s failed on %d nodes", r.Step, r.errCount) -} - -func (r stepResp) Response() api.ErrorResp { - - var apiResp api.ErrorResp - for _, resp := range r.Resps { - if resp.Error == nil { - continue - } - - apiResp.Errors = append(apiResp.Errors, api.HTTPError{ - Code: int(api.ErrTxnStepFailed), - Message: api.ErrorCodeMap[api.ErrTxnStepFailed], - Fields: map[string]string{ - "peer-id": resp.PeerID.String(), - "step": r.Step, - "error": resp.Error.Error()}, - }) - } - - return apiResp -} - -func (r stepResp) Status() int { - return http.StatusInternalServerError -} - -func runStepFuncOnNodes(origCtx context.Context, stepName string, ctx TxnCtx, nodes []uuid.UUID) error { - - respCh := make(chan stepPeerResp, len(nodes)) - defer close(respCh) - - for _, node := range nodes { - go runStepFuncOnNode(origCtx, stepName, ctx, node, respCh) - } - - // Ideally, we have to cancel the pending go-routines on first error - // response received from any of the nodes. But that's really tricky - // to do. Serializing sequentially is the easiest fix but we lose - // concurrency. Instead, we let the do() function run on all nodes. - - resp := stepResp{ - Step: stepName, - Resps: make([]stepPeerResp, len(nodes)), - } - - var peerResp stepPeerResp - for range nodes { - peerResp = <-respCh - if peerResp.Error != nil { - resp.errCount++ - ctx.Logger().WithError(peerResp.Error).WithFields(log.Fields{ - "step": stepName, "node": peerResp.PeerID, - }).Error("Step failed on node.") - } - resp.Resps = append(resp.Resps, peerResp) - } - - if resp.errCount != 0 { - return resp - } - - return nil -} - -func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, node uuid.UUID, respCh chan<- stepPeerResp) { - - ctx.Logger().WithFields(log.Fields{ - "step": stepName, "node": node, - }).Debug("Running step on node.") - - var err error - if uuid.Equal(node, gdctx.MyUUID) { - err = traceStep(RunStepFuncLocally)(origCtx, stepName, ctx) - } else { - // remote node - err = runStepOn(origCtx, stepName, node, ctx) - } - - respCh <- stepPeerResp{node, err} -} - -type runFunc func(origCtx context.Context, stepName string, ctx TxnCtx) error - -func traceStep(f runFunc) runFunc { - return func(origCtx context.Context, stepName string, ctx TxnCtx) error { - if origCtx != nil { - _, span := trace.StartSpan(origCtx, stepName) - reqID := ctx.GetTxnReqID() - span.AddAttributes( - trace.StringAttribute("reqID", reqID), - ) - defer span.End() - } - - return f(origCtx, stepName, ctx) - } -} - -// RunStepFuncLocally runs a step func on local node -func RunStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error { - stepFunc, ok := getStepFunc(stepName) - if !ok { - return ErrStepFuncNotFound - } - - if err := stepFunc(ctx); err != nil { - return err - } - - // if step function executes successfully, commit the - // results to the store - return ctx.Commit() -} diff --git a/glusterd2/oldtransaction/transaction-rpc.pb.go b/glusterd2/oldtransaction/transaction-rpc.pb.go deleted file mode 100644 index a7aae426c..000000000 --- a/glusterd2/oldtransaction/transaction-rpc.pb.go +++ /dev/null @@ -1,205 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: glusterd2/transaction/transaction-rpc.proto - -package oldtransaction - -import ( - fmt "fmt" - proto "github.com/golang/protobuf/proto" - context "golang.org/x/net/context" - grpc "google.golang.org/grpc" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - -type TxnStepReq struct { - StepFunc string `protobuf:"bytes,1,opt,name=StepFunc,proto3" json:"StepFunc,omitempty"` - Context []byte `protobuf:"bytes,2,opt,name=Context,proto3" json:"Context,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *TxnStepReq) Reset() { *m = TxnStepReq{} } -func (m *TxnStepReq) String() string { return proto.CompactTextString(m) } -func (*TxnStepReq) ProtoMessage() {} -func (*TxnStepReq) Descriptor() ([]byte, []int) { - return fileDescriptor_8c3c13f7a3182c1a, []int{0} -} - -func (m *TxnStepReq) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_TxnStepReq.Unmarshal(m, b) -} -func (m *TxnStepReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_TxnStepReq.Marshal(b, m, deterministic) -} -func (m *TxnStepReq) XXX_Merge(src proto.Message) { - xxx_messageInfo_TxnStepReq.Merge(m, src) -} -func (m *TxnStepReq) XXX_Size() int { - return xxx_messageInfo_TxnStepReq.Size(m) -} -func (m *TxnStepReq) XXX_DiscardUnknown() { - xxx_messageInfo_TxnStepReq.DiscardUnknown(m) -} - -var xxx_messageInfo_TxnStepReq proto.InternalMessageInfo - -func (m *TxnStepReq) GetStepFunc() string { - if m != nil { - return m.StepFunc - } - return "" -} - -func (m *TxnStepReq) GetContext() []byte { - if m != nil { - return m.Context - } - return nil -} - -type TxnStepResp struct { - Error string `protobuf:"bytes,1,opt,name=Error,proto3" json:"Error,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *TxnStepResp) Reset() { *m = TxnStepResp{} } -func (m *TxnStepResp) String() string { return proto.CompactTextString(m) } -func (*TxnStepResp) ProtoMessage() {} -func (*TxnStepResp) Descriptor() ([]byte, []int) { - return fileDescriptor_8c3c13f7a3182c1a, []int{1} -} - -func (m *TxnStepResp) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_TxnStepResp.Unmarshal(m, b) -} -func (m *TxnStepResp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_TxnStepResp.Marshal(b, m, deterministic) -} -func (m *TxnStepResp) XXX_Merge(src proto.Message) { - xxx_messageInfo_TxnStepResp.Merge(m, src) -} -func (m *TxnStepResp) XXX_Size() int { - return xxx_messageInfo_TxnStepResp.Size(m) -} -func (m *TxnStepResp) XXX_DiscardUnknown() { - xxx_messageInfo_TxnStepResp.DiscardUnknown(m) -} - -var xxx_messageInfo_TxnStepResp proto.InternalMessageInfo - -func (m *TxnStepResp) GetError() string { - if m != nil { - return m.Error - } - return "" -} - -func init() { - proto.RegisterType((*TxnStepReq)(nil), "transaction.TxnStepReq") - proto.RegisterType((*TxnStepResp)(nil), "transaction.TxnStepResp") -} - -func init() { - proto.RegisterFile("glusterd2/transaction/transaction-rpc.proto", fileDescriptor_8c3c13f7a3182c1a) -} - -var fileDescriptor_8c3c13f7a3182c1a = []byte{ - // 174 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x4e, 0xcf, 0x29, 0x2d, - 0x2e, 0x49, 0x2d, 0x4a, 0x31, 0xd2, 0x2f, 0x29, 0x4a, 0xcc, 0x2b, 0x4e, 0x4c, 0x2e, 0xc9, 0xcc, - 0xcf, 0x43, 0x66, 0xeb, 0x16, 0x15, 0x24, 0xeb, 0x15, 0x14, 0xe5, 0x97, 0xe4, 0x0b, 0x71, 0x23, - 0x09, 0x2b, 0x39, 0x71, 0x71, 0x85, 0x54, 0xe4, 0x05, 0x97, 0xa4, 0x16, 0x04, 0xa5, 0x16, 0x0a, - 0x49, 0x71, 0x71, 0x80, 0x98, 0x6e, 0xa5, 0x79, 0xc9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, - 0x70, 0xbe, 0x90, 0x04, 0x17, 0xbb, 0x73, 0x7e, 0x5e, 0x49, 0x6a, 0x45, 0x89, 0x04, 0x93, 0x02, - 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0xa4, 0xcc, 0xc5, 0x0d, 0x37, 0xa3, 0xb8, 0x40, 0x48, 0x84, - 0x8b, 0xd5, 0xb5, 0xa8, 0x28, 0xbf, 0x08, 0x6a, 0x02, 0x84, 0x63, 0xe4, 0xc1, 0xc5, 0x06, 0x52, - 0x54, 0x96, 0x2c, 0x64, 0xc7, 0xc5, 0x1e, 0x54, 0x0a, 0x56, 0x2e, 0x24, 0xae, 0x87, 0xe4, 0x16, - 0x3d, 0x84, 0x43, 0xa4, 0x24, 0xb0, 0x4b, 0x14, 0x17, 0x28, 0x31, 0x24, 0xb1, 0x81, 0xbd, 0x61, - 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x69, 0x42, 0xfa, 0xe6, 0xf5, 0x00, 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// TxnSvcClient is the client API for TxnSvc service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type TxnSvcClient interface { - RunStep(ctx context.Context, in *TxnStepReq, opts ...grpc.CallOption) (*TxnStepResp, error) -} - -type txnSvcClient struct { - cc *grpc.ClientConn -} - -func NewTxnSvcClient(cc *grpc.ClientConn) TxnSvcClient { - return &txnSvcClient{cc} -} - -func (c *txnSvcClient) RunStep(ctx context.Context, in *TxnStepReq, opts ...grpc.CallOption) (*TxnStepResp, error) { - out := new(TxnStepResp) - err := c.cc.Invoke(ctx, "/transaction.TxnSvc/RunStep", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// TxnSvcServer is the server API for TxnSvc service. -type TxnSvcServer interface { - RunStep(context.Context, *TxnStepReq) (*TxnStepResp, error) -} - -func RegisterTxnSvcServer(s *grpc.Server, srv TxnSvcServer) { - s.RegisterService(&_TxnSvc_serviceDesc, srv) -} - -func _TxnSvc_RunStep_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(TxnStepReq) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TxnSvcServer).RunStep(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/transaction.TxnSvc/RunStep", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TxnSvcServer).RunStep(ctx, req.(*TxnStepReq)) - } - return interceptor(ctx, in, info, handler) -} - -var _TxnSvc_serviceDesc = grpc.ServiceDesc{ - ServiceName: "transaction.TxnSvc", - HandlerType: (*TxnSvcServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "RunStep", - Handler: _TxnSvc_RunStep_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "glusterd2/transaction/transaction-rpc.proto", -} diff --git a/glusterd2/oldtransaction/transaction-rpc.proto b/glusterd2/oldtransaction/transaction-rpc.proto deleted file mode 100644 index c84fafff2..000000000 --- a/glusterd2/oldtransaction/transaction-rpc.proto +++ /dev/null @@ -1,16 +0,0 @@ -syntax = "proto3"; - -package transaction; - -message TxnStepReq { - string StepFunc = 1; - bytes Context = 2; // Context JSON encoded TxnCtx -} - -message TxnStepResp { - string Error = 1; -} - -service TxnSvc { - rpc RunStep(TxnStepReq) returns(TxnStepResp) {} -} diff --git a/glusterd2/oldtransaction/transaction.go b/glusterd2/oldtransaction/transaction.go deleted file mode 100644 index d29a90086..000000000 --- a/glusterd2/oldtransaction/transaction.go +++ /dev/null @@ -1,194 +0,0 @@ -// Package transaction implements a distributed transaction handling framework -package oldtransaction - -import ( - "context" - "expvar" - "fmt" - - "github.com/gluster/glusterd2/glusterd2/gdctx" - "github.com/gluster/glusterd2/glusterd2/store" - - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/clientv3/concurrency" - "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" -) - -const ( - txnPrefix = "transaction/" -) - -var expTxn = expvar.NewMap("txn") - -// Txn is a set of steps -type Txn struct { - id uuid.UUID - locks Locks - reqID uuid.UUID - storePrefix string - - Ctx TxnCtx - Steps []*Step - DontCheckAlive bool - DisableRollback bool - // Nodes is the union of the all the TxnStep.Nodes and is implicitly - // set in Txn.Do(). This list is used to determine liveness of the - // nodes before running the transaction steps. - Nodes []uuid.UUID - OrigCtx context.Context -} - -// NewTxn returns an initialized Txn without any steps -func NewTxn(ctx context.Context) *Txn { - t := new(Txn) - - t.id = uuid.NewRandom() - t.reqID = gdctx.GetReqID(ctx) - t.locks = make(map[string]*concurrency.Mutex) - t.storePrefix = txnPrefix + t.id.String() + "/" - config := &TxnCtxConfig{ - LogFields: log.Fields{ - "txnid": t.id.String(), - "reqid": t.reqID.String(), - }, - StorePrefix: t.storePrefix, - } - t.Ctx = newCtx(config) - - t.OrigCtx = ctx - t.Ctx.Logger().Debug("new transaction created") - return t -} - -// NewTxnWithLocks returns an empty Txn with locks obtained on given lockIDs -func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) { - t := NewTxn(ctx) - - for _, id := range lockIDs { - logger := t.Ctx.Logger().WithField("lockID", id) - logger.Debug("attempting to obtain lock") - - if err := t.locks.Lock(id); err != nil { - logger.WithError(err).Error("failed to obtain lock") - t.Done() - return nil, err - } - - logger.Debug("lock obtained") - } - - return t, nil -} - -// Done releases any obtained locks and cleans up the transaction namespace -// Done must be called after a transaction ends -func (t *Txn) Done() { - // Release obtained locks - for _, locker := range t.locks { - locker.Unlock(context.Background()) - } - - // Wipe txn namespace - if _, err := store.Delete(context.TODO(), t.storePrefix, clientv3.WithPrefix()); err != nil { - t.Ctx.Logger().WithError(err).WithField("key", - t.storePrefix).Error("Failed to remove transaction namespace from store") - } - - expTxn.Add("initiated_txn_in_progress", -1) -} - -func (t *Txn) checkAlive() error { - - if len(t.Nodes) == 0 { - for _, s := range t.Steps { - t.Nodes = append(t.Nodes, s.Nodes...) - } - } - t.Nodes = nodesUnion(t.Nodes) - - for _, node := range t.Nodes { - // TODO: Using prefixed query, get all alive nodes in a single etcd query - if _, online := store.Store.IsNodeAlive(node); !online { - return fmt.Errorf("node %s is probably down", node.String()) - } - } - - return nil -} - -// Do runs the transaction on the cluster -func (t *Txn) Do() error { - if !t.DontCheckAlive { - if err := t.checkAlive(); err != nil { - return err - } - } - - t.Ctx.Logger().Debug("Starting transaction") - expTxn.Add("initiated_txn_in_progress", 1) - - // commit txn.Ctx.Set()s done in REST handlers to the store - if err := t.Ctx.Commit(); err != nil { - return err - } - - for i, s := range t.Steps { - if s.Skip { - continue - } - - if err := s.do(t.OrigCtx, t.Ctx); err != nil { - if t.DontCheckAlive && isNodeUnreachable(err) { - continue - } - expTxn.Add("initiated_txn_failure", 1) - if !t.DisableRollback { - t.Ctx.Logger().WithError(err).Error("Transaction failed, rolling back changes") - t.undo(i) - } - return err - } - } - - expTxn.Add("initiated_txn_success", 1) - return nil -} - -func isNodeUnreachable(err error) bool { - unreachable := true - if s, ok := err.(*stepResp); ok { - for _, e := range s.Resps { - if grpc.Code(e.Error) != codes.Unavailable { - unreachable = false - } - } - } - return unreachable -} - -// undo undoes a transaction and will be automatically called by Perform if any step fails. -// The Steps are undone in the reverse order, from the failed step. -func (t *Txn) undo(n int) { - for i := n; i >= 0; i-- { - if t.Steps[i].Skip { - continue - } - t.Steps[i].undo(t.Ctx) - } -} - -// nodesUnion removes duplicate nodes -func nodesUnion(nodes []uuid.UUID) []uuid.UUID { - for i := 0; i < len(nodes); i++ { - for j := i + 1; j < len(nodes); j++ { - if uuid.Equal(nodes[i], nodes[j]) { - nodes = append(nodes[:j], nodes[j+1:]...) - j-- - } - } - } - return nodes -}