Browse Source

Merge

pull/1347/head
Jae Kwon 6 years ago
parent
commit
e4492afbad
24 changed files with 97 additions and 120 deletions
  1. +2
    -5
      Gopkg.lock
  2. +10
    -5
      benchmarks/codec_test.go
  3. +1
    -1
      blockchain/store.go
  4. +3
    -5
      cmd/tendermint/commands/show_validator.go
  5. +12
    -0
      cmd/tendermint/commands/wire.go
  6. +1
    -1
      consensus/common_test.go
  7. +1
    -1
      consensus/mempool_test.go
  8. +3
    -3
      consensus/reactor.go
  9. +3
    -3
      consensus/reactor_test.go
  10. +1
    -1
      consensus/state.go
  11. +14
    -14
      consensus/state_test.go
  12. +1
    -0
      consensus/wal_generator.go
  13. +1
    -1
      consensus/wal_test.go
  14. +1
    -1
      lite/proxy/wrapper.go
  15. +7
    -7
      node/node.go
  16. +4
    -4
      rpc/client/event_test.go
  17. +2
    -2
      rpc/client/helpers.go
  18. +1
    -1
      rpc/core/events.go
  19. +1
    -1
      rpc/core/mempool.go
  20. +2
    -2
      rpc/test/helpers.go
  21. +1
    -1
      state/txindex/indexer_service.go
  22. +14
    -14
      types/event_bus.go
  23. +1
    -1
      types/event_bus_test.go
  24. +10
    -46
      types/events.go

+ 2
- 5
Gopkg.lock View File

@ -265,10 +265,7 @@
[[projects]]
name = "github.com/tendermint/go-wire"
packages = [
".",
"data"
]
packages = ["."]
revision = "fa721242b042ecd4c6ed1a934ee740db4f74e45c"
version = "v0.7.3"
@ -386,6 +383,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "8cd32c0a5faec4d8cbca3d9ea5d97b0e4e90bfc2a9f0e6b71a4846cb6030be94"
inputs-digest = "0dacd2eb1550ca01e0c64f77b721eda1a381dde1d246a56bfe5a2746b78b7bad"
solver-name = "gps-cdcl"
solver-version = 1

+ 10
- 5
benchmarks/codec_test.go View File

