Browse Source

Merge pull request #291 from tendermint/type-safe-fire-event

Type safe fire event
pull/298/head
Ethan Buchman 8 years ago
committed by GitHub
parent
commit
7a424e6b12
18 changed files with 210 additions and 76 deletions
  1. +2
    -3
      blockchain/reactor.go
  2. +3
    -3
      consensus/common.go
  3. +1
    -2
      consensus/common_test.go
  4. +4
    -5
      consensus/reactor.go
  5. +2
    -3
      consensus/replay_test.go
  6. +19
    -20
      consensus/state.go
  7. +1
    -2
      consensus/state_test.go
  8. +4
    -4
      glide.lock
  9. +2
    -3
      mempool/reactor.go
  10. +5
    -6
      node/node.go
  11. +2
    -3
      rpc/core/events.go
  12. +6
    -8
      rpc/core/mempool.go
  13. +2
    -3
      rpc/core/pipe.go
  14. +34
    -0
      rpc/test/client_test.go
  15. +5
    -0
      scripts/glide/status.sh
  16. +1
    -1
      scripts/glide/update.sh
  17. +14
    -5
      state/execution.go
  18. +103
    -5
      types/events.go

+ 2
- 3
blockchain/reactor.go View File

@ -8,7 +8,6 @@ import (
"time" "time"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-events"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
@ -52,7 +51,7 @@ type BlockchainReactor struct {
timeoutsCh chan string timeoutsCh chan string
lastBlock *types.Block lastBlock *types.Block
evsw *events.EventSwitch
evsw types.EventSwitch
} }
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor { func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
@ -268,7 +267,7 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
} }
// implements events.Eventable // implements events.Eventable
func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
bcR.evsw = evsw bcR.evsw = evsw
} }


+ 3
- 3
consensus/common.go View File

@ -1,14 +1,14 @@
package consensus package consensus
import ( import (
"github.com/tendermint/go-events"
"github.com/tendermint/tendermint/types"
) )
// NOTE: this is blocking // NOTE: this is blocking
func subscribeToEvent(evsw *events.EventSwitch, receiver, eventID string, chanCap int) chan interface{} {
func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} {
// listen for new round // listen for new round
ch := make(chan interface{}, chanCap) ch := make(chan interface{}, chanCap)
evsw.AddListenerForEvent(receiver, eventID, func(data events.EventData) {
types.AddListenerForEvent(evsw, receiver, eventID, func(data types.TMEventData) {
ch <- data ch <- data
}) })
return ch return ch


+ 1
- 2
consensus/common_test.go View File

@ -10,7 +10,6 @@ import (
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
dbm "github.com/tendermint/go-db" dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-events"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
@ -338,7 +337,7 @@ func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Applic
cs := NewConsensusState(config, state, proxyAppConnCon, blockStore, mempool) cs := NewConsensusState(config, state, proxyAppConnCon, blockStore, mempool)
cs.SetPrivValidator(pv) cs.SetPrivValidator(pv)
evsw := events.NewEventSwitch()
evsw := types.NewEventSwitch()
cs.SetEventSwitch(evsw) cs.SetEventSwitch(evsw)
evsw.Start() evsw.Start()
return cs return cs


+ 4
- 5
consensus/reactor.go View File

@ -9,7 +9,6 @@ import (
"time" "time"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-events"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
@ -34,7 +33,7 @@ type ConsensusReactor struct {
blockStore *bc.BlockStore blockStore *bc.BlockStore
conS *ConsensusState conS *ConsensusState
fastSync bool fastSync bool
evsw *events.EventSwitch
evsw types.EventSwitch
} }
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
@ -225,7 +224,7 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
} }
// implements events.Eventable // implements events.Eventable
func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
conR.evsw = evsw conR.evsw = evsw
conR.conS.SetEventSwitch(evsw) conR.conS.SetEventSwitch(evsw)
} }
@ -236,12 +235,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
// broadcasting the result to peers // broadcasting the result to peers
func (conR *ConsensusReactor) registerEventCallbacks() { func (conR *ConsensusReactor) registerEventCallbacks() {
conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) {
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) {
rs := data.(types.EventDataRoundState).RoundState.(*RoundState) rs := data.(types.EventDataRoundState).RoundState.(*RoundState)
conR.broadcastNewRoundStep(rs) conR.broadcastNewRoundStep(rs)
}) })
conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) {
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) {
edv := data.(types.EventDataVote) edv := data.(types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote, edv.Index) conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
}) })


