Browse Source

TMEventDataInner

pull/467/head
Ethan Buchman 8 years ago
parent
commit
07e59e63f9
12 changed files with 98 additions and 64 deletions
  1. +1
    -1
      consensus/common_test.go
  2. +1
    -1
      consensus/mempool_test.go
  3. +4
    -4
      consensus/reactor.go
  4. +3
    -3
      consensus/reactor_test.go
  5. +15
    -15
      consensus/state_test.go
  6. +2
    -2
      rpc/client/event_test.go
  7. +3
    -3
      rpc/client/helpers.go
  8. +1
    -1
      rpc/core/mempool.go
  9. +4
    -6
      rpc/lib/rpc_test.go
  10. +4
    -3
      rpc/test/client_test.go
  11. +2
    -0
      rpc/test/helpers.go
  12. +58
    -25
      types/events.go

+ 1
- 1
consensus/common_test.go View File

@ -206,7 +206,7 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
go func() {
for {
v := <-voteCh0
vote := v.(types.EventDataVote)
vote := v.(types.TMEventData).Unwrap().(types.EventDataVote)
// we only fire for our own votes
if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
voteCh <- v


+ 1
- 1
consensus/mempool_test.go View File

@ -44,7 +44,7 @@ func TestTxConcurrentWithCommit(t *testing.T) {
for nTxs := 0; nTxs < NTxs; {
select {
case b := <-newBlockCh:
nTxs += b.(types.EventDataNewBlock).Block.Header.NumTxs
nTxs += b.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block.Header.NumTxs
case <-ticker.C:
panic("Timed out waiting to commit blocks with transactions")
}


+ 4
- 4
consensus/reactor.go View File

@ -8,11 +8,11 @@ import (
"sync"
"time"
. "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
. "github.com/tendermint/tmlibs/common"
)
const (
@ -299,12 +299,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
func (conR *ConsensusReactor) registerEventCallbacks() {
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) {
rs := data.(types.EventDataRoundState).RoundState.(*RoundState)
rs := data.Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
conR.broadcastNewRoundStep(rs)
})
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) {
edv := data.(types.EventDataVote)
edv := data.Unwrap().(types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote)
})
}


+ 3
- 3
consensus/reactor_test.go View File

@ -8,10 +8,10 @@ import (
"github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tmlibs/events"
"github.com/tendermint/abci/example/dummy"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/abci/example/dummy"
"github.com/tendermint/tmlibs/events"
)
func init() {
@ -252,7 +252,7 @@ func TestReactorWithTimeoutCommit(t *testing.T) {
func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
newBlockI := <-eventChans[j]
newBlock := newBlockI.(types.EventDataNewBlock).Block
newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
log.Warn("Got block", "height", newBlock.Height, "validator", j)
err := validateBlock(newBlock, activeVals)
if err != nil {


+ 15
- 15
consensus/state_test.go View File

@ -6,9 +6,9 @@ import (
"testing"
"time"
. "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/types"
. "github.com/tendermint/tmlibs/common"
)
func init() {
@ -248,7 +248,7 @@ func TestFullRound1(t *testing.T) {
// grab proposal
re := <-propCh
propBlockHash := re.(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash()
propBlockHash := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash()
<-voteCh // wait for prevote
// NOTE: voteChan cap of 0 ensures we can complete this
@ -345,7 +345,7 @@ func TestLockNoPOL(t *testing.T) {
cs1.startRoutines(0)
re := <-proposalCh
rs := re.(types.EventDataRoundState).RoundState.(*RoundState)
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote
@ -385,7 +385,7 @@ func TestLockNoPOL(t *testing.T) {
// now we're on a new round and not the proposer, so wait for timeout
re = <-timeoutProposeCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
if rs.ProposalBlock != nil {
panic("Expected proposal block to be nil")
@ -429,7 +429,7 @@ func TestLockNoPOL(t *testing.T) {
incrementRound(vs2)
re = <-proposalCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
// now we're on a new round and are the proposer
if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) {
@ -518,7 +518,7 @@ func TestLockPOLRelock(t *testing.T) {
<-newRoundCh
re := <-proposalCh
rs := re.(types.EventDataRoundState).RoundState.(*RoundState)
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote
@ -589,9 +589,9 @@ func TestLockPOLRelock(t *testing.T) {
_, _ = <-voteCh, <-voteCh
be := <-newBlockCh
b := be.(types.EventDataNewBlockHeader)
b := be.(types.TMEventData).Unwrap().(types.EventDataNewBlockHeader)
re = <-newRoundCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
if rs.Height != 2 {
panic("Expected height to increment")
}
@ -627,7 +627,7 @@ func TestLockPOLUnlock(t *testing.T) {
startTestRound(cs1, cs1.Height, 0)
<-newRoundCh
re := <-proposalCh
rs := re.(types.EventDataRoundState).RoundState.(*RoundState)
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote
@ -653,7 +653,7 @@ func TestLockPOLUnlock(t *testing.T) {
// timeout to new round
re = <-timeoutWaitCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
lockedBlockHash := rs.LockedBlock.Hash()
//XXX: this isnt gauranteed to get there before the timeoutPropose ...
@ -713,7 +713,7 @@ func TestLockPOLSafety1(t *testing.T) {
startTestRound(cs1, cs1.Height, 0)
<-newRoundCh
re := <-proposalCh
rs := re.(types.EventDataRoundState).RoundState.(*RoundState)
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
propBlock := rs.ProposalBlock
<-voteCh // prevote
@ -761,7 +761,7 @@ func TestLockPOLSafety1(t *testing.T) {
re = <-proposalCh
}
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
if rs.LockedBlock != nil {
panic("we should not be locked!")
@ -1009,7 +1009,7 @@ func TestHalt1(t *testing.T) {
startTestRound(cs1, cs1.Height, 0)
<-newRoundCh
re := <-proposalCh
rs := re.(types.EventDataRoundState).RoundState.(*RoundState)
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
propBlock := rs.ProposalBlock
propBlockParts := propBlock.MakePartSet(partSize)
@ -1032,7 +1032,7 @@ func TestHalt1(t *testing.T) {
// timeout to new round
<-timeoutWaitCh
re = <-newRoundCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
log.Notice("### ONTO ROUND 1")
/*Round2
@ -1050,7 +1050,7 @@ func TestHalt1(t *testing.T) {
// receiving that precommit should take us straight to commit
<-newBlockCh
re = <-newRoundCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
if rs.Height != 2 {
panic("expected height to increment")


+ 2
- 2
rpc/client/event_test.go View File

@ -25,7 +25,7 @@ func TestHeaderEvents(t *testing.T) {
evtTyp := types.EventStringNewBlockHeader()
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
require.Nil(err, "%d: %+v", i, err)
_, ok := evt.(types.EventDataNewBlockHeader)
_, ok := evt.Unwrap().(types.EventDataNewBlockHeader)
require.True(ok, "%d: %#v", i, evt)
// TODO: more checks...
}
@ -56,7 +56,7 @@ func TestTxEvents(t *testing.T) {
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info
txe, ok := evt.(types.EventDataTx)
txe, ok := evt.Unwrap().(types.EventDataTx)
require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx
require.EqualValues(tx, txe.Tx)


+ 3
- 3
rpc/client/helpers.go View File

@ -4,9 +4,9 @@ import (
"time"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
events "github.com/tendermint/tmlibs/events"
"github.com/tendermint/tendermint/types"
)
// Waiter is informed of current height, decided whether to quit early
@ -77,12 +77,12 @@ func WaitForOneEvent(evsw types.EventSwitch,
select {
case <-quit:
return nil, errors.New("timed out waiting for event")
return types.TMEventData{}, errors.New("timed out waiting for event")
case evt := <-evts:
tmevt, ok := evt.(types.TMEventData)
if ok {
return tmevt, nil
}
return nil, errors.Errorf("Got unexpected event type: %#v", evt)
return types.TMEventData{}, errors.Errorf("Got unexpected event type: %#v", evt)
}
}

+ 1
- 1
rpc/core/mempool.go View File

@ -50,7 +50,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block
deliverTxResCh := make(chan types.EventDataTx, 1)
types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) {
deliverTxResCh <- data.(types.EventDataTx)
deliverTxResCh <- data.Unwrap().(types.EventDataTx)
})
// broadcast the tx and register checktx callback


+ 4
- 6
rpc/lib/rpc_test.go View File

@ -208,12 +208,10 @@ func testWithHTTPClient(t *testing.T, cl client.HTTPClient) {
require.Nil(t, err)
assert.Equal(t, got3, val3)
/*
val4 := rand.Intn(10000)
got4, err := echoIntViaHTTP(cl, val4)
require.Nil(t, err)
assert.Equal(t, got4, val4)
*/
val4 := rand.Intn(10000)
got4, err := echoIntViaHTTP(cl, val4)
require.Nil(t, err)
assert.Equal(t, got4, val4)
}
func echoViaWS(cl *client.WSClient, val string) (string, error) {


+ 4
- 3
rpc/test/client_test.go View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-wire/data"
"github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpc "github.com/tendermint/tendermint/rpc/lib/client"
@ -116,7 +117,7 @@ func testABCIQuery(t *testing.T, client rpc.HTTPClient) {
time.Sleep(time.Millisecond * 500)
tmResult := new(ctypes.TMResult)
_, err := client.Call("abci_query",
map[string]interface{}{"path": "", "data": k, "prove": false}, tmResult)
map[string]interface{}{"path": "", "data": data.Bytes(k), "prove": false}, tmResult)
require.Nil(t, err)
resQuery := tmResult.Unwrap().(*ctypes.ResultABCIQuery)
@ -286,7 +287,7 @@ func TestWSBlockchainGrowth(t *testing.T) {
var initBlockN int
for i := 0; i < 3; i++ {
waitForEvent(t, wsc, eid, true, func() {}, func(eid string, eventData interface{}) error {
block := eventData.(types.EventDataNewBlock).Block
block := eventData.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
if i == 0 {
initBlockN = block.Header.Height
} else {
@ -320,7 +321,7 @@ func TestWSTxEvent(t *testing.T) {
require.Nil(err)
waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error {
evt, ok := b.(types.EventDataTx)
evt, ok := b.(types.TMEventData).Unwrap().(types.EventDataTx)
require.True(ok, "Got wrong event type: %#v", b)
require.Equal(tx, []byte(evt.Tx), "Returned different tx")
require.Equal(abci.CodeType_OK, evt.Code)


+ 2
- 0
rpc/test/helpers.go View File

@ -131,9 +131,11 @@ func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeo
for {
select {
case r := <-wsc.ResultsCh:
fmt.Println("GOT IT", string(r))
result := new(ctypes.TMResult)
err = json.Unmarshal(r, result)
if err != nil {
fmt.Println("POOP", err)
errCh <- err
break LOOP
}


+ 58
- 25
types/events.go View File

@ -3,7 +3,6 @@ package types
import (
// for registering TMEventData as events.EventData
abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-wire"
"github.com/tendermint/go-wire/data"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/events"
@ -34,10 +33,47 @@ func EventStringVote() string { return "Vote" }
//----------------------------------------
var (
EventDataNameNewBlock = "new_block"
EventDataNameNewBlockHeader = "new_block_header"
EventDataNameTx = "tx"
EventDataNameRoundState = "round_state"
EventDataNameVote = "vote"
)
//----------------------------------------
// implements events.EventData
type TMEventData interface {
type TMEventDataInner interface {
events.EventData
AssertIsTMEventData()
}
type TMEventData struct {
TMEventDataInner `json:"unwrap"`
}
func (tmr TMEventData) MarshalJSON() ([]byte, error) {
return tmEventDataMapper.ToJSON(tmr.TMEventDataInner)
}
func (tmr *TMEventData) UnmarshalJSON(data []byte) (err error) {
parsed, err := tmEventDataMapper.FromJSON(data)
if err == nil && parsed != nil {
tmr.TMEventDataInner = parsed.(TMEventDataInner)
}
return
}
func (tmr TMEventData) Unwrap() TMEventDataInner {
tmrI := tmr.TMEventDataInner
for wrap, ok := tmrI.(TMEventData); ok; wrap, ok = tmrI.(TMEventData) {
tmrI = wrap.TMEventDataInner
}
return tmrI
}
func (tmr TMEventData) Empty() bool {
return tmr.TMEventDataInner == nil
}
const (
@ -50,15 +86,12 @@ const (
EventDataTypeVote = byte(0x12)
)
var _ = wire.RegisterInterface(
struct{ TMEventData }{},
wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock},
wire.ConcreteType{EventDataNewBlockHeader{}, EventDataTypeNewBlockHeader},
// wire.ConcreteType{EventDataFork{}, EventDataTypeFork },
wire.ConcreteType{EventDataTx{}, EventDataTypeTx},
wire.ConcreteType{EventDataRoundState{}, EventDataTypeRoundState},
wire.ConcreteType{EventDataVote{}, EventDataTypeVote},
)
var tmEventDataMapper = data.NewMapper(TMEventData{}).
RegisterImplementation(EventDataNewBlock{}, EventDataNameNewBlock, EventDataTypeNewBlock).
RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader).
RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx).
RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState).
RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote)
// Most event messages are basic types (a block, a transaction)
// but some (an input to a call tx or a receive) are more exotic
@ -147,55 +180,55 @@ func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEven
//--- block, tx, and vote events
func FireEventNewBlock(fireable events.Fireable, block EventDataNewBlock) {
fireEvent(fireable, EventStringNewBlock(), block)
fireEvent(fireable, EventStringNewBlock(), TMEventData{block})
}
func FireEventNewBlockHeader(fireable events.Fireable, header EventDataNewBlockHeader) {
fireEvent(fireable, EventStringNewBlockHeader(), header)
fireEvent(fireable, EventStringNewBlockHeader(), TMEventData{header})
}
func FireEventVote(fireable events.Fireable, vote EventDataVote) {
fireEvent(fireable, EventStringVote(), vote)
fireEvent(fireable, EventStringVote(), TMEventData{vote})
}
func FireEventTx(fireable events.Fireable, tx EventDataTx) {
fireEvent(fireable, EventStringTx(tx.Tx), tx)
fireEvent(fireable, EventStringTx(tx.Tx), TMEventData{tx})
}
//--- EventDataRoundState events
func FireEventNewRoundStep(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringNewRoundStep(), rs)
fireEvent(fireable, EventStringNewRoundStep(), TMEventData{rs})
}
func FireEventTimeoutPropose(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringTimeoutPropose(), rs)
fireEvent(fireable, EventStringTimeoutPropose(), TMEventData{rs})
}
func FireEventTimeoutWait(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringTimeoutWait(), rs)
fireEvent(fireable, EventStringTimeoutWait(), TMEventData{rs})
}
func FireEventNewRound(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringNewRound(), rs)
fireEvent(fireable, EventStringNewRound(), TMEventData{rs})
}
func FireEventCompleteProposal(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringCompleteProposal(), rs)
fireEvent(fireable, EventStringCompleteProposal(), TMEventData{rs})
}
func FireEventPolka(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringPolka(), rs)
fireEvent(fireable, EventStringPolka(), TMEventData{rs})
}
func FireEventUnlock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringUnlock(), rs)
fireEvent(fireable, EventStringUnlock(), TMEventData{rs})
}
func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringRelock(), rs)
fireEvent(fireable, EventStringRelock(), TMEventData{rs})
}
func FireEventLock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringLock(), rs)
fireEvent(fireable, EventStringLock(), TMEventData{rs})
}

Loading…
Cancel
Save