Browse Source

fixes from Bucky's and Emmanuel's reviews

pull/788/head
Anton Kaliaev 7 years ago
parent
commit
61d76a273f
No known key found for this signature in database GPG Key ID: 7B6881D965918214
10 changed files with 76 additions and 56 deletions
  1. +8
    -1
      CHANGELOG.md
  2. +17
    -4
      consensus/reactor.go
  3. +1
    -7
      consensus/state.go
  4. +8
    -0
      consensus/wal.go
  5. +6
    -3
      rpc/core/events.go
  6. +6
    -1
      rpc/lib/server/handlers.go
  7. +1
    -1
      rpc/lib/types/types.go
  8. +17
    -17
      types/event_bus.go
  9. +1
    -1
      types/event_bus_test.go
  10. +11
    -21
      types/events.go

+ 8
- 1
CHANGELOG.md View File

@ -7,7 +7,6 @@ BREAKING CHANGES:
- Better support for injecting randomness - Better support for injecting randomness
- Pass evidence/voteInfo through ABCI - Pass evidence/voteInfo through ABCI
- Upgrade consensus for more real-time use of evidence - Upgrade consensus for more real-time use of evidence
- New events system using tmlibs/pubsub
FEATURES: FEATURES:
- Peer reputation management - Peer reputation management
@ -29,6 +28,14 @@ BUG FIXES:
- Graceful handling/recovery for apps that have non-determinism or fail to halt - Graceful handling/recovery for apps that have non-determinism or fail to halt
- Graceful handling/recovery for violations of safety, or liveness - Graceful handling/recovery for violations of safety, or liveness
## 0.12.1 (TBA)
FEATURES:
- new unsubscribe_all WebSocket RPC endpoint
IMPROVEMENTS:
- New events system using tmlibs/pubsub
## 0.12.0 (October 27, 2017) ## 0.12.0 (October 27, 2017)
BREAKING CHANGES: BREAKING CHANGES:


+ 17
- 4
consensus/reactor.go View File