+ 2
- 3
consensus/replay_test.go View File

@ -9,7 +9,6 @@ import (
"time" "time"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-events"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -170,7 +169,7 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) {
func TestReplayCrashBeforeWritePrevote(t *testing.T) { func TestReplayCrashBeforeWritePrevote(t *testing.T) {
cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote
cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) {
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) {
// Set LastSig // Set LastSig
var err error var err error
var msg ConsensusLogMessage var msg ConsensusLogMessage
@ -187,7 +186,7 @@ func TestReplayCrashBeforeWritePrevote(t *testing.T) {
func TestReplayCrashBeforeWritePrecommit(t *testing.T) { func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit
cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) {
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) {
// Set LastSig // Set LastSig
var err error var err error
var msg ConsensusLogMessage var msg ConsensusLogMessage


+ 19
- 20
consensus/state.go View File

@ -10,7 +10,6 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-events"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
@ -231,7 +230,7 @@ type ConsensusState struct {
tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine
timeoutParams *TimeoutParams // parameters and functions for timeout intervals timeoutParams *TimeoutParams // parameters and functions for timeout intervals
evsw *events.EventSwitch
evsw types.EventSwitch
wal *WAL wal *WAL
replayMode bool // so we don't log signing errors during replay replayMode bool // so we don't log signing errors during replay
@ -264,7 +263,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap
// Public interface // Public interface
// implements events.Eventable // implements events.Eventable
func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) {
func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) {
cs.evsw = evsw cs.evsw = evsw
} }
@ -545,7 +544,7 @@ func (cs *ConsensusState) newStep() {
cs.nSteps += 1 cs.nSteps += 1
// newStep is called by updateToStep in NewConsensusState before the evsw is set! // newStep is called by updateToStep in NewConsensusState before the evsw is set!
if cs.evsw != nil { if cs.evsw != nil {
cs.evsw.FireEvent(types.EventStringNewRoundStep(), rs)
types.FireEventNewRoundStep(cs.evsw, rs)
} }
} }
@ -719,13 +718,13 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
// XXX: should we fire timeout here? // XXX: should we fire timeout here?
cs.enterNewRound(ti.Height, 0) cs.enterNewRound(ti.Height, 0)
case RoundStepPropose: case RoundStepPropose:
cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent())
types.FireEventTimeoutPropose(cs.evsw, cs.RoundStateEvent())
cs.enterPrevote(ti.Height, ti.Round) cs.enterPrevote(ti.Height, ti.Round)
case RoundStepPrevoteWait: case RoundStepPrevoteWait:
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
types.FireEventTimeoutWait(cs.evsw, cs.RoundStateEvent())
cs.enterPrecommit(ti.Height, ti.Round) cs.enterPrecommit(ti.Height, ti.Round)
case RoundStepPrecommitWait: case RoundStepPrecommitWait:
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
types.FireEventTimeoutWait(cs.evsw, cs.RoundStateEvent())
cs.enterNewRound(ti.Height, ti.Round+1) cs.enterNewRound(ti.Height, ti.Round+1)
default: default:
panic(Fmt("Invalid timeout step: %v", ti.Step)) panic(Fmt("Invalid timeout step: %v", ti.Step))
@ -777,7 +776,7 @@ func (cs *ConsensusState) enterNewRound(height int, round int) {
} }
cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping
cs.evsw.FireEvent(types.EventStringNewRound(), cs.RoundStateEvent())
types.FireEventNewRound(cs.evsw, cs.RoundStateEvent())
// Immediately go to enterPropose. // Immediately go to enterPropose.
cs.enterPropose(height, round) cs.enterPropose(height, round)
@ -942,7 +941,7 @@ func (cs *ConsensusState) enterPrevote(height int, round int) {
// fire event for how we got here // fire event for how we got here
if cs.isProposalComplete() { if cs.isProposalComplete() {
cs.evsw.FireEvent(types.EventStringCompleteProposal(), cs.RoundStateEvent())
types.FireEventCompleteProposal(cs.evsw, cs.RoundStateEvent())
} else { } else {
// we received +2/3 prevotes for a future round // we received +2/3 prevotes for a future round
// TODO: catchup event? // TODO: catchup event?
@ -1047,7 +1046,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
} }
// At this point +2/3 prevoted for a particular block or nil // At this point +2/3 prevoted for a particular block or nil
cs.evsw.FireEvent(types.EventStringPolka(), cs.RoundStateEvent())
types.FireEventPolka(cs.evsw, cs.RoundStateEvent())
// the latest POLRound should be this round // the latest POLRound should be this round
if cs.Votes.POLRound() < round { if cs.Votes.POLRound() < round {
@ -1063,7 +1062,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
cs.LockedRound = 0 cs.LockedRound = 0
cs.LockedBlock = nil cs.LockedBlock = nil
cs.LockedBlockParts = nil cs.LockedBlockParts = nil
cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent())
types.FireEventUnlock(cs.evsw, cs.RoundStateEvent())
} }
cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{})
return return
@ -1075,7 +1074,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
if cs.LockedBlock.HashesTo(hash) { if cs.LockedBlock.HashesTo(hash) {
log.Notice("enterPrecommit: +2/3 prevoted locked block. Relocking") log.Notice("enterPrecommit: +2/3 prevoted locked block. Relocking")
cs.LockedRound = round cs.LockedRound = round
cs.evsw.FireEvent(types.EventStringRelock(), cs.RoundStateEvent())
types.FireEventRelock(cs.evsw, cs.RoundStateEvent())
cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader)
return return
} }
@ -1090,7 +1089,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
cs.LockedRound = round cs.LockedRound = round
cs.LockedBlock = cs.ProposalBlock cs.LockedBlock = cs.ProposalBlock
cs.LockedBlockParts = cs.ProposalBlockParts cs.LockedBlockParts = cs.ProposalBlockParts
cs.evsw.FireEvent(types.EventStringLock(), cs.RoundStateEvent())
types.FireEventLock(cs.evsw, cs.RoundStateEvent())
cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader)
return return
} }
@ -1106,7 +1105,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
cs.ProposalBlock = nil cs.ProposalBlock = nil
cs.ProposalBlockParts = types.NewPartSetFromHeader(partsHeader) cs.ProposalBlockParts = types.NewPartSetFromHeader(partsHeader)
} }
cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent())
types.FireEventUnlock(cs.evsw, cs.RoundStateEvent())
cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{})
return return
} }
@ -1226,14 +1225,14 @@ func (cs *ConsensusState) finalizeCommit(height int) {
// Fire off event for new block. // Fire off event for new block.
// TODO: Handle app failure. See #177 // TODO: Handle app failure. See #177
cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block})
cs.evsw.FireEvent(types.EventStringNewBlockHeader(), types.EventDataNewBlockHeader{block.Header})
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
// Create a copy of the state for staging // Create a copy of the state for staging
stateCopy := cs.state.Copy() stateCopy := cs.state.Copy()
// event cache for txs // event cache for txs
eventCache := events.NewEventCache(cs.evsw)
eventCache := types.NewEventCache(cs.evsw)
// Run the block on the State: // Run the block on the State:
// + update validator sets // + update validator sets
@ -1423,7 +1422,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
added, address, err = cs.LastCommit.AddByIndex(valIndex, vote) added, address, err = cs.LastCommit.AddByIndex(valIndex, vote)
if added { if added {
log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort()))
cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote})
types.FireEventVote(cs.evsw, types.EventDataVote{valIndex, address, vote})
} }
return return
@ -1434,7 +1433,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
height := cs.Height height := cs.Height
added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey) added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey)
if added { if added {
cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote})
types.FireEventVote(cs.evsw, types.EventDataVote{valIndex, address, vote})
switch vote.Type { switch vote.Type {
case types.VoteTypePrevote: case types.VoteTypePrevote:
@ -1452,7 +1451,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
cs.LockedRound = 0 cs.LockedRound = 0
cs.LockedBlock = nil cs.LockedBlock = nil
cs.LockedBlockParts = nil cs.LockedBlockParts = nil
cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent())
types.FireEventUnlock(cs.evsw, cs.RoundStateEvent())
} }
} }
if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() { if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() {


+ 1
- 2
consensus/state_test.go View File

@ -6,9 +6,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/tendermint/tendermint/config/tendermint_test"
//"github.com/tendermint/go-events"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )


+ 4
- 4
glide.lock View File

@ -64,17 +64,17 @@ imports:
- name: github.com/tendermint/go-db - name: github.com/tendermint/go-db
version: 31fdd21c7eaeed53e0ea7ca597fb1e960e2988a5 version: 31fdd21c7eaeed53e0ea7ca597fb1e960e2988a5
- name: github.com/tendermint/go-events - name: github.com/tendermint/go-events
version: 48fa21511b259278b871a37b6951da2d5bef698d
version: 1652dc8b3f7780079aa98c3ce20a83ee90b9758b
- name: github.com/tendermint/go-logger - name: github.com/tendermint/go-logger
version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2 version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2
- name: github.com/tendermint/go-merkle - name: github.com/tendermint/go-merkle
version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8 version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8
- name: github.com/tendermint/go-p2p - name: github.com/tendermint/go-p2p
version: f508f3f20b5bb36f03d3bc83647b7a92425139d1
version: 1eb390680d33299ba0e3334490eca587efd18414
subpackages: subpackages:
- upnp - upnp
- name: github.com/tendermint/go-rpc - name: github.com/tendermint/go-rpc
version: 479510be0e80dd9e5d6b1f941adad168df0af85f
version: 855255d73eecd25097288be70f3fb208a5817d80
subpackages: subpackages:
- client - client
- server - server
@ -86,7 +86,7 @@ imports:
subpackages: subpackages:
- term - term
- name: github.com/tendermint/tmsp - name: github.com/tendermint/tmsp
version: ead192adbbbf85ac581cf775b18ae70d59f86457
version: 5d3eb0328a615ba55b580ce871033e605aa8b97d
subpackages: subpackages:
- client - client
- example/counter - example/counter


+ 2
- 3
mempool/reactor.go View File

@ -9,7 +9,6 @@ import (
"github.com/tendermint/go-clist" "github.com/tendermint/go-clist"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-events"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -28,7 +27,7 @@ type MempoolReactor struct {
p2p.BaseReactor p2p.BaseReactor
config cfg.Config config cfg.Config
Mempool *Mempool Mempool *Mempool
evsw *events.EventSwitch
evsw types.EventSwitch
} }
func NewMempoolReactor(config cfg.Config, mempool *Mempool) *MempoolReactor { func NewMempoolReactor(config cfg.Config, mempool *Mempool) *MempoolReactor {
@ -143,7 +142,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
} }
// implements events.Eventable // implements events.Eventable
func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) {
memR.evsw = evsw memR.evsw = evsw
} }


