From 24f57aa2105634b4a4ec9e6469a9807a839bd20e Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Wed, 6 Mar 2019 12:14:15 +0800 Subject: [PATCH] #476 write order id to file for large expire message --- Gopkg.lock | 14 ++++- Gopkg.toml | 2 +- app/app.go | 4 +- app/pub/msgs.go | 94 ++++++++++++++++++++++++++++++++ app/pub/publisher_kafka.go | 74 +++++++++++++++++++------ app/pub/publisher_mock.go | 6 +- app/pub/schema_test.go | 10 ++-- cmd/pressuremaker/main.go | 13 ++++- cmd/pressuremaker/utils/utils.go | 2 +- 9 files changed, 185 insertions(+), 34 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 09f5c3827..e199602c0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,12 +2,20 @@ [[projects]] - digest = "1:66b8ed452b31eb9075bc53295952487c333a9cb555de57ed61f5664c058d9050" + digest = "1:ed77032e4241e3b8329c9304d66452ed196e795876e14be677a546f36b94e67a" + name = "github.com/DataDog/zstd" + packages = ["."] + pruneopts = "UT" + revision = "c7161f8c63c045cbc7ca051dcc969dd0e4054de2" + version = "v1.3.5" + +[[projects]] + digest = "1:82a18170c9c41e36939cb5d26da1546b2cfa786aa030a978d3bf183519849230" name = "github.com/Shopify/sarama" packages = ["."] pruneopts = "UT" - revision = "35324cf48e33d8260e1c7c18854465a904ade249" - version = "v1.17.0" + revision = "4602b5a8c6e826f9e0737865818dd43b2339a092" + version = "v1.21.0" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index c97cd2093..29beb9b80 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -54,7 +54,7 @@ [[constraint]] name = "github.com/Shopify/sarama" - version = "1.17.0" + version = "1.21.0" [[constraint]] name = "github.com/linkedin/goavro" diff --git a/app/app.go b/app/app.go index ece45b27c..44a45fee3 100644 --- a/app/app.go +++ b/app/app.go @@ -170,7 +170,7 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp publishers := make([]pub.MarketDataPublisher, 0, 1) if app.publicationConfig.PublishKafka { - publishers = append(publishers, pub.NewKafkaMarketDataPublisher(app.Logger)) + publishers = append(publishers, pub.NewKafkaMarketDataPublisher(app.Logger, ServerContext.Config.DBDir())) } if app.publicationConfig.PublishLocal { publishers = append(publishers, pub.NewLocalMarketDataPublisher(ServerContext.Config.RootDir, app.Logger, app.publicationConfig)) @@ -469,7 +469,7 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a if app.publicationConfig.ShouldPublishAny() && pub.IsLive { if height >= app.publicationConfig.FromHeightInclusive { - app.publish(tradesToPublish, &proposals, blockFee, ctx, height, blockTime.Unix()) + app.publish(tradesToPublish, &proposals, blockFee, ctx, height, blockTime.UnixNano()) } // clean up intermediate cached data diff --git a/app/pub/msgs.go b/app/pub/msgs.go index 37539b6ce..95d8dd0b0 100644 --- a/app/pub/msgs.go +++ b/app/pub/msgs.go @@ -3,6 +3,7 @@ package pub import ( "encoding/json" "fmt" + "strings" sdk "github.com/cosmos/cosmos-sdk/types" @@ -43,6 +44,30 @@ type AvroOrJsonMsg interface { String() string } +// EssMsg is a type when AvroOrJsonMsg failed to publish +// Not all AvroOrJsonMsg implemented Ess because: +// +// for transfer: +// +// 1. qs doesn't subscribe to its topic (risk control is relying on that) +// 2. risk control can recover from explorer indexed transfers (pull mode) +// 3. we don't have a unique representation of transfer like order-id (we didn't save txhash in message) +// +// for trade: +// the problem is same with above point 3, (trade id is only generated during publication, not persisted anywhere). +// If we keep qty, price, sid, bid for a trade, it would be too much, +// in this case we should recover from local publisher +type EssMsg interface { + AvroOrJsonMsg + + // a string that carry essential msg used to make up downstream service on kafka issue + // this string would be persisted into file + EssentialMsg() string + + // an empty message of original `AvroOrJsonMsg` to make downstream logic not broken + EmptyCopy() AvroOrJsonMsg +} + type ExecutionResults struct { Height int64 Timestamp int64 // milli seconds since Epoch @@ -73,6 +98,25 @@ func (msg *ExecutionResults) ToNativeMap() map[string]interface{} { return native } +func (msg *ExecutionResults) EssentialMsg() string { + // mainly used to recover for large breathe block expiring message, there should be no trade on breathe block + orders := msg.Orders.EssentialMsg() + proposals := msg.Proposals.EssentialMsg() + return fmt.Sprintf("height:%d\norders:%s\nproposals:%s\n", msg.Height, orders, proposals) +} + +func (msg *ExecutionResults) EmptyCopy() AvroOrJsonMsg { + return &ExecutionResults{ + msg.Height, + msg.Timestamp, + msg.NumOfMsgs, + trades{}, + Orders{}, + Proposals{}, + } +} + +// deliberated not implemented Ess type trades struct { NumOfMsgs int Trades []*Trade @@ -158,6 +202,21 @@ func (msg *Orders) ToNativeMap() map[string]interface{} { return native } +func (msg *Orders) EssentialMsg() string { + stat := make(map[orderPkg.ChangeType]*strings.Builder, 0) // ChangeType -> OrderIds splited by EOL + for _, order := range msg.Orders { + if _, ok := stat[order.Status]; !ok { + stat[order.Status] = &strings.Builder{} + } + fmt.Fprintf(stat[order.Status], "\n%s", order.OrderId) + } + var result strings.Builder + for changeType, str := range stat { + fmt.Fprintf(&result, "%d:%s\n", changeType, str.String()) + } + return result.String() +} + type Order struct { Symbol string Status orderPkg.ChangeType @@ -250,6 +309,21 @@ func (msg *Proposals) ToNativeMap() map[string]interface{} { return native } +func (msg *Proposals) EssentialMsg() string { + stat := make(map[ProposalStatus]*strings.Builder, 0) // ProposalStatus -> OrderIds splited by EOL + for _, proposal := range msg.Proposals { + if _, ok := stat[proposal.Status]; !ok { + stat[proposal.Status] = &strings.Builder{} + } + fmt.Fprintf(stat[proposal.Status], "\n%d", proposal.Id) + } + var result strings.Builder + for proposalStatus, str := range stat { + fmt.Fprintf(&result, "%d:%s\n", proposalStatus, str.String()) + } + return result.String() +} + type ProposalStatus uint8 const ( @@ -326,6 +400,7 @@ func (msg *OrderBookDelta) ToNativeMap() map[string]interface{} { return native } +// deliberated not implemented Ess type Books struct { Height int64 Timestamp int64 @@ -429,6 +504,24 @@ func (msg *Accounts) ToNativeMap() map[string]interface{} { return native } +func (msg *Accounts) EssentialMsg() string { + builder := strings.Builder{} + fmt.Fprintf(&builder, "height:%d\n", msg.Height) + for _, acc := range msg.Accounts { + fmt.Fprintf(&builder, "%s\n", sdk.AccAddress(acc.Owner).String()) + } + return builder.String() +} + +func (msg *Accounts) EmptyCopy() AvroOrJsonMsg { + return &Accounts{ + msg.Height, + 0, + []Account{}, + } +} + +// deliberated not implemented Ess type BlockFee struct { Height int64 Fee string @@ -522,6 +615,7 @@ func (msg Transfer) ToNativeMap() map[string]interface{} { return native } +// deliberated not implemented Ess type Transfers struct { Height int64 Num int diff --git a/app/pub/publisher_kafka.go b/app/pub/publisher_kafka.go index 6960700fb..7187ce777 100644 --- a/app/pub/publisher_kafka.go +++ b/app/pub/publisher_kafka.go @@ -2,7 +2,9 @@ package pub import ( "fmt" + "io/ioutil" "os" + "path/filepath" "strconv" "strings" "time" @@ -13,11 +15,13 @@ import ( "github.com/linkedin/goavro" "github.com/prometheus/client_golang/prometheus" + "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" ) const ( - KafkaBrokerSep = ";" + KafkaBrokerSep = ";" + essentialLogDir = "essential" ) type KafkaMarketDataPublisher struct { @@ -27,7 +31,8 @@ type KafkaMarketDataPublisher struct { blockFeeCodec *goavro.Codec transfersCodec *goavro.Codec - producers map[string]sarama.SyncProducer // topic -> producer + essentialLogPath string // the path (default to db dir) we write essential file to make up data on kafka error + producers map[string]sarama.SyncProducer // topic -> producer } func (publisher *KafkaMarketDataPublisher) newProducers() (config *sarama.Config, err error) { @@ -119,7 +124,47 @@ func (publisher *KafkaMarketDataPublisher) prepareMessage( } func (publisher *KafkaMarketDataPublisher) publish(avroMessage AvroOrJsonMsg, tpe msgType, height, timestamp int64) { - var topic string + topic := publisher.resolveTopic(tpe) + + if msg, err := publisher.marshal(avroMessage, tpe); err == nil { + kafkaMsg := publisher.prepareMessage(topic, strconv.FormatInt(height, 10), timestamp, tpe, msg) + if partition, offset, err := publisher.publishWithRetry(kafkaMsg, topic); err == nil { + Logger.Info("published", "topic", topic, "msg", avroMessage.String(), "offset", offset, "partition", partition) + } else { + Logger.Error("failed to publish, tring to log essential message", "topic", topic, "msg", avroMessage.String(), "err", err) + if essMsg, ok := avroMessage.(EssMsg); ok { + publisher.publishEssentialMsg(essMsg, topic, tpe, height, timestamp) + } + } + } else { + Logger.Error("failed to publish", "topic", topic, "msg", avroMessage.String(), "err", err) + } +} + +func (publisher KafkaMarketDataPublisher) publishEssentialMsg(essMsg EssMsg, topic string, tpe msgType, height, timestamp int64) { + // First, publish an empty copy to make sure downstream service not hanging + if msg, err := publisher.marshal(essMsg.EmptyCopy(), tpe); err == nil { + kafkaMsg := publisher.prepareMessage(topic, strconv.FormatInt(height, 10), timestamp, tpe, msg) + if partition, offset, err := publisher.publishWithRetry(kafkaMsg, topic); err == nil { + Logger.Info("published empty msg", "topic", topic, "msg", essMsg.String(), "offset", offset, "partition", partition) + } else { + Logger.Error("failed to publish empty msg", "topic", topic, "msg", essMsg.String(), "err", err) + } + } else { + Logger.Error("failed to publish empty msg", "topic", topic, "msg", essMsg.String(), "err", err) + } + + // Second, log essential content of message to hard disk + filePath := fmt.Sprintf("%s/%d_%s.log", publisher.essentialLogPath, height, tpe.String()) + toWrite := []byte(essMsg.EssentialMsg()) + if len(toWrite) != 0 { + if err := ioutil.WriteFile(filePath, toWrite, 0644); err != nil { + Logger.Error("failed to write essential log", "err", err) + } + } +} + +func (publisher KafkaMarketDataPublisher) resolveTopic(tpe msgType) (topic string) { switch tpe { case booksTpe: topic = Cfg.OrderBookTopic @@ -132,17 +177,7 @@ func (publisher *KafkaMarketDataPublisher) publish(avroMessage AvroOrJsonMsg, tp case transferType: topic = Cfg.TransferTopic } - - if msg, err := publisher.marshal(avroMessage, tpe); err == nil { - kafkaMsg := publisher.prepareMessage(topic, strconv.FormatInt(height, 10), timestamp, tpe, msg) - if partition, offset, err := publisher.publishWithRetry(kafkaMsg, topic); err == nil { - Logger.Info("published", "topic", topic, "msg", avroMessage.String(), "offset", offset, "partition", partition) - } else { - Logger.Error("failed to publish", "topic", topic, "msg", avroMessage.String(), "err", err) - } - } else { - Logger.Error("failed to publish", "topic", topic, "msg", avroMessage.String(), "err", err) - } + return } func (publisher *KafkaMarketDataPublisher) Stop() { @@ -194,7 +229,6 @@ func (publisher *KafkaMarketDataPublisher) publishWithRetry( func (publisher *KafkaMarketDataPublisher) marshal(msg AvroOrJsonMsg, tpe msgType) ([]byte, error) { native := msg.ToNativeMap() - Logger.Debug("msgDetail", "msg", native) var codec *goavro.Codec switch tpe { case accountsTpe: @@ -233,11 +267,12 @@ func (publisher *KafkaMarketDataPublisher) initAvroCodecs() (err error) { } func NewKafkaMarketDataPublisher( - logger log.Logger) (publisher *KafkaMarketDataPublisher) { + logger log.Logger, dbDir string) (publisher *KafkaMarketDataPublisher) { sarama.Logger = saramaLogger{} publisher = &KafkaMarketDataPublisher{ - producers: make(map[string]sarama.SyncProducer), + producers: make(map[string]sarama.SyncProducer), + essentialLogPath: filepath.Join(dbDir, essentialLogDir), } if err := publisher.initAvroCodecs(); err != nil { @@ -262,5 +297,10 @@ func NewKafkaMarketDataPublisher( go pClient.UpdatePrometheusMetrics() } + if err := common.EnsureDir(publisher.essentialLogPath, 0755); err != nil { + logger.Error("failed to create essential log path", "err", err) + } + + logger.Info("created kafka publisher", "elpath", publisher.essentialLogPath) return publisher } diff --git a/app/pub/publisher_mock.go b/app/pub/publisher_mock.go index 97673b017..ad5969f0d 100644 --- a/app/pub/publisher_mock.go +++ b/app/pub/publisher_mock.go @@ -11,7 +11,7 @@ type MockMarketDataPublisher struct { BooksPublished []*Books ExecutionResultsPublished []*ExecutionResults BlockFeePublished []BlockFee - TransferPublished []Transfer + TransferPublished []Transfers Lock *sync.Mutex // as mock publisher is only used in testing, its no harm to have this granularity Lock MessagePublished uint32 // atomic integer used to determine the published messages @@ -31,7 +31,7 @@ func (publisher *MockMarketDataPublisher) publish(msg AvroOrJsonMsg, tpe msgType case blockFeeTpe: publisher.BlockFeePublished = append(publisher.BlockFeePublished, msg.(BlockFee)) case transferType: - publisher.TransferPublished = append(publisher.TransferPublished, msg.(Transfer)) + publisher.TransferPublished = append(publisher.TransferPublished, msg.(Transfers)) default: panic(fmt.Errorf("does not support type %s", tpe.String())) } @@ -54,7 +54,7 @@ func NewMockMarketDataPublisher() (publisher *MockMarketDataPublisher) { make([]*Books, 0), make([]*ExecutionResults, 0), make([]BlockFee, 0), - make([]Transfer, 0), + make([]Transfers, 0), &sync.Mutex{}, 0, } diff --git a/app/pub/schema_test.go b/app/pub/schema_test.go index ae7eebb8c..740fa5a2c 100644 --- a/app/pub/schema_test.go +++ b/app/pub/schema_test.go @@ -18,7 +18,7 @@ func TestMain(m *testing.M) { } func TestExecutionResultsMarshaling(t *testing.T) { - publisher := NewKafkaMarketDataPublisher(Logger) + publisher := NewKafkaMarketDataPublisher(Logger, "") trades := trades{ NumOfMsgs: 1, Trades: []*Trade{{ @@ -58,7 +58,7 @@ func TestExecutionResultsMarshaling(t *testing.T) { } func TestBooksMarshaling(t *testing.T) { - publisher := NewKafkaMarketDataPublisher(Logger) + publisher := NewKafkaMarketDataPublisher(Logger, "") book := OrderBookDelta{"NNB_BNB", []PriceLevel{{100, 100}}, []PriceLevel{{100, 100}}} msg := Books{42, 100, 1, []OrderBookDelta{book}} _, err := publisher.marshal(&msg, booksTpe) @@ -68,7 +68,7 @@ func TestBooksMarshaling(t *testing.T) { } func TestAccountsMarshaling(t *testing.T) { - publisher := NewKafkaMarketDataPublisher(Logger) + publisher := NewKafkaMarketDataPublisher(Logger, "") accs := []Account{{"b-1", "BNB:1000;BTC:10", []*AssetBalance{{Asset: "BNB", Free: 100}}}} msg := Accounts{42, 2, accs} _, err := publisher.marshal(&msg, accountsTpe) @@ -78,7 +78,7 @@ func TestAccountsMarshaling(t *testing.T) { } func TestBlockFeeMarshaling(t *testing.T) { - publisher := NewKafkaMarketDataPublisher(Logger) + publisher := NewKafkaMarketDataPublisher(Logger, "") msg := BlockFee{1, "BNB:1000;BTC:10", []string{"bnc1", "bnc2", "bnc3"}} _, err := publisher.marshal(&msg, blockFeeTpe) if err != nil { @@ -87,7 +87,7 @@ func TestBlockFeeMarshaling(t *testing.T) { } func TestTransferMarshaling(t *testing.T) { - publisher := NewKafkaMarketDataPublisher(Logger) + publisher := NewKafkaMarketDataPublisher(Logger, "") msg := Transfers{42, 20, 1000, []Transfer{{From: "", To: []Receiver{Receiver{"bnc1", []Coin{{"BNB", 100}, {"BTC", 100}}}, Receiver{"bnc2", []Coin{{"BNB", 200}, {"BTC", 200}}}}}}} _, err := publisher.marshal(&msg, transferType) if err != nil { diff --git a/cmd/pressuremaker/main.go b/cmd/pressuremaker/main.go index 16a8d0c7e..803530736 100644 --- a/cmd/pressuremaker/main.go +++ b/cmd/pressuremaker/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "net/http" + _ "net/http/pprof" "os" "time" @@ -32,6 +33,10 @@ type PressureMakerConfig struct { } func main() { + go func() { + fmt.Println(http.ListenAndServe("localhost:6060", nil)) + }() + Execute() } @@ -77,8 +82,12 @@ var rootCmd = &cobra.Command{ // TODO: find an elegant way to exit // The problem of shutdown is publication is async (we don't know when messages are finishSignal := make(chan struct{}) - publisher := pub.NewKafkaMarketDataPublisher(context.Logger) - + pub.Logger = context.Logger.With("module", "pub") + pub.Cfg = &cfg.PublicationConfig + pub.ToPublishCh = make(chan pub.BlockInfoToPublish, cfg.PublicationConfig.PublicationChannelSize) + publisher := pub.NewKafkaMarketDataPublisher(pub.Logger, "") + go pub.Publish(publisher, nil, pub.Logger, pub.Cfg, pub.ToPublishCh) + pub.IsLive = true srv := &http.Server{ Addr: cfg.PrometheusAddr, Handler: promhttp.InstrumentMetricHandler( diff --git a/cmd/pressuremaker/utils/utils.go b/cmd/pressuremaker/utils/utils.go index c690e2c88..ad5971928 100644 --- a/cmd/pressuremaker/utils/utils.go +++ b/cmd/pressuremaker/utils/utils.go @@ -131,7 +131,7 @@ func (mg *MessageGenerator) ExpireMessages(height int, timeNow time.Time) (trade orderChanges = make(orderPkg.OrderChanges, 0, 100000) accounts = make(map[string]pub.Account) - for i := 0; i < 100000; i++ { + for i := 0; i < 1000000; i++ { o := makeOrderInfo(mg.buyerAddrs[0], 1, int64(height), 1000000000, 1000000000, 500000000, timePub) mg.OrderChangeMap[fmt.Sprintf("%d", i)] = &o orderChanges = append(orderChanges, orderPkg.OrderChange{fmt.Sprintf("%d", i), orderPkg.Expired, nil})