Browse Source

fixes from Bucky's review 2

pull/788/head
Anton Kaliaev 7 years ago
parent
commit
fcdd30b2d3
No known key found for this signature in database GPG Key ID: 7B6881D965918214
4 changed files with 13 additions and 17 deletions
  1. +2
    -2
      rpc/client/httpclient.go
  2. +4
    -4
      rpc/core/mempool.go
  3. +1
    -1
      rpc/core/pipe.go
  4. +6
    -10
      types/event_bus.go

+ 2
- 2
rpc/client/httpclient.go View File

@ -311,13 +311,13 @@ func (w *WSEvents) eventListener() {
continue
}
result := new(ctypes.ResultEvent)
err = json.Unmarshal(*resp.Result, result)
err := json.Unmarshal(*resp.Result, result)
if err != nil {
// ignore silently (eg. subscribe, unsubscribe and maybe other events)
// TODO: ?
continue
}
if ch := getSubscription(result.Query); ch != nil {
if ch := w.getSubscription(result.Query); ch != nil {
ch <- result.Data
}
case <-w.quit:


+ 4
- 4
rpc/core/mempool.go View File

@ -158,8 +158,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
if err != nil {
err = errors.Wrap(err, "failed to subscribe to tx")
logger.Error("Error broadcasting transaction", "err", err)
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)
}
defer eventBus.Unsubscribe(context.Background(), "mempool", q)
@ -169,8 +169,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
checkTxResCh <- res
})
if err != nil {
logger.Error("Error broadcasting transaction", "err", err)
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)
}
checkTxRes := <-checkTxResCh
checkTxR := checkTxRes.GetCheckTx()


+ 1
- 1
rpc/core/pipe.go View File

@ -50,7 +50,7 @@ var (
addrBook *p2p.AddrBook
txIndexer txindex.TxIndexer
consensusReactor *consensus.ConsensusReactor
eventBus *types.EventBus
eventBus *types.EventBus // thread safe
logger log.Logger
)


+ 6
- 10
types/event_bus.go View File

@ -59,11 +59,9 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error
}
func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
if b.pubsub != nil {
// no explicit deadline for publishing events
ctx := context.Background()
b.pubsub.PublishWithTags(ctx, eventData, map[string]interface{}{EventTypeKey: eventType})
}
// no explicit deadline for publishing events
ctx := context.Background()
b.pubsub.PublishWithTags(ctx, eventData, map[string]interface{}{EventTypeKey: eventType})
return nil
}
@ -82,11 +80,9 @@ func (b *EventBus) PublishEventVote(vote EventDataVote) error {
}
func (b *EventBus) PublishEventTx(tx EventDataTx) error {
if b.pubsub != nil {
// no explicit deadline for publishing events
ctx := context.Background()
b.pubsub.PublishWithTags(ctx, TMEventData{tx}, map[string]interface{}{EventTypeKey: EventTx, TxHashKey: fmt.Sprintf("%X", tx.Tx.Hash())})
}
// no explicit deadline for publishing events
ctx := context.Background()
b.pubsub.PublishWithTags(ctx, TMEventData{tx}, map[string]interface{}{EventTypeKey: EventTx, TxHashKey: fmt.Sprintf("%X", tx.Tx.Hash())})
return nil
}


Loading…
Cancel
Save