From 61d76a273fb1025fd9480994c9dc91db4217bd11 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 30 Oct 2017 11:12:01 -0500 Subject: [PATCH] fixes from Bucky's and Emmanuel's reviews --- CHANGELOG.md | 9 ++++++++- consensus/reactor.go | 21 +++++++++++++++++---- consensus/state.go | 8 +------- consensus/wal.go | 8 ++++++++ rpc/core/events.go | 9 ++++++--- rpc/lib/server/handlers.go | 7 ++++++- rpc/lib/types/types.go | 2 +- types/event_bus.go | 34 +++++++++++++++++----------------- types/event_bus_test.go | 2 +- types/events.go | 32 +++++++++++--------------------- 10 files changed, 76 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 055c7fb66..b90a52704 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,6 @@ BREAKING CHANGES: - Better support for injecting randomness - Pass evidence/voteInfo through ABCI - Upgrade consensus for more real-time use of evidence -- New events system using tmlibs/pubsub FEATURES: - Peer reputation management @@ -29,6 +28,14 @@ BUG FIXES: - Graceful handling/recovery for apps that have non-determinism or fail to halt - Graceful handling/recovery for violations of safety, or liveness +## 0.12.1 (TBA) + +FEATURES: +- new unsubscribe_all WebSocket RPC endpoint + +IMPROVEMENTS: +- New events system using tmlibs/pubsub + ## 0.12.0 (October 27, 2017) BREAKING CHANGES: diff --git a/consensus/reactor.go b/consensus/reactor.go index 1568e37a6..f18c3c39d 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -57,7 +57,7 @@ func (conR *ConsensusReactor) OnStart() error { conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync()) conR.BaseReactor.OnStart() - err := conR.broadcastNewRoundStepsAndVotes() + err := conR.broadcastRoutine() if err != nil { return err } @@ -325,9 +325,10 @@ func (conR *ConsensusReactor) FastSync() bool { //-------------------------------------- -// broadcastNewRoundStepsAndVotes subscribes for new round steps and votes -// using the event bus and broadcasts events to peers upon receiving them. -func (conR *ConsensusReactor) broadcastNewRoundStepsAndVotes() error { +// broadcastRoutine subscribes for new round steps, votes and proposal +// heartbeats using the event bus and broadcasts events to peers upon receiving +// them. +func (conR *ConsensusReactor) broadcastRoutine() error { const subscriber = "consensus-reactor" ctx := context.Background() @@ -345,6 +346,13 @@ func (conR *ConsensusReactor) broadcastNewRoundStepsAndVotes() error { return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryVote) } + // proposal heartbeats + heartbeatsCh := make(chan interface{}) + err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryProposalHeartbeat, heartbeatsCh) + if err != nil { + return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryProposalHeartbeat) + } + go func() { for { select { @@ -358,6 +366,11 @@ func (conR *ConsensusReactor) broadcastNewRoundStepsAndVotes() error { edv := data.(types.TMEventData).Unwrap().(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote) } + case data, ok := <-heartbeatsCh: + if ok { + edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat) + conR.broadcastProposalHeartbeatMessage(edph) + } case <-conR.Quit: conR.eventBus.UnsubscribeAll(ctx, subscriber) return diff --git a/consensus/state.go b/consensus/state.go index 15a036930..2f273dc3e 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "path/filepath" "reflect" "runtime/debug" "sync" @@ -280,14 +279,9 @@ func (cs *ConsensusState) Wait() { // OpenWAL opens a file to log all consensus messages and timeouts for deterministic accountability func (cs *ConsensusState) OpenWAL(walFile string) (WAL, error) { - err := cmn.EnsureDir(filepath.Dir(walFile), 0700) - if err != nil { - cs.Logger.Error("Error ensuring ConsensusState wal dir", "err", err.Error()) - return nil, err - } - wal, err := NewWAL(walFile, cs.config.WalLight) if err != nil { + cs.Logger.Error("Failed to open WAL for consensus state", "wal", walFile, "err", err) return nil, err } wal.SetLogger(cs.Logger.With("wal", walFile)) diff --git a/consensus/wal.go b/consensus/wal.go index 5ae02e4e7..3f85f7da6 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -6,8 +6,11 @@ import ( "fmt" "hash/crc32" "io" + "path/filepath" "time" + "github.com/pkg/errors" + wire "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" auto "github.com/tendermint/tmlibs/autofile" @@ -70,6 +73,11 @@ type baseWAL struct { } func NewWAL(walFile string, light bool) (*baseWAL, error) { + err := cmn.EnsureDir(filepath.Dir(walFile), 0700) + if err != nil { + return nil, errors.Wrap(err, "failed to ensure WAL directory is in place") + } + group, err := auto.OpenGroup(walFile) if err != nil { return nil, err diff --git a/rpc/core/events.go b/rpc/core/events.go index e9d544413..af224a6b5 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -41,13 +41,18 @@ import ( // func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { addr := wsCtx.GetRemoteAddr() - logger.Info("Subscribe to query", "remote", addr, "query", query) + q, err := tmquery.New(query) if err != nil { return nil, errors.Wrap(err, "failed to parse a query") } + err = wsCtx.AddSubscription(query, q) + if err != nil { + return nil, errors.Wrap(err, "failed to add subscription") + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() ch := make(chan interface{}) @@ -56,8 +61,6 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri return nil, errors.Wrap(err, "failed to subscribe") } - wsCtx.AddSubscription(query, q) - go func() { for event := range ch { tmResult := &ctypes.ResultEvent{query, event.(tmtypes.TMEventData)} diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 283be1823..ddb7f9624 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -478,8 +478,13 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { } } -func (wsc *wsConnection) AddSubscription(query string, data interface{}) { +func (wsc *wsConnection) AddSubscription(query string, data interface{}) error { + if _, ok := wsc.subscriptions[query]; ok { + return errors.New("Already subscribed") + } + wsc.subscriptions[query] = data + return nil } func (wsc *wsConnection) DeleteSubscription(query string) (interface{}, bool) { diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 5a3c9171c..5bf95cc6d 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -137,7 +137,7 @@ type WSRPCConnection interface { WriteRPCResponse(resp RPCResponse) TryWriteRPCResponse(resp RPCResponse) bool - AddSubscription(string, interface{}) + AddSubscription(string, interface{}) error DeleteSubscription(string) (interface{}, bool) DeleteAllSubscriptions() } diff --git a/types/event_bus.go b/types/event_bus.go index 762f1af61..3b6b37a05 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -58,7 +58,7 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error return b.pubsub.UnsubscribeAll(ctx, subscriber) } -func (b *EventBus) publish(eventType string, eventData TMEventData) error { +func (b *EventBus) Publish(eventType string, eventData TMEventData) error { if b.pubsub != nil { // no explicit deadline for publishing events ctx := context.Background() @@ -70,15 +70,15 @@ func (b *EventBus) publish(eventType string, eventData TMEventData) error { //--- block, tx, and vote events func (b *EventBus) PublishEventNewBlock(block EventDataNewBlock) error { - return b.publish(EventNewBlock, TMEventData{block}) + return b.Publish(EventNewBlock, TMEventData{block}) } func (b *EventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error { - return b.publish(EventNewBlockHeader, TMEventData{header}) + return b.Publish(EventNewBlockHeader, TMEventData{header}) } func (b *EventBus) PublishEventVote(vote EventDataVote) error { - return b.publish(EventVote, TMEventData{vote}) + return b.Publish(EventVote, TMEventData{vote}) } func (b *EventBus) PublishEventTx(tx EventDataTx) error { @@ -90,44 +90,44 @@ func (b *EventBus) PublishEventTx(tx EventDataTx) error { return nil } +func (b *EventBus) PublishEventProposalHeartbeat(ph EventDataProposalHeartbeat) error { + return b.Publish(EventProposalHeartbeat, TMEventData{ph}) +} + //--- EventDataRoundState events func (b *EventBus) PublishEventNewRoundStep(rs EventDataRoundState) error { - return b.publish(EventNewRoundStep, TMEventData{rs}) + return b.Publish(EventNewRoundStep, TMEventData{rs}) } func (b *EventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error { - return b.publish(EventTimeoutPropose, TMEventData{rs}) + return b.Publish(EventTimeoutPropose, TMEventData{rs}) } func (b *EventBus) PublishEventTimeoutWait(rs EventDataRoundState) error { - return b.publish(EventTimeoutWait, TMEventData{rs}) + return b.Publish(EventTimeoutWait, TMEventData{rs}) } func (b *EventBus) PublishEventNewRound(rs EventDataRoundState) error { - return b.publish(EventNewRound, TMEventData{rs}) + return b.Publish(EventNewRound, TMEventData{rs}) } func (b *EventBus) PublishEventCompleteProposal(rs EventDataRoundState) error { - return b.publish(EventCompleteProposal, TMEventData{rs}) + return b.Publish(EventCompleteProposal, TMEventData{rs}) } func (b *EventBus) PublishEventPolka(rs EventDataRoundState) error { - return b.publish(EventPolka, TMEventData{rs}) + return b.Publish(EventPolka, TMEventData{rs}) } func (b *EventBus) PublishEventUnlock(rs EventDataRoundState) error { - return b.publish(EventUnlock, TMEventData{rs}) + return b.Publish(EventUnlock, TMEventData{rs}) } func (b *EventBus) PublishEventRelock(rs EventDataRoundState) error { - return b.publish(EventRelock, TMEventData{rs}) + return b.Publish(EventRelock, TMEventData{rs}) } func (b *EventBus) PublishEventLock(rs EventDataRoundState) error { - return b.publish(EventLock, TMEventData{rs}) -} - -func (b *EventBus) PublishEventProposalHeartbeat(ph EventDataProposalHeartbeat) error { - return b.publish(EventProposalHeartbeat, TMEventData{ph}) + return b.Publish(EventLock, TMEventData{rs}) } diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 4c10fc219..aa97092f6 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -73,7 +73,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes eventType = randEvent() } - eventBus.publish(eventType, TMEventData{"Gamora"}) + eventBus.Publish(eventType, TMEventData{"Gamora"}) } } diff --git a/types/events.go b/types/events.go index 57851af4a..ef9b5f98b 100644 --- a/types/events.go +++ b/types/events.go @@ -37,12 +37,11 @@ const ( /////////////////////////////////////////////////////////////////////////////// var ( - EventDataNameNewBlock = "new_block" - EventDataNameNewBlockHeader = "new_block_header" - EventDataNameTx = "tx" - EventDataNameRoundState = "round_state" - EventDataNameVote = "vote" - + EventDataNameNewBlock = "new_block" + EventDataNameNewBlockHeader = "new_block_header" + EventDataNameTx = "tx" + EventDataNameRoundState = "round_state" + EventDataNameVote = "vote" EventDataNameProposalHeartbeat = "proposer_heartbeat" ) @@ -80,14 +79,12 @@ func (tmr TMEventData) Empty() bool { } const ( - EventDataTypeNewBlock = byte(0x01) - EventDataTypeFork = byte(0x02) - EventDataTypeTx = byte(0x03) - EventDataTypeNewBlockHeader = byte(0x04) - - EventDataTypeRoundState = byte(0x11) - EventDataTypeVote = byte(0x12) - + EventDataTypeNewBlock = byte(0x01) + EventDataTypeFork = byte(0x02) + EventDataTypeTx = byte(0x03) + EventDataTypeNewBlockHeader = byte(0x04) + EventDataTypeRoundState = byte(0x11) + EventDataTypeVote = byte(0x12) EventDataTypeProposalHeartbeat = byte(0x20) ) @@ -139,13 +136,6 @@ type EventDataVote struct { Vote *Vote } -func (_ EventDataNewBlock) AssertIsTMEventData() {} -func (_ EventDataNewBlockHeader) AssertIsTMEventData() {} -func (_ EventDataTx) AssertIsTMEventData() {} -func (_ EventDataRoundState) AssertIsTMEventData() {} -func (_ EventDataVote) AssertIsTMEventData() {} -func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {} - /////////////////////////////////////////////////////////////////////////////// // PUBSUB ///////////////////////////////////////////////////////////////////////////////