+ 5
- 6
node/node.go View File

@ -12,7 +12,6 @@ import (
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db" dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-events"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-rpc" "github.com/tendermint/go-rpc"
"github.com/tendermint/go-rpc/server" "github.com/tendermint/go-rpc/server"
@ -32,7 +31,7 @@ import _ "net/http/pprof"
type Node struct { type Node struct {
config cfg.Config config cfg.Config
sw *p2p.Switch sw *p2p.Switch
evsw *events.EventSwitch
evsw types.EventSwitch
blockStore *bc.BlockStore blockStore *bc.BlockStore
bcReactor *bc.BlockchainReactor bcReactor *bc.BlockchainReactor
mempoolReactor *mempl.MempoolReactor mempoolReactor *mempl.MempoolReactor
@ -80,7 +79,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
privKey := crypto.GenPrivKeyEd25519() privKey := crypto.GenPrivKeyEd25519()
// Make event switch // Make event switch
eventSwitch := events.NewEventSwitch()
eventSwitch := types.NewEventSwitch()
_, err := eventSwitch.Start() _, err := eventSwitch.Start()
if err != nil { if err != nil {
Exit(Fmt("Failed to start switch: %v", err)) Exit(Fmt("Failed to start switch: %v", err))
@ -187,7 +186,7 @@ func (n *Node) Stop() {
} }
// Add the event switch to reactors, mempool, etc. // Add the event switch to reactors, mempool, etc.
func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
for _, e := range eventables { for _, e := range eventables {
e.SetEventSwitch(evsw) e.SetEventSwitch(evsw)
} }
@ -252,7 +251,7 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor {
return n.mempoolReactor return n.mempoolReactor
} }
func (n *Node) EventSwitch() *events.EventSwitch {
func (n *Node) EventSwitch() types.EventSwitch {
return n.evsw return n.evsw
} }
@ -401,7 +400,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
config.Set("chain_id", state.ChainID) config.Set("chain_id", state.ChainID)
// Make event switch // Make event switch
eventSwitch := events.NewEventSwitch()
eventSwitch := types.NewEventSwitch()
_, err := eventSwitch.Start() _, err := eventSwitch.Start()
if err != nil { if err != nil {
Exit(Fmt("Failed to start event switch: %v", err)) Exit(Fmt("Failed to start event switch: %v", err))


+ 2
- 3
rpc/core/events.go View File

@ -1,7 +1,6 @@
package core package core
import ( import (
"github.com/tendermint/go-events"
"github.com/tendermint/go-rpc/types" "github.com/tendermint/go-rpc/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -9,10 +8,10 @@ import (
func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) {
log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) {
types.AddListenerForEvent(wsCtx.GetEventSwitch(), wsCtx.GetRemoteAddr(), event, func(msg types.TMEventData) {
// NOTE: EventSwitch callbacks must be nonblocking // NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event" // NOTE: RPCResponses of subscribed events have id suffix "#event"
tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, types.TMEventData(msg)})
tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, msg})
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, "")) wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, ""))
}) })
return &ctypes.ResultSubscribe{}, nil return &ctypes.ResultSubscribe{}, nil


