Skip to content

Commit

Permalink
changefeed for rethinkdb
Browse files Browse the repository at this point in the history
Signed-off-by: David Lawrence <[email protected]> (github: endophage)
  • Loading branch information
David Lawrence committed Aug 26, 2017
1 parent f2744d5 commit 005f17e
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 72 deletions.
4 changes: 2 additions & 2 deletions server/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (st *MemStorage) UpdateCurrent(gun data.GUN, update MetaUpdate) error {
// the MemStorage. Behaviour is undefined otherwise
func (st *MemStorage) writeChange(gun data.GUN, version int, checksum string) {
c := Change{
ID: uint(len(st.changes) + 1),
ID: strconv.Itoa(len(st.changes) + 1),
GUN: gun.String(),
Version: version,
SHA256: checksum,
Expand Down Expand Up @@ -200,7 +200,7 @@ func (st *MemStorage) Delete(gun data.GUN) error {
}
delete(st.checksums, gun.String())
c := Change{
ID: uint(len(st.changes) + 1),
ID: strconv.Itoa(len(st.changes) + 1),
GUN: gun.String(),
Category: changeCategoryDeletion,
CreatedAt: time.Now(),
Expand Down
2 changes: 1 addition & 1 deletion server/storage/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func init() {

// drop all tables, if they exist
gormDB.DropTable(&TUFFile{})
gormDB.DropTable(&Change{})
gormDB.DropTable(&SQLChange{})
}
cleanup1()
dbStore := SetupSQLDB(t, "mysql", dburl)
Expand Down
2 changes: 1 addition & 1 deletion server/storage/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func init() {

// drop all tables, if they exist
gormDB.DropTable(&TUFFile{})
gormDB.DropTable(&Change{})
gormDB.DropTable(&SQLChange{})
}
cleanup1()
dbStore := SetupSQLDB(t, notary.PostgresBackend, dburl)
Expand Down
8 changes: 8 additions & 0 deletions server/storage/rethink_realdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func rethinkDBSetup(t *testing.T) (RethinkDB, func()) {
cleanup()
require.NoError(t, rethinkdb.SetupDB(session, dbName, []rethinkdb.Table{
TUFFilesRethinkTable,
ChangeRethinkTable,
}))
return NewRethinkDBStorage(dbName, "", "", session), cleanup
}
Expand Down Expand Up @@ -169,3 +170,10 @@ func TestRethinkTUFMetaStoreGetCurrent(t *testing.T) {

testTUFMetaStoreGetCurrent(t, dbStore)
}

func TestRethinkDBGetChanges(t *testing.T) {
dbStore, cleanup := rethinkDBSetup(t)
defer cleanup()

testGetChanges(t, dbStore)
}
177 changes: 147 additions & 30 deletions server/storage/rethinkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sort"
"time"
Expand All @@ -15,6 +14,8 @@ import (
"gopkg.in/dancannon/gorethink.v3"
)

var blackoutTime = 60

// RDBTUFFile is a TUF file record
type RDBTUFFile struct {
rethinkdb.Timing
Expand All @@ -29,7 +30,22 @@ type RDBTUFFile struct {

// TableName returns the table name for the record type
func (r RDBTUFFile) TableName() string {
return "tuf_files"
return TUFFileTableName
}

// Change defines the the fields required for an object in the changefeed
type Change struct {
ID string `gorethink:"id,omitempty"`
CreatedAt time.Time `gorethink:"created_at"`
GUN string `gorethink:"gun"`
Version int `gorethink:"version"`
SHA256 string `gorethink:"sha256"`
Category string `gorethink:"category"`
}

// TableName sets a specific table name for Changefeed
func (rdb Change) TableName() string {
return ChangefeedTableName
}

// gorethink can't handle an UnmarshalJSON function (see https://github.com/gorethink/gorethink/issues/201),
Expand Down Expand Up @@ -65,6 +81,14 @@ func rdbTUFFileFromJSON(data []byte) (interface{}, error) {
}, nil
}

func rdbChangeFromJSON(data []byte) (interface{}, error) {
res := Change{}
if err := json.Unmarshal(data, &res); err != nil {
return Change{}, err
}
return res, nil
}

// RethinkDB implements a MetaStore against the Rethink Database
type RethinkDB struct {
dbName string
Expand All @@ -87,35 +111,27 @@ func NewRethinkDBStorage(dbName, user, password string, sess *gorethink.Session)
// if it's a new role, or the version is greater than the current version
// for the role. Otherwise an error is returned.
func (rdb RethinkDB) UpdateCurrent(gun data.GUN, update MetaUpdate) error {
now := time.Now()
checksum := sha256.Sum256(update.Data)
file := RDBTUFFile{
Timing: rethinkdb.Timing{
CreatedAt: now,
UpdatedAt: now,
},
GunRoleVersion: []interface{}{gun, update.Role, update.Version},
Gun: gun.String(),
Role: update.Role.String(),
Version: update.Version,
SHA256: hex.EncodeToString(checksum[:]),
Data: update.Data,
// empty string is the zero value for tsChecksum in the RDBTUFFile struct.
// Therefore we can just call through to updateCurrentWithTSChecksum passing
// "" for the tsChecksum value.
if err := rdb.updateCurrentWithTSChecksum(gun.String(), "", update); err != nil {
return err
}
_, err := gorethink.DB(rdb.dbName).Table(file.TableName()).Insert(
file,
gorethink.InsertOpts{
Conflict: "error", // default but explicit for clarity of intent
},
).RunWrite(rdb.sess)
if err != nil && gorethink.IsConflictErr(err) {
return ErrOldVersion{}
if update.Role == data.CanonicalTimestampRole {
tsChecksumBytes := sha256.Sum256(update.Data)
return rdb.writeChange(
gun.String(),
update.Version,
hex.EncodeToString(tsChecksumBytes[:]),
changeCategoryUpdate,
)
}
return err
return nil
}

// UpdateCurrentWithTSChecksum adds new metadata version for the given GUN with an associated
// updateCurrentWithTSChecksum adds new metadata version for the given GUN with an associated
// checksum for the timestamp it belongs to, to afford us transaction-like functionality
func (rdb RethinkDB) UpdateCurrentWithTSChecksum(gun, tsChecksum string, update MetaUpdate) error {
func (rdb RethinkDB) updateCurrentWithTSChecksum(gun, tsChecksum string, update MetaUpdate) error {
now := time.Now()
checksum := sha256.Sum256(update.Data)
file := RDBTUFFile{
Expand Down Expand Up @@ -162,11 +178,15 @@ func (rdb RethinkDB) UpdateMany(gun data.GUN, updates []MetaUpdate) error {
// find the timestamp first and save its checksum
// then apply the updates in alphabetic role order with the timestamp last
// if there are any failures, we roll back in the same alphabetic order
var tsChecksum string
var (
tsChecksum string
tsVersion int
)
for _, up := range updates {
if up.Role == data.CanonicalTimestampRole {
tsChecksumBytes := sha256.Sum256(up.Data)
tsChecksum = hex.EncodeToString(tsChecksumBytes[:])
tsVersion = up.Version
break
}
}
Expand All @@ -175,7 +195,7 @@ func (rdb RethinkDB) UpdateMany(gun data.GUN, updates []MetaUpdate) error {
sort.Stable(updateSorter(updates))

for _, up := range updates {
if err := rdb.UpdateCurrentWithTSChecksum(gun.String(), tsChecksum, up); err != nil {
if err := rdb.updateCurrentWithTSChecksum(gun.String(), tsChecksum, up); err != nil {
// roll back with best-effort deletion, and then error out
rollbackErr := rdb.deleteByTSChecksum(tsChecksum)
if rollbackErr != nil {
Expand All @@ -185,6 +205,11 @@ func (rdb RethinkDB) UpdateMany(gun data.GUN, updates []MetaUpdate) error {
return err
}
}

// if the update included a timestamp, write a change object
if tsChecksum != "" {
return rdb.writeChange(gun.String(), tsVersion, tsChecksum, changeCategoryUpdate)
}
return nil
}

Expand Down Expand Up @@ -259,7 +284,7 @@ func (rdb RethinkDB) Delete(gun data.GUN) error {
if err != nil {
return fmt.Errorf("unable to delete %s from database: %s", gun.String(), err.Error())
}
return nil
return rdb.writeChange(gun.String(), 0, "", changeCategoryDeletion)
}

// deleteByTSChecksum removes all metadata by a timestamp checksum, used for rolling back a "transaction"
Expand All @@ -271,13 +296,15 @@ func (rdb RethinkDB) deleteByTSChecksum(tsChecksum string) error {
if err != nil {
return fmt.Errorf("unable to delete timestamp checksum data: %s from database: %s", tsChecksum, err.Error())
}
// DO NOT WRITE CHANGE! THIS IS USED _ONLY_ TO ROLLBACK A FAILED INSERT
return nil
}

// Bootstrap sets up the database and tables, also creating the notary server user with appropriate db permission
func (rdb RethinkDB) Bootstrap() error {
if err := rethinkdb.SetupDB(rdb.sess, rdb.dbName, []rethinkdb.Table{
TUFFilesRethinkTable,
ChangeRethinkTable,
}); err != nil {
return err
}
Expand All @@ -294,7 +321,97 @@ func (rdb RethinkDB) CheckHealth() error {
return nil
}

func (rdb RethinkDB) writeChange(gun string, version int, sha256, category string) error {
now := time.Now()
ch := Change{
CreatedAt: now,
GUN: gun,
Version: version,
SHA256: sha256,
Category: category,
}
_, err := gorethink.DB(rdb.dbName).Table(ch.TableName()).Insert(
ch,
gorethink.InsertOpts{
Conflict: "error", // default but explicit for clarity of intent
},
).RunWrite(rdb.sess)
return err
}

// GetChanges is not implemented for RethinkDB
func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string) ([]Change, error) {
return nil, errors.New("Not Implemented")
var (
lower, upper, bound []interface{}
idx = "rdb_created_at_id"
max = []interface{}{gorethink.Now().Sub(blackoutTime), gorethink.MaxVal}
min = []interface{}{gorethink.MinVal, gorethink.MinVal}
order gorethink.OrderByOpts
changes = make([]Change, 0, pageSize)
reversed bool
)
if filterName != "" {
idx = "rdb_gun_created_at_id"
max = append([]interface{}{filterName}, max...)
min = append([]interface{}{filterName}, min...)
}

switch changeID {
case "0", "-1":
lower = min
upper = max
default:
bound, idx = rdb.bound(changeID, filterName)
if pageSize < 0 {
lower = min
upper = bound
} else {
lower = bound
upper = max
}
}

if changeID == "-1" || pageSize < 0 {
reversed = true
order = gorethink.OrderByOpts{Index: gorethink.Desc(idx)}
} else {
order = gorethink.OrderByOpts{Index: gorethink.Asc(idx)}
}

if pageSize < 0 {
pageSize = pageSize * -1
}

res, err := gorethink.DB(rdb.dbName).
Table(Change{}.TableName()).
OrderBy(order).
Between(
lower,
upper,
gorethink.BetweenOpts{
LeftBound: "open",
RightBound: "open",
},
).Limit(pageSize).Run(rdb.sess)
if err != nil {
return nil, err
}
defer res.Close()

if reversed {
// results are currently newest to oldest, should be oldest to newest
for i, j := 0, len(changes)-1; i < j; i, j = i+1, j-1 {
changes[i], changes[j] = changes[j], changes[i]
}
}

return changes, res.All(&changes)
}

func (rdb RethinkDB) bound(changeID, filterName string) ([]interface{}, string) {
term := gorethink.DB(rdb.dbName).Table(Change{}.TableName()).Get(changeID).Field("created_at")
if filterName != "" {
return []interface{}{filterName, term, changeID}, "rdb_gun_created_at_id"
}
return []interface{}{term, changeID}, "rdb_created_at_id"
}
14 changes: 14 additions & 0 deletions server/storage/rethinkdb_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,18 @@ var (
},
JSONUnmarshaller: rdbTUFFileFromJSON,
}

// ChangeRethinkTable is the table definition for changefeed objects
ChangeRethinkTable = rethinkdb.Table{
Name: Change{}.TableName(),
PrimaryKey: "id",
SecondaryIndexes: map[string][]string{
"rdb_created_at_id": {"created_at", "id"},
"rdb_gun_created_at_id": {"gun", "created_at", "id"},
},
Config: map[string]string{
"write_acks": "majority",
},
JSONUnmarshaller: rdbChangeFromJSON,
}
)
8 changes: 0 additions & 8 deletions server/storage/rethinkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,3 @@ func TestRDBTUFFileJSONUnmarshallingFailure(t *testing.T) {
require.Error(t, err)
}
}

func TestRethinkDBGetChanges(t *testing.T) {
s := NewRethinkDBStorage("dbname", "user", "pwd", nil)
c, err := s.GetChanges("foo", 10, "")
require.Error(t, err)
require.Nil(t, c)
require.Contains(t, err.Error(), "Not Implemented")
}
10 changes: 5 additions & 5 deletions server/storage/sql_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func (g TUFFile) TableName() string {
return TUFFileTableName
}

// Change defines the the fields required for an object in the changefeed
type Change struct {
ID uint `gorm:"primary_key" sql:"not null" json:",string"`
// SQLChange defines the the fields required for an object in the changefeed
type SQLChange struct {
ID uint `gorm:"primary_key" sql:";not null" json:",string"`
CreatedAt time.Time
GUN string `gorm:"column:gun" sql:"type:varchar(255);not null"`
Version int `sql:"not null"`
Expand All @@ -43,7 +43,7 @@ type Change struct {
}

// TableName sets a specific table name for Changefeed
func (c Change) TableName() string {
func (c SQLChange) TableName() string {
return ChangefeedTableName
}

Expand All @@ -61,6 +61,6 @@ func CreateTUFTable(db gorm.DB) error {

// CreateChangefeedTable creates the DB table for Changefeed
func CreateChangefeedTable(db gorm.DB) error {
query := db.AutoMigrate(&Change{})
query := db.AutoMigrate(&SQLChange{})
return query.Error
}
Loading

0 comments on commit 005f17e

Please sign in to comment.