Skip to content

Commit

Permalink
#476 write order id to file for large expire message
Browse files Browse the repository at this point in the history
  • Loading branch information
ackratos committed Mar 11, 2019
1 parent 7165641 commit 73ebb22
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 30 deletions.
20 changes: 14 additions & 6 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

[[constraint]]
name = "github.com/Shopify/sarama"
version = "1.17.0"
version = "1.21.0"

[[constraint]]
name = "github.com/linkedin/goavro"
Expand Down
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp

publishers := make([]pub.MarketDataPublisher, 0, 1)
if app.publicationConfig.PublishKafka {
publishers = append(publishers, pub.NewKafkaMarketDataPublisher(app.Logger))
publishers = append(publishers, pub.NewKafkaMarketDataPublisher(app.Logger, ServerContext.Config.DBDir()))
}
if app.publicationConfig.PublishLocal {
publishers = append(publishers, pub.NewLocalMarketDataPublisher(ServerContext.Config.RootDir, app.Logger, app.publicationConfig))
Expand Down
68 changes: 68 additions & 0 deletions app/pub/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pub
import (
"encoding/json"
"fmt"
"strings"

sdk "github.com/cosmos/cosmos-sdk/types"

Expand Down Expand Up @@ -40,6 +41,7 @@ func (this msgType) String() string {

type AvroOrJsonMsg interface {
ToNativeMap() map[string]interface{}
EssentialMsg() string // a string that carry essential msg used to make up downstream service on kafka issue
String() string
}

Expand All @@ -56,6 +58,13 @@ func (msg *ExecutionResults) String() string {
return fmt.Sprintf("ExecutionResult at height: %d, numOfMsgs: %d", msg.Height, msg.NumOfMsgs)
}

func (msg *ExecutionResults) EssentialMsg() string {
// mainly used to recover for large breathe block expiring message, there should be no trade on breathe block
orders := msg.Orders.EssentialMsg()
proposals := msg.Proposals.EssentialMsg()
return fmt.Sprintf("height:%d\norders:%s\nproposals:%s\n", msg.Height, orders, proposals)
}

func (msg *ExecutionResults) ToNativeMap() map[string]interface{} {
var native = make(map[string]interface{})
native["height"] = msg.Height
Expand Down Expand Up @@ -93,6 +102,11 @@ func (msg *trades) ToNativeMap() map[string]interface{} {
return native
}

func (msg *trades) EssentialMsg() string {
// deliberated not implemented
return ""
}

type Trade struct {
Id string
Symbol string
Expand Down Expand Up @@ -158,6 +172,21 @@ func (msg *Orders) ToNativeMap() map[string]interface{} {
return native
}

func (msg *Orders) EssentialMsg() string {
stat := make(map[orderPkg.ChangeType]*strings.Builder, 0) // ChangeType -> OrderIds splited by EOL
for _, order := range msg.Orders {
if _, ok := stat[order.Status]; !ok {
stat[order.Status] = &strings.Builder{}
}
fmt.Fprintf(stat[order.Status], "\n%s", order.OrderId)
}
var result strings.Builder
for changeType, str := range stat {
fmt.Fprintf(&result, "%d:%s\n", changeType, str.String())
}
return result.String()
}

type Order struct {
Symbol string
Status orderPkg.ChangeType
Expand Down Expand Up @@ -240,6 +269,21 @@ func (msg *Proposals) ToNativeMap() map[string]interface{} {
return native
}

func (msg *Proposals) EssentialMsg() string {
stat := make(map[ProposalStatus]*strings.Builder, 0) // ProposalStatus -> OrderIds splited by EOL
for _, proposal := range msg.Proposals {
if _, ok := stat[proposal.Status]; !ok {
stat[proposal.Status] = &strings.Builder{}
}
fmt.Fprintf(stat[proposal.Status], "\n%d", proposal.Id)
}
var result strings.Builder
for proposalStatus, str := range stat {
fmt.Fprintf(&result, "%d:%s\n", proposalStatus, str.String())
}
return result.String()
}

type ProposalStatus uint8

const (
Expand Down Expand Up @@ -342,6 +386,11 @@ func (msg *Books) ToNativeMap() map[string]interface{} {
return native
}

func (msg *Books) EssentialMsg() string {
// deliberated not implemented
return ""
}

type AssetBalance struct {
Asset string
Free int64
Expand Down Expand Up @@ -419,6 +468,15 @@ func (msg *Accounts) ToNativeMap() map[string]interface{} {
return native
}

func (msg *Accounts) EssentialMsg() string {
builder := strings.Builder{}
fmt.Fprintf(&builder, "height:%d\n", msg.Height)
for _, acc := range msg.Accounts {
fmt.Fprintf(&builder, "%s\n", sdk.AccAddress(acc.Owner).String())
}
return builder.String()
}

type BlockFee struct {
Height int64
Fee string
Expand Down Expand Up @@ -456,6 +514,11 @@ func (msg BlockFee) ToNativeMap() map[string]interface{} {
return native
}

func (msg BlockFee) EssentialMsg() string {
// deliberated not implemented
return ""
}

type Coin struct {
Denom string `json:"denom"`
Amount int64 `json:"amount"`
Expand Down Expand Up @@ -535,3 +598,8 @@ func (msg Transfers) ToNativeMap() map[string]interface{} {
native["transfers"] = transfers
return native
}

func (msg Transfers) EssentialMsg() string {
// deliberated not implemented
return ""
}
28 changes: 23 additions & 5 deletions app/pub/publisher_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package pub

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"time"
Expand All @@ -13,11 +15,13 @@ import (
"github.com/linkedin/goavro"
"github.com/prometheus/client_golang/prometheus"

"github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
)

const (
KafkaBrokerSep = ";"
KafkaBrokerSep = ";"
essentialLogDir = "essential"
)

type KafkaMarketDataPublisher struct {
Expand All @@ -27,7 +31,8 @@ type KafkaMarketDataPublisher struct {
blockFeeCodec *goavro.Codec
transfersCodec *goavro.Codec

producers map[string]sarama.SyncProducer // topic -> producer
essentialLogPath string // the path (default to db dir) we write essential file to make up data on kafka error
producers map[string]sarama.SyncProducer // topic -> producer
}

func (publisher *KafkaMarketDataPublisher) newProducers() (config *sarama.Config, err error) {
Expand Down Expand Up @@ -138,7 +143,14 @@ func (publisher *KafkaMarketDataPublisher) publish(avroMessage AvroOrJsonMsg, tp
if partition, offset, err := publisher.publishWithRetry(kafkaMsg, topic); err == nil {
Logger.Info("published", "topic", topic, "msg", avroMessage.String(), "offset", offset, "partition", partition)
} else {
Logger.Error("failed to publish", "topic", topic, "msg", avroMessage.String(), "err", err)
Logger.Error("failed to publish, tring to log essential message", "topic", topic, "msg", avroMessage.String(), "err", err)
filePath := fmt.Sprintf("%s/%d_%s.log", publisher.essentialLogPath, height, tpe.String())
toWrite := []byte(avroMessage.EssentialMsg())
if len(toWrite) != 0 {
if err := ioutil.WriteFile(filePath, toWrite, 0644); err != nil {
Logger.Error("failed to write essential log", "err", err)
}
}
}
} else {
Logger.Error("failed to publish", "topic", topic, "msg", avroMessage.String(), "err", err)
Expand Down Expand Up @@ -233,11 +245,12 @@ func (publisher *KafkaMarketDataPublisher) initAvroCodecs() (err error) {
}

func NewKafkaMarketDataPublisher(
logger log.Logger) (publisher *KafkaMarketDataPublisher) {
logger log.Logger, dbDir string) (publisher *KafkaMarketDataPublisher) {

sarama.Logger = saramaLogger{}
publisher = &KafkaMarketDataPublisher{
producers: make(map[string]sarama.SyncProducer),
producers: make(map[string]sarama.SyncProducer),
essentialLogPath: filepath.Join(dbDir, essentialLogDir),
}

if err := publisher.initAvroCodecs(); err != nil {
Expand All @@ -262,5 +275,10 @@ func NewKafkaMarketDataPublisher(
go pClient.UpdatePrometheusMetrics()
}

if err := common.EnsureDir(publisher.essentialLogPath, 0755); err != nil {
logger.Error("failed to create essential log path", "err", err)
}

logger.Info("created kafka publisher", "elpath", publisher.essentialLogPath)
return publisher
}
6 changes: 3 additions & 3 deletions app/pub/publisher_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type MockMarketDataPublisher struct {
BooksPublished []*Books
ExecutionResultsPublished []*ExecutionResults
BlockFeePublished []BlockFee
TransferPublished []Transfer
TransferPublished []Transfers

Lock *sync.Mutex // as mock publisher is only used in testing, its no harm to have this granularity Lock
MessagePublished uint32 // atomic integer used to determine the published messages
Expand All @@ -31,7 +31,7 @@ func (publisher *MockMarketDataPublisher) publish(msg AvroOrJsonMsg, tpe msgType
case blockFeeTpe:
publisher.BlockFeePublished = append(publisher.BlockFeePublished, msg.(BlockFee))
case transferType:
publisher.TransferPublished = append(publisher.TransferPublished, msg.(Transfer))
publisher.TransferPublished = append(publisher.TransferPublished, msg.(Transfers))
default:
panic(fmt.Errorf("does not support type %s", tpe.String()))
}
Expand All @@ -54,7 +54,7 @@ func NewMockMarketDataPublisher() (publisher *MockMarketDataPublisher) {
make([]*Books, 0),
make([]*ExecutionResults, 0),
make([]BlockFee, 0),
make([]Transfer, 0),
make([]Transfers, 0),
&sync.Mutex{},
0,
}
Expand Down
10 changes: 5 additions & 5 deletions app/pub/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestMain(m *testing.M) {
}

func TestExecutionResultsMarshaling(t *testing.T) {
publisher := NewKafkaMarketDataPublisher(Logger)
publisher := NewKafkaMarketDataPublisher(Logger, "")
trades := trades{
NumOfMsgs: 1,
Trades: []*Trade{{
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestExecutionResultsMarshaling(t *testing.T) {
}

func TestBooksMarshaling(t *testing.T) {
publisher := NewKafkaMarketDataPublisher(Logger)
publisher := NewKafkaMarketDataPublisher(Logger, "")
book := OrderBookDelta{"NNB_BNB", []PriceLevel{{100, 100}}, []PriceLevel{{100, 100}}}
msg := Books{42, 100, 1, []OrderBookDelta{book}}
_, err := publisher.marshal(&msg, booksTpe)
Expand All @@ -68,7 +68,7 @@ func TestBooksMarshaling(t *testing.T) {
}

func TestAccountsMarshaling(t *testing.T) {
publisher := NewKafkaMarketDataPublisher(Logger)
publisher := NewKafkaMarketDataPublisher(Logger, "")
accs := []Account{{"b-1", "BNB:1000;BTC:10", []*AssetBalance{{Asset: "BNB", Free: 100}}}}
msg := Accounts{42, 2, accs}
_, err := publisher.marshal(&msg, accountsTpe)
Expand All @@ -78,7 +78,7 @@ func TestAccountsMarshaling(t *testing.T) {
}

func TestBlockFeeMarshaling(t *testing.T) {
publisher := NewKafkaMarketDataPublisher(Logger)
publisher := NewKafkaMarketDataPublisher(Logger, "")
msg := BlockFee{1, "BNB:1000;BTC:10", []string{"bnc1", "bnc2", "bnc3"}}
_, err := publisher.marshal(&msg, blockFeeTpe)
if err != nil {
Expand All @@ -87,7 +87,7 @@ func TestBlockFeeMarshaling(t *testing.T) {
}

func TestTransferMarshaling(t *testing.T) {
publisher := NewKafkaMarketDataPublisher(Logger)
publisher := NewKafkaMarketDataPublisher(Logger, "")
msg := Transfers{42, 20, 1000, []Transfer{{From: "", To: []Receiver{Receiver{"bnc1", []Coin{{"BNB", 100}, {"BTC", 100}}}, Receiver{"bnc2", []Coin{{"BNB", 200}, {"BTC", 200}}}}}}}
_, err := publisher.marshal(&msg, transferType)
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions cmd/pressuremaker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"time"

Expand Down Expand Up @@ -32,6 +33,10 @@ type PressureMakerConfig struct {
}

func main() {
go func() {
fmt.Println(http.ListenAndServe("localhost:6060", nil))
}()

Execute()
}

Expand Down Expand Up @@ -77,8 +82,12 @@ var rootCmd = &cobra.Command{
// TODO: find an elegant way to exit
// The problem of shutdown is publication is async (we don't know when messages are
finishSignal := make(chan struct{})
publisher := pub.NewKafkaMarketDataPublisher(context.Logger)

pub.Logger = context.Logger.With("module", "pub")
pub.Cfg = &cfg.PublicationConfig
pub.ToPublishCh = make(chan pub.BlockInfoToPublish, cfg.PublicationConfig.PublicationChannelSize)
publisher := pub.NewKafkaMarketDataPublisher(pub.Logger, "")
go pub.Publish(publisher, nil, pub.Logger, pub.Cfg, pub.ToPublishCh)
pub.IsLive = true
srv := &http.Server{
Addr: cfg.PrometheusAddr,
Handler: promhttp.InstrumentMetricHandler(
Expand Down
2 changes: 1 addition & 1 deletion cmd/pressuremaker/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (mg *MessageGenerator) ExpireMessages(height int, timeNow time.Time) (trade
orderChanges = make(orderPkg.OrderChanges, 0, 100000)
accounts = make(map[string]pub.Account)

for i := 0; i < 100000; i++ {
for i := 0; i < 1000000; i++ {
o := makeOrderInfo(mg.buyerAddrs[0], 1, int64(height), 1000000000, 1000000000, 500000000, timePub)
mg.OrderChangeMap[fmt.Sprintf("%d", i)] = &o
orderChanges = append(orderChanges, orderPkg.OrderChange{fmt.Sprintf("%d", i), orderPkg.Expired})
Expand Down
Loading

0 comments on commit 73ebb22

Please sign in to comment.