+ 6
- 8
rpc/core/mempool.go View File

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/tendermint/go-events"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmsp "github.com/tendermint/tmsp/types" tmsp "github.com/tendermint/tmsp/types"
@ -52,9 +51,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// subscribe to tx being committed in block // subscribe to tx being committed in block
appendTxResCh := make(chan *tmsp.Response, 1)
eventSwitch.AddListenerForEvent("rpc", types.EventStringTx(tx), func(data events.EventData) {
appendTxResCh <- data.(*tmsp.Response)
appendTxResCh := make(chan types.EventDataTx, 1)
types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) {
appendTxResCh <- data.(types.EventDataTx)
}) })
// broadcast the tx and register checktx callback // broadcast the tx and register checktx callback
@ -84,11 +83,10 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// The tx was included in a block. // The tx was included in a block.
// NOTE we don't return an error regardless of the AppendTx code; // NOTE we don't return an error regardless of the AppendTx code;
// clients must check this to see if they need to send a new tx! // clients must check this to see if they need to send a new tx!
r := appendTxRes.GetAppendTx()
return &ctypes.ResultBroadcastTx{ return &ctypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Code: appendTxRes.Code,
Data: appendTxRes.Result,
Log: appendTxRes.Log,
}, nil }, nil
case <-timer.C: case <-timer.C:
r := checkTxR r := checkTxR


