diff --git a/doc/transaction.md b/doc/transaction.md new file mode 100644 index 000000000..519957364 --- /dev/null +++ b/doc/transaction.md @@ -0,0 +1,413 @@ +# Transaction framework + +The GD2 transaction framework is used to execute/orchestrate distributed actions (transactions) over a Gluster trusted storage pool. +It is used to perform the various actions required by the different volume management and cluster management operations supported by GD2. + + + +* [Transaction](#transaction) + * [Transaction step](#transaction-step) +* [Transaction engine](#transaction-engine) + * [Creating and running a transaction.](#creating-and-running-a-transaction) + * [Modify global data structures](#modify-global-data-structures) + * [Synchronized step execution](#synchronized-step-execution) + * [Cleaning up stale and failed transactions](#cleaning-up-stale-and-failed-transactions) + * [Handling peer restart during transaction](#handling-peer-restart-during-transaction) +* [Examples](#examples) + * [Volume create](#volume-create) + * [Happy path: All peers alive throught out the transaction](#happy-path-all-peers-alive-throught-out-the-transaction) + * [Fail path: initiator dies and comes back up within cleanup timeout](#fail-path-initiator-dies-and-comes-back-up-within-cleanup-timeout) + * [Fail path: initiator dies and comes back up after first cleanup timeout](#fail-path-initiator-dies-and-comes-back-up-after-first-cleanup-timeout) + * [Happy path: one executor dies and comes back up before cleanup/transaction timeout](#happy-path-one-executor-dies-and-comes-back-up-before-cleanuptransaction-timeout) + * [Fail path: one executor dies and comes back up after cleanup/transaction timeout](#fail-path-one-executor-dies-and-comes-back-up-after-cleanuptransaction-timeout) + * [Examples for more complex cases](#examples-for-more-complex-cases) +* [Terms](#terms) + * [Data structures](#data-structures) + * [Global data structures](#global-data-structures) + * [Local data structures](#local-data-structures) + * [Initiator](#initiator) + * [Cleanup leader](#cleanup-leader) + * [Locks](#locks) + * [Cluster locks](#cluster-locks) + * [Local locks](#local-locks) + * [Stale transaction](#stale-transaction) + * [Transaction namespaces](#transaction-namespaces) + * [Pending transaction namespace](#pending-transaction-namespace) + * [Transaction context namespace](#transaction-context-namespace) + + + +## Transaction + +A transaction is basically a collection of steps or actions to be performed in order. +A transaction object provides the framework with the following, + +1. a list of peers that will be a part of the transaction +2. a set of transaction [steps](#transaction-step) + +Given this information, the GD2 transaction framework will, + +- verify if all the listed peers are online +- run each step on all of the nodes, before proceeding to the next step +- if a step fails, undo the changes done by the step and all previous steps. + +The base transaction is basically free-form, allow users to create any order of steps. +This keeps it flexible and extensible to create complex transactions. + +### Transaction step +A step is an action to be performed, most likely a function that needs to be run. +A step object provides the following information, + +1. The function to be run +2. The list of peers the step should be run on. +3. An undo function that reverts any changes done by the step. + +Each step can have its own list of peers, so that steps can be targeted to specific nodes and provide more flexibility. + + +## Transaction engine + +The transaction engine executes the given transaction across the cluster. +The engine is designed to make use of etcd as the means of communication between peers. +The framework has to provide two important characteristics, + +1. Each peer must be capable of independently and asynchronously execute a transaction that has been intitiated. +2. Each peer should be capable of independetly rollback/undo [stale transaction](#stale-transaction). + +In addition the transaction engine should also provide, + +1. A method to obtain [cluster wide locks](#cluster-locks) and [local locks](#local-locks), + so that updates to [global](#global-data-structures) and [local](#local-data-structures) can be done safely. +2. The ability to synchronize transaction steps across the cluster when required. + +The transaction engine is started on each peer in the cluster, +and keeps a watch on the [pending transaction namespace](#pending-transaction-namespace) for new transactions. +For each new incoming transactions the transaction engine does the following, + +- Check if it peer is invovled in the transaction. If it is not, then do nothing. +- Fetch the list of steps for the transaction. For each step, + - Check if transaction has been marked as failure + - If transaction has failed, + - rollback previously executed steps + - mark yourself as having failed the transaction + - end transaction + - [Synchronize step](#synchronized-step-execution) if required + - Check if step needs to be executed on peer + - if not required, mark successfull progress and contine to next step + - Execute the step + - If step executes successfully, + - mark progress in the [transaction namespace](#transaction-context-namespace) + - continue to next step + - If step fails, + - rollback previously executed steps + - mark yourself as having failed the transaction + - end transaction +- After all steps have been executed, + - mark yourself as having successfully completed the transaction + - start a timeout timer and wait for transaction to be cleaned up. + - If transaction is cleaned up + - end transaction + - If a timeout occurs, + - rollback previously executed steps + - mark yourself as having failed the transaction + - end transaction + +### Creating and running a transaction. + +A transaction is initiated by an [initiator](#initiator). +The initiator is most likely the node that recives an incoming GD2 request. +The initiator does the following, + +- Based on the incoming request, the initiator creates a transaction + - If required the intiator takes any required cluster locks + - If required the initiator can obtain locks before filling out the transaction steps and starting the transaction +- Add the created and filled transaction into the pending transaction namespace +- Start a timeout timer and watch for involved peers to mark transaction completion in transaction context + - If all involved peers mark successful completion + - Cleanup transaction + - Respond back with result + - If at least one peer marks failure + - Mark transaction as having failed + - Respond with error + - If timeout occurs + - Mark transaction as having failed + - Respond with error + +### Modify global data structures + +[Global data structures](#global-data-structures) can only be updated under [cluster locks](#cluster-locks). +During a transaction it is required that, +- only the initiator modifies global data structures +- modification is only done as the last step of a transaction +- the transaction is synchronized before the modify step is executed + +Modifications once done to global data structures cannot be rolled-back. + +### Synchronized step execution + +A synchornized step is executed only after all pervious steps have been completed successfully by all involved peers. +Step synchronization is required for steps that collate information, update global data structures or perform other similar operations. +The initiator can mark a step as synchronized when creating the transaction. + +Step synchronization is performed by the engine for any synchornized step, even if the step would be executed on the peer. +The engine synchronizes as follows, +- Check if step needs synchronization + - if not required + - continue with step execution + - if required + - wait for previous step to marked as completed by all peers involved in transaction + - if all peers mark completion + - continue step execution + - if any peer marks step failure, + - mark yourself as having failed current step and return + +Step synchronization is only done for forward execution of transactions, not for rollbacks. + + +### Cleaning up stale and failed transactions + +A [leader](#cleanup-leader) is elected among the peers in the cluster to cleanup [stale transactions](#stale-transaction). +The leader periodically scans the pending transaction namespace for failed and stale transactions, +and cleans them up if rollback is completed by all peers involved in the transaction. + +- After winning election or after hitting cleanup timer, + - Fetch pending transactions + - For each transaction, + - if transaction is failed + - ensure all peers have performed rollbacks (marked transaction as failure) + - cleanup transaction from pending transactions + - continue to next transaction + - if transaction is stale (initiator down or transaction is active for longer than [transaction timeout](#transaction-timeout)) + - check if all peers have marked transaction as failure + - if all peers have marked transaction as failure, + - cleanup transaction from pending transactions + - continue to next transaction + - if not, + - mark transaction as failure to trigger peers to perform rollbacks + - continue to next transaction + - restart cleanup timer + +### Handling peer restart during transaction + +If peer dies in the middle of transaction execution, and later restarts, +it will attempt to resume or rollback any transactions it was involved in. +This happens as follows, + +- On peer startup, it scans the pending transaction namespace for transactions involving the peer +- For each such transaction, + - Check if transaction has been marked as failure + - if the transaction is marked as failure or you are transaction initiator + - perform rollback from last completed step + - mark yourself as having failed transaction + - if not, + - resume transaction execution from last completed step + +Transactions cannot be safely resumed on initiators as any global locks it held will be lost when the peer died. + + +## Examples + +The following assumptions are made. + +- Cluster of size 3, with peers A, B and C. +- Peer A is always the initiator +- Peer B is the cleanup leader + + +### Volume create + +Attempt to create a volume with bricks on all 3 peers. +Transaction created is as follows, +``` +- Transaction: Create volume - vol1 + Initiator: A + Nodes: A, B, C + StartTime: T0 + GlobalLocks: Vol/vol1 + Steps: + - Step: Check brick path + Nodes: A B C + - Step: Create brick xattrs + Undo: Remove brick xattrs + Nodes: A B C + - Step: Create brickinfo and store brickinfo + Undo: Remove stored brickinfo + Nodes: A B C + - Step: Create and store volinfo + Sync: yes + Node: A +``` + +#### Happy path: All peers alive throught out the transaction + +|A (initiator)| A (engine)|B (engine)| C (engine)|B(cleanup)| +|---|---|---|---|---| +||Wait for new transactions|Wait for new transactions|Wait for new transactions|Start cleanup timer| +|Receive create request ||||| +|Create transaction-1 and add to pending transactions||||| +|Wait for nodes to succeed or fail||||| +||New transaction-1|New transaction-1|New transaction-1|| +||Execute steps 1-3, and mark as completed|Execute steps 1-3 and mark as completed|Execute steps 1-3 and mark as completed|| +||Wait for all peers to complete step 3|Wait for all peers to complete step 3|Wait for all peers to complete step 3|| +||Execute step 4|Skip step 4|Skip step 4|| +||No more steps, mark self as succeeded transaction|No more steps, mark self as succeeded transaction|No more steps, mark self as succeeded transaction|| +|All peers succeeded||||| +|Cleanup transaction-1||||| +|Send response||||| + +#### Fail path: initiator dies and comes back up within cleanup timeout + +|A (initiator)| A (engine)|B (engine)| C (engine)|B(cleanup)| +|---|---|---|---|---| +||Wait for new transactions|Wait for new transactions|Wait for new transactions|Start cleanup timer| +|Receive create request ||||| +|Create transaction-1 and add to pending transactions||||| +|Wait for nodes to succeed or fail||||| +||New transaction-1|New transaction-1|New transaction-1|| +|Peer dies||Execute steps 1-3 and mark as completed|Execute steps 1-3 and mark as completed|| +|||Wait for all peers to complete step 3|Wait for all peers to complete step 3|| +|Peer restarts||||| +||Check for pending transations|||| +||Pending transaction-1|||| +||Rollback transaction (as peer was initiator)|||| +||Mark self as failed transaction-1|||| +|||||Timer expires| +|||||Get pending stale transactions| +|||||Pending transaction-1 found| +|||||Mark transaction-1 as failed (not all peers have marked failed| +|||||Restart timer| +|||Transaction marked as failure|Transaction marked as failure|| +|||Rollback transaction-1|Rollback transaction-1|| +|||Mark self as failed transaction-1|Mark self as failed transaction-1|| +|||||Timer expires| +|||||Get pending stale transactions| +|||||Pending transaction-1 found| +|||||All peers failed, delete transaction-1| +|||||| + +#### Fail path: initiator dies and comes back up after first cleanup timeout +|A (initiator)| A (engine)|B (engine)| C (engine)|B(cleanup)| +|---|---|---|---|---| +||Wait for new transactions|Wait for new transactions|Wait for new transactions|Start cleanup timer| +|Receive create request ||||| +|Create transaction and add to pending transactions||||| +|Wait for nodes to succeed or fail||||| +||New transaction|New transaction|New transaction|| +|Peer dies||Execute steps 1-3 and mark as completed|Execute steps 1-3 and mark as completed|| +|||Wait for all peers to complete step 3|Wait for all peers to complete step 3|| +|||||Timer expires| +|||||Get pending stale transactions| +|||||Pending transaction-1 found| +|||||Mark transaction-1 as failed (not all peers have marked failed| +|||||Restart timer| +|||Transaction marked as failure|Transaction marked as failure|| +|||Rollback transaction-1|Rollback transaction-1|| +|||Mark self as failed transaction-1|Mark self as failed transaction-1|| +|Peer restarts||||| +||Check for pending transations|||| +||Pending transaction-1|||| +||Rollback transaction-1 (failed transaction)|||| +||Mark self as failed transaction-1|||| +|||||Timer expires| +|||||Get pending stale transactions| +|||||Pending transaction-1 found| +|||||All peers failed, delete transaction-1| +|||||| + +#### Happy path: one executor dies and comes back up before cleanup/transaction timeout + +|A (initiator)| A (engine)|B (engine)| C (engine)|B(cleanup)| +|---|---|---|---|---| +||Wait for new transactions|Wait for new transactions|Wait for new transactions|Start cleanup timer| +|Receive create request ||||| +|Create transaction and add to pending transactions||||| +|Wait for nodes to succeed or fail||||| +||New transaction|New transaction|New transaction|| +||Execute steps 1-3 and mark as completed|Execute steps 1-3 and mark as completed|Peer dies|| +||Wait for all peers to complete step 3|Wait for all peers to complete step 3||| +||||Peer restarts|| +||||Check for pending transactions|| +||||Get transaction-1|| +||||Resume transaction-1|| +||||Complete step-3|| +||||Wait for all peers to complete step 3|| +||Execute step 4|Skip step 4|Skip step 4|| +||No more steps, mark self as succeeded transaction-1|No more steps, mark self as succeeded transaction-1|No more steps, mark self as succeeded transaction-1|| +|All peers succeeded||||| +|Cleanup transaction-1||||| +|Send response||||| + +#### Fail path: one executor dies and comes back up after cleanup/transaction timeout + +|A (initiator)| A (engine)|B (engine)| C (engine)|B(cleanup)| +|---|---|---|---|---| +||Wait for new transactions|Wait for new transactions|Wait for new transactions|Start cleanup timer| +|Receive create request ||||| +|Create transaction and add to pending transactions||||| +|Wait for nodes to succeed or fail||||| +||New transaction|New transaction|New transaction|| +||Execute steps 1-3 and mark as completed|Execute steps 1-3 and mark as completed|Peer dies|| +||Wait for all peers to complete step 3|Wait for all peers to complete step 3||| +|Transaction-1 timer expires||||| +|Mark transaction-1 as failed||||| +|Send error response||||| +||Transaction-1 marked failure|Transaction-1 marked failure||| +||Rollback|Rollback||| +||Mark self as failed transaction-1|Mark self as failed transaction-1||| +||||Peer restarts|| +||||Check for pending transactions|| +||||Get transaction-1|| +||||Rollback transaction-1|| +||||Mark self as failed transaction-1|| +||||Wait for all peers to complete step 3|| +|||||Timer expires| +|||||Get pending stale transactions| +|||||Pending transaction-1 found| +|||||All peers failed, delete transaction-1| + +#### Examples for more complex cases +**TODO** + +## Terms + +### Data structures + +#### Global data structures + +Global data structures are the objects that span over multiple peers in the cluster. These include volumes, snapshots and the like. Updates to these data structures require that a [cluster lock](#global-locks) be obtained on them. + +#### Local data structures + +Local data structures are objects that are restricted to individual peers in the cluster. These include bricks, daemon processes etc. Updates to these data structures require that a [local lock](#local-locks) be obtained on them. + +### Initiator + +The intiator is the peer that initiates a transaction. The intiator prepares the list of transaction steps, adds them to the [pending transaction namespace](#pending-transaction-namespace), waits for the transaction to complete, and finally cleans-up the transaction from the new transaction namespace. + +### Cleanup leader + +The leader cleans-up any [stale transactions](#stale-transaction) from the [pending transaction namespace](#pending-transaction-namespace). The leader waits till the peers involved in the stale transaction have performed a rollback, before removing the transaction. Leaders are elected using etcd election mechanisms. + +### Locks + +#### Cluster locks + +Locks taken to synchronize access to [global data structures](#global-data-structures). These locks will most likely be implemented as etcd locks, and are co-operative in nature. + +#### Local locks + +Locks taken to synchronize access to [local data structures](#local-data-structures). The locks will most likely be implemented as mutexes. + +### Stale transaction + +A stale transaction is a transaction where the transaction initiator is dies before the transaction completes, which results in the transaction never being cleaned up. + +### Transaction namespaces + +#### Pending transaction namespace + +This is an etcd namespace, into which the [initiator](#initiator) adds new transactions. All peers keep a watch on this namespace for new transactions and execute transactions they are marked as being part of. + +#### Transaction context namespace + +Each individual transaction is provided with an etcd namespace, which is used to store/retrieve/share transaction specific contextual information when a transaction is being executed. diff --git a/glusterd2/commands/peers/peer-rpc-svc.go b/glusterd2/commands/peers/peer-rpc-svc.go index b7211c1b3..d37f3b734 100644 --- a/glusterd2/commands/peers/peer-rpc-svc.go +++ b/glusterd2/commands/peers/peer-rpc-svc.go @@ -8,10 +8,12 @@ import ( "github.com/gluster/glusterd2/glusterd2/peer" "github.com/gluster/glusterd2/glusterd2/servers/peerrpc" "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transactionv2" + "github.com/gluster/glusterd2/glusterd2/transactionv2/cleanuphandler" "github.com/gluster/glusterd2/glusterd2/volume" "github.com/gluster/glusterd2/pkg/utils" - "github.com/pborman/uuid" + "github.com/pborman/uuid" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) @@ -169,6 +171,8 @@ func ReconfigureStore(c *StoreConfig) error { // Stop events framework events.Stop() + transaction.StopTxnEngine() + cleanuphandler.StopCleanupLeader() // do not delete cluster namespace if this is not a loner node var deleteNamespace bool @@ -217,7 +221,8 @@ func ReconfigureStore(c *StoreConfig) error { // Now that new store is up, start events framework events.Start() - + transaction.StartTxnEngine() + cleanuphandler.StartCleanupLeader() return nil } diff --git a/glusterd2/commands/snapshot/snapshot-activate.go b/glusterd2/commands/snapshot/snapshot-activate.go index 5eac6fcef..6ccce5bd5 100644 --- a/glusterd2/commands/snapshot/snapshot-activate.go +++ b/glusterd2/commands/snapshot/snapshot-activate.go @@ -152,6 +152,7 @@ func snapshotActivateHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "snap-activate.StoreSnapshot", UndoFunc: "snap-activate.UndoStoreSnapshot", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, } if err = txn.Do(); err != nil { diff --git a/glusterd2/commands/snapshot/snapshot-clone.go b/glusterd2/commands/snapshot/snapshot-clone.go index c6cb51536..418d2ba30 100644 --- a/glusterd2/commands/snapshot/snapshot-clone.go +++ b/glusterd2/commands/snapshot/snapshot-clone.go @@ -322,6 +322,7 @@ func snapshotCloneHandler(w http.ResponseWriter, r *http.Request) { { DoFunc: "snap-clone.CreateCloneVolinfo", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, { DoFunc: "snap-clone.TakeBrickSnapshots", @@ -332,6 +333,7 @@ func snapshotCloneHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "snap-clone.StoreSnapshot", UndoFunc: "snap-clone.UndoStoreSnapshotOnClone", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, } if err = txn.Ctx.Set("snapname", &snapname); err != nil { diff --git a/glusterd2/commands/snapshot/snapshot-create.go b/glusterd2/commands/snapshot/snapshot-create.go index c432f70d6..89e2760a8 100644 --- a/glusterd2/commands/snapshot/snapshot-create.go +++ b/glusterd2/commands/snapshot/snapshot-create.go @@ -763,6 +763,7 @@ func snapshotCreateHandler(w http.ResponseWriter, r *http.Request) { { DoFunc: "snap-create.CreateSnapinfo", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, { DoFunc: "snap-create.ActivateBarrier", @@ -774,6 +775,8 @@ func snapshotCreateHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "snap-create.TakeBrickSnapshots", UndoFunc: "snap-create.UndoBrickSnapshots", Nodes: txn.Nodes, + // All bricks need to be barriered before taking a snapshot + Sync: true, }, { DoFunc: "snap-create.DeactivateBarrier", @@ -784,6 +787,7 @@ func snapshotCreateHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "snap-create.StoreSnapshot", UndoFunc: "snap-create.UndoStoreSnapshotOnCreate", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, } diff --git a/glusterd2/commands/snapshot/snapshot-deactivate.go b/glusterd2/commands/snapshot/snapshot-deactivate.go index d46129087..5fa9095f9 100644 --- a/glusterd2/commands/snapshot/snapshot-deactivate.go +++ b/glusterd2/commands/snapshot/snapshot-deactivate.go @@ -148,6 +148,7 @@ func snapshotDeactivateHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "snap-deactivate.StoreSnapshot", UndoFunc: "snap-deactivate.UndoStoreSnapshot", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, } if err = txn.Ctx.Set("oldsnapinfo", &snapinfo); err != nil { diff --git a/glusterd2/commands/snapshot/snapshot-delete.go b/glusterd2/commands/snapshot/snapshot-delete.go index c79d5837d..22007ccf8 100644 --- a/glusterd2/commands/snapshot/snapshot-delete.go +++ b/glusterd2/commands/snapshot/snapshot-delete.go @@ -161,6 +161,7 @@ func snapshotDeleteHandler(w http.ResponseWriter, r *http.Request) { { DoFunc: "snap-delete.Store", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, } diff --git a/glusterd2/commands/snapshot/snapshot-restore.go b/glusterd2/commands/snapshot/snapshot-restore.go index c3b4483b6..7ba4a3f6b 100644 --- a/glusterd2/commands/snapshot/snapshot-restore.go +++ b/glusterd2/commands/snapshot/snapshot-restore.go @@ -364,6 +364,7 @@ func snapshotRestoreHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "snap-restore.Store", UndoFunc: "snap-restore.UndoStore", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, { DoFunc: "snap-restore.CleanBricks", diff --git a/glusterd2/commands/volumes/brick-replace.go b/glusterd2/commands/volumes/brick-replace.go index 03163d388..ee2d0e72e 100644 --- a/glusterd2/commands/volumes/brick-replace.go +++ b/glusterd2/commands/volumes/brick-replace.go @@ -161,6 +161,7 @@ LOOP: { DoFunc: "brick-replace.ReplaceVolinfo", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, { DoFunc: "vol-create.InitBricks", @@ -175,6 +176,7 @@ LOOP: DoFunc: "vol-create.StoreVolume", UndoFunc: "vol-create.UndoStoreVolume", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, { DoFunc: "vol-expand.NotifyClients", diff --git a/glusterd2/commands/volumes/common.go b/glusterd2/commands/volumes/common.go index 95d9ae303..e82c271fe 100644 --- a/glusterd2/commands/volumes/common.go +++ b/glusterd2/commands/volumes/common.go @@ -5,8 +5,11 @@ import ( "encoding/json" "errors" "fmt" + "io/ioutil" "os" + "path" "path/filepath" + "strings" "github.com/gluster/glusterd2/glusterd2/brick" "github.com/gluster/glusterd2/glusterd2/gdctx" @@ -22,6 +25,7 @@ import ( "github.com/pborman/uuid" log "github.com/sirupsen/logrus" + config "github.com/spf13/viper" "golang.org/x/sys/unix" ) @@ -302,8 +306,43 @@ func undoStoreVolume(c transaction.TxnCtx) error { return storeVolInfo(c, "oldvolinfo") } -// LoadDefaultGroupOptions loads the default group option map into the store -func LoadDefaultGroupOptions() error { +func loadDefaultGroupOptions() error { + defaultProfilesPath := path.Join(config.GetString("localstatedir"), "templates", "profiles.json") + // If directory not exists, create the directory and then generate default templates + _, err := os.Stat(defaultProfilesPath) + if os.IsNotExist(err) { + content, err := json.MarshalIndent(defaultGroupOptions, "", " ") + if err != nil { + return err + } + + err = os.MkdirAll(path.Dir(defaultProfilesPath), os.ModeDir|os.ModePerm) + if err != nil { + return err + } + return ioutil.WriteFile(defaultProfilesPath, content, 0640) + } else if err == nil { + content, err := ioutil.ReadFile(defaultProfilesPath) + if err != nil { + return err + } + var grpOpts map[string]*api.OptionGroup + err = json.Unmarshal(content, &grpOpts) + if err != nil { + return err + } + defaultGroupOptions = grpOpts + return nil + } + return err +} + +// InitDefaultGroupOptions loads the default group option map into the store +func InitDefaultGroupOptions() error { + err := loadDefaultGroupOptions() + if err != nil { + return err + } groupOptions, err := json.Marshal(defaultGroupOptions) if err != nil { return err @@ -380,3 +419,23 @@ func txnGenerateBrickVolfiles(c transaction.TxnCtx) error { } return nil } + +func containsReservedGroupProfile(opts interface{}) bool { + pfx := "profile.default." + switch value := opts.(type) { + case map[string]string: + for k := range value { + if strings.HasPrefix(k, pfx) { + return true + } + } + case []string: + for _, v := range value { + if strings.HasPrefix(v, pfx) { + return true + } + } + } + + return false +} diff --git a/glusterd2/commands/volumes/grouped-options.go b/glusterd2/commands/volumes/grouped-options.go index b0d6981de..d966a23bd 100644 --- a/glusterd2/commands/volumes/grouped-options.go +++ b/glusterd2/commands/volumes/grouped-options.go @@ -6,27 +6,75 @@ import ( // defaultGroupOptions maps from a profile name to a set of options var defaultGroupOptions = map[string]*api.OptionGroup{ - "profile.gluster-block": {Name: "profile.gluster-block", Options: []api.VolumeOption{{Name: "performance/quick-read", OnValue: "off"}, - {Name: "performance/read-ahead", OnValue: "off"}, - {Name: "performance/io-cache", OnValue: "off"}, - {Name: "performance/md-cache", OnValue: "off"}, - {Name: "performance/open-behind", OnValue: "off"}, - {Name: "performance/readdir-ahead", OnValue: "off"}, - {Name: "performance/write-behind.strict-O_DIRECT", OnValue: "on"}, - {Name: "protocol/client.filter-O_DIRECT", OnValue: "disable"}, - {Name: "cluster/replicate.eager-lock", OnValue: "disable"}, - {Name: "cluster/replicate.quorum-type", OnValue: "auto"}, - {Name: "cluster/replicate.data-self-heal-algorithm", OnValue: "full"}, - {Name: "cluster/replicate.locking-scheme", OnValue: "granular"}, - {Name: "cluster/replicate.shd-max-threads", OnValue: "8"}, - {Name: "cluster/replicate.shd-wait-qlength", OnValue: "10000"}, - {Name: "features/shard", OnValue: "on"}, - {Name: "features/shard.shard-block-size", OnValue: "64MB"}, - {Name: "user.cifs", OnValue: "off"}, - {Name: "protocol/server.rpc-auth-allow-insecure", OnValue: "on"}}, - Description: "Enable this profile for optimal results, in block use case"}, - "profile.SMB-small-file": {Name: "profile.SMB-small-file", - Options: []api.VolumeOption{{Name: "features/upcall.cache-invalidation", OnValue: "on"}, + "profile.default.replicate": { + Name: "profile.default.replicate", + Options: []api.VolumeOption{ + {Name: "cluster/replicate.self-heal-daemon", OnValue: "on"}, + {Name: "performance/md-cache", OnValue: "off"}, + {Name: "performance/open-behind", OnValue: "off"}, + {Name: "performance/quick-read", OnValue: "off"}, + {Name: "performance/io-cache", OnValue: "off"}, + {Name: "performance/readdir-ahead", OnValue: "off"}, + {Name: "performance/read-ahead", OnValue: "off"}, + {Name: "performance/write-behind", OnValue: "off"}, + }, + Description: "Default volume options for Replicate or Distributed Replicate volumes", + }, + "profile.default.disperse": { + Name: "profile.default.disperse", + Options: []api.VolumeOption{ + {Name: "cluster/replicate.self-heal-daemon", OnValue: "on"}, + {Name: "performance/md-cache", OnValue: "off"}, + {Name: "performance/open-behind", OnValue: "off"}, + {Name: "performance/quick-read", OnValue: "off"}, + {Name: "performance/io-cache", OnValue: "off"}, + {Name: "performance/readdir-ahead", OnValue: "off"}, + {Name: "performance/read-ahead", OnValue: "off"}, + {Name: "performance/write-behind", OnValue: "off"}, + }, + Description: "Default volume options for Disperse or Distributed Disperse volumes", + }, + "profile.default.distribute": { + Name: "profile.default.distribute", + Options: []api.VolumeOption{ + {Name: "performance/md-cache", OnValue: "off"}, + {Name: "performance/open-behind", OnValue: "off"}, + {Name: "performance/quick-read", OnValue: "off"}, + {Name: "performance/io-cache", OnValue: "off"}, + {Name: "performance/readdir-ahead", OnValue: "off"}, + {Name: "performance/read-ahead", OnValue: "off"}, + {Name: "performance/write-behind", OnValue: "off"}, + }, + Description: "Default volume options for Distribute volumes", + }, + "profile.gluster-block": { + Name: "profile.gluster-block", + Options: []api.VolumeOption{ + {Name: "performance/quick-read", OnValue: "off"}, + {Name: "performance/read-ahead", OnValue: "off"}, + {Name: "performance/io-cache", OnValue: "off"}, + {Name: "performance/md-cache", OnValue: "off"}, + {Name: "performance/open-behind", OnValue: "off"}, + {Name: "performance/readdir-ahead", OnValue: "off"}, + {Name: "performance/write-behind.strict-O_DIRECT", OnValue: "on"}, + {Name: "protocol/client.filter-O_DIRECT", OnValue: "disable"}, + {Name: "cluster/replicate.eager-lock", OnValue: "disable"}, + {Name: "cluster/replicate.quorum-type", OnValue: "auto"}, + {Name: "cluster/replicate.data-self-heal-algorithm", OnValue: "full"}, + {Name: "cluster/replicate.locking-scheme", OnValue: "granular"}, + {Name: "cluster/replicate.shd-max-threads", OnValue: "8"}, + {Name: "cluster/replicate.shd-wait-qlength", OnValue: "10000"}, + {Name: "features/shard", OnValue: "on"}, + {Name: "features/shard.shard-block-size", OnValue: "64MB"}, + {Name: "user.cifs", OnValue: "off"}, + {Name: "protocol/server.rpc-auth-allow-insecure", OnValue: "on"}, + }, + Description: "Enable this profile for optimal results, in block use case", + }, + "profile.SMB-small-file": { + Name: "profile.SMB-small-file", + Options: []api.VolumeOption{ + {Name: "features/upcall.cache-invalidation", OnValue: "on"}, {Name: "features/upcall.cache-invalidation-timeout", OnValue: "600"}, {Name: "protocol/server.inode-lru-limit", OnValue: "200000"}, {Name: "performance/md-cache", OnValue: "on"}, @@ -42,10 +90,14 @@ var defaultGroupOptions = map[string]*api.OptionGroup{ {Name: "protocol/client.event-threads", OnValue: "4"}, {Name: "protocol/server.event-threads", OnValue: "4"}, {Name: "cluster/distribute.lookup-optimize", OnValue: "on"}, - {Name: "cluster/distribute.readdir-optimize", OnValue: "on"}}, - Description: "For use cases with dominant small file workload in SMB access, enable this profile"}, - "profile.FUSE-small-file": {Name: "profile.FUSE-small-file", - Options: []api.VolumeOption{{Name: "features/upcall.cache-invalidation", OnValue: "on"}, + {Name: "cluster/distribute.readdir-optimize", OnValue: "on"}, + }, + Description: "For use cases with dominant small file workload in SMB access, enable this profile", + }, + "profile.FUSE-small-file": { + Name: "profile.FUSE-small-file", + Options: []api.VolumeOption{ + {Name: "features/upcall.cache-invalidation", OnValue: "on"}, {Name: "features/upcall.cache-invalidation-timeout", OnValue: "600"}, {Name: "protocol/server.inode-lru-limit", OnValue: "200000"}, {Name: "performance/md-cache", OnValue: "on"}, @@ -60,10 +112,14 @@ var defaultGroupOptions = map[string]*api.OptionGroup{ {Name: "protocol/client.event-threads", OnValue: "4"}, {Name: "protocol/server.event-threads", OnValue: "4"}, {Name: "cluster/distribute.lookup-optimize", OnValue: "on"}, - {Name: "cluster/distribute.readdir-optimize", OnValue: "on"}}, - Description: "For use cases with dominant small file workload in native FUSE mount access, enable this profile"}, - "profile.SMB-large-file-EC": {Name: "profile.SMB-large-file-EC", - Options: []api.VolumeOption{{Name: "features/upcall.cache-invalidation", OnValue: "on"}, + {Name: "cluster/distribute.readdir-optimize", OnValue: "on"}, + }, + Description: "For use cases with dominant small file workload in native FUSE mount access, enable this profile", + }, + "profile.SMB-large-file-EC": { + Name: "profile.SMB-large-file-EC", + Options: []api.VolumeOption{ + {Name: "features/upcall.cache-invalidation", OnValue: "on"}, {Name: "features/upcall.cache-invalidation-timeout", OnValue: "600"}, {Name: "protocol/server.inode-lru-limit", OnValue: "200000"}, {Name: "performance/md-cache", OnValue: "on"}, @@ -80,10 +136,14 @@ var defaultGroupOptions = map[string]*api.OptionGroup{ {Name: "performance/write-behind.trickling-writes", OnValue: "off"}, {Name: "performance/write-behind.aggregate-size", OnValue: "1MB"}, {Name: "protocol/client.event-threads", OnValue: "4"}, - {Name: "protocol/server.event-threads", OnValue: "4"}}, - Description: "Enable this profile for use cases consisting of mostly large files like video surveillance, backup, video streaming and others, with SMB access to an erasure coded volume"}, - "profile.FUSE-large-file-EC": {Name: "profile.FUSE-large-file-EC", - Options: []api.VolumeOption{{Name: "features/upcall.cache-invalidation", OnValue: "on"}, + {Name: "protocol/server.event-threads", OnValue: "4"}, + }, + Description: "Enable this profile for use cases consisting of mostly large files like video surveillance, backup, video streaming and others, with SMB access to an erasure coded volume", + }, + "profile.FUSE-large-file-EC": { + Name: "profile.FUSE-large-file-EC", + Options: []api.VolumeOption{ + {Name: "features/upcall.cache-invalidation", OnValue: "on"}, {Name: "features/upcall.cache-invalidation-timeout", OnValue: "600"}, {Name: "protocol/server.inode-lru-limit", OnValue: "200000"}, {Name: "performance.md-cache", OnValue: "on"}, @@ -99,26 +159,38 @@ var defaultGroupOptions = map[string]*api.OptionGroup{ {Name: "performance/write-behind.trickling-writes", OnValue: "off"}, {Name: "performance/write-behind.aggregate-size", OnValue: "1MB"}, {Name: "protocol/client.event-threads", OnValue: "4"}, - {Name: "protocol/server.event-threads", OnValue: "4"}}, - Description: "Enable this profile for use cases consisting of mostly large files like video surveillance, backup, video streaming and others, with native FUSE mount access to an erasure coded volume"}, - "profile.nl-cache": {Name: "profile.nl-cache", - Options: []api.VolumeOption{{Name: "features/upcall.cache-invalidation", OnValue: "on"}, + {Name: "protocol/server.event-threads", OnValue: "4"}, + }, + Description: "Enable this profile for use cases consisting of mostly large files like video surveillance, backup, video streaming and others, with native FUSE mount access to an erasure coded volume", + }, + "profile.nl-cache": { + Name: "profile.nl-cache", + Options: []api.VolumeOption{ + {Name: "features/upcall.cache-invalidation", OnValue: "on"}, {Name: "features/upcall.cache-invalidation-timeout", OnValue: "600"}, {Name: "performance/nl-cache", OnValue: "on"}, {Name: "performance/nl-cache.nl-cache-timeout", OnValue: "600"}, - {Name: "protocol/server.inode-lru-limit", OnValue: "200000"}}, - Description: "Enable this for the workloads that generate lots of lookups before creating files, eg: SMB access, git tool and others"}, - "profile.metadata-cache": {Name: "profile.metadata-cache", - Options: []api.VolumeOption{{Name: "features/upcall.cache-invalidation", OnValue: "on"}, + {Name: "protocol/server.inode-lru-limit", OnValue: "200000"}, + }, + Description: "Enable this for the workloads that generate lots of lookups before creating files, eg: SMB access, git tool and others", + }, + "profile.metadata-cache": { + Name: "profile.metadata-cache", + Options: []api.VolumeOption{ + {Name: "features/upcall.cache-invalidation", OnValue: "on"}, {Name: "features/upcall.cache-invalidation-timeout", OnValue: "600"}, {Name: "performance/md-cache", OnValue: "on"}, {Name: "performance/quick-read.cache-invalidation", OnValue: "on"}, {Name: "performance/md-cache.cache-invalidation", OnValue: "on"}, {Name: "performance/md-cache.md-cache-timeout", OnValue: "600"}, - {Name: "protocol/server.inode-lru-limit", OnValue: "200000"}}, - Description: "This profile enables metadata(stat, xattr) caching on client side. Its recommended to enable this for most workloads other than the ones that require 100% consistency like databases"}, - "profile.virt": {Name: "profile.virt", - Options: []api.VolumeOption{{Name: "performance/quick-read", OnValue: "off"}, + {Name: "protocol/server.inode-lru-limit", OnValue: "200000"}, + }, + Description: "This profile enables metadata(stat, xattr) caching on client side. Its recommended to enable this for most workloads other than the ones that require 100% consistency like databases", + }, + "profile.virt": { + Name: "profile.virt", + Options: []api.VolumeOption{ + {Name: "performance/quick-read", OnValue: "off"}, {Name: "performance/read-ahead", OnValue: "off"}, {Name: "performance/io-cache", OnValue: "off"}, {Name: "performance/io-threads.low-prio-threads", OnValue: "32"}, @@ -131,9 +203,12 @@ var defaultGroupOptions = map[string]*api.OptionGroup{ {Name: "cluster/replicate.shd-max-threads", OnValue: "8"}, {Name: "cluster/replicate.shd-wait-qlength", OnValue: "10000"}, {Name: "features/shard", OnValue: "on"}, - {Name: "user.cifs", OnValue: "off"}}, - Description: "Enable this profile, if the Gluster Volume is used to store virtual machines"}, - "profile.db": {Name: "profile.db", + {Name: "user.cifs", OnValue: "off"}, + }, + Description: "Enable this profile, if the Gluster Volume is used to store virtual machines", + }, + "profile.db": { + Name: "profile.db", Options: []api.VolumeOption{ {Name: "performance/open-behind", OnValue: "on"}, {Name: "performance/write-behind", OnValue: "off"}, @@ -152,15 +227,22 @@ var defaultGroupOptions = map[string]*api.OptionGroup{ {Name: "performance/open-behind.read-after-open", OnValue: "on"}, {Name: "performance/write-behind.strict-O_DIRECT", OnValue: "on"}, }, - Description: "Enable profile that tunes volume for database workload"}, - "tls": {Name: "tls", + Description: "Enable profile that tunes volume for database workload", + }, + "tls": { + Name: "tls", Options: []api.VolumeOption{ {Name: "protocol/server.transport.socket.ssl-enabled", OnValue: "on"}, {Name: "protocol/client.transport.socket.ssl-enabled", OnValue: "on"}, }, - Description: "Enable TLS for the volume for both bricks and clients"}, - "profile.test": {Name: "profile.test", - Options: []api.VolumeOption{{Name: "cluster/replicate.eager-lock", OnValue: "on"}, - {Name: "gfproxy.cluster/replicate.eager-lock", OnValue: "on"}}, - Description: "Test purpose only"}, + Description: "Enable TLS for the volume for both bricks and clients", + }, + "profile.test": { + Name: "profile.test", + Options: []api.VolumeOption{ + {Name: "cluster/replicate.eager-lock", OnValue: "on"}, + {Name: "gfproxy.cluster/replicate.eager-lock", OnValue: "on"}, + }, + Description: "Test purpose only", + }, } diff --git a/glusterd2/commands/volumes/volume-create.go b/glusterd2/commands/volumes/volume-create.go index 4986146c7..db3ea29c7 100644 --- a/glusterd2/commands/volumes/volume-create.go +++ b/glusterd2/commands/volumes/volume-create.go @@ -11,6 +11,7 @@ import ( "github.com/gluster/glusterd2/glusterd2/gdctx" restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils" "github.com/gluster/glusterd2/glusterd2/transaction" + transactionv2 "github.com/gluster/glusterd2/glusterd2/transactionv2" "github.com/gluster/glusterd2/glusterd2/volume" "github.com/gluster/glusterd2/pkg/api" gderrors "github.com/gluster/glusterd2/pkg/errors" @@ -118,6 +119,11 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) { return } + if containsReservedGroupProfile(req.Options) { + restutils.SendHTTPError(ctx, w, http.StatusBadRequest, gderrors.ErrReservedGroupProfile) + return + } + if req.Size > 0 { applyDefaults(&req) @@ -149,13 +155,27 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) { return } + // Include default Volume Options profile + if len(req.Subvols) > 0 { + groupProfile, exists := defaultGroupOptions["profile.default."+req.Subvols[0].Type] + if exists { + for _, opt := range groupProfile.Options { + // Apply default option only if not overridden in volume create request + _, exists = req.Options[opt.Name] + if !exists { + req.Options[opt.Name] = opt.OnValue + } + } + } + } + nodes, err := req.Nodes() if err != nil { restutils.SendHTTPError(ctx, w, http.StatusBadRequest, err) return } - txn, err := transaction.NewTxnWithLocks(ctx, req.Name) + txn, err := transactionv2.NewTxnWithLocks(ctx, req.Name) if err != nil { status, err := restutils.ErrToStatusCode(err) restutils.SendHTTPError(ctx, w, status, err) @@ -177,6 +197,8 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) { { DoFunc: "vol-create.ValidateBricks", Nodes: nodes, + // Need to wait for volinfo to be created first + Sync: true, }, { DoFunc: "vol-create.InitBricks", @@ -187,6 +209,7 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "vol-create.StoreVolume", UndoFunc: "vol-create.UndoStoreVolume", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, } diff --git a/glusterd2/commands/volumes/volume-delete.go b/glusterd2/commands/volumes/volume-delete.go index f876b52f4..8daa54f2e 100644 --- a/glusterd2/commands/volumes/volume-delete.go +++ b/glusterd2/commands/volumes/volume-delete.go @@ -81,6 +81,7 @@ func volumeDeleteHandler(w http.ResponseWriter, r *http.Request) { { DoFunc: "vol-delete.Store", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, } diff --git a/glusterd2/commands/volumes/volume-expand.go b/glusterd2/commands/volumes/volume-expand.go index d95f5e146..b638ca7f7 100644 --- a/glusterd2/commands/volumes/volume-expand.go +++ b/glusterd2/commands/volumes/volume-expand.go @@ -176,6 +176,8 @@ func volumeExpandHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "vol-expand.ValidateBricks", Nodes: nodes, Skip: lvmResizeOp, + // Need to wait for newly selected bricks to be set by the previous step + Sync: true, }, { DoFunc: "vol-expand.InitBricks", @@ -193,11 +195,13 @@ func volumeExpandHandler(w http.ResponseWriter, r *http.Request) { UndoFunc: "vol-create.UndoStoreVolume", Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: !lvmResizeOp, + Sync: true, }, { DoFunc: "vol-expand.UpdateVolinfo", Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: lvmResizeOp, + Sync: true, }, { DoFunc: "vol-expand.GenerateBrickVolfiles", diff --git a/glusterd2/commands/volumes/volume-option.go b/glusterd2/commands/volumes/volume-option.go index 33e4e1989..eacfc7cf2 100644 --- a/glusterd2/commands/volumes/volume-option.go +++ b/glusterd2/commands/volumes/volume-option.go @@ -206,6 +206,11 @@ func volumeOptionsHandler(w http.ResponseWriter, r *http.Request) { return } + if containsReservedGroupProfile(req.Options) { + restutils.SendHTTPError(ctx, w, http.StatusBadRequest, errors.ErrReservedGroupProfile) + return + } + txn, err := transaction.NewTxnWithLocks(ctx, volname) if err != nil { status, err := restutils.ErrToStatusCode(err) @@ -241,6 +246,7 @@ func volumeOptionsHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "vol-option.UpdateVolinfo", UndoFunc: "vol-option.UpdateVolinfo.Undo", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, { DoFunc: "vol-option.XlatorActionDoSet", diff --git a/glusterd2/commands/volumes/volume-reset.go b/glusterd2/commands/volumes/volume-reset.go index 78f08a4c5..fa707dc49 100644 --- a/glusterd2/commands/volumes/volume-reset.go +++ b/glusterd2/commands/volumes/volume-reset.go @@ -2,6 +2,7 @@ package volumecommands import ( "net/http" + "strings" "github.com/gluster/glusterd2/glusterd2/gdctx" "github.com/gluster/glusterd2/glusterd2/peer" @@ -41,6 +42,11 @@ func volumeResetHandler(w http.ResponseWriter, r *http.Request) { return } + if containsReservedGroupProfile(req.Options) { + restutils.SendHTTPError(ctx, w, http.StatusBadRequest, errors.ErrReservedGroupProfile) + return + } + volname := mux.Vars(r)["volname"] volinfo, err := volume.GetVolume(volname) if err != nil { @@ -84,6 +90,29 @@ func volumeResetHandler(w http.ResponseWriter, r *http.Request) { } } + // If reset All is called or if anything reseted from the + // default group profile, remove from reset list and + // reassign the default value + var newopts []string + if len(volinfo.Subvols) > 0 { + optGrp, exists := defaultGroupOptions["profile.default."+strings.ToLower(volinfo.Subvols[0].Type.String())] + if exists { + REQLOOP: + for _, k := range req.Options { + for _, opt := range optGrp.Options { + if k == opt.Name { + // Reset the default value as mentioned in profile + volinfo.Options[k] = opt.OnValue + continue REQLOOP + } + } + // Not in default profile, continue to reset + newopts = append(newopts, k) + } + } + } + req.Options = newopts + for _, k := range req.Options { // Check if the key is set or not if _, ok := volinfo.Options[k]; ok { @@ -138,11 +167,13 @@ func volumeResetHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "vol-option.UpdateVolinfo", UndoFunc: "vol-option.UpdateVolinfo.Undo", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, { DoFunc: "vol-option.GenerateBrickVolfiles", UndoFunc: "vol-option.GenerateBrickvolfiles.Undo", Nodes: volinfo.Nodes(), + Sync: true, }, { DoFunc: "vol-option.NotifyVolfileChange", diff --git a/glusterd2/commands/volumes/volume-start.go b/glusterd2/commands/volumes/volume-start.go index 320c9fe1b..45c49cfd0 100644 --- a/glusterd2/commands/volumes/volume-start.go +++ b/glusterd2/commands/volumes/volume-start.go @@ -171,6 +171,7 @@ func volumeStartHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "vol-start.UpdateVolinfo", UndoFunc: "vol-start.UpdateVolinfo.Undo", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, { DoFunc: "vol-start.XlatorActionDoVolumeStart", diff --git a/glusterd2/commands/volumes/volume-stop.go b/glusterd2/commands/volumes/volume-stop.go index 7b827c03c..1844be527 100644 --- a/glusterd2/commands/volumes/volume-stop.go +++ b/glusterd2/commands/volumes/volume-stop.go @@ -152,6 +152,7 @@ func volumeStopHandler(w http.ResponseWriter, r *http.Request) { DoFunc: "vol-stop.UpdateVolinfo", UndoFunc: "vol-stop.UpdateVolinfo.Undo", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, { DoFunc: "vol-stop.XlatorActionDoVolumeStop", diff --git a/glusterd2/main.go b/glusterd2/main.go index 8455cbffe..e2234a663 100644 --- a/glusterd2/main.go +++ b/glusterd2/main.go @@ -16,6 +16,8 @@ import ( "github.com/gluster/glusterd2/glusterd2/pmap" "github.com/gluster/glusterd2/glusterd2/servers" "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transactionv2" + "github.com/gluster/glusterd2/glusterd2/transactionv2/cleanuphandler" gdutils "github.com/gluster/glusterd2/glusterd2/utils" "github.com/gluster/glusterd2/glusterd2/volgen" "github.com/gluster/glusterd2/glusterd2/xlator" @@ -105,6 +107,8 @@ func main() { log.WithError(err).Fatal("Failed to initialize store (etcd client)") } + transaction.StartTxnEngine() + cleanuphandler.StartCleanupLeader() // Start the events framework after store is up if err := events.Start(); err != nil { log.WithError(err).Fatal("Failed to start internal events framework") @@ -115,7 +119,7 @@ func main() { } // Load the default group option map into the store - if err := volumecommands.LoadDefaultGroupOptions(); err != nil { + if err := volumecommands.InitDefaultGroupOptions(); err != nil { log.WithError(err).Fatal("Failed to load the default group options") } @@ -168,6 +172,8 @@ func main() { case unix.SIGINT: log.Info("Received SIGTERM. Stopping GlusterD") gdctx.IsTerminating = true + transaction.StopTxnEngine() + cleanuphandler.StopCleanupLeader() super.Stop() events.Stop() store.Close() diff --git a/glusterd2/transaction/context.go b/glusterd2/transaction/context.go index 5e58b0894..40d4c1077 100644 --- a/glusterd2/transaction/context.go +++ b/glusterd2/transaction/context.go @@ -33,29 +33,32 @@ type TxnCtx interface { // Logger returns the Logrus logger associated with the context Logger() log.FieldLogger - // commit writes all locally cached keys and values into the store using + // Commit writes all locally cached keys and values into the store using // a single etcd transaction. This is for internal use by the txn framework // and hence isn't exported. - commit() error + Commit() error + + // SyncCache synchronizes the locally cached keys and values from the store + SyncCache() error } // Tctx represents structure for transaction context type Tctx struct { - config *txnCtxConfig // this will be marshalled and sent on wire + config *TxnCtxConfig // this will be marshalled and sent on wire logger log.FieldLogger readSet map[string][]byte // cached responses from store readCacheDirty bool writeSet map[string]string // to be written to store } -// txnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx +// TxnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx // on receiver's end. -type txnCtxConfig struct { +type TxnCtxConfig struct { LogFields log.Fields StorePrefix string } -func newCtx(config *txnCtxConfig) *Tctx { +func newCtx(config *TxnCtxConfig) *Tctx { return &Tctx{ config: config, logger: log.StandardLogger().WithFields(config.LogFields), @@ -65,6 +68,11 @@ func newCtx(config *txnCtxConfig) *Tctx { } } +// NewCtx returns a transaction context from given config +func NewCtx(config *TxnCtxConfig) *Tctx { + return newCtx(config) +} + // Set attaches the given key-value pair to the context. // If the key exists, the value will be updated. func (c *Tctx) Set(key string, value interface{}) error { @@ -86,9 +94,22 @@ func (c *Tctx) Set(key string, value interface{}) error { return nil } -// commit writes all locally cached keys and values into the store using +// SyncCache synchronizes the locally cached keys and values from the store +func (c *Tctx) SyncCache() error { + resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix()) + if err != nil { + return err + } + + for _, kv := range resp.Kvs { + c.readSet[string(kv.Key)] = kv.Value + } + return nil +} + +// Commit writes all locally cached keys and values into the store using // a single etcd transaction. -func (c *Tctx) commit() error { +func (c *Tctx) Commit() error { if len(c.writeSet) == 0 { return nil @@ -120,6 +141,7 @@ func (c *Tctx) commit() error { expTxn.Add("txn_ctx_store_commit", 1) + c.writeSet = make(map[string]string) c.readCacheDirty = true return nil @@ -139,15 +161,10 @@ func (c *Tctx) Get(key string, value interface{}) error { // cache all keys and values from the store on the first call to Get if c.readCacheDirty { - resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix()) - if err != nil { + if err := c.SyncCache(); err != nil { c.logger.WithError(err).WithField("key", key).Error("failed to get key from transaction context") return err } - expTxn.Add("txn_ctx_store_get", 1) - for _, kv := range resp.Kvs { - c.readSet[string(kv.Key)] = kv.Value - } c.readCacheDirty = false } diff --git a/glusterd2/transaction/lock.go b/glusterd2/transaction/lock.go index 4fe6b0c65..8f27baf09 100644 --- a/glusterd2/transaction/lock.go +++ b/glusterd2/transaction/lock.go @@ -85,8 +85,8 @@ func CreateLockSteps(key string) (*Step, *Step, error) { return nil, nil, err } - lockStep := &Step{lockFunc, unlockFunc, []uuid.UUID{gdctx.MyUUID}, false} - unlockStep := &Step{unlockFunc, "", []uuid.UUID{gdctx.MyUUID}, false} + lockStep := &Step{DoFunc: lockFunc, UndoFunc: unlockFunc, Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false} + unlockStep := &Step{DoFunc: unlockFunc, UndoFunc: "", Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false} return lockStep, unlockStep, nil } @@ -149,13 +149,17 @@ func CreateLockFuncs(key string) (LockUnlockFunc, LockUnlockFunc) { return lockFunc, unlockFunc } -func (t *Txn) lock(lockID string) error { +// Locks are the collection of cluster wide transaction lock +type Locks map[string]*concurrency.Mutex + +func (l Locks) lock(lockID string) error { + var logger = log.WithField("lockID", lockID) + // Ensure that no prior lock exists for the given lockID in this transaction - if _, ok := t.locks[lockID]; ok { + if _, ok := l[lockID]; ok { return ErrLockExists } - logger := t.Ctx.Logger().WithField("lockID", lockID) logger.Debug("attempting to obtain lock") key := lockPrefix + lockID @@ -169,11 +173,11 @@ func (t *Txn) lock(lockID string) error { case nil: logger.Debug("lock obtained") // Attach lock to the transaction - t.locks[lockID] = locker + l[lockID] = locker case context.DeadlineExceeded: - // Propagate this all the way back to the client as a HTTP 409 response logger.Debug("timeout: failed to obtain lock") + // Propagate this all the way back to the client as a HTTP 409 response err = ErrLockTimeout default: @@ -185,14 +189,23 @@ func (t *Txn) lock(lockID string) error { // Lock obtains a cluster wide transaction lock on the given lockID/lockIDs, // and attaches the obtained locks to the transaction -func (t *Txn) Lock(lockID string, lockIDs ...string) error { - if err := t.lock(lockID); err != nil { +func (l Locks) Lock(lockID string, lockIDs ...string) error { + if err := l.lock(lockID); err != nil { return err } for _, id := range lockIDs { - if err := t.lock(id); err != nil { + if err := l.lock(id); err != nil { return err } } return nil } + +// UnLock releases all cluster wide obtained locks +func (l Locks) UnLock(ctx context.Context) { + for lockID, locker := range l { + if err := locker.Unlock(ctx); err == nil { + delete(l, lockID) + } + } +} diff --git a/glusterd2/transaction/rpc-service.go b/glusterd2/transaction/rpc-service.go index ecc00b78c..43ca372aa 100644 --- a/glusterd2/transaction/rpc-service.go +++ b/glusterd2/transaction/rpc-service.go @@ -59,7 +59,7 @@ func (p *txnSvc) RunStep(rpcCtx context.Context, req *TxnStepReq) (*TxnStepResp, goto End } - if err = ctx.commit(); err != nil { + if err = ctx.Commit(); err != nil { logger.WithError(err).Error("failed to commit txn context to store") } diff --git a/glusterd2/transaction/step.go b/glusterd2/transaction/step.go index 784d7c02e..69a89b61c 100644 --- a/glusterd2/transaction/step.go +++ b/glusterd2/transaction/step.go @@ -27,6 +27,7 @@ type Step struct { UndoFunc string Nodes []uuid.UUID Skip bool + Sync bool } var ( @@ -136,7 +137,7 @@ func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, nod var err error if uuid.Equal(node, gdctx.MyUUID) { - err = runStepFuncLocally(origCtx, stepName, ctx) + err = RunStepFuncLocally(origCtx, stepName, ctx) } else { // remote node err = runStepOn(origCtx, stepName, node, ctx) @@ -145,7 +146,8 @@ func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, nod respCh <- stepPeerResp{node, err} } -func runStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error { +// RunStepFuncLocally runs a step func on local node +func RunStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error { var err error @@ -163,7 +165,7 @@ func runStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) er if err = stepFunc(ctx); err == nil { // if step function executes successfully, commit the // results to the store - err = ctx.commit() + err = ctx.Commit() } } else { err = ErrStepFuncNotFound diff --git a/glusterd2/transaction/transaction.go b/glusterd2/transaction/transaction.go index 3cb5c3bad..deca9501d 100644 --- a/glusterd2/transaction/transaction.go +++ b/glusterd2/transaction/transaction.go @@ -26,7 +26,7 @@ var expTxn = expvar.NewMap("txn") // Txn is a set of steps type Txn struct { id uuid.UUID - locks map[string]*concurrency.Mutex + locks Locks reqID uuid.UUID storePrefix string @@ -49,7 +49,7 @@ func NewTxn(ctx context.Context) *Txn { t.reqID = gdctx.GetReqID(ctx) t.locks = make(map[string]*concurrency.Mutex) t.storePrefix = txnPrefix + t.id.String() + "/" - config := &txnCtxConfig{ + config := &TxnCtxConfig{ LogFields: log.Fields{ "txnid": t.id.String(), "reqid": t.reqID.String(), @@ -68,10 +68,16 @@ func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) { t := NewTxn(ctx) for _, id := range lockIDs { - if err := t.Lock(id); err != nil { + logger := t.Ctx.Logger().WithField("lockID", id) + logger.Debug("attempting to obtain lock") + + if err := t.locks.Lock(id); err != nil { + logger.WithError(err).Error("failed to obtain lock") t.Done() return nil, err } + + logger.Debug("lock obtained") } return t, nil @@ -125,7 +131,7 @@ func (t *Txn) Do() error { expTxn.Add("initiated_txn_in_progress", 1) // commit txn.Ctx.Set()s done in REST handlers to the store - if err := t.Ctx.commit(); err != nil { + if err := t.Ctx.Commit(); err != nil { return err } diff --git a/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go b/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go new file mode 100644 index 000000000..ecff20ccb --- /dev/null +++ b/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go @@ -0,0 +1,172 @@ +package cleanuphandler + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transactionv2" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + log "github.com/sirupsen/logrus" +) + +const ( + leaderKey = "cleanup-leader" + cleanupTimerDur = time.Minute * 2 + txnMaxAge = time.Second * 20 +) + +// CleanupLeader is responsible for performing all cleaning operation +var CleanupLeader *CleanupHandler + +// CleaupHandlerOptFunc accepts a CleanupHandler and overrides its members +type CleaupHandlerOptFunc func(handler *CleanupHandler) error + +// CleanupHandler performs all cleaning operation. +// It will remove all expired txn related data from store. +// A leader is elected among the peers in the cluster to +// cleanup stale transactions. The leader periodically scans +// the pending transaction namespace for failed and stale +// transactions, and cleans them up if rollback is completed +// by all peers involved in the transaction. +type CleanupHandler struct { + sync.Mutex + isLeader bool + stopChan chan struct{} + stopOnce sync.Once + session *concurrency.Session + election *concurrency.Election + txnManager transaction.TxnManager +} + +// WithSession configures a session with given ttl +func WithSession(client *clientv3.Client, ttl int) CleaupHandlerOptFunc { + return func(handler *CleanupHandler) error { + session, err := concurrency.NewSession(client, concurrency.WithTTL(ttl)) + if err != nil { + return err + } + handler.session = session + return nil + } +} + +// WithElection creates a new election for CleanupHandler.It will use the `defaultSession` +// if no session has been configured previously. +func WithElection(defaultSession *concurrency.Session) CleaupHandlerOptFunc { + return func(handler *CleanupHandler) error { + session := defaultSession + if handler.session != nil { + session = handler.session + } + electionKeyPrefix := fmt.Sprintf("gluster-%s/", gdctx.MyClusterID.String()) + leaderKey + handler.election = concurrency.NewElection(session, electionKeyPrefix) + return nil + } +} + +// NewCleanupHandler returns a new CleanupHandler +func NewCleanupHandler(optFuncs ...CleaupHandlerOptFunc) (*CleanupHandler, error) { + cl := &CleanupHandler{ + stopChan: make(chan struct{}), + txnManager: transaction.NewTxnManager(store.Store.Watcher), + } + + for _, optFunc := range optFuncs { + if err := optFunc(cl); err != nil { + return nil, err + } + } + + return cl, nil +} + +// Run starts running CleanupHandler +func (c *CleanupHandler) Run() { + log.Info("cleanup handler started") + + go transaction.UntilStop(c.HandleStaleTxn, cleanupTimerDur, c.stopChan) + go transaction.UntilStop(c.CleanFailedTxn, cleanupTimerDur, c.stopChan) + + <-c.stopChan + log.Info("cleanup handler stopped") +} + +// HandleStaleTxn will mark all the expired txn as failed based maxAge of a txn +func (c *CleanupHandler) HandleStaleTxn() { + c.Lock() + isLeader := c.isLeader + c.Unlock() + + if isLeader { + c.txnManager.TxnGC(txnMaxAge) + } +} + +// CleanFailedTxn removes all failed txn if rollback is +// completed by all peers involved in the transaction +func (c *CleanupHandler) CleanFailedTxn() { + c.Lock() + isLeader := c.isLeader + c.Unlock() + + if isLeader { + c.txnManager.RemoveFailedTxns() + } +} + +// StartElecting triggers a new election campaign. +// If it succeeded then it assumes the leader role and returns +func (c *CleanupHandler) StartElecting() { + log.Info("node started to contest for leader election") + + if err := c.election.Campaign(context.Background(), gdctx.MyUUID.String()); err != nil { + log.WithError(err).Error("failed in campaign for cleanup leader election") + c.Stop() + return + } + + log.Info("node got elected as cleanup leader") + c.Lock() + defer c.Unlock() + c.isLeader = true +} + +// Stop will stop running the CleanupHandler +func (c *CleanupHandler) Stop() { + log.Info("attempting to stop cleanup handler") + c.stopOnce.Do(func() { + close(c.stopChan) + c.election.Resign(context.Background()) + }) +} + +// StartCleanupLeader starts cleanup leader +func StartCleanupLeader() { + var err error + + CleanupLeader, err = NewCleanupHandler( + WithSession(store.Store.Client, 60), + WithElection(store.Store.Session), + ) + + if err != nil { + log.WithError(err).Errorf("failed in starting cleanup handler") + return + } + + go CleanupLeader.StartElecting() + go CleanupLeader.Run() +} + +// StopCleanupLeader stops the cleanup leader +func StopCleanupLeader() { + if CleanupLeader != nil { + CleanupLeader.Stop() + } +} diff --git a/glusterd2/transactionv2/engine.go b/glusterd2/transactionv2/engine.go new file mode 100644 index 000000000..d80f68759 --- /dev/null +++ b/glusterd2/transactionv2/engine.go @@ -0,0 +1,232 @@ +package transaction + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/store" + + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" +) + +const ( + // PendingTxnPrefix is the etcd namespace into which all pending txn will be stored + PendingTxnPrefix = "pending-transaction/" + txnSyncTimeout = time.Second * 10 +) + +// transactionEngine is responsible for executing newly added txn +var transactionEngine *Engine + +// Engine executes the given transaction across the cluster. +// It makes use of etcd as the means of communication between nodes. +type Engine struct { + stop chan struct{} + stopOnce sync.Once + selfNodeID uuid.UUID + stepManager StepManager + txnManager TxnManager +} + +// NewEngine creates a TxnEngine +func NewEngine() *Engine { + return &Engine{ + stop: make(chan struct{}), + selfNodeID: gdctx.MyUUID, + stepManager: newStepManager(), + txnManager: NewTxnManager(store.Store.Watcher), + } +} + +// Run will start running the TxnEngine and wait for txn Engine to be stopped. +func (txnEng *Engine) Run() { + log.Info("running txn engine") + + go UntilStop(txnEng.HandleTransaction, 0, txnEng.stop) + go UntilStop(txnEng.HandleFailedTxn, 0, txnEng.stop) + + <-txnEng.stop + log.Info("txn engine stopped") +} + +// HandleTransaction executes newly added txn to the store. It will keep watching on +// `pending-transaction` namespace, if a new txn is added to the namespace then it will +// execute that txn. +func (txnEng *Engine) HandleTransaction() { + txnChan := txnEng.txnManager.WatchTxn(txnEng.stop) + + for { + select { + case <-txnEng.stop: + return + case txn, ok := <-txnChan: + if !ok { + return + } + txn.Ctx.Logger().Info("received a pending txn") + go txnEng.Execute(context.Background(), txn) + } + } +} + +func (txnEng *Engine) isInitiator(txn *Txn) bool { + return uuid.Equal(txn.Initiator, txnEng.selfNodeID) +} + +// Execute will run a given txn +func (txnEng *Engine) Execute(ctx context.Context, txn *Txn) { + var shouldExecute bool + for _, node := range txn.Nodes { + if uuid.Equal(node, txnEng.selfNodeID) { + txn.Ctx.Logger().WithField("peerID", txnEng.selfNodeID.String()).Info("executing txn on peer") + shouldExecute = true + break + } + } + + if !shouldExecute { + txn.Ctx.Logger().WithField("peerID", txnEng.selfNodeID.String()).Info("peer is not involved in this txn") + return + } + + if txnEng.isInitiator(txn) { + if err := WithClusterLocks(txn.Locks...)(txn); err != nil { + txn.Ctx.Logger().WithError(err).Error("error in acquiring cluster lock") + return + } + defer txn.releaseLocks() + } + + txnStatus, err := txnEng.txnManager.GetTxnStatus(txn.ID, txnEng.selfNodeID) + if err != nil { + txn.Ctx.Logger().WithError(err).Error("error in getting txn status") + return + } + + switch txnStatus.State { + case txnPending: + if err := txnEng.executePendingTxn(ctx, txn); err != nil { + status := TxnStatus{State: txnFailed, Reason: err.Error(), TxnID: txn.ID} + txnEng.txnManager.UpDateTxnStatus(status, txn.ID, txnEng.selfNodeID) + return + } + txn.Ctx.Logger().Info("txn succeeded") + txnEng.txnManager.UpDateTxnStatus(TxnStatus{State: txnSucceeded, TxnID: txn.ID}, txn.ID, txnEng.selfNodeID) + } + return +} + +func (txnEng *Engine) executePendingTxn(ctx context.Context, txn *Txn) error { + var ( + stopch = make(chan struct{}) + txnStatusChan = txnEng.txnManager.WatchTxnStatus(stopch, txn.ID, txnEng.selfNodeID) + updateOnce = &sync.Once{} + logger = txn.Ctx.Logger() + ) + defer close(stopch) + + logger.Infof("transaction started on node") + + for i, step := range txn.Steps { + logger.WithField("stepname", step.DoFunc).Debug("running step func on node") + + // a synchronized step is executed only after all pervious steps + // have been completed successfully by all involved peers. + if step.Sync { + logger.WithField("stepname", step.DoFunc).Debug("synchronizing txn step") + if err := txnEng.stepManager.SyncStep(ctx, i, txn); err != nil { + logger.WithFields(log.Fields{ + "error": err, + "stepname": step.DoFunc, + }).Error("encounter an error in synchronizing txn step") + return err + } + logger.Debug("transaction got synchronized") + } + + if err := txnEng.stepManager.RunStep(ctx, step, txn.Ctx); err != nil { + logger.WithFields(log.Fields{ + "error": err, + "stepname": step.DoFunc, + }).Error("failed in executing txn step ") + txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + return err + } + + select { + case <-ctx.Done(): + txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + return ctx.Err() + case status := <-txnStatusChan: + if status.State == txnFailed { + txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + return errors.New(status.Reason) + } + default: + } + + logger.WithField("stepname", step.DoFunc).Debug("step func executed successfully on node") + txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + updateOnce.Do(func() { + txnEng.txnManager.UpDateTxnStatus(TxnStatus{State: txnRunning, TxnID: txn.ID}, txn.ID, txnEng.selfNodeID) + }) + } + return nil +} + +// HandleFailedTxn keep on watching store for failed txn. If it receives any failed +// txn then it will rollback all executed steps of that txn. +func (txnEng *Engine) HandleFailedTxn() { + failedTxnChan := txnEng.txnManager.WatchFailedTxn(txnEng.stop, txnEng.selfNodeID) + + for { + select { + case <-txnEng.stop: + return + case failedTxn, ok := <-failedTxnChan: + if !ok { + return + } + + lastStepIndex, err := txnEng.txnManager.GetLastExecutedStep(failedTxn.ID, txnEng.selfNodeID) + if err != nil || lastStepIndex == -1 { + continue + } + failedTxn.Ctx.Logger().Debugf("received a failed txn, rolling back changes") + + for i := lastStepIndex; i >= 0; i-- { + err := txnEng.stepManager.RollBackStep(context.Background(), failedTxn.Steps[i], failedTxn.Ctx) + if err != nil { + failedTxn.Ctx.Logger().WithError(err).WithField("step", failedTxn.Steps[i]).Error("failed in rolling back step") + } + } + txnEng.txnManager.UpdateLastExecutedStep(-1, failedTxn.ID, txnEng.selfNodeID) + } + } +} + +// Stop will stop a running Txn Engine +func (txnEng *Engine) Stop() { + log.Info("stopping txn engine") + txnEng.stopOnce.Do(func() { + close(txnEng.stop) + }) +} + +// StartTxnEngine creates a new Txn Engine and starts running it +func StartTxnEngine() { + transactionEngine = NewEngine() + GlobalTxnManager = NewTxnManager(store.Store.Watcher) + go transactionEngine.Run() +} + +// StopTxnEngine stops the Txn Engine +func StopTxnEngine() { + if transactionEngine != nil { + transactionEngine.Stop() + } +} diff --git a/glusterd2/transactionv2/errors.go b/glusterd2/transactionv2/errors.go new file mode 100644 index 000000000..0b354cdfe --- /dev/null +++ b/glusterd2/transactionv2/errors.go @@ -0,0 +1,8 @@ +package transaction + +import "errors" + +var ( + errTxnTimeout = errors.New("txn timeout") + errTxnSyncTimeout = errors.New("timeout in synchronizing txn") +) diff --git a/glusterd2/transactionv2/steprunner.go b/glusterd2/transactionv2/steprunner.go new file mode 100644 index 000000000..d1f3280b3 --- /dev/null +++ b/glusterd2/transactionv2/steprunner.go @@ -0,0 +1,108 @@ +package transaction + +import ( + "context" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transaction" + + "github.com/pborman/uuid" +) + +// StepManager is an interface for running a step and also rollback step on local node +type StepManager interface { + RunStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error + RollBackStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error + SyncStep(ctx context.Context, stepIndex int, txn *Txn) error +} + +type stepManager struct { + selfNodeID uuid.UUID +} + +func newStepManager() StepManager { + return &stepManager{ + selfNodeID: gdctx.MyUUID, + } +} + +func (sm *stepManager) shouldRunStep(step *transaction.Step) bool { + if step.Skip { + return false + } + + for _, id := range step.Nodes { + if uuid.Equal(sm.selfNodeID, id) { + return true + } + } + return false +} + +// runStep synchronises the locally cached keys and values from the store +// before running the step function on node +func (sm *stepManager) runStep(ctx context.Context, stepName string, txnCtx transaction.TxnCtx) error { + txnCtx.SyncCache() + return transaction.RunStepFuncLocally(ctx, stepName, txnCtx) +} + +// isPrevStepsExecutedOnNode reports that all pervious steps +// have been completed successfully on a given node +func (sm *stepManager) isPrevStepsExecutedOnNode(ctx context.Context, syncStepIndex int, nodeID uuid.UUID, txnID uuid.UUID, success chan<- struct{}) { + txnManager := NewTxnManager(store.Store.Watcher) + lastStepWatchChan := txnManager.WatchLastExecutedStep(ctx.Done(), txnID, nodeID) + for { + select { + case <-ctx.Done(): + return + case lastStep := <-lastStepWatchChan: + if lastStep == syncStepIndex-1 { + success <- struct{}{} + return + } + } + } +} + +// SyncStep synchronises a step of given txn across all nodes of cluster +func (sm *stepManager) SyncStep(ctx context.Context, syncStepIndex int, txn *Txn) error { + var ( + success = make(chan struct{}) + syncCtx, cancel = context.WithTimeout(ctx, txnSyncTimeout) + ) + defer cancel() + + for _, nodeID := range txn.Nodes { + go sm.isPrevStepsExecutedOnNode(syncCtx, syncStepIndex, nodeID, txn.ID, success) + } + + for range txn.Nodes { + select { + case <-syncCtx.Done(): + return errTxnSyncTimeout + case <-success: + } + } + return nil +} + +// RollBackStep will rollback a given step on local node +func (sm *stepManager) RollBackStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error { + if !sm.shouldRunStep(step) { + return nil + } + + if step.UndoFunc != "" { + return sm.runStep(ctx, step.UndoFunc, txnCtx) + } + return nil +} + +// RunStepRunStep will execute the step on local node +func (sm *stepManager) RunStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error { + if !sm.shouldRunStep(step) { + return nil + } + return sm.runStep(ctx, step.DoFunc, txnCtx) +} diff --git a/glusterd2/transactionv2/transaction.go b/glusterd2/transactionv2/transaction.go new file mode 100644 index 000000000..29eebb6d8 --- /dev/null +++ b/glusterd2/transactionv2/transaction.go @@ -0,0 +1,251 @@ +// Package transaction implements a distributed transaction handling framework +package transaction + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transaction" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" +) + +const ( + txnPrefix = "transaction/" + txnTimeOut = time.Second * 15 +) + +// TxnOptFunc receives a Txn and overrides its members +type TxnOptFunc func(*Txn) error + +// Txn is a set of steps +type Txn struct { + locks transaction.Locks + + // Nodes is the union of the all the TxnStep.Nodes and is implicitly + // set in Txn.Do(). This list is used to determine liveness of the + // nodes before running the transaction steps. + Nodes []uuid.UUID `json:"nodes"` + StorePrefix string `json:"store_prefix"` + ID uuid.UUID `json:"id"` + Locks []string `json:"locks"` + ReqID uuid.UUID `json:"req_id"` + Ctx transaction.TxnCtx `json:"ctx"` + Steps []*transaction.Step `json:"steps"` + DontCheckAlive bool `json:"dont_check_alive"` + DisableRollback bool `json:"disable_rollback"` + Initiator uuid.UUID `json:"initiator"` + StartTime time.Time `json:"start_time"` + + success chan struct{} + error chan error + succeeded bool +} + +// NewTxn returns an initialized Txn without any steps +func NewTxn(ctx context.Context) *Txn { + t := new(Txn) + + t.ID = uuid.NewRandom() + t.ReqID = gdctx.GetReqID(ctx) + t.locks = make(map[string]*concurrency.Mutex) + t.StorePrefix = txnPrefix + t.ID.String() + "/" + config := &transaction.TxnCtxConfig{ + LogFields: log.Fields{ + "txnid": t.ID.String(), + "reqid": t.ReqID.String(), + }, + StorePrefix: t.StorePrefix, + } + t.Ctx = transaction.NewCtx(config) + t.Initiator = gdctx.MyUUID + t.Ctx.Logger().Debug("new transaction created") + return t +} + +// NewTxnWithLocks returns an empty Txn with locks obtained on given lockIDs +func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) { + t := NewTxn(ctx) + t.Locks = lockIDs + return t, nil +} + +// WithClusterLocks obtains a cluster wide locks on given IDs for a txn +func WithClusterLocks(lockIDs ...string) TxnOptFunc { + return func(t *Txn) error { + for _, id := range lockIDs { + logger := t.Ctx.Logger().WithField("lockID", id) + logger.Debug("attempting to obtain lock") + if err := t.locks.Lock(id); err != nil { + logger.WithError(err).Error("failed to obtain lock") + t.releaseLocks() + return err + } + logger.Debug("lock obtained") + } + return nil + } +} + +func (t *Txn) releaseLocks() { + t.locks.UnLock(context.Background()) +} + +// Done releases any obtained locks and cleans up the transaction namespace +// Done must be called after a transaction ends +func (t *Txn) Done() { + if !t.succeeded { + return + } + t.done() + t.releaseLocks() + GlobalTxnManager.RemoveTransaction(t.ID) + t.Ctx.Logger().Info("txn succeeded on all nodes, txn data cleaned up from store") +} + +func (t *Txn) done() { + if _, err := store.Delete(context.TODO(), t.StorePrefix, clientv3.WithPrefix()); err != nil { + t.Ctx.Logger().WithError(err).WithField("key", + t.StorePrefix).Error("Failed to remove transaction namespace from store") + } + +} + +func (t *Txn) checkAlive() error { + + if len(t.Nodes) == 0 { + for _, s := range t.Steps { + t.Nodes = append(t.Nodes, s.Nodes...) + } + } + t.Nodes = nodesUnion(t.Nodes) + + for _, node := range t.Nodes { + // TODO: Using prefixed query, get all alive nodes in a single etcd query + if _, online := store.Store.IsNodeAlive(node); !online { + return fmt.Errorf("node %s is probably down", node.String()) + } + } + + return nil +} + +// Do runs the transaction on the cluster +func (t *Txn) Do() error { + var ( + stop = make(chan struct{}) + timer = time.NewTimer(txnTimeOut) + ) + + { + t.success = make(chan struct{}) + t.error = make(chan error) + t.StartTime = time.Now() + } + + defer timer.Stop() + + if !t.DontCheckAlive { + if err := t.checkAlive(); err != nil { + return err + } + } + + t.Ctx.Logger().Debug("Starting transaction") + + go t.waitForCompletion(stop) + defer close(stop) + + GlobalTxnManager.UpDateTxnStatus(TxnStatus{State: txnPending, TxnID: t.ID}, t.ID, t.Nodes...) + + // commit txn.Ctx.Set()s done in REST handlers to the store + if err := t.Ctx.Commit(); err != nil { + return err + } + + t.Ctx.Logger().Debug("adding txn to store") + if err := GlobalTxnManager.AddTxn(t); err != nil { + return err + } + t.Ctx.Logger().Debug("waiting for txn to be cleaned up") + + failureAction := func(err error) { + t.Ctx.Logger().WithError(err).Error("error in executing txn, marking as failure") + txnStatus := TxnStatus{State: txnFailed, TxnID: t.ID, Reason: err.Error()} + GlobalTxnManager.UpDateTxnStatus(txnStatus, t.ID, t.Nodes...) + } + + select { + case <-t.success: + t.succeeded = true + case err := <-t.error: + failureAction(err) + return err + case <-timer.C: + failureAction(errTxnTimeout) + return errTxnTimeout + } + + return nil +} + +func (t *Txn) isNodeSucceded(nodeID uuid.UUID, success chan<- struct{}, stopCh <-chan struct{}) { + txnStatusChan := GlobalTxnManager.WatchTxnStatus(stopCh, t.ID, nodeID) + + for { + select { + case <-stopCh: + return + case status := <-txnStatusChan: + log.WithFields(log.Fields{ + "nodeId": nodeID.String(), + "status": fmt.Sprintf("%+v", status), + }).Debug("state received") + + if status.State == txnSucceeded { + success <- struct{}{} + return + } else if status.State == txnFailed { + t.error <- errors.New(status.Reason) + return + } + } + } +} + +func (t *Txn) waitForCompletion(stopCh <-chan struct{}) { + var successChan = make(chan struct{}) + + for _, nodeID := range t.Nodes { + go t.isNodeSucceded(nodeID, successChan, stopCh) + } + + for range t.Nodes { + select { + case <-stopCh: + return + case <-successChan: + } + } + t.success <- struct{}{} +} + +// nodesUnion removes duplicate nodes +func nodesUnion(nodes []uuid.UUID) []uuid.UUID { + for i := 0; i < len(nodes); i++ { + for j := i + 1; j < len(nodes); j++ { + if uuid.Equal(nodes[i], nodes[j]) { + nodes = append(nodes[:j], nodes[j+1:]...) + j-- + } + } + } + return nodes +} diff --git a/glusterd2/transactionv2/txnmanager.go b/glusterd2/transactionv2/txnmanager.go new file mode 100644 index 000000000..3908afa89 --- /dev/null +++ b/glusterd2/transactionv2/txnmanager.go @@ -0,0 +1,436 @@ +package transaction + +import ( + "context" + "encoding/json" + "errors" + "path" + "strconv" + "strings" + "sync" + "time" + + "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transaction" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/pborman/uuid" +) + +// GlobalTxnManager stores and manages access to transaction related data +var GlobalTxnManager TxnManager + +const ( + // TxnStatusPrefix is etcd key prefix under which status of a txn is stored for a particular node + // eg.. key for storing status:- pending-transaction///status + TxnStatusPrefix = "status" + // LastExecutedStepPrefix is etcd key prefix under which last step executed on a particular node for a txn is stored + // eg.. key for storing last executed step on a node:- pending-transaction///laststep + LastExecutedStepPrefix = "laststep" + // etcd txn timeout in seconds + etcdTxnTimeout = time.Second * 5 +) + +// TxnManager stores and manages access to transaction related data in +// `pending-transaction` namespace. +type TxnManager interface { + WatchTxn(stopCh <-chan struct{}) <-chan *Txn + GetTxns() []*Txn + AddTxn(txn *Txn) error + GetTxnByUUID(id uuid.UUID) (*Txn, error) + RemoveTransaction(txnID uuid.UUID) error + UpdateLastExecutedStep(index int, txnID uuid.UUID, nodeID uuid.UUID) error + GetLastExecutedStep(txnID uuid.UUID, nodeID uuid.UUID) (int, error) + WatchLastExecutedStep(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan int + WatchFailedTxn(stopCh <-chan struct{}, nodeID uuid.UUID) <-chan *Txn + WatchTxnStatus(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan TxnStatus + GetTxnStatus(txnID uuid.UUID, nodeID uuid.UUID) (TxnStatus, error) + UpDateTxnStatus(state TxnStatus, txnID uuid.UUID, nodeIDs ...uuid.UUID) error + TxnGC(maxAge time.Duration) + RemoveFailedTxns() +} + +type txnManager struct { + sync.Mutex + getStoreKey func(...string) string + storeWatcher clientv3.Watcher +} + +// NewTxnManager returns a TxnManager +func NewTxnManager(storeWatcher clientv3.Watcher) TxnManager { + tm := &txnManager{ + storeWatcher: storeWatcher, + } + tm.getStoreKey = func(s ...string) string { + key := path.Join(PendingTxnPrefix, path.Join(s...)) + return key + } + return tm +} + +// RemoveTransaction removes a transaction from `pending-transaction namespace` +func (tm *txnManager) RemoveTransaction(txnID uuid.UUID) error { + _, err := store.Delete(context.TODO(), tm.getStoreKey(txnID.String()), clientv3.WithPrefix()) + return err +} + +// WatchTxnStatus watches status of txn on a particular node +func (tm *txnManager) WatchTxnStatus(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan TxnStatus { + var ( + txnStatusChan = make(chan TxnStatus, 10) + key = tm.getStoreKey(txnID.String(), nodeID.String(), TxnStatusPrefix) + ) + + respHandler := func(response clientv3.WatchResponse) { + for _, event := range response.Events { + txnStatus := TxnStatus{} + if err := json.Unmarshal(event.Kv.Value, &txnStatus); err != nil { + continue + } + if !txnStatus.State.Valid() { + continue + } + txnStatusChan <- txnStatus + } + } + + tm.watch(stopCh, key, respHandler, clientv3.WithFilterDelete()) + return txnStatusChan +} + +// WatchTxn watches for newly added txn to store +func (tm *txnManager) WatchTxn(stopCh <-chan struct{}) <-chan *Txn { + var ( + txnChan = make(chan *Txn, 10) + key = tm.getStoreKey() + opts = []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithFilterDelete()} + ) + + respHandler := func(response clientv3.WatchResponse) { + for _, txn := range tm.watchRespToTxns(response) { + txnChan <- txn + } + } + + tm.watch(stopCh, key, respHandler, opts...) + + return txnChan +} + +// WatchFailedTxn watches for a failed txn on a particular node +func (tm *txnManager) WatchFailedTxn(stopCh <-chan struct{}, nodeID uuid.UUID) <-chan *Txn { + var ( + txnChan = make(chan *Txn) + key = tm.getStoreKey() + ops = []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithFilterDelete()} + ) + + go func() { + resp, err := store.Get(context.TODO(), key, ops...) + if err != nil { + return + } + for _, kv := range resp.Kvs { + if txn := tm.kvToFailedTxn(kv, nodeID); txn != nil { + txnChan <- txn + } + } + }() + + respHandler := func(resp clientv3.WatchResponse) { + for _, event := range resp.Events { + if txn := tm.kvToFailedTxn(event.Kv, nodeID); txn != nil { + txnChan <- txn + } + } + } + + tm.watch(stopCh, key, respHandler, ops...) + return txnChan +} + +func (tm *txnManager) kvToFailedTxn(kv *mvccpb.KeyValue, nodeID uuid.UUID) *Txn { + + if !strings.HasSuffix(string(kv.Key), TxnStatusPrefix) { + return nil + } + + prefix, _ := path.Split(string(kv.Key)) + nID := path.Base(prefix) + + if nodeID.String() != nID { + return nil + } + + txnStatus := &TxnStatus{} + if err := json.Unmarshal(kv.Value, txnStatus); err != nil { + return nil + } + + if txnStatus.State != txnFailed { + return nil + } + + txn, err := tm.GetTxnByUUID(txnStatus.TxnID) + if err != nil { + return nil + } + return txn +} + +func (tm *txnManager) watchRespToTxns(resp clientv3.WatchResponse) (txns []*Txn) { + for _, event := range resp.Events { + prefix, id := path.Split(string(event.Kv.Key)) + if uuid.Parse(id) == nil || !strings.HasSuffix(prefix, PendingTxnPrefix) { + continue + } + + txn := &Txn{Ctx: new(transaction.Tctx)} + if err := json.Unmarshal(event.Kv.Value, txn); err != nil { + continue + } + + txn.locks = make(map[string]*concurrency.Mutex) + txns = append(txns, txn) + } + return +} + +// AddTxn adds a txn to the store +func (tm *txnManager) AddTxn(txn *Txn) error { + data, err := json.Marshal(txn) + if err != nil { + return err + } + _, err = store.Put(context.TODO(), tm.getStoreKey(txn.ID.String()), string(data)) + return err +} + +// GetTxnByUUID returns the txn from given ID +func (tm *txnManager) GetTxnByUUID(id uuid.UUID) (*Txn, error) { + key := tm.getStoreKey(id.String()) + resp, err := store.Get(context.TODO(), key) + if err != nil { + return nil, err + } + + if len(resp.Kvs) == 0 { + return nil, errors.New(key + " key not found") + } + + kv := resp.Kvs[0] + + txn := &Txn{Ctx: new(transaction.Tctx)} + if err := json.Unmarshal(kv.Value, txn); err != nil { + return nil, err + } + txn.locks = make(map[string]*concurrency.Mutex) + return txn, nil +} + +// GetTxns returns all txns added to the store +func (tm *txnManager) GetTxns() (txns []*Txn) { + resp, err := store.Get(context.TODO(), tm.getStoreKey(), clientv3.WithPrefix()) + if err != nil { + return + } + for _, kv := range resp.Kvs { + _, id := path.Split(string(kv.Key)) + if uuid.Parse(id) == nil { + continue + } + + txn := &Txn{Ctx: new(transaction.Tctx)} + if err := json.Unmarshal(kv.Value, txn); err != nil { + continue + } + txn.locks = make(map[string]*concurrency.Mutex) + txns = append(txns, txn) + } + return +} + +// UpDateTxnStatus updates txn status for given nodes +func (tm *txnManager) UpDateTxnStatus(status TxnStatus, txnID uuid.UUID, nodeIDs ...uuid.UUID) error { + var ( + ctx, cancel = context.WithTimeout(context.Background(), etcdTxnTimeout) + storeMutex = concurrency.NewMutex(store.Store.Session, txnID.String()) + putOps []clientv3.Op + ) + + storeMutex.Lock(ctx) + defer cancel() + defer storeMutex.Unlock(ctx) + + data, err := json.Marshal(status) + if err != nil { + return err + } + + for _, nodeID := range nodeIDs { + key := tm.getStoreKey(txnID.String(), nodeID.String(), TxnStatusPrefix) + putOps = append(putOps, clientv3.OpPut(key, string(data))) + } + + txn, err := store.Txn(ctx).Then(putOps...).Commit() + if err != nil || !txn.Succeeded { + return errors.New("etcd txn to update txn status failed") + } + return nil +} + +// GetTxnStatus returns status of given txn on a particular node +func (tm *txnManager) GetTxnStatus(txnID uuid.UUID, nodeID uuid.UUID) (TxnStatus, error) { + var ( + ctx, cancel = context.WithCancel(context.Background()) + key = tm.getStoreKey(txnID.String(), nodeID.String(), TxnStatusPrefix) + storeMutex = concurrency.NewMutex(store.Store.Session, txnID.String()) + ) + + storeMutex.Lock(ctx) + defer cancel() + defer storeMutex.Unlock(ctx) + + resp, err := store.Get(context.TODO(), key) + if err != nil { + return TxnStatus{State: txnUnknown}, err + } + + if len(resp.Kvs) == 0 { + return TxnStatus{State: txnUnknown}, errors.New(key + " key not found") + } + + txnStatus := TxnStatus{} + kv := resp.Kvs[0] + + if err := json.Unmarshal(kv.Value, &txnStatus); err != nil { + return TxnStatus{State: txnUnknown}, err + } + + if !txnStatus.State.Valid() { + return TxnStatus{State: txnUnknown}, errors.New("invalid txn state") + } + + return txnStatus, nil +} + +// UpdateLastExecutedStep updates the last executed step on a node of a given txn ID +func (tm *txnManager) UpdateLastExecutedStep(index int, txnID uuid.UUID, nodeID uuid.UUID) error { + key := tm.getStoreKey(txnID.String(), nodeID.String(), LastExecutedStepPrefix) + _, err := store.Put(context.TODO(), key, strconv.Itoa(index)) + return err +} + +// GetLastExecutedStep fetches the last executed step on a node for a given txn ID +func (tm *txnManager) GetLastExecutedStep(txnID uuid.UUID, nodeID uuid.UUID) (int, error) { + key := tm.getStoreKey(txnID.String(), nodeID.String(), LastExecutedStepPrefix) + resp, err := store.Get(context.TODO(), key) + if err != nil { + return -1, err + } + + if resp.Count != 1 { + return -1, errors.New("more than one entry for same key") + } + + kv := resp.Kvs[0] + return strconv.Atoi(string(kv.Value)) +} + +// WatchLastExecutedStep watches for last executed step on a node for a given txn ID +func (tm *txnManager) WatchLastExecutedStep(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan int { + var ( + lastExecutedStepChan = make(chan int) + key = tm.getStoreKey(txnID.String(), nodeID.String(), LastExecutedStepPrefix) + opts = []clientv3.OpOption{clientv3.WithFilterDelete()} + ) + + resp, err := store.Get(context.TODO(), key) + if err == nil && resp.Count == 1 { + opts = append(opts, clientv3.WithRev(resp.Kvs[0].CreateRevision)) + } + + respHandler := func(response clientv3.WatchResponse) { + for _, event := range response.Events { + lastStep := string(event.Kv.Value) + if i, err := strconv.Atoi(lastStep); err == nil { + lastExecutedStepChan <- i + } + } + } + + tm.watch(stopCh, key, respHandler, opts...) + return lastExecutedStepChan +} + +// TxnGC will mark all the expired txn as failed based on given maxAge +func (tm *txnManager) TxnGC(maxAge time.Duration) { + tm.Lock() + defer tm.Unlock() + + txns := tm.GetTxns() + for _, txn := range txns { + if txn.StartTime.Unix()+int64(maxAge/time.Second) < time.Now().Unix() { + nonFailedNodes := []uuid.UUID{} + for _, nodeID := range txn.Nodes { + txnStatus, err := tm.GetTxnStatus(txn.ID, nodeID) + if err == nil && txnStatus.State != txnFailed { + nonFailedNodes = append(nonFailedNodes, nodeID) + } + } + if len(nonFailedNodes) == 0 { + continue + } + txnStatus := TxnStatus{State: txnFailed, TxnID: txn.ID, Reason: "txn expired"} + txn.Ctx.Logger().Info("txn got expired marking as failure") + tm.UpDateTxnStatus(txnStatus, txn.ID, nonFailedNodes...) + } + } +} + +// RemoveFailedTxns will remove all failed txn if rollback is completed by all peers involved in the transaction. +func (tm *txnManager) RemoveFailedTxns() { + txns := tm.GetTxns() + + for _, txn := range txns { + nodesRollbacked := 0 + + for _, nodeID := range txn.Nodes { + txnStatus, err := tm.GetTxnStatus(txn.ID, nodeID) + if err == nil && txnStatus.State == txnFailed { + lastStep, err := tm.GetLastExecutedStep(txn.ID, nodeID) + if err == nil && lastStep == -1 { + nodesRollbacked++ + } + } + } + + if nodesRollbacked == len(txn.Nodes) { + txn.Ctx.Logger().Info("txn rolled back on all nodes, cleaning from store") + txn.done() + tm.RemoveTransaction(txn.ID) + } + } +} + +func (tm *txnManager) watch(stopCh <-chan struct{}, key string, respHandler func(clientv3.WatchResponse), opts ...clientv3.OpOption) { + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + watchRespChan := tm.storeWatcher.Watch(ctx, key, opts...) + for { + select { + case <-stopCh: + return + case watchResp := <-watchRespChan: + if watchResp.Err() != nil || watchResp.Canceled { + return + } + respHandler(watchResp) + + } + } + }() +} diff --git a/glusterd2/transactionv2/types.go b/glusterd2/transactionv2/types.go new file mode 100644 index 000000000..d03febfaf --- /dev/null +++ b/glusterd2/transactionv2/types.go @@ -0,0 +1,37 @@ +package transaction + +import "github.com/pborman/uuid" + +// TxnState represents state of a txn +type TxnState string + +const ( + txnPending TxnState = "Pending" + txnRunning TxnState = "Running" + txnSucceeded TxnState = "Succeeded" + txnFailed TxnState = "Failed" + txnUnknown TxnState = "Unknown" +) + +// Valid returns whether a TxnState is valid or not +func (ts TxnState) Valid() bool { + switch ts { + case txnPending: + fallthrough + case txnRunning: + fallthrough + case txnSucceeded: + fallthrough + case txnFailed: + return true + default: + return false + } +} + +// TxnStatus represents status of a txn +type TxnStatus struct { + State TxnState `json:"txn_state"` + TxnID uuid.UUID `json:"txn_id"` + Reason string `json:"reason,omitempty"` +} diff --git a/glusterd2/transactionv2/utils.go b/glusterd2/transactionv2/utils.go new file mode 100644 index 000000000..05e149b26 --- /dev/null +++ b/glusterd2/transactionv2/utils.go @@ -0,0 +1,73 @@ +package transaction + +import ( + "fmt" + "runtime" + "time" + + log "github.com/sirupsen/logrus" +) + +// NeverStop can be used in UntilStop to make it never stop +var NeverStop <-chan struct{} = make(chan struct{}) + +// UntilStop loops until stop channel is closed, running f every d duration +func UntilStop(f func(), d time.Duration, stop <-chan struct{}) { + var ( + t *time.Timer + timeout bool + ) + + for { + select { + case <-stop: + return + default: + } + func() { + defer HandlePanic() + f() + }() + t = ResetTimer(t, d, timeout) + select { + case <-stop: + return + case <-t.C: + timeout = true + } + } +} + +// ResetTimer avoids allocating a new timer if one is already in use +func ResetTimer(t *time.Timer, dur time.Duration, timeout bool) *time.Timer { + if t == nil { + return time.NewTimer(dur) + } + if !t.Stop() && !timeout { + <-t.C + } + t.Reset(dur) + return t +} + +// HandlePanic simply recovers from a panic and logs an error. +func HandlePanic() { + if r := recover(); r != nil { + callers := getCallers() + log.WithFields(log.Fields{ + "panic": r, + "callers": callers, + }).Error("recovered from panic") + } +} + +func getCallers() (callers string) { + for i := 0; true; i++ { + _, file, line, ok := runtime.Caller(i) + if !ok { + return + } + callers += fmt.Sprintf("%v:%v\n", file, line) + } + return +} diff --git a/glusterd2/volgen/defaults.go b/glusterd2/volgen/defaults.go index a2af1f8b9..90fd163f3 100644 --- a/glusterd2/volgen/defaults.go +++ b/glusterd2/volgen/defaults.go @@ -106,11 +106,42 @@ func init() { Type: "debug/io-stats", NameTmpl: "{{ volume.name }}", }, + { + Type: "performance/io-threads", + }, + { + Type: "performance/md-cache", + }, + { + Type: "performance/open-behind", + }, + { + Type: "performance/quick-read", + }, + { + Type: "performance/io-cache", + }, + { + Type: "performance/readdir-ahead", + }, + { + Type: "performance/read-ahead", + }, + { + Type: "performance/write-behind", + }, { Type: "features/read-only", Disabled: true, EnableByOption: true, }, + { + Type: "features/utime", + }, + { + Type: "features/shard", + Disabled: true, + }, { Type: "cluster/distribute", }, diff --git a/pkg/errors/error.go b/pkg/errors/error.go index 65abec2e0..a8f4a8ed7 100644 --- a/pkg/errors/error.go +++ b/pkg/errors/error.go @@ -69,4 +69,5 @@ var ( ErrDeviceNotFound = errors.New("device does not exist in the given peer") ErrVolumeBricksMountFailed = errors.New("failed to get mount point entries for the volume bricks") ErrBrickMountFailed = errors.New("failed to mount brick") + ErrReservedGroupProfile = errors.New("reserved group profile") ) diff --git a/plugins/bitrot/rest.go b/plugins/bitrot/rest.go index 3a6f0fca3..e7542e94c 100644 --- a/plugins/bitrot/rest.go +++ b/plugins/bitrot/rest.go @@ -78,6 +78,8 @@ func bitrotEnableHandler(w http.ResponseWriter, r *http.Request) { // Required because bitrot-stub should be enabled on brick side DoFunc: "vol-option.NotifyVolfileChange", Nodes: txn.Nodes, + // Volinfo needs to be updated before sending notifications + Sync: true, }, { DoFunc: "bitrot-enable.Commit", diff --git a/plugins/georeplication/rest.go b/plugins/georeplication/rest.go index 6b8b10729..50a9dc887 100644 --- a/plugins/georeplication/rest.go +++ b/plugins/georeplication/rest.go @@ -197,6 +197,8 @@ func georepCreateHandler(w http.ResponseWriter, r *http.Request) { { DoFunc: "vol-option.NotifyVolfileChange", Nodes: txn.Nodes, + // Volinfo needs to be updated before sending notifications + Sync: true, }, { DoFunc: "georeplication-create.Commit", @@ -786,6 +788,8 @@ func georepConfigSetHandler(w http.ResponseWriter, r *http.Request) { { DoFunc: "georeplication-configfilegen.Commit", Nodes: txn.Nodes, + // Config needs to be set before config file can be generated + Sync: true, }, } @@ -919,6 +923,8 @@ func georepConfigResetHandler(w http.ResponseWriter, r *http.Request) { { DoFunc: "georeplication-configfilegen.Commit", Nodes: txn.Nodes, + // Config needs to be set before config file can be generated + Sync: true, }, } diff --git a/plugins/rebalance/rest.go b/plugins/rebalance/rest.go index 84e946ef9..ec82a6102 100644 --- a/plugins/rebalance/rest.go +++ b/plugins/rebalance/rest.go @@ -96,6 +96,7 @@ func rebalanceStartHandler(w http.ResponseWriter, r *http.Request) { { DoFunc: "rebalance-store", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, } @@ -186,6 +187,7 @@ func rebalanceStopHandler(w http.ResponseWriter, r *http.Request) { { DoFunc: "rebalance-store", Nodes: []uuid.UUID{gdctx.MyUUID}, + Sync: true, }, }