Skip to content

Commit

Permalink
#476 write order id to file for large expire message
Browse files Browse the repository at this point in the history
  • Loading branch information
ackratos committed Mar 21, 2019
1 parent 8113d9d commit 61acc7d
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 40 deletions.
14 changes: 11 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

[[constraint]]
name = "github.com/Shopify/sarama"
version = "1.17.0"
version = "1.21.0"

[[constraint]]
name = "github.com/linkedin/goavro"
Expand Down
4 changes: 2 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp

publishers := make([]pub.MarketDataPublisher, 0, 1)
if app.publicationConfig.PublishKafka {
publishers = append(publishers, pub.NewKafkaMarketDataPublisher(app.Logger))
publishers = append(publishers, pub.NewKafkaMarketDataPublisher(app.Logger, ServerContext.Config.DBDir()))
}
if app.publicationConfig.PublishLocal {
publishers = append(publishers, pub.NewLocalMarketDataPublisher(ServerContext.Config.RootDir, app.Logger, app.publicationConfig))
Expand Down Expand Up @@ -459,7 +459,7 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a
if app.publicationConfig.ShouldPublishAny() &&
pub.IsLive {
if height >= app.publicationConfig.FromHeightInclusive {
app.publish(tradesToPublish, &proposals, blockFee, ctx, height, blockTime.Unix())
app.publish(tradesToPublish, &proposals, blockFee, ctx, height, blockTime.UnixNano())
}

// clean up intermediate cached data
Expand Down
94 changes: 94 additions & 0 deletions app/pub/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pub
import (
"encoding/json"
"fmt"
"strings"

sdk "github.com/cosmos/cosmos-sdk/types"

Expand Down Expand Up @@ -43,6 +44,30 @@ type AvroOrJsonMsg interface {
String() string
}

// EssMsg is a type when AvroOrJsonMsg failed to publish
// Not all AvroOrJsonMsg implemented Ess because:
//
// for transfer:
//
// 1. qs doesn't subscribe to its topic (risk control is relying on that)
// 2. risk control can recover from explorer indexed transfers (pull mode)
// 3. we don't have a unique representation of transfer like order-id (we didn't save txhash in message)
//
// for trade:
// the problem is same with above point 3, (trade id is only generated during publication, not persisted anywhere).
// If we keep qty, price, sid, bid for a trade, it would be too much,
// in this case we should recover from local publisher
type EssMsg interface {
AvroOrJsonMsg

// a string that carry essential msg used to make up downstream service on kafka issue
// this string would be persisted into file
EssentialMsg() string

// an empty message of original `AvroOrJsonMsg` to make downstream logic not broken
EmptyCopy() AvroOrJsonMsg
}

type ExecutionResults struct {
Height int64
Timestamp int64 // milli seconds since Epoch
Expand Down Expand Up @@ -73,6 +98,25 @@ func (msg *ExecutionResults) ToNativeMap() map[string]interface{} {
return native
}

func (msg *ExecutionResults) EssentialMsg() string {
// mainly used to recover for large breathe block expiring message, there should be no trade on breathe block
orders := msg.Orders.EssentialMsg()
proposals := msg.Proposals.EssentialMsg()
return fmt.Sprintf("height:%d\norders:%s\nproposals:%s\n", msg.Height, orders, proposals)
}

func (msg *ExecutionResults) EmptyCopy() AvroOrJsonMsg {
return &ExecutionResults{
msg.Height,
msg.Timestamp,
msg.NumOfMsgs,
trades{},
Orders{},
Proposals{},
}
}

// deliberated not implemented Ess
type trades struct {
NumOfMsgs int
Trades []*Trade
Expand Down Expand Up @@ -158,6 +202,21 @@ func (msg *Orders) ToNativeMap() map[string]interface{} {
return native
}

func (msg *Orders) EssentialMsg() string {
stat := make(map[orderPkg.ChangeType]*strings.Builder, 0) // ChangeType -> OrderIds splited by EOL
for _, order := range msg.Orders {
if _, ok := stat[order.Status]; !ok {
stat[order.Status] = &strings.Builder{}
}
fmt.Fprintf(stat[order.Status], "\n%s", order.OrderId)
}
var result strings.Builder
for changeType, str := range stat {
fmt.Fprintf(&result, "%d:%s\n", changeType, str.String())
}
return result.String()
}

type Order struct {
Symbol string
Status orderPkg.ChangeType
Expand Down Expand Up @@ -240,6 +299,21 @@ func (msg *Proposals) ToNativeMap() map[string]interface{} {
return native
}

func (msg *Proposals) EssentialMsg() string {
stat := make(map[ProposalStatus]*strings.Builder, 0) // ProposalStatus -> OrderIds splited by EOL
for _, proposal := range msg.Proposals {
if _, ok := stat[proposal.Status]; !ok {
stat[proposal.Status] = &strings.Builder{}
}
fmt.Fprintf(stat[proposal.Status], "\n%d", proposal.Id)
}
var result strings.Builder
for proposalStatus, str := range stat {
fmt.Fprintf(&result, "%d:%s\n", proposalStatus, str.String())
}
return result.String()
}

type ProposalStatus uint8

const (
Expand Down Expand Up @@ -316,6 +390,7 @@ func (msg *OrderBookDelta) ToNativeMap() map[string]interface{} {
return native
}

// deliberated not implemented Ess
type Books struct {
Height int64
Timestamp int64
Expand Down Expand Up @@ -419,6 +494,24 @@ func (msg *Accounts) ToNativeMap() map[string]interface{} {
return native
}

func (msg *Accounts) EssentialMsg() string {
builder := strings.Builder{}
fmt.Fprintf(&builder, "height:%d\n", msg.Height)
for _, acc := range msg.Accounts {
fmt.Fprintf(&builder, "%s\n", sdk.AccAddress(acc.Owner).String())
}
return builder.String()
}

func (msg *Accounts) EmptyCopy() AvroOrJsonMsg {
return &Accounts{
msg.Height,
0,
[]Account{},
}
}

// deliberated not implemented Ess
type BlockFee struct {
Height int64
Fee string
Expand Down Expand Up @@ -512,6 +605,7 @@ func (msg Transfer) ToNativeMap() map[string]interface{} {
return native
}

// deliberated not implemented Ess
type Transfers struct {
Height int64
Num int
Expand Down
74 changes: 57 additions & 17 deletions app/pub/publisher_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package pub

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"time"
Expand All @@ -13,11 +15,13 @@ import (
"github.com/linkedin/goavro"
"github.com/prometheus/client_golang/prometheus"

"github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
)

const (
KafkaBrokerSep = ";"
KafkaBrokerSep = ";"
essentialLogDir = "essential"
)

type KafkaMarketDataPublisher struct {
Expand All @@ -27,7 +31,8 @@ type KafkaMarketDataPublisher struct {
blockFeeCodec *goavro.Codec
transfersCodec *goavro.Codec

producers map[string]sarama.SyncProducer // topic -> producer
essentialLogPath string // the path (default to db dir) we write essential file to make up data on kafka error
producers map[string]sarama.SyncProducer // topic -> producer
}

func (publisher *KafkaMarketDataPublisher) newProducers() (config *sarama.Config, err error) {
Expand Down Expand Up @@ -119,7 +124,47 @@ func (publisher *KafkaMarketDataPublisher) prepareMessage(
}

func (publisher *KafkaMarketDataPublisher) publish(avroMessage AvroOrJsonMsg, tpe msgType, height, timestamp int64) {
var topic string
topic := publisher.resolveTopic(tpe)

if msg, err := publisher.marshal(avroMessage, tpe); err == nil {
kafkaMsg := publisher.prepareMessage(topic, strconv.FormatInt(height, 10), timestamp, tpe, msg)
if partition, offset, err := publisher.publishWithRetry(kafkaMsg, topic); err == nil {
Logger.Info("published", "topic", topic, "msg", avroMessage.String(), "offset", offset, "partition", partition)
} else {
Logger.Error("failed to publish, tring to log essential message", "topic", topic, "msg", avroMessage.String(), "err", err)
if essMsg, ok := avroMessage.(EssMsg); ok {
publisher.publishEssentialMsg(essMsg, topic, tpe, height, timestamp)
}
}
} else {
Logger.Error("failed to publish", "topic", topic, "msg", avroMessage.String(), "err", err)
}
}

func (publisher KafkaMarketDataPublisher) publishEssentialMsg(essMsg EssMsg, topic string, tpe msgType, height, timestamp int64) {
// First, publish an empty copy to make sure downstream service not hanging
if msg, err := publisher.marshal(essMsg.EmptyCopy(), tpe); err == nil {
kafkaMsg := publisher.prepareMessage(topic, strconv.FormatInt(height, 10), timestamp, tpe, msg)
if partition, offset, err := publisher.publishWithRetry(kafkaMsg, topic); err == nil {
Logger.Info("published empty msg", "topic", topic, "msg", essMsg.String(), "offset", offset, "partition", partition)
} else {
Logger.Error("failed to publish empty msg", "topic", topic, "msg", essMsg.String(), "err", err)
}
} else {
Logger.Error("failed to publish empty msg", "topic", topic, "msg", essMsg.String(), "err", err)
}

// Second, log essential content of message to hard disk
filePath := fmt.Sprintf("%s/%d_%s.log", publisher.essentialLogPath, height, tpe.String())
toWrite := []byte(essMsg.EssentialMsg())
if len(toWrite) != 0 {
if err := ioutil.WriteFile(filePath, toWrite, 0644); err != nil {
Logger.Error("failed to write essential log", "err", err)
}
}
}

func (publisher KafkaMarketDataPublisher) resolveTopic(tpe msgType) (topic string) {
switch tpe {
case booksTpe:
topic = Cfg.OrderBookTopic
Expand All @@ -132,17 +177,7 @@ func (publisher *KafkaMarketDataPublisher) publish(avroMessage AvroOrJsonMsg, tp
case transferType:
topic = Cfg.TransferTopic
}

if msg, err := publisher.marshal(avroMessage, tpe); err == nil {
kafkaMsg := publisher.prepareMessage(topic, strconv.FormatInt(height, 10), timestamp, tpe, msg)
if partition, offset, err := publisher.publishWithRetry(kafkaMsg, topic); err == nil {
Logger.Info("published", "topic", topic, "msg", avroMessage.String(), "offset", offset, "partition", partition)
} else {
Logger.Error("failed to publish", "topic", topic, "msg", avroMessage.String(), "err", err)
}
} else {
Logger.Error("failed to publish", "topic", topic, "msg", avroMessage.String(), "err", err)
}
return
}

func (publisher *KafkaMarketDataPublisher) Stop() {
Expand Down Expand Up @@ -194,7 +229,6 @@ func (publisher *KafkaMarketDataPublisher) publishWithRetry(

func (publisher *KafkaMarketDataPublisher) marshal(msg AvroOrJsonMsg, tpe msgType) ([]byte, error) {
native := msg.ToNativeMap()
Logger.Debug("msgDetail", "msg", native)
var codec *goavro.Codec
switch tpe {
case accountsTpe:
Expand Down Expand Up @@ -233,11 +267,12 @@ func (publisher *KafkaMarketDataPublisher) initAvroCodecs() (err error) {
}

func NewKafkaMarketDataPublisher(
logger log.Logger) (publisher *KafkaMarketDataPublisher) {
logger log.Logger, dbDir string) (publisher *KafkaMarketDataPublisher) {

sarama.Logger = saramaLogger{}
publisher = &KafkaMarketDataPublisher{
producers: make(map[string]sarama.SyncProducer),
producers: make(map[string]sarama.SyncProducer),
essentialLogPath: filepath.Join(dbDir, essentialLogDir),
}

if err := publisher.initAvroCodecs(); err != nil {
Expand All @@ -262,5 +297,10 @@ func NewKafkaMarketDataPublisher(
go pClient.UpdatePrometheusMetrics()
}

if err := common.EnsureDir(publisher.essentialLogPath, 0755); err != nil {
logger.Error("failed to create essential log path", "err", err)
}

logger.Info("created kafka publisher", "elpath", publisher.essentialLogPath)
return publisher
}
6 changes: 3 additions & 3 deletions app/pub/publisher_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type MockMarketDataPublisher struct {
BooksPublished []*Books
ExecutionResultsPublished []*ExecutionResults
BlockFeePublished []BlockFee
TransferPublished []Transfer
TransferPublished []Transfers

Lock *sync.Mutex // as mock publisher is only used in testing, its no harm to have this granularity Lock
MessagePublished uint32 // atomic integer used to determine the published messages
Expand All @@ -31,7 +31,7 @@ func (publisher *MockMarketDataPublisher) publish(msg AvroOrJsonMsg, tpe msgType
case blockFeeTpe:
publisher.BlockFeePublished = append(publisher.BlockFeePublished, msg.(BlockFee))
case transferType:
publisher.TransferPublished = append(publisher.TransferPublished, msg.(Transfer))
publisher.TransferPublished = append(publisher.TransferPublished, msg.(Transfers))
default:
panic(fmt.Errorf("does not support type %s", tpe.String()))
}
Expand All @@ -54,7 +54,7 @@ func NewMockMarketDataPublisher() (publisher *MockMarketDataPublisher) {
make([]*Books, 0),
make([]*ExecutionResults, 0),
make([]BlockFee, 0),
make([]Transfer, 0),
make([]Transfers, 0),
&sync.Mutex{},
0,
}
Expand Down
Loading

0 comments on commit 61acc7d

Please sign in to comment.