From fcdd30b2d345bf1250fad46b615219879d37df7c Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 2 Nov 2017 13:12:40 -0500 Subject: [PATCH] fixes from Bucky's review 2 --- rpc/client/httpclient.go | 4 ++-- rpc/core/mempool.go | 8 ++++---- rpc/core/pipe.go | 2 +- types/event_bus.go | 16 ++++++---------- 4 files changed, 13 insertions(+), 17 deletions(-) diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 962ecfd7c..66cf89165 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -311,13 +311,13 @@ func (w *WSEvents) eventListener() { continue } result := new(ctypes.ResultEvent) - err = json.Unmarshal(*resp.Result, result) + err := json.Unmarshal(*resp.Result, result) if err != nil { // ignore silently (eg. subscribe, unsubscribe and maybe other events) // TODO: ? continue } - if ch := getSubscription(result.Query); ch != nil { + if ch := w.getSubscription(result.Query); ch != nil { ch <- result.Data } case <-w.quit: diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 649f701b4..46204ebfa 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -158,8 +158,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) if err != nil { err = errors.Wrap(err, "failed to subscribe to tx") - logger.Error("Error broadcasting transaction", "err", err) - return nil, fmt.Errorf("Error broadcasting transaction: %v", err) + logger.Error("Error on broadcastTxCommit", "err", err) + return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err) } defer eventBus.Unsubscribe(context.Background(), "mempool", q) @@ -169,8 +169,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { checkTxResCh <- res }) if err != nil { - logger.Error("Error broadcasting transaction", "err", err) - return nil, fmt.Errorf("Error broadcasting transaction: %v", err) + logger.Error("Error on broadcastTxCommit", "err", err) + return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err) } checkTxRes := <-checkTxResCh checkTxR := checkTxRes.GetCheckTx() diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index bee59e1c0..cbe6cc426 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -50,7 +50,7 @@ var ( addrBook *p2p.AddrBook txIndexer txindex.TxIndexer consensusReactor *consensus.ConsensusReactor - eventBus *types.EventBus + eventBus *types.EventBus // thread safe logger log.Logger ) diff --git a/types/event_bus.go b/types/event_bus.go index 3b6b37a05..85ef14485 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -59,11 +59,9 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error } func (b *EventBus) Publish(eventType string, eventData TMEventData) error { - if b.pubsub != nil { - // no explicit deadline for publishing events - ctx := context.Background() - b.pubsub.PublishWithTags(ctx, eventData, map[string]interface{}{EventTypeKey: eventType}) - } + // no explicit deadline for publishing events + ctx := context.Background() + b.pubsub.PublishWithTags(ctx, eventData, map[string]interface{}{EventTypeKey: eventType}) return nil } @@ -82,11 +80,9 @@ func (b *EventBus) PublishEventVote(vote EventDataVote) error { } func (b *EventBus) PublishEventTx(tx EventDataTx) error { - if b.pubsub != nil { - // no explicit deadline for publishing events - ctx := context.Background() - b.pubsub.PublishWithTags(ctx, TMEventData{tx}, map[string]interface{}{EventTypeKey: EventTx, TxHashKey: fmt.Sprintf("%X", tx.Tx.Hash())}) - } + // no explicit deadline for publishing events + ctx := context.Background() + b.pubsub.PublishWithTags(ctx, TMEventData{tx}, map[string]interface{}{EventTypeKey: EventTx, TxHashKey: fmt.Sprintf("%X", tx.Tx.Hash())}) return nil }