+ 2
- 3
rpc/core/pipe.go View File

@ -4,7 +4,6 @@ import (
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-events"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
@ -12,7 +11,7 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
var eventSwitch *events.EventSwitch
var eventSwitch types.EventSwitch
var blockStore *bc.BlockStore var blockStore *bc.BlockStore
var consensusState *consensus.ConsensusState var consensusState *consensus.ConsensusState
var consensusReactor *consensus.ConsensusReactor var consensusReactor *consensus.ConsensusReactor
@ -28,7 +27,7 @@ func SetConfig(c cfg.Config) {
config = c config = c
} }
func SetEventSwitch(evsw *events.EventSwitch) {
func SetEventSwitch(evsw types.EventSwitch) {
eventSwitch = evsw eventSwitch = evsw
} }


+ 34
- 0
rpc/test/client_test.go View File

@ -257,6 +257,40 @@ func TestWSBlockchainGrowth(t *testing.T) {
} }
} }
func TestWSTxEvent(t *testing.T) {
wsc := newWSClient(t)
tx := randBytes()
// listen for the tx I am about to submit
eid := types.EventStringTx(types.Tx(tx))
subscribe(t, wsc, eid)
defer func() {
unsubscribe(t, wsc, eid)
wsc.Stop()
}()
// send an tx
tmResult := new(ctypes.TMResult)
_, err := clientJSON.Call("broadcast_tx_sync", []interface{}{tx}, tmResult)
if err != nil {
t.Fatal("Error submitting event")
}
waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error {
evt, ok := b.(types.EventDataTx)
if !ok {
t.Fatal("Got wrong event type", b)
}
if bytes.Compare([]byte(evt.Tx), tx) != 0 {
t.Error("Event returned different tx")
}
if evt.Code != tmsp.CodeType_OK {
t.Error("Event returned tx error code", evt.Code)
}
return nil
})
}
/* TODO: this with dummy app.. /* TODO: this with dummy app..
func TestWSDoubleFire(t *testing.T) { func TestWSDoubleFire(t *testing.T) {
if testing.Short() { if testing.Short() {


+ 5
- 0
scripts/glide/status.sh View File

@ -31,6 +31,11 @@ for lib in "${LIBS[@]}"; do
echo "Vendored: $VENDORED" echo "Vendored: $VENDORED"
echo "Master: $MASTER" echo "Master: $MASTER"
fi fi
elif [[ "$VENDORED" != "$HEAD" ]]; then
echo ""
echo "Vendored version of $lib matches origin/master but differs from HEAD"
echo "Vendored: $VENDORED"
echo "Head: $HEAD"
fi fi
done done


+ 1
- 1
scripts/glide/update.sh View File

@ -16,4 +16,4 @@ cd $GOPATH/src/github.com/tendermint/$LIB
NEW_COMMIT=$(git rev-parse HEAD) NEW_COMMIT=$(git rev-parse HEAD)
cd $PWD cd $PWD
sed -i "s/$OLD_COMMIT/$NEW_COMMIT/g" $GLIDE
sed -i "" "s/$OLD_COMMIT/$NEW_COMMIT/g" $GLIDE

+ 14
- 5
state/execution.go View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-events"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmsp "github.com/tendermint/tmsp/types" tmsp "github.com/tendermint/tmsp/types"
@ -18,7 +17,7 @@ func (s *State) ValidateBlock(block *types.Block) error {
// Execute the block to mutate State. // Execute the block to mutate State.
// Validates block and then executes Data.Txs in the block. // Validates block and then executes Data.Txs in the block.
func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error {
func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error {
// Validate the block. // Validate the block.
err := s.validateBlock(block) err := s.validateBlock(block)
@ -55,7 +54,7 @@ func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn
// Executes block's transactions on proxyAppConn. // Executes block's transactions on proxyAppConn.
// TODO: Generate a bitmap or otherwise store tx validity in state. // TODO: Generate a bitmap or otherwise store tx validity in state.
func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error {
func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error {
var validTxs, invalidTxs = 0, 0 var validTxs, invalidTxs = 0, 0
@ -67,15 +66,25 @@ func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn pro
// TODO: make use of this info // TODO: make use of this info
// Blocks may include invalid txs. // Blocks may include invalid txs.
// reqAppendTx := req.(tmsp.RequestAppendTx) // reqAppendTx := req.(tmsp.RequestAppendTx)
if r.AppendTx.Code == tmsp.CodeType_OK {
txError := ""
apTx := r.AppendTx
if apTx.Code == tmsp.CodeType_OK {
validTxs += 1 validTxs += 1
} else { } else {
log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log) log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log)
invalidTxs += 1 invalidTxs += 1
txError = apTx.Code.String()
} }
// NOTE: if we count we can access the tx from the block instead of // NOTE: if we count we can access the tx from the block instead of
// pulling it from the req // pulling it from the req
eventCache.FireEvent(types.EventStringTx(req.GetAppendTx().Tx), res)
event := types.EventDataTx{
Tx: req.GetAppendTx().Tx,
Result: apTx.Data,
Code: apTx.Code,
Log: apTx.Log,
Error: txError,
}
types.FireEventTx(eventCache, event)
} }
} }
proxyAppConn.SetResponseCallback(proxyCb) proxyAppConn.SetResponseCallback(proxyCb)


+ 103
- 5
types/events.go View File

@ -5,6 +5,7 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-events" "github.com/tendermint/go-events"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
tmsp "github.com/tendermint/tmsp/types"
) )
// Functions to generate eventId strings // Functions to generate eventId strings
@ -35,7 +36,7 @@ func EventStringVote() string { return "Vote" }
// implements events.EventData // implements events.EventData
type TMEventData interface { type TMEventData interface {
events.EventData events.EventData
// AssertIsTMEventData()
AssertIsTMEventData()
} }
const ( const (
@ -72,10 +73,11 @@ type EventDataNewBlockHeader struct {
// All txs fire EventDataTx // All txs fire EventDataTx
type EventDataTx struct { type EventDataTx struct {
Tx Tx `json:"tx"`
Result []byte `json:"result"`
Log string `json:"log"`
Error string `json:"error"`
Tx Tx `json:"tx"`
Result []byte `json:"result"`
Log string `json:"log"`
Code tmsp.CodeType `json:"code"`
Error string `json:"error"`
} }
// NOTE: This goes into the replay WAL // NOTE: This goes into the replay WAL
@ -99,3 +101,99 @@ func (_ EventDataNewBlockHeader) AssertIsTMEventData() {}
func (_ EventDataTx) AssertIsTMEventData() {} func (_ EventDataTx) AssertIsTMEventData() {}
func (_ EventDataRoundState) AssertIsTMEventData() {} func (_ EventDataRoundState) AssertIsTMEventData() {}
func (_ EventDataVote) AssertIsTMEventData() {} func (_ EventDataVote) AssertIsTMEventData() {}
//----------------------------------------
// Wrappers for type safety
type Fireable interface {
events.Fireable
}
type Eventable interface {
SetEventSwitch(EventSwitch)
}
type EventSwitch interface {
events.EventSwitch
}
type EventCache interface {
Fireable
Flush()
}
func NewEventSwitch() EventSwitch {
return events.NewEventSwitch()
}
func NewEventCache(evsw EventSwitch) EventCache {
return events.NewEventCache(evsw)
}
// All events should be based on this FireEvent to ensure they are TMEventData
func fireEvent(fireable events.Fireable, event string, data TMEventData) {
fireable.FireEvent(event, data)
}
func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEventData)) {
evsw.AddListenerForEvent(id, event, func(data events.EventData) {
cb(data.(TMEventData))
})
}
//--- block, tx, and vote events
func FireEventNewBlock(fireable events.Fireable, block EventDataNewBlock) {
fireEvent(fireable, EventStringNewBlock(), block)
}
func FireEventNewBlockHeader(fireable events.Fireable, header EventDataNewBlockHeader) {
fireEvent(fireable, EventStringNewBlockHeader(), header)
}
func FireEventVote(fireable events.Fireable, vote EventDataVote) {
fireEvent(fireable, EventStringVote(), vote)
}
func FireEventTx(fireable events.Fireable, tx EventDataTx) {
fireEvent(fireable, EventStringTx(tx.Tx), tx)
}
//--- EventDataRoundState events
func FireEventNewRoundStep(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringNewRoundStep(), rs)
}
func FireEventTimeoutPropose(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringTimeoutPropose(), rs)
}
func FireEventTimeoutWait(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringTimeoutWait(), rs)
}
func FireEventNewRound(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringNewRound(), rs)
}
func FireEventCompleteProposal(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringCompleteProposal(), rs)
}
func FireEventPolka(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringPolka(), rs)
}
func FireEventUnlock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringUnlock(), rs)
}
func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringRelock(), rs)
}
func FireEventLock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringLock(), rs)
}

Loading…
Cancel
Save