Skip to content

Commit

Permalink
#476 enhancement for query service efficient processing (#563)
Browse files Browse the repository at this point in the history
  • Loading branch information
ackratos authored and forcodedancing committed May 19, 2022
1 parent 660071b commit 902f90e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 31 deletions.
2 changes: 2 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,8 @@ func (app *BinanceChain) publish(tradesToPublish []*pub.Trade, proposalsToPublis
len(app.DexKeeper.OrderChanges),
"numOfProposals",
proposalsToPublish.NumOfMsgs,
"numOfStakeUpdates",
stakeUpdates.NumOfMsgs,
"numOfAccounts",
len(accountsToPublish))
pub.ToRemoveOrderIdCh = make(chan string, pub.ToRemoveOrderIdChannelSize)
Expand Down
50 changes: 19 additions & 31 deletions app/pub/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,25 @@ func (msg *ExecutionResults) ToNativeMap() map[string]interface{} {
func (msg *ExecutionResults) EssentialMsg() string {
// mainly used to recover for large breathe block expiring message, there should be no trade on breathe block
orders := msg.Orders.EssentialMsg()
proposals := msg.Proposals.EssentialMsg()
return fmt.Sprintf("height:%d\norders:%s\nproposals:%s\n", msg.Height, orders, proposals)
return fmt.Sprintf("height:%d\ntime:%d\norders:\n%s\n", msg.Height, msg.Timestamp, orders)
}

func (msg *ExecutionResults) EmptyCopy() AvroOrJsonMsg {
var nonExpiredOrders []*Order
for _, order := range msg.Orders.Orders {
if order.Status != orderPkg.Expired {
nonExpiredOrders = append(nonExpiredOrders, order)
}
}

return &ExecutionResults{
msg.Height,
msg.Timestamp,
msg.NumOfMsgs,
trades{},
Orders{},
Proposals{},
StakeUpdates{},
msg.Proposals.NumOfMsgs + msg.StakeUpdates.NumOfMsgs + len(nonExpiredOrders),
trades{}, // no trades on breathe block
Orders{len(nonExpiredOrders), nonExpiredOrders},
msg.Proposals,
msg.StakeUpdates,
}
}

Expand Down Expand Up @@ -208,18 +214,15 @@ func (msg *Orders) ToNativeMap() map[string]interface{} {
}

func (msg *Orders) EssentialMsg() string {
stat := make(map[orderPkg.ChangeType]*strings.Builder, 0) // ChangeType -> OrderIds splited by EOL
expiredOrders := &strings.Builder{}
for _, order := range msg.Orders {
if _, ok := stat[order.Status]; !ok {
stat[order.Status] = &strings.Builder{}
// we only log expired orders in essential file
// and publish other types of message via kafka
if order.Status == orderPkg.Expired {
fmt.Fprintf(expiredOrders, "%s %s %s\n", order.OrderId, order.Owner, order.Fee)
}
fmt.Fprintf(stat[order.Status], "\n%s", order.OrderId)
}
var result strings.Builder
for changeType, str := range stat {
fmt.Fprintf(&result, "%d:%s\n", changeType, str.String())
}
return result.String()
return expiredOrders.String()
}

type Order struct {
Expand Down Expand Up @@ -314,21 +317,6 @@ func (msg *Proposals) ToNativeMap() map[string]interface{} {
return native
}

func (msg *Proposals) EssentialMsg() string {
stat := make(map[ProposalStatus]*strings.Builder, 0) // ProposalStatus -> OrderIds splited by EOL
for _, proposal := range msg.Proposals {
if _, ok := stat[proposal.Status]; !ok {
stat[proposal.Status] = &strings.Builder{}
}
fmt.Fprintf(stat[proposal.Status], "\n%d", proposal.Id)
}
var result strings.Builder
for proposalStatus, str := range stat {
fmt.Fprintf(&result, "%d:%s\n", proposalStatus, str.String())
}
return result.String()
}

type ProposalStatus uint8

const (
Expand Down

0 comments on commit 902f90e

Please sign in to comment.