@ -57,7 +57,7 @@ func (conR *ConsensusReactor) OnStart() error {
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync()) conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
conR.BaseReactor.OnStart() conR.BaseReactor.OnStart()
err := conR.broadcastNewRoundStepsAndVotes()
err := conR.broadcastRoutine()
if err != nil { if err != nil {
return err return err
} }
@ -325,9 +325,10 @@ func (conR *ConsensusReactor) FastSync() bool {
//-------------------------------------- //--------------------------------------
// broadcastNewRoundStepsAndVotes subscribes for new round steps and votes
// using the event bus and broadcasts events to peers upon receiving them.
func (conR *ConsensusReactor) broadcastNewRoundStepsAndVotes() error {
// broadcastRoutine subscribes for new round steps, votes and proposal
// heartbeats using the event bus and broadcasts events to peers upon receiving
// them.
func (conR *ConsensusReactor) broadcastRoutine() error {
const subscriber = "consensus-reactor" const subscriber = "consensus-reactor"
ctx := context.Background() ctx := context.Background()
@ -345,6 +346,13 @@ func (conR *ConsensusReactor) broadcastNewRoundStepsAndVotes() error {
return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryVote) return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryVote)
} }
// proposal heartbeats
heartbeatsCh := make(chan interface{})
err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryProposalHeartbeat, heartbeatsCh)
if err != nil {
return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryProposalHeartbeat)
}
go func() { go func() {
for { for {
select { select {
@ -358,6 +366,11 @@ func (conR *ConsensusReactor) broadcastNewRoundStepsAndVotes() error {
edv := data.(types.TMEventData).Unwrap().(types.EventDataVote) edv := data.(types.TMEventData).Unwrap().(types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote) conR.broadcastHasVoteMessage(edv.Vote)
} }
case data, ok := <-heartbeatsCh:
if ok {
edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat)
conR.broadcastProposalHeartbeatMessage(edph)
}
case <-conR.Quit: case <-conR.Quit:
conR.eventBus.UnsubscribeAll(ctx, subscriber) conR.eventBus.UnsubscribeAll(ctx, subscriber)
return return


+ 1
- 7
consensus/state.go View File

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"path/filepath"
"reflect" "reflect"
"runtime/debug" "runtime/debug"
"sync" "sync"
@ -280,14 +279,9 @@ func (cs *ConsensusState) Wait() {
// OpenWAL opens a file to log all consensus messages and timeouts for deterministic accountability // OpenWAL opens a file to log all consensus messages and timeouts for deterministic accountability
func (cs *ConsensusState) OpenWAL(walFile string) (WAL, error) { func (cs *ConsensusState) OpenWAL(walFile string) (WAL, error) {
err := cmn.EnsureDir(filepath.Dir(walFile), 0700)
if err != nil {
cs.Logger.Error("Error ensuring ConsensusState wal dir", "err", err.Error())
return nil, err
}
wal, err := NewWAL(walFile, cs.config.WalLight) wal, err := NewWAL(walFile, cs.config.WalLight)
if err != nil { if err != nil {
cs.Logger.Error("Failed to open WAL for consensus state", "wal", walFile, "err", err)
return nil, err return nil, err
} }
wal.SetLogger(cs.Logger.With("wal", walFile)) wal.SetLogger(cs.Logger.With("wal", walFile))


+ 8
- 0
consensus/wal.go View File

@ -6,8 +6,11 @@ import (
"fmt" "fmt"
"hash/crc32" "hash/crc32"
"io" "io"
"path/filepath"
"time" "time"
"github.com/pkg/errors"
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
auto "github.com/tendermint/tmlibs/autofile" auto "github.com/tendermint/tmlibs/autofile"
@ -70,6 +73,11 @@ type baseWAL struct {
} }
func NewWAL(walFile string, light bool) (*baseWAL, error) { func NewWAL(walFile string, light bool) (*baseWAL, error) {
err := cmn.EnsureDir(filepath.Dir(walFile), 0700)
if err != nil {
return nil, errors.Wrap(err, "failed to ensure WAL directory is in place")
}
group, err := auto.OpenGroup(walFile) group, err := auto.OpenGroup(walFile)
if err != nil { if err != nil {
return nil, err return nil, err


+ 6
- 3
rpc/core/events.go View File

@ -41,13 +41,18 @@ import (
// <aside class="notice">WebSocket only</aside> // <aside class="notice">WebSocket only</aside>
func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) {
addr := wsCtx.GetRemoteAddr() addr := wsCtx.GetRemoteAddr()
logger.Info("Subscribe to query", "remote", addr, "query", query) logger.Info("Subscribe to query", "remote", addr, "query", query)
q, err := tmquery.New(query) q, err := tmquery.New(query)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to parse a query") return nil, errors.Wrap(err, "failed to parse a query")
} }
err = wsCtx.AddSubscription(query, q)
if err != nil {
return nil, errors.Wrap(err, "failed to add subscription")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel() defer cancel()
ch := make(chan interface{}) ch := make(chan interface{})
@ -56,8 +61,6 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
return nil, errors.Wrap(err, "failed to subscribe") return nil, errors.Wrap(err, "failed to subscribe")
} }
wsCtx.AddSubscription(query, q)
go func() { go func() {
for event := range ch { for event := range ch {
tmResult := &ctypes.ResultEvent{query, event.(tmtypes.TMEventData)} tmResult := &ctypes.ResultEvent{query, event.(tmtypes.TMEventData)}


+ 6
- 1
rpc/lib/server/handlers.go View File

@ -478,8 +478,13 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
} }
} }
func (wsc *wsConnection) AddSubscription(query string, data interface{}) {
func (wsc *wsConnection) AddSubscription(query string, data interface{}) error {
if _, ok := wsc.subscriptions[query]; ok {
return errors.New("Already subscribed")
}
wsc.subscriptions[query] = data wsc.subscriptions[query] = data
return nil
} }
func (wsc *wsConnection) DeleteSubscription(query string) (interface{}, bool) { func (wsc *wsConnection) DeleteSubscription(query string) (interface{}, bool) {


+ 1
- 1
rpc/lib/types/types.go View File

@ -137,7 +137,7 @@ type WSRPCConnection interface {
WriteRPCResponse(resp RPCResponse) WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool TryWriteRPCResponse(resp RPCResponse) bool
AddSubscription(string, interface{})
AddSubscription(string, interface{}) error
DeleteSubscription(string) (interface{}, bool) DeleteSubscription(string) (interface{}, bool)
DeleteAllSubscriptions() DeleteAllSubscriptions()
} }


+ 17
- 17
types/event_bus.go View File

@ -58,7 +58,7 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error
return b.pubsub.UnsubscribeAll(ctx, subscriber) return b.pubsub.UnsubscribeAll(ctx, subscriber)
} }
func (b *EventBus) publish(eventType string, eventData TMEventData) error {
func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
if b.pubsub != nil { if b.pubsub != nil {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
@ -70,15 +70,15 @@ func (b *EventBus) publish(eventType string, eventData TMEventData) error {
//--- block, tx, and vote events //--- block, tx, and vote events
func (b *EventBus) PublishEventNewBlock(block EventDataNewBlock) error { func (b *EventBus) PublishEventNewBlock(block EventDataNewBlock) error {
return b.publish(EventNewBlock, TMEventData{block})
return b.Publish(EventNewBlock, TMEventData{block})
} }
func (b *EventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error { func (b *EventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error {
return b.publish(EventNewBlockHeader, TMEventData{header})
return b.Publish(EventNewBlockHeader, TMEventData{header})
} }
func (b *EventBus) PublishEventVote(vote EventDataVote) error { func (b *EventBus) PublishEventVote(vote EventDataVote) error {
return b.publish(EventVote, TMEventData{vote})
return b.Publish(EventVote, TMEventData{vote})
} }
func (b *EventBus) PublishEventTx(tx EventDataTx) error { func (b *EventBus) PublishEventTx(tx EventDataTx) error {
@ -90,44 +90,44 @@ func (b *EventBus) PublishEventTx(tx EventDataTx) error {
return nil return nil
} }
func (b *EventBus) PublishEventProposalHeartbeat(ph EventDataProposalHeartbeat) error {
return b.Publish(EventProposalHeartbeat, TMEventData{ph})
}
//--- EventDataRoundState events //--- EventDataRoundState events
func (b *EventBus) PublishEventNewRoundStep(rs EventDataRoundState) error { func (b *EventBus) PublishEventNewRoundStep(rs EventDataRoundState) error {
return b.publish(EventNewRoundStep, TMEventData{rs})
return b.Publish(EventNewRoundStep, TMEventData{rs})
} }
func (b *EventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error { func (b *EventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error {
return b.publish(EventTimeoutPropose, TMEventData{rs})
return b.Publish(EventTimeoutPropose, TMEventData{rs})
} }
func (b *EventBus) PublishEventTimeoutWait(rs EventDataRoundState) error { func (b *EventBus) PublishEventTimeoutWait(rs EventDataRoundState) error {
return b.publish(EventTimeoutWait, TMEventData{rs})
return b.Publish(EventTimeoutWait, TMEventData{rs})
} }
func (b *EventBus) PublishEventNewRound(rs EventDataRoundState) error { func (b *EventBus) PublishEventNewRound(rs EventDataRoundState) error {
return b.publish(EventNewRound, TMEventData{rs})
return b.Publish(EventNewRound, TMEventData{rs})
} }
func (b *EventBus) PublishEventCompleteProposal(rs EventDataRoundState) error { func (b *EventBus) PublishEventCompleteProposal(rs EventDataRoundState) error {
return b.publish(EventCompleteProposal, TMEventData{rs})
return b.Publish(EventCompleteProposal, TMEventData{rs})
} }
func (b *EventBus) PublishEventPolka(rs EventDataRoundState) error { func (b *EventBus) PublishEventPolka(rs EventDataRoundState) error {
return b.publish(EventPolka, TMEventData{rs})
return b.Publish(EventPolka, TMEventData{rs})
} }
func (b *EventBus) PublishEventUnlock(rs EventDataRoundState) error { func (b *EventBus) PublishEventUnlock(rs EventDataRoundState) error {
return b.publish(EventUnlock, TMEventData{rs})
return b.Publish(EventUnlock, TMEventData{rs})
} }
func (b *EventBus) PublishEventRelock(rs EventDataRoundState) error { func (b *EventBus) PublishEventRelock(rs EventDataRoundState) error {
return b.publish(EventRelock, TMEventData{rs})
return b.Publish(EventRelock, TMEventData{rs})
} }
func (b *EventBus) PublishEventLock(rs EventDataRoundState) error { func (b *EventBus) PublishEventLock(rs EventDataRoundState) error {
return b.publish(EventLock, TMEventData{rs})
}
func (b *EventBus) PublishEventProposalHeartbeat(ph EventDataProposalHeartbeat) error {
return b.publish(EventProposalHeartbeat, TMEventData{ph})
return b.Publish(EventLock, TMEventData{rs})
} }

+ 1
- 1
types/event_bus_test.go View File

@ -73,7 +73,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes
eventType = randEvent() eventType = randEvent()
} }
eventBus.publish(eventType, TMEventData{"Gamora"})
eventBus.Publish(eventType, TMEventData{"Gamora"})
} }
} }


+ 11
- 21
types/events.go View File

@ -37,12 +37,11 @@ const (
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
var ( var (
EventDataNameNewBlock = "new_block"
EventDataNameNewBlockHeader = "new_block_header"
EventDataNameTx = "tx"
EventDataNameRoundState = "round_state"
EventDataNameVote = "vote"
EventDataNameNewBlock = "new_block"
EventDataNameNewBlockHeader = "new_block_header"
EventDataNameTx = "tx"
EventDataNameRoundState = "round_state"
EventDataNameVote = "vote"
EventDataNameProposalHeartbeat = "proposer_heartbeat" EventDataNameProposalHeartbeat = "proposer_heartbeat"
) )
@ -80,14 +79,12 @@ func (tmr TMEventData) Empty() bool {
} }
const ( const (
EventDataTypeNewBlock = byte(0x01)
EventDataTypeFork = byte(0x02)
EventDataTypeTx = byte(0x03)
EventDataTypeNewBlockHeader = byte(0x04)
EventDataTypeRoundState = byte(0x11)
EventDataTypeVote = byte(0x12)
EventDataTypeNewBlock = byte(0x01)
EventDataTypeFork = byte(0x02)
EventDataTypeTx = byte(0x03)
EventDataTypeNewBlockHeader = byte(0x04)
EventDataTypeRoundState = byte(0x11)
EventDataTypeVote = byte(0x12)
EventDataTypeProposalHeartbeat = byte(0x20) EventDataTypeProposalHeartbeat = byte(0x20)
) )
@ -139,13 +136,6 @@ type EventDataVote struct {
Vote *Vote Vote *Vote
} }
func (_ EventDataNewBlock) AssertIsTMEventData() {}
func (_ EventDataNewBlockHeader) AssertIsTMEventData() {}
func (_ EventDataTx) AssertIsTMEventData() {}
func (_ EventDataRoundState) AssertIsTMEventData() {}
func (_ EventDataVote) AssertIsTMEventData() {}
func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// PUBSUB // PUBSUB
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////


Loading…
Cancel
Save