@ -5,7 +5,6 @@ import (
"time"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-wire"
proto "github.com/tendermint/tendermint/benchmarks/proto"
"github.com/tendermint/tendermint/p2p"
@ -33,7 +32,10 @@ func BenchmarkEncodeStatusWire(b *testing.B) {
counter := 0
for i := 0; i < b.N; i++ {
jsonBytes := wire.JSONBytes(status)
jsonBytes, err := cdc.MarshalJSON(status)
if err != nil {
panic(err)
}
counter += len(jsonBytes)
}
@ -54,7 +56,10 @@ func BenchmarkEncodeNodeInfoWire(b *testing.B) {
counter := 0
for i := 0; i < b.N; i++ {
jsonBytes := wire.JSONBytes(nodeInfo)
jsonBytes, err := cdc.MarshalJSON(nodeInfo)
if err != nil {
panic(err)
}
counter += len(jsonBytes)
}
}
@ -74,7 +79,7 @@ func BenchmarkEncodeNodeInfoBinary(b *testing.B) {
counter := 0
for i := 0; i < b.N; i++ {
jsonBytes := wire.BinaryBytes(nodeInfo)
jsonBytes := cdc.MustMarshalBinaryBare(nodeInfo)
counter += len(jsonBytes)
}
@ -82,7 +87,7 @@ func BenchmarkEncodeNodeInfoBinary(b *testing.B) {
func BenchmarkEncodeNodeInfoProto(b *testing.B) {
b.StopTimer()
pubKey := crypto.GenPrivKeyEd25519().PubKey().Unwrap().(crypto.PubKeyEd25519)
pubKey := crypto.GenPrivKeyEd25519().PubKey().(crypto.PubKeyEd25519)
pubKey2 := &proto.PubKey{Ed25519: &proto.PubKeyEd25519{Bytes: pubKey[:]}}
nodeInfo := proto.NodeInfo{
PubKey: pubKey2,


+ 1
- 1
blockchain/store.go View File

@ -69,7 +69,7 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block {
part := bs.LoadBlockPart(height, i)
buf = append(buf, part.Bytes...)
}
err = cdc.UnmarshalBinaryBare(buf, block)
err = cdc.UnmarshalBinary(buf, block)
if err != nil {
// NOTE: The existence of meta should imply the existence of the
// block. So, make sure meta is only saved after blocks are saved.


+ 3
- 5
cmd/tendermint/commands/show_validator.go View File

@ -2,11 +2,9 @@ package commands
import (
"fmt"
"github.com/spf13/cobra"
"github.com/tendermint/go-wire/data"
"github.com/tendermint/tendermint/types"
privval "github.com/tendermint/tendermint/types/priv_validator"
)
// ShowValidatorCmd adds capabilities for showing the validator info.
@ -17,7 +15,7 @@ var ShowValidatorCmd = &cobra.Command{
}
func showValidator(cmd *cobra.Command, args []string) {
privValidator := types.LoadOrGenPrivValidatorFS(config.PrivValidatorFile())
pubKeyJSONBytes, _ := data.ToJSON(privValidator.PubKey)
privValidator := privval.LoadOrGenFilePV(config.PrivValidatorFile())
pubKeyJSONBytes, _ := cdc.MarshalJSON(privValidator.GetPubKey())
fmt.Println(string(pubKeyJSONBytes))
}

+ 12
- 0
cmd/tendermint/commands/wire.go View File

@ -0,0 +1,12 @@
package commands
import (
"github.com/tendermint/go-amino"
"github.com/tendermint/go-crypto"
)
var cdc = amino.NewCodec()
func init() {
crypto.RegisterAmino(cdc)
}

+ 1
- 1
consensus/common_test.go View File

@ -223,7 +223,7 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
voteCh := make(chan interface{})
go func() {
for v := range voteCh0 {
vote := v.(types.TMEventData).Unwrap().(types.EventDataVote)
vote := v.(types.EventDataVote)
// we only fire for our own votes
if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
voteCh <- v


+ 1
- 1
consensus/mempool_test.go View File

@ -108,7 +108,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
ticker := time.NewTicker(time.Second * 30)
select {
case b := <-newBlockCh:
evt := b.(types.TMEventData).Unwrap().(types.EventDataNewBlock)
evt := b.(types.EventDataNewBlock)
nTxs += int(evt.Block.Header.NumTxs)
case <-ticker.C:
panic("Timed out waiting to commit blocks with transactions")


+ 3
- 3
consensus/reactor.go View File

@ -372,17 +372,17 @@ func (conR *ConsensusReactor) startBroadcastRoutine() error {
select {
case data, ok := <-stepsCh:
if ok { // a receive from a closed channel returns the zero value immediately
edrs := data.(types.TMEventData).Unwrap().(types.EventDataRoundState)
edrs := data.(types.EventDataRoundState)
conR.broadcastNewRoundStep(edrs.RoundState.(*cstypes.RoundState))
}
case data, ok := <-votesCh:
if ok {
edv := data.(types.TMEventData).Unwrap().(types.EventDataVote)
edv := data.(types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote)
}
case data, ok := <-heartbeatsCh:
if ok {
edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat)
edph := data.(types.EventDataProposalHeartbeat)
conR.broadcastProposalHeartbeatMessage(edph)
}
case <-conR.Quit():


+ 3
- 3
consensus/reactor_test.go View File

@ -301,7 +301,7 @@ func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}
if !ok {
return
}
newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
newBlock := newBlockI.(types.EventDataNewBlock).Block
css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
@ -322,7 +322,7 @@ func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]st
if !ok {
return
}
newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
newBlock := newBlockI.(types.EventDataNewBlock).Block
css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
@ -354,7 +354,7 @@ func waitForBlockWithUpdatedValsAndValidateIt(t *testing.T, n int, updatedVals m
if !ok {
return
}
newBlock = newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
newBlock = newBlockI.(types.EventDataNewBlock).Block
if newBlock.LastCommit.Size() == len(updatedVals) {
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height)
break LOOP


+ 1
- 1
consensus/state.go View File

@ -778,7 +778,7 @@ func (cs *ConsensusState) enterPropose(height int64, round int) {
// if not a validator, we're done
if !cs.Validators.HasAddress(cs.privValidator.GetAddress()) {
cs.Logger.Debug("This node is not a validator")
cs.Logger.Debug("This node is not a validator 2", cs.privValidator.GetAddress(), cs.Validators)
return
}
cs.Logger.Debug("This node is a validator")


+ 14
- 14
consensus/state_test.go View File

@ -261,7 +261,7 @@ func TestStateFullRound1(t *testing.T) {
// grab proposal
re := <-propCh
propBlockHash := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState).ProposalBlock.Hash()
propBlockHash := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState).ProposalBlock.Hash()
<-voteCh // wait for prevote
validatePrevote(t, cs, round, vss[0], propBlockHash)
@ -356,7 +356,7 @@ func TestStateLockNoPOL(t *testing.T) {
cs1.startRoutines(0)
re := <-proposalCh
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote
@ -396,7 +396,7 @@ func TestStateLockNoPOL(t *testing.T) {
// now we're on a new round and not the proposer, so wait for timeout
re = <-timeoutProposeCh
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
if rs.ProposalBlock != nil {
panic("Expected proposal block to be nil")
@ -440,7 +440,7 @@ func TestStateLockNoPOL(t *testing.T) {
incrementRound(vs2)
re = <-proposalCh
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
// now we're on a new round and are the proposer
if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) {
@ -529,7 +529,7 @@ func TestStateLockPOLRelock(t *testing.T) {
<-newRoundCh
re := <-proposalCh
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote
@ -605,9 +605,9 @@ func TestStateLockPOLRelock(t *testing.T) {
discardFromChan(voteCh, 2)
be := <-newBlockCh
b := be.(types.TMEventData).Unwrap().(types.EventDataNewBlockHeader)
b := be.(types.EventDataNewBlockHeader)
re = <-newRoundCh
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
if rs.Height != 2 {
panic("Expected height to increment")
}
@ -643,7 +643,7 @@ func TestStateLockPOLUnlock(t *testing.T) {
startTestRound(cs1, cs1.Height, 0)
<-newRoundCh
re := <-proposalCh
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote
@ -669,7 +669,7 @@ func TestStateLockPOLUnlock(t *testing.T) {
// timeout to new round
re = <-timeoutWaitCh
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
lockedBlockHash := rs.LockedBlock.Hash()
//XXX: this isnt guaranteed to get there before the timeoutPropose ...
@ -731,7 +731,7 @@ func TestStateLockPOLSafety1(t *testing.T) {
startTestRound(cs1, cs1.Height, 0)
<-newRoundCh
re := <-proposalCh
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
propBlock := rs.ProposalBlock
<-voteCh // prevote
@ -781,7 +781,7 @@ func TestStateLockPOLSafety1(t *testing.T) {
re = <-proposalCh
}
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
if rs.LockedBlock != nil {
panic("we should not be locked!")
@ -1033,7 +1033,7 @@ func TestStateHalt1(t *testing.T) {
startTestRound(cs1, cs1.Height, 0)
<-newRoundCh
re := <-proposalCh
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
propBlock := rs.ProposalBlock
propBlockParts := propBlock.MakePartSet(partSize)
@ -1056,7 +1056,7 @@ func TestStateHalt1(t *testing.T) {
// timeout to new round
<-timeoutWaitCh
re = <-newRoundCh
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
t.Log("### ONTO ROUND 1")
/*Round2
@ -1074,7 +1074,7 @@ func TestStateHalt1(t *testing.T) {
// receiving that precommit should take us straight to commit
<-newBlockCh
re = <-newRoundCh
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState)
if rs.Height != 2 {
panic("expected height to increment")


+ 1
- 0
consensus/wal_generator.go View File

@ -72,6 +72,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
fmt.Println(">>privval", privValidator)
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
}


+ 1
- 1
consensus/wal_test.go View File

@ -35,7 +35,7 @@ func TestWALEncoderDecoder(t *testing.T) {
decoded, err := dec.Decode()
require.NoError(t, err)
assert.True(t, msg.Time.Equal(decoded.Time))
assert.Equal(t, msg.Time.UTC(), decoded.Time)
assert.Equal(t, msg.Msg, decoded.Msg)
}
}


+ 1
- 1
lite/proxy/wrapper.go View File

@ -146,7 +146,7 @@ func (w Wrapper) Commit(height *int64) (*ctypes.ResultCommit, error) {
// }
// // check to validate it if possible, and drop if not valid
// switch t := tm.Unwrap().(type) {
// switch t := tm.(type) {
// case types.EventDataNewBlockHeader:
// err := verifyHeader(s.client, t.Header)
// if err != nil {


+ 7
- 7
node/node.go View File

@ -10,8 +10,8 @@ import (
"strings"
abci "github.com/tendermint/abci/types"
amino "github.com/tendermint/go-amino"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
@ -34,7 +34,7 @@ import (
"github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types"
priv_val "github.com/tendermint/tendermint/types/priv_validator"
privval "github.com/tendermint/tendermint/types/priv_validator"
"github.com/tendermint/tendermint/version"
_ "net/http/pprof"
@ -79,7 +79,7 @@ type NodeProvider func(*cfg.Config, log.Logger) (*Node, error)
// It implements NodeProvider.
func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
return NewNode(config,
types.LoadOrGenPrivValidatorFS(config.PrivValidatorFile()),
privval.LoadOrGenFilePV(config.PrivValidatorFile()),
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
DefaultGenesisDocProviderFunc(config),
DefaultDBProvider,
@ -180,8 +180,8 @@ func NewNode(config *cfg.Config,
// TODO: persist this key so external signer
// can actually authenticate us
privKey = crypto.GenPrivKeyEd25519()
pvsc = priv_val.NewSocketClient(
logger.With("module", "priv_val"),
pvsc = privval.NewSocketPV(
logger.With("module", "privval"),
config.PrivValidatorListenAddr,
privKey,
)
@ -445,7 +445,7 @@ func (n *Node) OnStop() {
n.eventBus.Stop()
n.indexerService.Stop()
if pvsc, ok := n.privValidator.(*priv_val.SocketClient); ok {
if pvsc, ok := n.privValidator.(*privval.SocketPV); ok {
if err := pvsc.Stop(); err != nil {
n.Logger.Error("Error stopping priv validator socket client", "err", err)
}
@ -591,7 +591,7 @@ func (n *Node) makeNodeInfo(pubKey crypto.PubKey) p2p.NodeInfo {
},
Moniker: n.config.Moniker,
Other: []string{
cmn.Fmt("wire_version=%v", wire.Version),
cmn.Fmt("amino_version=%v", amino.Version),
cmn.Fmt("p2p_version=%v", p2p.Version),
cmn.Fmt("consensus_version=%v", cs.Version),
cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),


+ 4
- 4
rpc/client/event_test.go View File

@ -36,7 +36,7 @@ func TestHeaderEvents(t *testing.T) {
evtTyp := types.EventNewBlockHeader
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err)
_, ok := evt.Unwrap().(types.EventDataNewBlockHeader)
_, ok := evt.(types.EventDataNewBlockHeader)
require.True(ok, "%d: %#v", i, evt)
// TODO: more checks...
}
@ -59,7 +59,7 @@ func TestBlockEvents(t *testing.T) {
evtTyp := types.EventNewBlock
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", j, err)
blockEvent, ok := evt.Unwrap().(types.EventDataNewBlock)
blockEvent, ok := evt.(types.EventDataNewBlock)
require.True(ok, "%d: %#v", j, evt)
block := blockEvent.Block
@ -97,7 +97,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) {
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info
txe, ok := evt.Unwrap().(types.EventDataTx)
txe, ok := evt.(types.EventDataTx)
require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx
require.EqualValues(tx, txe.Tx)
@ -129,7 +129,7 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) {
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info
txe, ok := evt.Unwrap().(types.EventDataTx)
txe, ok := evt.(types.EventDataTx)
require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx
require.EqualValues(tx, txe.Tx)


+ 2
- 2
rpc/client/helpers.go View File

@ -65,7 +65,7 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
query := types.QueryForEvent(evtTyp)
err := c.Subscribe(ctx, subscriber, query, evts)
if err != nil {
return types.TMEventData{}, errors.Wrap(err, "failed to subscribe")
return nil, errors.Wrap(err, "failed to subscribe")
}
// make sure to unregister after the test is over
@ -75,6 +75,6 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
case evt := <-evts:
return evt.(types.TMEventData), nil
case <-ctx.Done():
return types.TMEventData{}, errors.New("timed out waiting for event")
return nil, errors.New("timed out waiting for event")
}
}

+ 1
- 1
rpc/core/events.go View File

@ -61,7 +61,7 @@ import (
//
// go func() {
// for e := range txs {
// fmt.Println("got ", e.(types.TMEventData).Unwrap().(types.EventDataTx))
// fmt.Println("got ", e.(types.EventDataTx))
// }
// }()
// ```


+ 1
- 1
rpc/core/mempool.go View File

@ -189,7 +189,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
timer := time.NewTimer(60 * 2 * time.Second)
select {
case deliverTxResMsg := <-deliverTxResCh:
deliverTxRes := deliverTxResMsg.(types.TMEventData).Unwrap().(types.EventDataTx)
deliverTxRes := deliverTxResMsg.(types.EventDataTx)
// The tx was included in a block.
deliverTxR := deliverTxRes.Result
logger.Info("DeliverTx passed ", "tx", cmn.HexBytes(tx), "response", deliverTxR)


+ 2
- 2
rpc/test/helpers.go View File

@ -18,7 +18,7 @@ import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
core_grpc "github.com/tendermint/tendermint/rpc/grpc"
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types"
privval "github.com/tendermint/tendermint/types/priv_validator"
)
var globalConfig *cfg.Config
@ -113,7 +113,7 @@ func NewTendermint(app abci.Application) *nm.Node {
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
logger = log.NewFilter(logger, log.AllowError())
privValidatorFile := config.PrivValidatorFile()
privValidator := types.LoadOrGenPrivValidatorFS(privValidatorFile)
privValidator := privval.LoadOrGenFilePV(privValidatorFile)
papp := proxy.NewLocalClientCreator(app)
node, err := nm.NewNode(config, privValidator, papp,
nm.DefaultGenesisDocProviderFunc(config),


+ 1
- 1
state/txindex/indexer_service.go View File

@ -34,7 +34,7 @@ func (is *IndexerService) OnStart() error {
go func() {
for event := range ch {
// TODO: may be not perfomant to write one event at a time
txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult
txResult := event.(types.EventDataTx).TxResult
is.idr.Index(&txResult)
}
}()


+ 14
- 14
types/event_bus.go View File

@ -74,15 +74,15 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
//--- block, tx, and vote events
func (b *EventBus) PublishEventNewBlock(event EventDataNewBlock) error {
return b.Publish(EventNewBlock, TMEventData{event})
return b.Publish(EventNewBlock, event)
}
func (b *EventBus) PublishEventNewBlockHeader(event EventDataNewBlockHeader) error {
return b.Publish(EventNewBlockHeader, TMEventData{event})
return b.Publish(EventNewBlockHeader, event)
}
func (b *EventBus) PublishEventVote(event EventDataVote) error {
return b.Publish(EventVote, TMEventData{event})
return b.Publish(EventVote, event)
}
// PublishEventTx publishes tx event with tags from Result. Note it will add
@ -114,50 +114,50 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error {
logIfTagExists(TxHeightKey, tags, b.Logger)
tags[TxHeightKey] = event.Height
b.pubsub.PublishWithTags(ctx, TMEventData{event}, tags)
b.pubsub.PublishWithTags(ctx, event, tags)
return nil
}
func (b *EventBus) PublishEventProposalHeartbeat(event EventDataProposalHeartbeat) error {
return b.Publish(EventProposalHeartbeat, TMEventData{event})
return b.Publish(EventProposalHeartbeat, event)
}
//--- EventDataRoundState events
func (b *EventBus) PublishEventNewRoundStep(event EventDataRoundState) error {
return b.Publish(EventNewRoundStep, TMEventData{event})
return b.Publish(EventNewRoundStep, event)
}
func (b *EventBus) PublishEventTimeoutPropose(event EventDataRoundState) error {
return b.Publish(EventTimeoutPropose, TMEventData{event})
return b.Publish(EventTimeoutPropose, event)
}
func (b *EventBus) PublishEventTimeoutWait(event EventDataRoundState) error {
return b.Publish(EventTimeoutWait, TMEventData{event})
return b.Publish(EventTimeoutWait, event)
}
func (b *EventBus) PublishEventNewRound(event EventDataRoundState) error {
return b.Publish(EventNewRound, TMEventData{event})
return b.Publish(EventNewRound, event)
}
func (b *EventBus) PublishEventCompleteProposal(event EventDataRoundState) error {
return b.Publish(EventCompleteProposal, TMEventData{event})
return b.Publish(EventCompleteProposal, event)
}
func (b *EventBus) PublishEventPolka(event EventDataRoundState) error {
return b.Publish(EventPolka, TMEventData{event})
return b.Publish(EventPolka, event)
}
func (b *EventBus) PublishEventUnlock(event EventDataRoundState) error {
return b.Publish(EventUnlock, TMEventData{event})
return b.Publish(EventUnlock, event)
}
func (b *EventBus) PublishEventRelock(event EventDataRoundState) error {
return b.Publish(EventRelock, TMEventData{event})
return b.Publish(EventRelock, event)
}
func (b *EventBus) PublishEventLock(event EventDataRoundState) error {
return b.Publish(EventLock, TMEventData{event})
return b.Publish(EventLock, event)
}
func logIfTagExists(tag string, tags map[string]interface{}, logger log.Logger) {


+ 1
- 1
types/event_bus_test.go View File

@ -73,7 +73,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes
eventType = randEvent()
}
eventBus.Publish(eventType, TMEventData{"Gamora"})
eventBus.Publish(eventType, "Gamora")
}
}


+ 10
- 46
types/events.go View File

@ -3,7 +3,7 @@ package types
import (
"fmt"
"github.com/tendermint/go-wire/data"
"github.com/tendermint/go-amino"
tmpubsub "github.com/tendermint/tmlibs/pubsub"
tmquery "github.com/tendermint/tmlibs/pubsub/query"
)
@ -45,56 +45,20 @@ var (
)
// implements events.EventData
type TMEventDataInner interface {
type TMEventData interface {
// empty interface
}
type TMEventData struct {
TMEventDataInner `json:"unwrap"`
func RegisterEventDatas(cdc *amino.Codec) {
cdc.RegisterInterface((*TMEventData)(nil), nil)
cdc.RegisterConcrete(EventDataNewBlock{}, "tendermint/EventDataNameNewBlock", nil)
cdc.RegisterConcrete(EventDataNewBlockHeader{}, "tendermint/EventDataNameNewBlockHeader", nil)
cdc.RegisterConcrete(EventDataTx{}, "tendermint/EventDataNameTx", nil)
cdc.RegisterConcrete(EventDataRoundState{}, "tendermint/EventDataNameRoundState", nil)
cdc.RegisterConcrete(EventDataVote{}, "tendermint/EventDataNameVote", nil)
cdc.RegisterConcrete(EventDataProposalHeartbeat{}, "tendermint/EventDataNameProposalHeartbeat", nil)
}
func (tmr TMEventData) MarshalJSON() ([]byte, error) {
return tmEventDataMapper.ToJSON(tmr.TMEventDataInner)
}
func (tmr *TMEventData) UnmarshalJSON(data []byte) (err error) {
parsed, err := tmEventDataMapper.FromJSON(data)
if err == nil && parsed != nil {
tmr.TMEventDataInner = parsed.(TMEventDataInner)
}
return
}
func (tmr TMEventData) Unwrap() TMEventDataInner {
tmrI := tmr.TMEventDataInner
for wrap, ok := tmrI.(TMEventData); ok; wrap, ok = tmrI.(TMEventData) {
tmrI = wrap.TMEventDataInner
}
return tmrI
}
func (tmr TMEventData) Empty() bool {
return tmr.TMEventDataInner == nil
}
const (
EventDataTypeNewBlock = byte(0x01)
EventDataTypeFork = byte(0x02)
EventDataTypeTx = byte(0x03)
EventDataTypeNewBlockHeader = byte(0x04)
EventDataTypeRoundState = byte(0x11)
EventDataTypeVote = byte(0x12)
EventDataTypeProposalHeartbeat = byte(0x20)
)
var tmEventDataMapper = data.NewMapper(TMEventData{}).
RegisterImplementation(EventDataNewBlock{}, EventDataNameNewBlock, EventDataTypeNewBlock).
RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader).
RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx).
RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState).
RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote).
RegisterImplementation(EventDataProposalHeartbeat{}, EventDataNameProposalHeartbeat, EventDataTypeProposalHeartbeat)
// Most event messages are basic types (a block, a transaction)
// but some (an input to a call tx or a receive) are more exotic


Loading…
Cancel
Save