diff --git a/CHANGELOG.md b/CHANGELOG.md index e8cb63ba5..c506a2294 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,48 @@ # Changelog +## v0.26.4 + +*November 27th, 2018* + +Special thanks to external contributors on this release: +ackratos, goolAdapter, james-ray, joe-bowman, kostko, +nagarajmanjunath, tomtau + + +Friendly reminder, we have a [bug bounty +program](https://hackerone.com/tendermint). + +### FEATURES: + +- [rpc] [\#2747](https://github.com/tendermint/tendermint/issues/2747) Enable subscription to tags emitted from `BeginBlock`/`EndBlock` (@kostko) +- [types] [\#2747](https://github.com/tendermint/tendermint/issues/2747) Add `ResultBeginBlock` and `ResultEndBlock` fields to `EventDataNewBlock` + and `EventDataNewBlockHeader` to support subscriptions (@kostko) +- [types] [\#2918](https://github.com/tendermint/tendermint/issues/2918) Add Marshal, MarshalTo, Unmarshal methods to various structs + to support Protobuf compatibility (@nagarajmanjunath) + +### IMPROVEMENTS: + +- [config] [\#2877](https://github.com/tendermint/tendermint/issues/2877) Add `blocktime_iota` to the config.toml (@ackratos) + - NOTE: this should be a ConsensusParam, not part of the config, and will be + removed from the config at a later date + ([\#2920](https://github.com/tendermint/tendermint/issues/2920). +- [mempool] [\#2882](https://github.com/tendermint/tendermint/issues/2882) Add txs from Update to cache +- [mempool] [\#2891](https://github.com/tendermint/tendermint/issues/2891) Remove local int64 counter from being stored in every tx +- [node] [\#2866](https://github.com/tendermint/tendermint/issues/2866) Add ability to instantiate IPCVal (@joe-bowman) + +### BUG FIXES: + +- [blockchain] [\#2731](https://github.com/tendermint/tendermint/issues/2731) Retry both blocks if either is bad to avoid getting stuck during fast sync (@goolAdapter) +- [consensus] [\#2893](https://github.com/tendermint/tendermint/issues/2893) Use genDoc.Validators instead of state.NextValidators on replay when appHeight==0 (@james-ray) +- [log] [\#2868](https://github.com/tendermint/tendermint/issues/2868) Fix `module=main` setting overriding all others + - NOTE: this changes the default logging behaviour to be much less verbose. + Set `log_level="info"` to restore the previous behaviour. +- [rpc] [\#2808](https://github.com/tendermint/tendermint/issues/2808) Fix `accum` field in `/validators` by calling `IncrementAccum` if necessary +- [rpc] [\#2811](https://github.com/tendermint/tendermint/issues/2811) Allow integer IDs in JSON-RPC requests (@tomtau) +- [txindex/kv] [\#2759](https://github.com/tendermint/tendermint/issues/2759) Fix tx.height range queries +- [txindex/kv] [\#2775](https://github.com/tendermint/tendermint/issues/2775) Order tx results by index if height is the same +- [txindex/kv] [\#2908](https://github.com/tendermint/tendermint/issues/2908) Don't return false positives when searching for a prefix of a tag value + ## v0.26.3 *November 17th, 2018* diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index fd340d4da..2a2626a4f 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -1,6 +1,6 @@ # Pending -## v0.26.4 +## v0.27.0 *TBD* diff --git a/blockchain/pool.go b/blockchain/pool.go index c7864a646..e6be36012 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -168,9 +168,12 @@ func (pool *BlockPool) IsCaughtUp() bool { return false } - // some conditions to determine if we're caught up - receivedBlockOrTimedOut := (pool.height > 0 || time.Since(pool.startTime) > 5*time.Second) - ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight + // Some conditions to determine if we're caught up. + // Ensures we've either received a block or waited some amount of time, + // and that we're synced to the highest known height. Note we use maxPeerHeight - 1 + // because to sync block H requires block H+1 to verify the LastCommit. + receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second + ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1) isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers return isCaughtUp } @@ -252,7 +255,8 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int peer.decrPending(blockSize) } } else { - // Bad peer? + pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height) + pool.sendError(errors.New("invalid peer"), peerID) } } @@ -292,7 +296,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) { func (pool *BlockPool) removePeer(peerID p2p.ID) { for _, requester := range pool.requesters { if requester.getPeerID() == peerID { - requester.redo() + requester.redo(peerID) } } delete(pool.peers, peerID) @@ -326,8 +330,11 @@ func (pool *BlockPool) makeNextRequester() { defer pool.mtx.Unlock() nextHeight := pool.height + pool.requestersLen() + if nextHeight > pool.maxPeerHeight { + return + } + request := newBPRequester(pool, nextHeight) - // request.SetLogger(pool.Logger.With("height", nextHeight)) pool.requesters[nextHeight] = request atomic.AddInt32(&pool.numPending, 1) @@ -453,7 +460,7 @@ type bpRequester struct { pool *BlockPool height int64 gotBlockCh chan struct{} - redoCh chan struct{} + redoCh chan p2p.ID //redo may send multitime, add peerId to identify repeat mtx sync.Mutex peerID p2p.ID @@ -465,7 +472,7 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester { pool: pool, height: height, gotBlockCh: make(chan struct{}, 1), - redoCh: make(chan struct{}, 1), + redoCh: make(chan p2p.ID, 1), peerID: "", block: nil, @@ -524,9 +531,9 @@ func (bpr *bpRequester) reset() { // Tells bpRequester to pick another peer and try again. // NOTE: Nonblocking, and does nothing if another redo // was already requested. -func (bpr *bpRequester) redo() { +func (bpr *bpRequester) redo(peerId p2p.ID) { select { - case bpr.redoCh <- struct{}{}: + case bpr.redoCh <- peerId: default: } } @@ -565,9 +572,13 @@ OUTER_LOOP: return case <-bpr.Quit(): return - case <-bpr.redoCh: - bpr.reset() - continue OUTER_LOOP + case peerID := <-bpr.redoCh: + if peerID == bpr.peerID { + bpr.reset() + continue OUTER_LOOP + } else { + continue WAIT_LOOP + } case <-bpr.gotBlockCh: // We got a block! // Continue the for-loop and wait til Quit. diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 01187bcfe..75a03f631 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -16,16 +16,52 @@ func init() { } type testPeer struct { - id p2p.ID - height int64 + id p2p.ID + height int64 + inputChan chan inputData //make sure each peer's data is sequential } -func makePeers(numPeers int, minHeight, maxHeight int64) map[p2p.ID]testPeer { - peers := make(map[p2p.ID]testPeer, numPeers) +type inputData struct { + t *testing.T + pool *BlockPool + request BlockRequest +} + +func (p testPeer) runInputRoutine() { + go func() { + for input := range p.inputChan { + p.simulateInput(input) + } + }() +} + +// Request desired, pretend like we got the block immediately. +func (p testPeer) simulateInput(input inputData) { + block := &types.Block{Header: types.Header{Height: input.request.Height}} + input.pool.AddBlock(input.request.PeerID, block, 123) + input.t.Logf("Added block from peer %v (height: %v)", input.request.PeerID, input.request.Height) +} + +type testPeers map[p2p.ID]testPeer + +func (ps testPeers) start() { + for _, v := range ps { + v.runInputRoutine() + } +} + +func (ps testPeers) stop() { + for _, v := range ps { + close(v.inputChan) + } +} + +func makePeers(numPeers int, minHeight, maxHeight int64) testPeers { + peers := make(testPeers, numPeers) for i := 0; i < numPeers; i++ { peerID := p2p.ID(cmn.RandStr(12)) height := minHeight + cmn.RandInt63n(maxHeight-minHeight) - peers[peerID] = testPeer{peerID, height} + peers[peerID] = testPeer{peerID, height, make(chan inputData, 10)} } return peers } @@ -45,6 +81,9 @@ func TestBasic(t *testing.T) { defer pool.Stop() + peers.start() + defer peers.stop() + // Introduce each peer. go func() { for _, peer := range peers { @@ -77,12 +116,8 @@ func TestBasic(t *testing.T) { if request.Height == 300 { return // Done! } - // Request desired, pretend like we got the block immediately. - go func() { - block := &types.Block{Header: types.Header{Height: request.Height}} - pool.AddBlock(request.PeerID, block, 123) - t.Logf("Added block from peer %v (height: %v)", request.PeerID, request.Height) - }() + + peers[request.PeerID].inputChan <- inputData{t, pool, request} } } } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 59318dcc5..e62a9e4fe 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -264,8 +264,12 @@ FOR_LOOP: bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) bcR.pool.Stop() - conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) - conR.SwitchToConsensus(state, blocksSynced) + conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) + if ok { + conR.SwitchToConsensus(state, blocksSynced) + } else { + // should only happen during testing + } break FOR_LOOP } @@ -314,6 +318,13 @@ FOR_LOOP: // still need to clean up the rest. bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) } + peerID2 := bcR.pool.RedoRequest(second.Height) + peer2 := bcR.Switch.Peers().Get(peerID2) + if peer2 != nil && peer2 != peer { + // NOTE: we've already removed the peer's request, but we + // still need to clean up the rest. + bcR.Switch.StopPeerForError(peer2, fmt.Errorf("BlockchainReactor validation error: %v", err)) + } continue FOR_LOOP } else { bcR.pool.PopRequest() diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 9b26f919a..ac499efa6 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -1,72 +1,151 @@ package blockchain import ( - "net" + "sort" "testing" + "time" + "github.com/stretchr/testify/assert" + + abci "github.com/tendermint/tendermint/abci/types" + cfg "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" - - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + tmtime "github.com/tendermint/tendermint/types/time" ) -func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) { - config := cfg.ResetTestRoot("blockchain_reactor_test") - // blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB()) - // stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB()) +var config *cfg.Config + +func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) { + validators := make([]types.GenesisValidator, numValidators) + privValidators := make([]types.PrivValidator, numValidators) + for i := 0; i < numValidators; i++ { + val, privVal := types.RandValidator(randPower, minPower) + validators[i] = types.GenesisValidator{ + PubKey: val.PubKey, + Power: val.VotingPower, + } + privValidators[i] = privVal + } + sort.Sort(types.PrivValidatorsByAddress(privValidators)) + + return &types.GenesisDoc{ + GenesisTime: tmtime.Now(), + ChainID: config.ChainID(), + Validators: validators, + }, privValidators +} + +func makeVote(header *types.Header, blockID types.BlockID, valset *types.ValidatorSet, privVal types.PrivValidator) *types.Vote { + addr := privVal.GetAddress() + idx, _ := valset.GetByAddress(addr) + vote := &types.Vote{ + ValidatorAddress: addr, + ValidatorIndex: idx, + Height: header.Height, + Round: 1, + Timestamp: tmtime.Now(), + Type: types.PrecommitType, + BlockID: blockID, + } + + privVal.SignVote(header.ChainID, vote) + + return vote +} + +type BlockchainReactorPair struct { + reactor *BlockchainReactor + app proxy.AppConns +} + +func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair { + if len(privVals) != 1 { + panic("only support one validator") + } + + app := &testApp{} + cc := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(cc) + err := proxyApp.Start() + if err != nil { + panic(cmn.ErrorWrap(err, "error start app")) + } + blockDB := dbm.NewMemDB() stateDB := dbm.NewMemDB() blockStore := NewBlockStore(blockDB) - state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile()) + + state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc) if err != nil { panic(cmn.ErrorWrap(err, "error constructing state from genesis file")) } - return state, blockStore -} - -func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainReactor { - state, blockStore := makeStateAndBlockStore(logger) - // Make the blockchainReactor itself + // Make the BlockchainReactor itself. + // NOTE we have to create and commit the blocks first because + // pool.height is determined from the store. fastSync := true - var nilApp proxy.AppConnConsensus - blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp, + blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(), sm.MockMempool{}, sm.MockEvidencePool{}) - bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - bcReactor.SetLogger(logger.With("module", "blockchain")) + // let's add some blocks in + for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { + lastCommit := &types.Commit{} + if blockHeight > 1 { + lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) + lastBlock := blockStore.LoadBlock(blockHeight - 1) - // Next: we need to set a switch in order for peers to be added in - bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig(), nil) + vote := makeVote(&lastBlock.Header, lastBlockMeta.BlockID, state.Validators, privVals[0]) + lastCommit = &types.Commit{Precommits: []*types.Vote{vote}, BlockID: lastBlockMeta.BlockID} + } - // Lastly: let's add some blocks in - for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { - firstBlock := makeBlock(blockHeight, state) - secondBlock := makeBlock(blockHeight+1, state) - firstParts := firstBlock.MakePartSet(types.BlockPartSizeBytes) - blockStore.SaveBlock(firstBlock, firstParts, secondBlock.LastCommit) + thisBlock := makeBlock(blockHeight, state, lastCommit) + + thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) + blockID := types.BlockID{thisBlock.Hash(), thisParts.Header()} + + state, err = blockExec.ApplyBlock(state, blockID, thisBlock) + if err != nil { + panic(cmn.ErrorWrap(err, "error apply block")) + } + + blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } - return bcReactor + bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor.SetLogger(logger.With("module", "blockchain")) + + return BlockchainReactorPair{bcReactor, proxyApp} } func TestNoBlockResponse(t *testing.T) { - maxBlockHeight := int64(20) + config = cfg.ResetTestRoot("blockchain_reactor_test") + genDoc, privVals := randGenesisDoc(1, false, 30) + + maxBlockHeight := int64(65) + + reactorPairs := make([]BlockchainReactorPair, 2) + + reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) + reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight) - bcr.Start() - defer bcr.Stop() + p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + return s - // Add some peers in - peer := newbcrTestPeer(p2p.ID(cmn.RandStr(12))) - bcr.AddPeer(peer) + }, p2p.Connect2Switches) - chID := byte(0x01) + defer func() { + for _, r := range reactorPairs { + r.reactor.Stop() + r.app.Stop() + } + }() tests := []struct { height int64 @@ -78,72 +157,100 @@ func TestNoBlockResponse(t *testing.T) { {100, false}, } - // receive a request message from peer, - // wait for our response to be received on the peer - for _, tt := range tests { - reqBlockMsg := &bcBlockRequestMessage{tt.height} - reqBlockBytes := cdc.MustMarshalBinaryBare(reqBlockMsg) - bcr.Receive(chID, peer, reqBlockBytes) - msg := peer.lastBlockchainMessage() + for { + if reactorPairs[1].reactor.pool.IsCaughtUp() { + break + } + + time.Sleep(10 * time.Millisecond) + } + + assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height()) + for _, tt := range tests { + block := reactorPairs[1].reactor.store.LoadBlock(tt.height) if tt.existent { - if blockMsg, ok := msg.(*bcBlockResponseMessage); !ok { - t.Fatalf("Expected to receive a block response for height %d", tt.height) - } else if blockMsg.Block.Height != tt.height { - t.Fatalf("Expected response to be for height %d, got %d", tt.height, blockMsg.Block.Height) - } + assert.True(t, block != nil) } else { - if noBlockMsg, ok := msg.(*bcNoBlockResponseMessage); !ok { - t.Fatalf("Expected to receive a no block response for height %d", tt.height) - } else if noBlockMsg.Height != tt.height { - t.Fatalf("Expected response to be for height %d, got %d", tt.height, noBlockMsg.Height) - } + assert.True(t, block == nil) } } } -/* // NOTE: This is too hard to test without // an easy way to add test peer to switch // or without significant refactoring of the module. // Alternatively we could actually dial a TCP conn but // that seems extreme. func TestBadBlockStopsPeer(t *testing.T) { - maxBlockHeight := int64(20) + config = cfg.ResetTestRoot("blockchain_reactor_test") + genDoc, privVals := randGenesisDoc(1, false, 30) + + maxBlockHeight := int64(148) + + otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) + defer func() { + otherChain.reactor.Stop() + otherChain.app.Stop() + }() + + reactorPairs := make([]BlockchainReactorPair, 4) + + reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) + reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + + switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + return s + + }, p2p.Connect2Switches) + + defer func() { + for _, r := range reactorPairs { + r.reactor.Stop() + r.app.Stop() + } + }() + + for { + if reactorPairs[3].reactor.pool.IsCaughtUp() { + break + } + + time.Sleep(1 * time.Second) + } + + //at this time, reactors[0-3] is the newest + assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size()) - bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight) - bcr.Start() - defer bcr.Stop() + //mark reactorPairs[3] is an invalid peer + reactorPairs[3].reactor.store = otherChain.reactor.store - // Add some peers in - peer := newbcrTestPeer(p2p.ID(cmn.RandStr(12))) + lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + reactorPairs = append(reactorPairs, lastReactorPair) - // XXX: This doesn't add the peer to anything, - // so it's hard to check that it's later removed - bcr.AddPeer(peer) - assert.True(t, bcr.Switch.Peers().Size() > 0) + switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) + return s - // send a bad block from the peer - // default blocks already dont have commits, so should fail - block := bcr.store.LoadBlock(3) - msg := &bcBlockResponseMessage{Block: block} - peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg}) + }, p2p.Connect2Switches)...) + + for i := 0; i < len(reactorPairs)-1; i++ { + p2p.Connect2Switches(switches, i, len(reactorPairs)-1) + } - ticker := time.NewTicker(time.Millisecond * 10) - timer := time.NewTimer(time.Second * 2) -LOOP: for { - select { - case <-ticker.C: - if bcr.Switch.Peers().Size() == 0 { - break LOOP - } - case <-timer.C: - t.Fatal("Timed out waiting to disconnect peer") + if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { + break } + + time.Sleep(1 * time.Second) } + + assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) } -*/ //---------------------------------------------- // utility funcs @@ -155,56 +262,41 @@ func makeTxs(height int64) (txs []types.Tx) { return txs } -func makeBlock(height int64, state sm.State) *types.Block { - block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit), nil, state.Validators.GetProposer().Address) +func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block { + block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address) return block } -// The Test peer -type bcrTestPeer struct { - cmn.BaseService - id p2p.ID - ch chan interface{} +type testApp struct { + abci.BaseApplication } -var _ p2p.Peer = (*bcrTestPeer)(nil) +var _ abci.Application = (*testApp)(nil) -func newbcrTestPeer(id p2p.ID) *bcrTestPeer { - bcr := &bcrTestPeer{ - id: id, - ch: make(chan interface{}, 2), - } - bcr.BaseService = *cmn.NewBaseService(nil, "bcrTestPeer", bcr) - return bcr +func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) { + return abci.ResponseInfo{} } -func (tp *bcrTestPeer) lastBlockchainMessage() interface{} { return <-tp.ch } +func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock { + return abci.ResponseBeginBlock{} +} -func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool { - var msg BlockchainMessage - err := cdc.UnmarshalBinaryBare(msgBytes, &msg) - if err != nil { - panic(cmn.ErrorWrap(err, "Error while trying to parse a BlockchainMessage")) - } - if _, ok := msg.(*bcStatusResponseMessage); ok { - // Discard status response messages since they skew our results - // We only want to deal with: - // + bcBlockResponseMessage - // + bcNoBlockResponseMessage - } else { - tp.ch <- msg - } - return true -} - -func (tp *bcrTestPeer) FlushStop() {} -func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) } -func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} } -func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} } -func (tp *bcrTestPeer) ID() p2p.ID { return tp.id } -func (tp *bcrTestPeer) IsOutbound() bool { return false } -func (tp *bcrTestPeer) IsPersistent() bool { return true } -func (tp *bcrTestPeer) Get(s string) interface{} { return s } -func (tp *bcrTestPeer) Set(string, interface{}) {} -func (tp *bcrTestPeer) RemoteIP() net.IP { return []byte{127, 0, 0, 1} } -func (tp *bcrTestPeer) OriginalAddr() *p2p.NetAddress { return nil } +func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { + return abci.ResponseEndBlock{} +} + +func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { + return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}} +} + +func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx { + return abci.ResponseCheckTx{} +} + +func (app *testApp) Commit() abci.ResponseCommit { + return abci.ResponseCommit{} +} + +func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) { + return +} diff --git a/blockchain/store_test.go b/blockchain/store_test.go index 9c8fdb23c..a52039fa4 100644 --- a/blockchain/store_test.go +++ b/blockchain/store_test.go @@ -9,13 +9,30 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + cfg "github.com/tendermint/tendermint/config" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/db" + dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" ) +func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) { + config := cfg.ResetTestRoot("blockchain_reactor_test") + // blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB()) + // stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB()) + blockDB := dbm.NewMemDB() + stateDB := dbm.NewMemDB() + state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile()) + if err != nil { + panic(cmn.ErrorWrap(err, "error constructing state from genesis file")) + } + return state, NewBlockStore(blockDB) +} + func TestLoadBlockStoreStateJSON(t *testing.T) { db := db.NewMemDB() @@ -65,7 +82,7 @@ func freshBlockStore() (*BlockStore, db.DB) { var ( state, _ = makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) - block = makeBlock(1, state) + block = makeBlock(1, state, new(types.Commit)) partSet = block.MakePartSet(2) part1 = partSet.GetPart(0) part2 = partSet.GetPart(1) @@ -88,7 +105,7 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { } // save a block - block := makeBlock(bs.Height()+1, state) + block := makeBlock(bs.Height()+1, state, new(types.Commit)) validPartSet := block.MakePartSet(2) seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10, Timestamp: tmtime.Now()}}} @@ -331,7 +348,7 @@ func TestLoadBlockMeta(t *testing.T) { func TestBlockFetchAtHeight(t *testing.T) { state, bs := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") - block := makeBlock(bs.Height()+1, state) + block := makeBlock(bs.Height()+1, state, new(types.Commit)) partSet := block.MakePartSet(2) seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10, diff --git a/config/toml.go b/config/toml.go index 6f0578e44..21e017b45 100644 --- a/config/toml.go +++ b/config/toml.go @@ -260,6 +260,9 @@ create_empty_blocks_interval = "{{ .Consensus.CreateEmptyBlocksInterval }}" peer_gossip_sleep_duration = "{{ .Consensus.PeerGossipSleepDuration }}" peer_query_maj23_sleep_duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}" +# Block time parameters. Corresponds to the minimum time increment between consecutive blocks. +blocktime_iota = "{{ .Consensus.BlockTimeIota }}" + ##### transactions indexer configuration options ##### [tx_index] diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 6d36d1e74..49ba74fe5 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -72,18 +72,18 @@ func TestMempoolProgressInHigherRound(t *testing.T) { startTestRound(cs, height, round) ensureNewRound(newRoundCh, height, round) // first round at first height - ensureNewEventOnChannel(newBlockCh) // first block gets committed + ensureNewEventOnChannel(newBlockCh) // first block gets committed height = height + 1 // moving to the next height round = 0 ensureNewRound(newRoundCh, height, round) // first round at next height - deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round + deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds()) - round = round + 1 // moving to the next round + round = round + 1 // moving to the next round ensureNewRound(newRoundCh, height, round) // wait for the next round - ensureNewEventOnChannel(newBlockCh) // now we can commit the block + ensureNewEventOnChannel(newBlockCh) // now we can commit the block } func deliverTxsRange(cs *ConsensusState, start, end int) { diff --git a/consensus/replay.go b/consensus/replay.go index abc43eb57..c9a779e34 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -276,7 +276,12 @@ func (h *Handshaker) ReplayBlocks( // If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain. if appBlockHeight == 0 { - nextVals := types.TM2PB.ValidatorUpdates(state.NextValidators) // state.Validators would work too. + validators := make([]*types.Validator, len(h.genDoc.Validators)) + for i, val := range h.genDoc.Validators { + validators[i] = types.NewValidator(val.PubKey, val.Power) + } + validatorSet := types.NewValidatorSet(validators) + nextVals := types.TM2PB.ValidatorUpdates(validatorSet) csParams := types.TM2PB.ConsensusParams(h.genDoc.ConsensusParams) req := abci.RequestInitChain{ Time: h.genDoc.GenesisTime, diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 70c4ba332..c261426c1 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -315,28 +315,21 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { config := ResetConfig("proxy_test_") walBody, err := WALWithNBlocks(NUM_BLOCKS) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) walFile := tempWALWithData(walBody) config.Consensus.SetWalFile(walFile) privVal := privval.LoadFilePV(config.PrivValidatorFile()) wal, err := NewWAL(walFile) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) wal.SetLogger(log.TestingLogger()) - if err := wal.Start(); err != nil { - t.Fatal(err) - } + err = wal.Start() + require.NoError(t, err) defer wal.Stop() chain, commits, err := makeBlockchainFromWAL(wal) - if err != nil { - t.Fatalf(err.Error()) - } + require.NoError(t, err) stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion) store.chain = chain diff --git a/consensus/state.go b/consensus/state.go index 0f7b56bc5..4b7aec2af 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -324,10 +324,11 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { go cs.receiveRoutine(maxSteps) } -// OnStop implements cmn.Service. It stops all routines and waits for the WAL to finish. +// OnStop implements cmn.Service. func (cs *ConsensusState) OnStop() { cs.evsw.Stop() cs.timeoutTicker.Stop() + // WAL is stopped in receiveRoutine. } // Wait waits for the the main routine to return. diff --git a/consensus/state_test.go b/consensus/state_test.go index 19dde0532..941a99cda 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -1043,7 +1043,7 @@ func TestNoHearbeatWhenNotValidator(t *testing.T) { cs.Stop() // if a faulty implementation sends an event, we should wait here a little bit to make sure we don't miss it by prematurely leaving the test method - time.Sleep((proposalHeartbeatIntervalSeconds + 1) * time.Second) + time.Sleep((proposalHeartbeatIntervalSeconds + 1) * time.Second) } // regression for #2518 diff --git a/consensus/types/peer_round_state.go b/consensus/types/peer_round_state.go index e42395bc3..16e292940 100644 --- a/consensus/types/peer_round_state.go +++ b/consensus/types/peer_round_state.go @@ -55,3 +55,31 @@ func (prs PeerRoundState) StringIndented(indent string) string { indent, prs.CatchupCommit, prs.CatchupCommitRound, indent) } + +//----------------------------------------------------------- +// These methods are for Protobuf Compatibility + +// Size returns the size of the amino encoding, in bytes. +func (ps *PeerRoundState) Size() int { + bs, _ := ps.Marshal() + return len(bs) +} + +// Marshal returns the amino encoding. +func (ps *PeerRoundState) Marshal() ([]byte, error) { + return cdc.MarshalBinaryBare(ps) +} + +// MarshalTo calls Marshal and copies to the given buffer. +func (ps *PeerRoundState) MarshalTo(data []byte) (int, error) { + bs, err := ps.Marshal() + if err != nil { + return -1, err + } + return copy(data, bs), nil +} + +// Unmarshal deserializes from amino encoded form. +func (ps *PeerRoundState) Unmarshal(bs []byte) error { + return cdc.UnmarshalBinaryBare(bs, ps) +} diff --git a/consensus/types/round_state.go b/consensus/types/round_state.go index 6359a6555..418f73a8e 100644 --- a/consensus/types/round_state.go +++ b/consensus/types/round_state.go @@ -201,3 +201,31 @@ func (rs *RoundState) StringShort() string { return fmt.Sprintf(`RoundState{H:%v R:%v S:%v ST:%v}`, rs.Height, rs.Round, rs.Step, rs.StartTime) } + +//----------------------------------------------------------- +// These methods are for Protobuf Compatibility + +// Size returns the size of the amino encoding, in bytes. +func (rs *RoundStateSimple) Size() int { + bs, _ := rs.Marshal() + return len(bs) +} + +// Marshal returns the amino encoding. +func (rs *RoundStateSimple) Marshal() ([]byte, error) { + return cdc.MarshalBinaryBare(rs) +} + +// MarshalTo calls Marshal and copies to the given buffer. +func (rs *RoundStateSimple) MarshalTo(data []byte) (int, error) { + bs, err := rs.Marshal() + if err != nil { + return -1, err + } + return copy(data, bs), nil +} + +// Unmarshal deserializes from amino encoded form. +func (rs *RoundStateSimple) Unmarshal(bs []byte) error { + return cdc.UnmarshalBinaryBare(bs, rs) +} diff --git a/consensus/wal_test.go b/consensus/wal_test.go index c45f6acee..c056f2017 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -7,13 +7,13 @@ import ( "io/ioutil" "os" "path/filepath" - // "sync" "testing" "time" "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/libs/autofile" + "github.com/tendermint/tendermint/libs/log" tmtypes "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" @@ -23,29 +23,27 @@ import ( func TestWALTruncate(t *testing.T) { walDir, err := ioutil.TempDir("", "wal") - if err != nil { - panic(fmt.Errorf("failed to create temp WAL file: %v", err)) - } + require.NoError(t, err) defer os.RemoveAll(walDir) walFile := filepath.Join(walDir, "wal") //this magic number 4K can truncate the content when RotateFile. defaultHeadSizeLimit(10M) is hard to simulate. //this magic number 1 * time.Millisecond make RotateFile check frequently. defaultGroupCheckDuration(5s) is hard to simulate. - wal, err := NewWAL(walFile, autofile.GroupHeadSizeLimit(4096), autofile.GroupCheckDuration(1*time.Millisecond)) - if err != nil { - t.Fatal(err) - } - - wal.Start() + wal, err := NewWAL(walFile, + autofile.GroupHeadSizeLimit(4096), + autofile.GroupCheckDuration(1*time.Millisecond), + ) + require.NoError(t, err) + wal.SetLogger(log.TestingLogger()) + err = wal.Start() + require.NoError(t, err) defer wal.Stop() //60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file. //at this time, RotateFile is called, truncate content exist in each file. err = WALGenerateNBlocks(wal.Group(), 60) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) time.Sleep(1 * time.Millisecond) //wait groupCheckDuration, make sure RotateFile run @@ -99,9 +97,8 @@ func TestWALSearchForEndHeight(t *testing.T) { walFile := tempWALWithData(walBody) wal, err := NewWAL(walFile) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + wal.SetLogger(log.TestingLogger()) h := int64(3) gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) diff --git a/docs/architecture/adr-008-priv-validator.md b/docs/architecture/adr-008-priv-validator.md index 94e882af4..a8499465c 100644 --- a/docs/architecture/adr-008-priv-validator.md +++ b/docs/architecture/adr-008-priv-validator.md @@ -5,14 +5,17 @@ implementations: - FilePV uses an unencrypted private key in a "priv_validator.json" file - no configuration required (just `tendermint init`). -- SocketPV uses a socket to send signing requests to another process - user is - responsible for starting that process themselves. +- TCPVal and IPCVal use TCP and Unix sockets respectively to send signing requests + to another process - the user is responsible for starting that process themselves. -The SocketPV address can be provided via flags at the command line - doing so -will cause Tendermint to ignore any "priv_validator.json" file and to listen on -the given address for incoming connections from an external priv_validator -process. It will halt any operation until at least one external process -succesfully connected. +Both TCPVal and IPCVal addresses can be provided via flags at the command line +or in the configuration file; TCPVal addresses must be of the form +`tcp://:` and IPCVal addresses `unix:///path/to/file.sock` - +doing so will cause Tendermint to ignore any private validator files. + +TCPVal will listen on the given address for incoming connections from an external +private validator process. It will halt any operation until at least one external +process successfully connected. The external priv_validator process will dial the address to connect to Tendermint, and then Tendermint will send requests on the ensuing connection to @@ -21,6 +24,9 @@ but the Tendermint process makes all requests. In a later stage we're going to support multiple validators for fault tolerance. To prevent double signing they need to be synced, which is deferred to an external solution (see #1185). +Conversely, IPCVal will make an outbound connection to an existing socket opened +by the external validator process. + In addition, Tendermint will provide implementations that can be run in that external process. These include: diff --git a/docs/introduction/install.md b/docs/introduction/install.md index f3498514c..c3395effc 100644 --- a/docs/introduction/install.md +++ b/docs/introduction/install.md @@ -95,9 +95,9 @@ wget https://github.com/google/leveldb/archive/v1.20.tar.gz && \ tar -zxvf v1.20.tar.gz && \ cd leveldb-1.20/ && \ make && \ - cp -r out-static/lib* out-shared/lib* /usr/local/lib/ && \ + sudo cp -r out-static/lib* out-shared/lib* /usr/local/lib/ && \ cd include/ && \ - cp -r leveldb /usr/local/include/ && \ + sudo cp -r leveldb /usr/local/include/ && \ sudo ldconfig && \ rm -f v1.20.tar.gz ``` @@ -109,8 +109,8 @@ Set database backend to cleveldb: db_backend = "cleveldb" ``` -To build Tendermint, run +To install Tendermint, run ``` -CGO_LDFLAGS="-lsnappy" go build -ldflags "-X github.com/tendermint/tendermint/version.GitCommit=`git rev-parse --short=8 HEAD`" -tags "tendermint gcc" -o build/tendermint ./cmd/tendermint/ +CGO_LDFLAGS="-lsnappy" go install -ldflags "-X github.com/tendermint/tendermint/version.GitCommit=`git rev-parse --short=8 HEAD`" -tags "tendermint gcc" -o build/tendermint ./cmd/tendermint/ ``` diff --git a/docs/spec/abci/abci.md b/docs/spec/abci/abci.md index f057002ef..b9dc744de 100644 --- a/docs/spec/abci/abci.md +++ b/docs/spec/abci/abci.md @@ -45,7 +45,9 @@ include a `Tags` field in their `Response*`. Each tag is key-value pair denoting something about what happened during the methods execution. Tags can be used to index transactions and blocks according to what happened -during their execution. +during their execution. Note that the set of tags returned for a block from +`BeginBlock` and `EndBlock` are merged. In case both methods return the same +tag, only the value defined in `EndBlock` is used. Keys and values in tags must be UTF-8 encoded strings (e.g. "account.owner": "Bob", "balance": "100.0", diff --git a/docs/spec/blockchain/encoding.md b/docs/spec/blockchain/encoding.md index f5120cdd4..cb506739f 100644 --- a/docs/spec/blockchain/encoding.md +++ b/docs/spec/blockchain/encoding.md @@ -59,22 +59,14 @@ You can simply use below table and concatenate Prefix || Length (of raw bytes) | | PubKeySecp256k1 | tendermint/PubKeySecp256k1 | 0xEB5AE987 | 0x21 | | | PrivKeyEd25519 | tendermint/PrivKeyEd25519 | 0xA3288910 | 0x40 | | | PrivKeySecp256k1 | tendermint/PrivKeySecp256k1 | 0xE1B0F79B | 0x20 | | -| SignatureEd25519 | tendermint/SignatureEd25519 | 0x2031EA53 | 0x40 | | -| SignatureSecp256k1 | tendermint/SignatureSecp256k1 | 0x7FC4A495 | variable | +| PubKeyMultisigThreshold | tendermint/PubKeyMultisigThreshold | 0x22C1F7E2 | variable | | -| +### Example -### Examples - -1. For example, the 33-byte (or 0x21-byte in hex) Secp256k1 pubkey +For example, the 33-byte (or 0x21-byte in hex) Secp256k1 pubkey `020BD40F225A57ED383B440CF073BC5539D0341F5767D2BF2D78406D00475A2EE9` would be encoded as - `EB5AE98221020BD40F225A57ED383B440CF073BC5539D0341F5767D2BF2D78406D00475A2EE9` - -2. For example, the variable size Secp256k1 signature (in this particular example 70 or 0x46 bytes) - `304402201CD4B8C764D2FD8AF23ECFE6666CA8A53886D47754D951295D2D311E1FEA33BF02201E0F906BB1CF2C30EAACFFB032A7129358AFF96B9F79B06ACFFB18AC90C2ADD7` - would be encoded as - `16E1FEEA46304402201CD4B8C764D2FD8AF23ECFE6666CA8A53886D47754D951295D2D311E1FEA33BF02201E0F906BB1CF2C30EAACFFB032A7129358AFF96B9F79B06ACFFB18AC90C2ADD7` + `EB5AE98721020BD40F225A57ED383B440CF073BC5539D0341F5767D2BF2D78406D00475A2EE9` ### Addresses diff --git a/docs/spec/reactors/block_sync/reactor.md b/docs/spec/reactors/block_sync/reactor.md index 045bbd400..91fd79b0b 100644 --- a/docs/spec/reactors/block_sync/reactor.md +++ b/docs/spec/reactors/block_sync/reactor.md @@ -65,24 +65,24 @@ type Requester { mtx Mutex block Block height int64 - 
peerID p2p.ID - redoChannel chan struct{} + 
 peerID p2p.ID + redoChannel chan p2p.ID //redo may send multi-time; peerId is used to identify repeat } ``` -Pool is core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`), current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc. +Pool is a core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`), current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc. ```go type Pool { - mtx Mutex - requesters map[int64]*Requester - height int64 - peers map[p2p.ID]*Peer - maxPeerHeight int64 - numPending int32 - store BlockStore - requestsChannel chan<- BlockRequest - errorsChannel chan<- peerError + mtx Mutex + requesters map[int64]*Requester + height int64 + peers map[p2p.ID]*Peer + maxPeerHeight int64 + numPending int32 + store BlockStore + requestsChannel chan<- BlockRequest + errorsChannel chan<- peerError } ``` @@ -90,11 +90,11 @@ Peer data structure stores for each peer current `height` and number of pending ```go type Peer struct { - id p2p.ID - height int64 - numPending int32 - timeout *time.Timer - didTimeout bool + id p2p.ID + height int64 + numPending int32 + timeout *time.Timer + didTimeout bool } ``` @@ -169,11 +169,11 @@ Requester task is responsible for fetching a single block at position `height`. ```go fetchBlock(height, pool): - while true do + while true do { peerID = nil block = nil peer = pickAvailablePeer(height) - peerId = peer.id + peerID = peer.id enqueue BlockRequest(height, peerID) to pool.requestsChannel redo = false @@ -181,12 +181,15 @@ fetchBlock(height, pool): select { upon receiving Quit message do return - upon receiving message on redoChannel do - mtx.Lock() - pool.numPending++ - redo = true - mtx.UnLock() + upon receiving redo message with id on redoChannel do + if peerID == id { + mtx.Lock() + pool.numPending++ + redo = true + mtx.UnLock() + } } + } pickAvailablePeer(height): selectedPeer = nil @@ -244,7 +247,7 @@ createRequesters(pool): main(pool): create trySyncTicker with interval trySyncIntervalMS create statusUpdateTicker with interval statusUpdateIntervalSeconds - create switchToConsensusTicker with interbal switchToConsensusIntervalSeconds + create switchToConsensusTicker with interval switchToConsensusIntervalSeconds while true do select { diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index 13894a308..7d1a562ec 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -203,6 +203,9 @@ create_empty_blocks_interval = "0s" peer_gossip_sleep_duration = "100ms" peer_query_maj23_sleep_duration = "2000ms" +# Block time parameters. Corresponds to the minimum time increment between consecutive blocks. +blocktime_iota = "1000ms" + ##### transactions indexer configuration options ##### [tx_index] diff --git a/evidence/reactor.go b/evidence/reactor.go index 48092fdff..6bb45e689 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -163,7 +163,7 @@ func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evid // make sure the peer is up to date evHeight := ev.Height() peerState, ok := peer.Get(types.PeerStateKey).(PeerState) - if !ok { + if !ok { // Peer does not have a state yet. We set it in the consensus reactor, but // when we add peer in Switch, the order we call reactors#AddPeer is // different every time due to us using a map. Sometimes other reactors diff --git a/libs/autofile/autofile_test.go b/libs/autofile/autofile_test.go index 9903f1e68..d9c90309e 100644 --- a/libs/autofile/autofile_test.go +++ b/libs/autofile/autofile_test.go @@ -119,4 +119,4 @@ func TestAutoFileSize(t *testing.T) { // Cleanup _ = os.Remove(f.Name()) -} \ No newline at end of file +} diff --git a/libs/cli/flags/log_level_test.go b/libs/cli/flags/log_level_test.go index 1503ec281..c4c1707b5 100644 --- a/libs/cli/flags/log_level_test.go +++ b/libs/cli/flags/log_level_test.go @@ -51,7 +51,7 @@ func TestParseLogLevel(t *testing.T) { buf.Reset() - logger.With("module", "wire").Debug("Kingpin") + logger.With("module", "mempool").With("module", "wire").Debug("Kingpin") if have := strings.TrimSpace(buf.String()); c.expectedLogLines[0] != have { t.Errorf("\nwant '%s'\nhave '%s'\nlevel '%s'", c.expectedLogLines[0], have, c.lvl) } diff --git a/libs/log/filter.go b/libs/log/filter.go index 768c09b85..b71447ed7 100644 --- a/libs/log/filter.go +++ b/libs/log/filter.go @@ -11,9 +11,10 @@ const ( ) type filter struct { - next Logger - allowed level // XOR'd levels for default case - allowedKeyvals map[keyval]level // When key-value match, use this level + next Logger + allowed level // XOR'd levels for default case + initiallyAllowed level // XOR'd levels for initial case + allowedKeyvals map[keyval]level // When key-value match, use this level } type keyval struct { @@ -33,6 +34,7 @@ func NewFilter(next Logger, options ...Option) Logger { for _, option := range options { option(l) } + l.initiallyAllowed = l.allowed return l } @@ -76,14 +78,45 @@ func (l *filter) Error(msg string, keyvals ...interface{}) { // logger = log.NewFilter(logger, log.AllowError(), log.AllowInfoWith("module", "crypto"), log.AllowNoneWith("user", "Sam")) // logger.With("user", "Sam").With("module", "crypto").Info("Hello") # produces "I... Hello module=crypto user=Sam" func (l *filter) With(keyvals ...interface{}) Logger { + keyInAllowedKeyvals := false + for i := len(keyvals) - 2; i >= 0; i -= 2 { for kv, allowed := range l.allowedKeyvals { - if keyvals[i] == kv.key && keyvals[i+1] == kv.value { - return &filter{next: l.next.With(keyvals...), allowed: allowed, allowedKeyvals: l.allowedKeyvals} + if keyvals[i] == kv.key { + keyInAllowedKeyvals = true + // Example: + // logger = log.NewFilter(logger, log.AllowError(), log.AllowInfoWith("module", "crypto")) + // logger.With("module", "crypto") + if keyvals[i+1] == kv.value { + return &filter{ + next: l.next.With(keyvals...), + allowed: allowed, // set the desired level + allowedKeyvals: l.allowedKeyvals, + initiallyAllowed: l.initiallyAllowed, + } + } } } } - return &filter{next: l.next.With(keyvals...), allowed: l.allowed, allowedKeyvals: l.allowedKeyvals} + + // Example: + // logger = log.NewFilter(logger, log.AllowError(), log.AllowInfoWith("module", "crypto")) + // logger.With("module", "main") + if keyInAllowedKeyvals { + return &filter{ + next: l.next.With(keyvals...), + allowed: l.initiallyAllowed, // return back to initially allowed + allowedKeyvals: l.allowedKeyvals, + initiallyAllowed: l.initiallyAllowed, + } + } + + return &filter{ + next: l.next.With(keyvals...), + allowed: l.allowed, // simply continue with the current level + allowedKeyvals: l.allowedKeyvals, + initiallyAllowed: l.initiallyAllowed, + } } //-------------------------------------------------------------------------------- diff --git a/mempool/mempool.go b/mempool/mempool.go index 0bdb47140..8f70ec6c8 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -131,7 +131,6 @@ type Mempool struct { proxyMtx sync.Mutex proxyAppConn proxy.AppConnMempool txs *clist.CList // concurrent linked-list of good txs - counter int64 // simple incrementing counter height int64 // the last block Update()'d to rechecking int32 // for re-checking filtered txs on Update() recheckCursor *clist.CElement // next expected response @@ -167,7 +166,6 @@ func NewMempool( config: config, proxyAppConn: proxyAppConn, txs: clist.New(), - counter: 0, height: height, rechecking: 0, recheckCursor: nil, @@ -365,9 +363,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { postCheckErr = mem.postCheck(tx, r.CheckTx) } if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { - mem.counter++ memTx := &mempoolTx{ - counter: mem.counter, height: mem.height, gasWanted: r.CheckTx.GasWanted, tx: tx, @@ -378,7 +374,6 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { "res", r, "height", memTx.height, "total", mem.Size(), - "counter", memTx.counter, ) mem.metrics.TxSizeBytes.Observe(float64(len(tx))) mem.notifyTxsAvailable() @@ -534,12 +529,6 @@ func (mem *Mempool) Update( preCheck PreCheckFunc, postCheck PostCheckFunc, ) error { - // First, create a lookup map of txns in new txs. - txsMap := make(map[string]struct{}, len(txs)) - for _, tx := range txs { - txsMap[string(tx)] = struct{}{} - } - // Set height mem.height = height mem.notifiedTxsAvailable = false @@ -551,12 +540,18 @@ func (mem *Mempool) Update( mem.postCheck = postCheck } - // Remove transactions that are already in txs. - goodTxs := mem.filterTxs(txsMap) + // Add committed transactions to cache (if missing). + for _, tx := range txs { + _ = mem.cache.Push(tx) + } + + // Remove committed transactions. + txsLeft := mem.removeTxs(txs) + // Recheck mempool txs if any txs were committed in the block - if mem.config.Recheck && len(goodTxs) > 0 { - mem.logger.Info("Recheck txs", "numtxs", len(goodTxs), "height", height) - mem.recheckTxs(goodTxs) + if mem.config.Recheck && len(txsLeft) > 0 { + mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height) + mem.recheckTxs(txsLeft) // At this point, mem.txs are being rechecked. // mem.recheckCursor re-scans mem.txs and possibly removes some txs. // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. @@ -568,12 +563,18 @@ func (mem *Mempool) Update( return nil } -func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { - goodTxs := make([]types.Tx, 0, mem.txs.Len()) +func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx { + // Build a map for faster lookups. + txsMap := make(map[string]struct{}, len(txs)) + for _, tx := range txs { + txsMap[string(tx)] = struct{}{} + } + + txsLeft := make([]types.Tx, 0, mem.txs.Len()) for e := mem.txs.Front(); e != nil; e = e.Next() { memTx := e.Value.(*mempoolTx) - // Remove the tx if it's alredy in a block. - if _, ok := blockTxsMap[string(memTx.tx)]; ok { + // Remove the tx if it's already in a block. + if _, ok := txsMap[string(memTx.tx)]; ok { // remove from clist mem.txs.Remove(e) e.DetachPrev() @@ -581,15 +582,14 @@ func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { // NOTE: we don't remove committed txs from the cache. continue } - // Good tx! - goodTxs = append(goodTxs, memTx.tx) + txsLeft = append(txsLeft, memTx.tx) } - return goodTxs + return txsLeft } -// NOTE: pass in goodTxs because mem.txs can mutate concurrently. -func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { - if len(goodTxs) == 0 { +// NOTE: pass in txs because mem.txs can mutate concurrently. +func (mem *Mempool) recheckTxs(txs []types.Tx) { + if len(txs) == 0 { return } atomic.StoreInt32(&mem.rechecking, 1) @@ -598,7 +598,7 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { // Push txs to proxyAppConn // NOTE: resCb() may be called concurrently. - for _, tx := range goodTxs { + for _, tx := range txs { mem.proxyAppConn.CheckTxAsync(tx) } mem.proxyAppConn.FlushAsync() @@ -608,7 +608,6 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { // mempoolTx is a transaction that successfully ran type mempoolTx struct { - counter int64 // a simple incrementing counter height int64 // height that this tx had been validated in gasWanted int64 // amount of gas this tx states it will require tx types.Tx // diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index d7ab82737..15bfaa25b 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -163,6 +163,17 @@ func TestMempoolFilters(t *testing.T) { } } +func TestMempoolUpdateAddsTxsToCache(t *testing.T) { + app := kvstore.NewKVStoreApplication() + cc := proxy.NewLocalClientCreator(app) + mempool := newMempoolWithApp(cc) + mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil) + err := mempool.CheckTx([]byte{0x01}, nil) + if assert.Error(t, err) { + assert.Equal(t, ErrTxInCache, err) + } +} + func TestTxsAvailable(t *testing.T) { app := kvstore.NewKVStoreApplication() cc := proxy.NewLocalClientCreator(app) diff --git a/node/node.go b/node/node.go index bfd8d02e2..8e41dfd11 100644 --- a/node/node.go +++ b/node/node.go @@ -3,7 +3,6 @@ package node import ( "bytes" "context" - "errors" "fmt" "net" "net/http" @@ -11,11 +10,12 @@ import ( "strings" "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" abci "github.com/tendermint/tendermint/abci/types" bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" @@ -220,25 +220,14 @@ func NewNode(config *cfg.Config, ) } - // If an address is provided, listen on the socket for a - // connection from an external signing process. if config.PrivValidatorListenAddr != "" { - var ( - // TODO: persist this key so external signer - // can actually authenticate us - privKey = ed25519.GenPrivKey() - pvsc = privval.NewTCPVal( - logger.With("module", "privval"), - config.PrivValidatorListenAddr, - privKey, - ) - ) - - if err := pvsc.Start(); err != nil { - return nil, fmt.Errorf("Error starting private validator client: %v", err) + // If an address is provided, listen on the socket for a connection from an + // external signing process. + // FIXME: we should start services inside OnStart + privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger) + if err != nil { + return nil, errors.Wrap(err, "Error with private validator socket client") } - - privValidator = pvsc } // Decide whether to fast-sync or not @@ -600,10 +589,8 @@ func (n *Node) OnStop() { } } - if pvsc, ok := n.privValidator.(*privval.TCPVal); ok { - if err := pvsc.Stop(); err != nil { - n.Logger.Error("Error stopping priv validator socket client", "err", err) - } + if pvsc, ok := n.privValidator.(cmn.Service); ok { + pvsc.Stop() } if n.prometheusSrv != nil { @@ -857,6 +844,36 @@ func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) { db.SetSync(genesisDocKey, bytes) } +func createAndStartPrivValidatorSocketClient( + listenAddr string, + logger log.Logger, +) (types.PrivValidator, error) { + var pvsc types.PrivValidator + + protocol, address := cmn.ProtocolAndAddress(listenAddr) + switch protocol { + case "unix": + pvsc = privval.NewIPCVal(logger.With("module", "privval"), address) + case "tcp": + // TODO: persist this key so external signer + // can actually authenticate us + pvsc = privval.NewTCPVal(logger.With("module", "privval"), listenAddr, ed25519.GenPrivKey()) + default: + return nil, fmt.Errorf( + "Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s", + protocol, + ) + } + + if pvsc, ok := pvsc.(cmn.Service); ok { + if err := pvsc.Start(); err != nil { + return nil, errors.Wrap(err, "failed to start") + } + } + + return pvsc, nil +} + // splitAndTrimEmpty slices s into all subslices separated by sep and returns a // slice of the string s with all leading and trailing Unicode code points // contained in cutset removed. If sep is empty, SplitAndTrim splits after each diff --git a/node/node_test.go b/node/node_test.go index 3a33e6bbb..e675eb9a8 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -3,23 +3,26 @@ package node import ( "context" "fmt" + "net" "os" "syscall" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/example/kvstore" + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/crypto/ed25519" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/privval" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/version" - - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/types" - tmtime "github.com/tendermint/tendermint/types/time" + "github.com/tendermint/tendermint/version" ) func TestNodeStartStop(t *testing.T) { @@ -27,17 +30,16 @@ func TestNodeStartStop(t *testing.T) { // create & start node n, err := DefaultNewNode(config, log.TestingLogger()) - assert.NoError(t, err, "expected no err on DefaultNewNode") - err1 := n.Start() - if err1 != nil { - t.Error(err1) - } + require.NoError(t, err) + err = n.Start() + require.NoError(t, err) + t.Logf("Started node %v", n.sw.NodeInfo()) // wait for the node to produce a block blockCh := make(chan interface{}) err = n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock, blockCh) - assert.NoError(t, err) + require.NoError(t, err) select { case <-blockCh: case <-time.After(10 * time.Second): @@ -89,7 +91,7 @@ func TestNodeDelayedStop(t *testing.T) { // create & start node n, err := DefaultNewNode(config, log.TestingLogger()) n.GenesisDoc().GenesisTime = now.Add(5 * time.Second) - assert.NoError(t, err) + require.NoError(t, err) n.Start() startTime := tmtime.Now() @@ -101,7 +103,7 @@ func TestNodeSetAppVersion(t *testing.T) { // create & start node n, err := DefaultNewNode(config, log.TestingLogger()) - assert.NoError(t, err, "expected no err on DefaultNewNode") + require.NoError(t, err) // default config uses the kvstore app var appVersion version.Protocol = kvstore.ProtocolVersion @@ -113,3 +115,80 @@ func TestNodeSetAppVersion(t *testing.T) { // check version is set in node info assert.Equal(t, n.nodeInfo.(p2p.DefaultNodeInfo).ProtocolVersion.App, appVersion) } + +func TestNodeSetPrivValTCP(t *testing.T) { + addr := "tcp://" + testFreeAddr(t) + + config := cfg.ResetTestRoot("node_priv_val_tcp_test") + config.BaseConfig.PrivValidatorListenAddr = addr + + rs := privval.NewRemoteSigner( + log.TestingLogger(), + config.ChainID(), + addr, + types.NewMockPV(), + ed25519.GenPrivKey(), + ) + privval.RemoteSignerConnDeadline(5 * time.Millisecond)(rs) + go func() { + err := rs.Start() + if err != nil { + panic(err) + } + }() + defer rs.Stop() + + n, err := DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + assert.IsType(t, &privval.TCPVal{}, n.PrivValidator()) +} + +// address without a protocol must result in error +func TestPrivValidatorListenAddrNoProtocol(t *testing.T) { + addrNoPrefix := testFreeAddr(t) + + config := cfg.ResetTestRoot("node_priv_val_tcp_test") + config.BaseConfig.PrivValidatorListenAddr = addrNoPrefix + + _, err := DefaultNewNode(config, log.TestingLogger()) + assert.Error(t, err) +} + +func TestNodeSetPrivValIPC(t *testing.T) { + tmpfile := "/tmp/kms." + cmn.RandStr(6) + ".sock" + defer os.Remove(tmpfile) // clean up + + config := cfg.ResetTestRoot("node_priv_val_tcp_test") + config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile + + rs := privval.NewIPCRemoteSigner( + log.TestingLogger(), + config.ChainID(), + tmpfile, + types.NewMockPV(), + ) + privval.IPCRemoteSignerConnDeadline(3 * time.Second)(rs) + + done := make(chan struct{}) + go func() { + defer close(done) + n, err := DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + assert.IsType(t, &privval.IPCVal{}, n.PrivValidator()) + }() + + err := rs.Start() + require.NoError(t, err) + defer rs.Stop() + + <-done +} + +// testFreeAddr claims a free port so we don't block on listener being ready. +func testFreeAddr(t *testing.T) string { + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer ln.Close() + + return fmt.Sprintf("127.0.0.1:%d", ln.Addr().(*net.TCPAddr).Port) +} diff --git a/p2p/node_info.go b/p2p/node_info.go index c36d98d9c..99daf7c43 100644 --- a/p2p/node_info.go +++ b/p2p/node_info.go @@ -230,3 +230,31 @@ func (info DefaultNodeInfo) NetAddress() *NetAddress { } return netAddr } + +//----------------------------------------------------------- +// These methods are for Protobuf Compatibility + +// Size returns the size of the amino encoding, in bytes. +func (info *DefaultNodeInfo) Size() int { + bs, _ := info.Marshal() + return len(bs) +} + +// Marshal returns the amino encoding. +func (info *DefaultNodeInfo) Marshal() ([]byte, error) { + return cdc.MarshalBinaryBare(info) +} + +// MarshalTo calls Marshal and copies to the given buffer. +func (info *DefaultNodeInfo) MarshalTo(data []byte) (int, error) { + bs, err := info.Marshal() + if err != nil { + return -1, err + } + return copy(data, bs), nil +} + +// Unmarshal deserializes from amino encoded form. +func (info *DefaultNodeInfo) Unmarshal(bs []byte) error { + return cdc.UnmarshalBinaryBare(bs, info) +} diff --git a/privval/ipc_server.go b/privval/ipc_server.go index d3907cbdb..ba9574771 100644 --- a/privval/ipc_server.go +++ b/privval/ipc_server.go @@ -69,6 +69,7 @@ func (rs *IPCRemoteSigner) OnStart() error { for { conn, err := rs.listener.AcceptUnix() if err != nil { + rs.Logger.Error("AcceptUnix", "err", err) return } go rs.handleConnection(conn) diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 217971fda..b07b74a39 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -370,20 +370,27 @@ func TestTxSearch(t *testing.T) { } // query by height - result, err = c.TxSearch(fmt.Sprintf("tx.height >= %d", txHeight), true, 1, 30) + result, err = c.TxSearch(fmt.Sprintf("tx.height=%d", txHeight), true, 1, 30) require.Nil(t, err, "%+v", err) require.Len(t, result.Txs, 1) - // we query for non existing tx + // query for non existing tx result, err = c.TxSearch(fmt.Sprintf("tx.hash='%X'", anotherTxHash), false, 1, 30) require.Nil(t, err, "%+v", err) require.Len(t, result.Txs, 0) - // we query using a tag (see kvstore application) + // query using a tag (see kvstore application) result, err = c.TxSearch("app.creator='Cosmoshi Netowoko'", false, 1, 30) require.Nil(t, err, "%+v", err) if len(result.Txs) == 0 { t.Fatal("expected a lot of transactions") } + + // query using a tag (see kvstore application) and height + result, err = c.TxSearch("app.creator='Cosmoshi Netowoko' AND tx.height<10000", true, 1, 30) + require.Nil(t, err, "%+v", err) + if len(result.Txs) == 0 { + t.Fatal("expected a lot of transactions") + } } } diff --git a/rpc/core/events.go b/rpc/core/events.go index 6f679e33d..e7456f351 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -2,6 +2,7 @@ package core import ( "context" + "fmt" "github.com/pkg/errors" @@ -104,7 +105,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri go func() { for event := range ch { tmResult := &ctypes.ResultEvent{query, event.(tmtypes.TMEventData)} - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Codec(), wsCtx.Request.ID+"#event", tmResult)) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Codec(), rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)), tmResult)) } }() diff --git a/rpc/lib/client/http_client.go b/rpc/lib/client/http_client.go index bd440289b..21be5fe0c 100644 --- a/rpc/lib/client/http_client.go +++ b/rpc/lib/client/http_client.go @@ -99,7 +99,7 @@ func NewJSONRPCClient(remote string) *JSONRPCClient { } func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { - request, err := types.MapToRequest(c.cdc, "jsonrpc-client", method, params) + request, err := types.MapToRequest(c.cdc, types.JSONRPCStringID("jsonrpc-client"), method, params) if err != nil { return nil, err } diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 6da996e2c..b183118d9 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -214,7 +214,7 @@ func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { // Call the given method. See Send description. func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error { - request, err := types.MapToRequest(c.cdc, "ws-client", method, params) + request, err := types.MapToRequest(c.cdc, types.JSONRPCStringID("ws-client"), method, params) if err != nil { return err } @@ -224,7 +224,7 @@ func (c *WSClient) Call(ctx context.Context, method string, params map[string]in // CallWithArrayParams the given method with params in a form of array. See // Send description. func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error { - request, err := types.ArrayToRequest(c.cdc, "ws-client", method, params) + request, err := types.ArrayToRequest(c.cdc, types.JSONRPCStringID("ws-client"), method, params) if err != nil { return err } diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 3ec5f81e3..edab88fe5 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -103,7 +103,7 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo return func(w http.ResponseWriter, r *http.Request) { b, err := ioutil.ReadAll(r.Body) if err != nil { - WriteRPCResponseHTTP(w, types.RPCInvalidRequestError("", errors.Wrap(err, "Error reading request body"))) + WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(types.JSONRPCStringID(""), errors.Wrap(err, "Error reading request body"))) return } // if its an empty request (like from a browser), @@ -116,12 +116,12 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo var request types.RPCRequest err = json.Unmarshal(b, &request) if err != nil { - WriteRPCResponseHTTP(w, types.RPCParseError("", errors.Wrap(err, "Error unmarshalling request"))) + WriteRPCResponseHTTP(w, types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "Error unmarshalling request"))) return } // A Notification is a Request object without an "id" member. // The Server MUST NOT reply to a Notification, including those that are within a batch request. - if request.ID == "" { + if request.ID == types.JSONRPCStringID("") { logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)") return } @@ -255,7 +255,7 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func // Exception for websocket endpoints if rpcFunc.ws { return func(w http.ResponseWriter, r *http.Request) { - WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError("")) + WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(types.JSONRPCStringID(""))) } } // All other endpoints @@ -263,17 +263,17 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func logger.Debug("HTTP HANDLER", "req", r) args, err := httpParamsToArgs(rpcFunc, cdc, r) if err != nil { - WriteRPCResponseHTTP(w, types.RPCInvalidParamsError("", errors.Wrap(err, "Error converting http params to arguments"))) + WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(types.JSONRPCStringID(""), errors.Wrap(err, "Error converting http params to arguments"))) return } returns := rpcFunc.f.Call(args) logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) result, err := unreflectResult(returns) if err != nil { - WriteRPCResponseHTTP(w, types.RPCInternalError("", err)) + WriteRPCResponseHTTP(w, types.RPCInternalError(types.JSONRPCStringID(""), err)) return } - WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(cdc, "", result)) + WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(cdc, types.JSONRPCStringID(""), result)) } } @@ -580,7 +580,7 @@ func (wsc *wsConnection) readRoutine() { err = fmt.Errorf("WSJSONRPC: %v", r) } wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack())) - wsc.WriteRPCResponse(types.RPCInternalError("unknown", err)) + wsc.WriteRPCResponse(types.RPCInternalError(types.JSONRPCStringID("unknown"), err)) go wsc.readRoutine() } else { wsc.baseConn.Close() // nolint: errcheck @@ -615,13 +615,13 @@ func (wsc *wsConnection) readRoutine() { var request types.RPCRequest err = json.Unmarshal(in, &request) if err != nil { - wsc.WriteRPCResponse(types.RPCParseError("", errors.Wrap(err, "Error unmarshaling request"))) + wsc.WriteRPCResponse(types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "Error unmarshaling request"))) continue } // A Notification is a Request object without an "id" member. // The Server MUST NOT reply to a Notification, including those that are within a batch request. - if request.ID == "" { + if request.ID == types.JSONRPCStringID("") { wsc.Logger.Debug("WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)") continue } diff --git a/rpc/lib/server/handlers_test.go b/rpc/lib/server/handlers_test.go index 6004959ae..b1d3c7888 100644 --- a/rpc/lib/server/handlers_test.go +++ b/rpc/lib/server/handlers_test.go @@ -47,21 +47,22 @@ func statusOK(code int) bool { return code >= 200 && code <= 299 } func TestRPCParams(t *testing.T) { mux := testMux() tests := []struct { - payload string - wantErr string + payload string + wantErr string + expectedId interface{} }{ // bad - {`{"jsonrpc": "2.0", "id": "0"}`, "Method not found"}, - {`{"jsonrpc": "2.0", "method": "y", "id": "0"}`, "Method not found"}, - {`{"method": "c", "id": "0", "params": a}`, "invalid character"}, - {`{"method": "c", "id": "0", "params": ["a"]}`, "got 1"}, - {`{"method": "c", "id": "0", "params": ["a", "b"]}`, "invalid character"}, - {`{"method": "c", "id": "0", "params": [1, 1]}`, "of type string"}, + {`{"jsonrpc": "2.0", "id": "0"}`, "Method not found", types.JSONRPCStringID("0")}, + {`{"jsonrpc": "2.0", "method": "y", "id": "0"}`, "Method not found", types.JSONRPCStringID("0")}, + {`{"method": "c", "id": "0", "params": a}`, "invalid character", types.JSONRPCStringID("")}, // id not captured in JSON parsing failures + {`{"method": "c", "id": "0", "params": ["a"]}`, "got 1", types.JSONRPCStringID("0")}, + {`{"method": "c", "id": "0", "params": ["a", "b"]}`, "invalid character", types.JSONRPCStringID("0")}, + {`{"method": "c", "id": "0", "params": [1, 1]}`, "of type string", types.JSONRPCStringID("0")}, // good - {`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": null}`, ""}, - {`{"method": "c", "id": "0", "params": {}}`, ""}, - {`{"method": "c", "id": "0", "params": ["a", "10"]}`, ""}, + {`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": null}`, "", types.JSONRPCStringID("0")}, + {`{"method": "c", "id": "0", "params": {}}`, "", types.JSONRPCStringID("0")}, + {`{"method": "c", "id": "0", "params": ["a", "10"]}`, "", types.JSONRPCStringID("0")}, } for i, tt := range tests { @@ -80,7 +81,7 @@ func TestRPCParams(t *testing.T) { recv := new(types.RPCResponse) assert.Nil(t, json.Unmarshal(blob, recv), "#%d: expecting successful parsing of an RPCResponse:\nblob: %s", i, blob) assert.NotEqual(t, recv, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i) - + assert.Equal(t, tt.expectedId, recv.ID, "#%d: expected ID not matched in RPCResponse", i) if tt.wantErr == "" { assert.Nil(t, recv.Error, "#%d: not expecting an error", i) } else { @@ -91,9 +92,56 @@ func TestRPCParams(t *testing.T) { } } +func TestJSONRPCID(t *testing.T) { + mux := testMux() + tests := []struct { + payload string + wantErr bool + expectedId interface{} + }{ + // good id + {`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": ["a", "10"]}`, false, types.JSONRPCStringID("0")}, + {`{"jsonrpc": "2.0", "method": "c", "id": "abc", "params": ["a", "10"]}`, false, types.JSONRPCStringID("abc")}, + {`{"jsonrpc": "2.0", "method": "c", "id": 0, "params": ["a", "10"]}`, false, types.JSONRPCIntID(0)}, + {`{"jsonrpc": "2.0", "method": "c", "id": 1, "params": ["a", "10"]}`, false, types.JSONRPCIntID(1)}, + {`{"jsonrpc": "2.0", "method": "c", "id": 1.3, "params": ["a", "10"]}`, false, types.JSONRPCIntID(1)}, + {`{"jsonrpc": "2.0", "method": "c", "id": -1, "params": ["a", "10"]}`, false, types.JSONRPCIntID(-1)}, + {`{"jsonrpc": "2.0", "method": "c", "id": null, "params": ["a", "10"]}`, false, nil}, + + // bad id + {`{"jsonrpc": "2.0", "method": "c", "id": {}, "params": ["a", "10"]}`, true, nil}, + {`{"jsonrpc": "2.0", "method": "c", "id": [], "params": ["a", "10"]}`, true, nil}, + } + + for i, tt := range tests { + req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload)) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + res := rec.Result() + // Always expecting back a JSONRPCResponse + assert.True(t, statusOK(res.StatusCode), "#%d: should always return 2XX", i) + blob, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("#%d: err reading body: %v", i, err) + continue + } + + recv := new(types.RPCResponse) + err = json.Unmarshal(blob, recv) + assert.Nil(t, err, "#%d: expecting successful parsing of an RPCResponse:\nblob: %s", i, blob) + if !tt.wantErr { + assert.NotEqual(t, recv, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i) + assert.Equal(t, tt.expectedId, recv.ID, "#%d: expected ID not matched in RPCResponse", i) + assert.Nil(t, recv.Error, "#%d: not expecting an error", i) + } else { + assert.True(t, recv.Error.Code < 0, "#%d: not expecting a positive JSONRPC code", i) + } + } +} + func TestRPCNotification(t *testing.T) { mux := testMux() - body := strings.NewReader(`{"jsonrpc": "2.0"}`) + body := strings.NewReader(`{"jsonrpc": "2.0", "id": ""}`) req, _ := http.NewRequest("POST", "http://localhost/", body) rec := httptest.NewRecorder() mux.ServeHTTP(rec, req) @@ -134,7 +182,7 @@ func TestWebsocketManagerHandler(t *testing.T) { } // check basic functionality works - req, err := types.MapToRequest(amino.NewCodec(), "TestWebsocketManager", "c", map[string]interface{}{"s": "a", "i": 10}) + req, err := types.MapToRequest(amino.NewCodec(), types.JSONRPCStringID("TestWebsocketManager"), "c", map[string]interface{}{"s": "a", "i": 10}) require.NoError(t, err) err = c.WriteJSON(req) require.NoError(t, err) diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index 1fd422a9b..9db69b6ff 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -132,7 +132,7 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler "Panic in RPC HTTP handler", "err", e, "stack", string(debug.Stack()), ) - WriteRPCResponseHTTPError(rww, http.StatusInternalServerError, types.RPCInternalError("", e.(error))) + WriteRPCResponseHTTPError(rww, http.StatusInternalServerError, types.RPCInternalError(types.JSONRPCStringID(""), e.(error))) } } diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index fe9a92531..e0753a03b 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "reflect" "strings" "github.com/pkg/errors" @@ -13,17 +14,75 @@ import ( tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ) +// a wrapper to emulate a sum type: jsonrpcid = string | int +// TODO: refactor when Go 2.0 arrives https://github.com/golang/go/issues/19412 +type jsonrpcid interface { + isJSONRPCID() +} + +// JSONRPCStringID a wrapper for JSON-RPC string IDs +type JSONRPCStringID string + +func (JSONRPCStringID) isJSONRPCID() {} + +// JSONRPCIntID a wrapper for JSON-RPC integer IDs +type JSONRPCIntID int + +func (JSONRPCIntID) isJSONRPCID() {} + +func idFromInterface(idInterface interface{}) (jsonrpcid, error) { + switch id := idInterface.(type) { + case string: + return JSONRPCStringID(id), nil + case float64: + // json.Unmarshal uses float64 for all numbers + // (https://golang.org/pkg/encoding/json/#Unmarshal), + // but the JSONRPC2.0 spec says the id SHOULD NOT contain + // decimals - so we truncate the decimals here. + return JSONRPCIntID(int(id)), nil + default: + typ := reflect.TypeOf(id) + return nil, fmt.Errorf("JSON-RPC ID (%v) is of unknown type (%v)", id, typ) + } +} + //---------------------------------------- // REQUEST type RPCRequest struct { JSONRPC string `json:"jsonrpc"` - ID string `json:"id"` + ID jsonrpcid `json:"id"` Method string `json:"method"` Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{} } -func NewRPCRequest(id string, method string, params json.RawMessage) RPCRequest { +// UnmarshalJSON custom JSON unmarshalling due to jsonrpcid being string or int +func (request *RPCRequest) UnmarshalJSON(data []byte) error { + unsafeReq := &struct { + JSONRPC string `json:"jsonrpc"` + ID interface{} `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{} + }{} + err := json.Unmarshal(data, &unsafeReq) + if err != nil { + return err + } + request.JSONRPC = unsafeReq.JSONRPC + request.Method = unsafeReq.Method + request.Params = unsafeReq.Params + if unsafeReq.ID == nil { + return nil + } + id, err := idFromInterface(unsafeReq.ID) + if err != nil { + return err + } + request.ID = id + return nil +} + +func NewRPCRequest(id jsonrpcid, method string, params json.RawMessage) RPCRequest { return RPCRequest{ JSONRPC: "2.0", ID: id, @@ -36,7 +95,7 @@ func (req RPCRequest) String() string { return fmt.Sprintf("[%s %s]", req.ID, req.Method) } -func MapToRequest(cdc *amino.Codec, id string, method string, params map[string]interface{}) (RPCRequest, error) { +func MapToRequest(cdc *amino.Codec, id jsonrpcid, method string, params map[string]interface{}) (RPCRequest, error) { var params_ = make(map[string]json.RawMessage, len(params)) for name, value := range params { valueJSON, err := cdc.MarshalJSON(value) @@ -53,7 +112,7 @@ func MapToRequest(cdc *amino.Codec, id string, method string, params map[string] return request, nil } -func ArrayToRequest(cdc *amino.Codec, id string, method string, params []interface{}) (RPCRequest, error) { +func ArrayToRequest(cdc *amino.Codec, id jsonrpcid, method string, params []interface{}) (RPCRequest, error) { var params_ = make([]json.RawMessage, len(params)) for i, value := range params { valueJSON, err := cdc.MarshalJSON(value) @@ -89,12 +148,38 @@ func (err RPCError) Error() string { type RPCResponse struct { JSONRPC string `json:"jsonrpc"` - ID string `json:"id"` + ID jsonrpcid `json:"id"` Result json.RawMessage `json:"result,omitempty"` Error *RPCError `json:"error,omitempty"` } -func NewRPCSuccessResponse(cdc *amino.Codec, id string, res interface{}) RPCResponse { +// UnmarshalJSON custom JSON unmarshalling due to jsonrpcid being string or int +func (response *RPCResponse) UnmarshalJSON(data []byte) error { + unsafeResp := &struct { + JSONRPC string `json:"jsonrpc"` + ID interface{} `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error *RPCError `json:"error,omitempty"` + }{} + err := json.Unmarshal(data, &unsafeResp) + if err != nil { + return err + } + response.JSONRPC = unsafeResp.JSONRPC + response.Error = unsafeResp.Error + response.Result = unsafeResp.Result + if unsafeResp.ID == nil { + return nil + } + id, err := idFromInterface(unsafeResp.ID) + if err != nil { + return err + } + response.ID = id + return nil +} + +func NewRPCSuccessResponse(cdc *amino.Codec, id jsonrpcid, res interface{}) RPCResponse { var rawMsg json.RawMessage if res != nil { @@ -109,7 +194,7 @@ func NewRPCSuccessResponse(cdc *amino.Codec, id string, res interface{}) RPCResp return RPCResponse{JSONRPC: "2.0", ID: id, Result: rawMsg} } -func NewRPCErrorResponse(id string, code int, msg string, data string) RPCResponse { +func NewRPCErrorResponse(id jsonrpcid, code int, msg string, data string) RPCResponse { return RPCResponse{ JSONRPC: "2.0", ID: id, @@ -124,27 +209,27 @@ func (resp RPCResponse) String() string { return fmt.Sprintf("[%s %s]", resp.ID, resp.Error) } -func RPCParseError(id string, err error) RPCResponse { +func RPCParseError(id jsonrpcid, err error) RPCResponse { return NewRPCErrorResponse(id, -32700, "Parse error. Invalid JSON", err.Error()) } -func RPCInvalidRequestError(id string, err error) RPCResponse { +func RPCInvalidRequestError(id jsonrpcid, err error) RPCResponse { return NewRPCErrorResponse(id, -32600, "Invalid Request", err.Error()) } -func RPCMethodNotFoundError(id string) RPCResponse { +func RPCMethodNotFoundError(id jsonrpcid) RPCResponse { return NewRPCErrorResponse(id, -32601, "Method not found", "") } -func RPCInvalidParamsError(id string, err error) RPCResponse { +func RPCInvalidParamsError(id jsonrpcid, err error) RPCResponse { return NewRPCErrorResponse(id, -32602, "Invalid params", err.Error()) } -func RPCInternalError(id string, err error) RPCResponse { +func RPCInternalError(id jsonrpcid, err error) RPCResponse { return NewRPCErrorResponse(id, -32603, "Internal error", err.Error()) } -func RPCServerError(id string, err error) RPCResponse { +func RPCServerError(id jsonrpcid, err error) RPCResponse { return NewRPCErrorResponse(id, -32000, "Server error", err.Error()) } diff --git a/rpc/lib/types/types_test.go b/rpc/lib/types/types_test.go index 9dd1b7a18..3e8851326 100644 --- a/rpc/lib/types/types_test.go +++ b/rpc/lib/types/types_test.go @@ -15,24 +15,57 @@ type SampleResult struct { Value string } +type responseTest struct { + id jsonrpcid + expected string +} + +var responseTests = []responseTest{ + {JSONRPCStringID("1"), `"1"`}, + {JSONRPCStringID("alphabet"), `"alphabet"`}, + {JSONRPCStringID(""), `""`}, + {JSONRPCStringID("àáâ"), `"àáâ"`}, + {JSONRPCIntID(-1), "-1"}, + {JSONRPCIntID(0), "0"}, + {JSONRPCIntID(1), "1"}, + {JSONRPCIntID(100), "100"}, +} + func TestResponses(t *testing.T) { assert := assert.New(t) cdc := amino.NewCodec() + for _, tt := range responseTests { + jsonid := tt.id + a := NewRPCSuccessResponse(cdc, jsonid, &SampleResult{"hello"}) + b, _ := json.Marshal(a) + s := fmt.Sprintf(`{"jsonrpc":"2.0","id":%v,"result":{"Value":"hello"}}`, tt.expected) + assert.Equal(string(s), string(b)) - a := NewRPCSuccessResponse(cdc, "1", &SampleResult{"hello"}) - b, _ := json.Marshal(a) - s := `{"jsonrpc":"2.0","id":"1","result":{"Value":"hello"}}` - assert.Equal(string(s), string(b)) + d := RPCParseError(jsonid, errors.New("Hello world")) + e, _ := json.Marshal(d) + f := fmt.Sprintf(`{"jsonrpc":"2.0","id":%v,"error":{"code":-32700,"message":"Parse error. Invalid JSON","data":"Hello world"}}`, tt.expected) + assert.Equal(string(f), string(e)) - d := RPCParseError("1", errors.New("Hello world")) - e, _ := json.Marshal(d) - f := `{"jsonrpc":"2.0","id":"1","error":{"code":-32700,"message":"Parse error. Invalid JSON","data":"Hello world"}}` - assert.Equal(string(f), string(e)) + g := RPCMethodNotFoundError(jsonid) + h, _ := json.Marshal(g) + i := fmt.Sprintf(`{"jsonrpc":"2.0","id":%v,"error":{"code":-32601,"message":"Method not found"}}`, tt.expected) + assert.Equal(string(h), string(i)) + } +} - g := RPCMethodNotFoundError("2") - h, _ := json.Marshal(g) - i := `{"jsonrpc":"2.0","id":"2","error":{"code":-32601,"message":"Method not found"}}` - assert.Equal(string(h), string(i)) +func TestUnmarshallResponses(t *testing.T) { + assert := assert.New(t) + cdc := amino.NewCodec() + for _, tt := range responseTests { + response := &RPCResponse{} + err := json.Unmarshal([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","id":%v,"result":{"Value":"hello"}}`, tt.expected)), response) + assert.Nil(err) + a := NewRPCSuccessResponse(cdc, tt.id, &SampleResult{"hello"}) + assert.Equal(*response, a) + } + response := &RPCResponse{} + err := json.Unmarshal([]byte(`{"jsonrpc":"2.0","id":true,"result":{"Value":"hello"}}`), response) + assert.NotNil(err) } func TestRPCError(t *testing.T) { diff --git a/state/execution.go b/state/execution.go index 9aa714ebd..b7c38f418 100644 --- a/state/execution.go +++ b/state/execution.go @@ -226,8 +226,9 @@ func execBlockOnProxyApp( commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB) - // Begin block. - _, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{ + // Begin block + var err error + abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{ Hash: block.Hash(), Header: types.TM2PB.Header(&block.Header), LastCommitInfo: commitInfo, @@ -417,8 +418,16 @@ func updateState( // Fire TxEvent for every tx. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) { - eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) - eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) + eventBus.PublishEventNewBlock(types.EventDataNewBlock{ + Block: block, + ResultBeginBlock: *abciResponses.BeginBlock, + ResultEndBlock: *abciResponses.EndBlock, + }) + eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ + Header: block.Header, + ResultBeginBlock: *abciResponses.BeginBlock, + ResultEndBlock: *abciResponses.EndBlock, + }) for i, tx := range block.Data.Txs { eventBus.PublishEventTx(types.EventDataTx{types.TxResult{ diff --git a/state/state.go b/state/state.go index 0dbd718da..451d65442 100644 --- a/state/state.go +++ b/state/state.go @@ -64,7 +64,8 @@ type State struct { // Validators are persisted to the database separately every time they change, // so we can query for historical validator sets. // Note that if s.LastBlockHeight causes a valset change, - // we set s.LastHeightValidatorsChanged = s.LastBlockHeight + 1 + // we set s.LastHeightValidatorsChanged = s.LastBlockHeight + 1 + 1 + // Extra +1 due to nextValSet delay. NextValidators *types.ValidatorSet Validators *types.ValidatorSet LastValidators *types.ValidatorSet diff --git a/state/state_test.go b/state/state_test.go index 17293f6fe..50346025e 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -3,9 +3,10 @@ package state import ( "bytes" "fmt" - "github.com/tendermint/tendermint/libs/log" "testing" + "github.com/tendermint/tendermint/libs/log" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" @@ -260,6 +261,27 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) { } } +func TestStoreLoadValidatorsIncrementsAccum(t *testing.T) { + const valSetSize = 2 + tearDown, stateDB, state := setupTestCase(t) + state.Validators = genValSet(valSetSize) + state.NextValidators = state.Validators.CopyIncrementAccum(1) + SaveState(stateDB, state) + defer tearDown(t) + + nextHeight := state.LastBlockHeight + 1 + + v0, err := LoadValidators(stateDB, nextHeight) + assert.Nil(t, err) + acc0 := v0.Validators[0].Accum + + v1, err := LoadValidators(stateDB, nextHeight+1) + assert.Nil(t, err) + acc1 := v1.Validators[0].Accum + + assert.NotEqual(t, acc1, acc0, "expected Accum value to change between heights") +} + // TestValidatorChangesSaveLoad tests saving and loading a validator set with // changes. func TestManyValidatorChangesSaveLoad(t *testing.T) { diff --git a/state/store.go b/state/store.go index 086dcdf5a..eb850fa7f 100644 --- a/state/store.go +++ b/state/store.go @@ -89,7 +89,9 @@ func saveState(db dbm.DB, state State, key []byte) { nextHeight := state.LastBlockHeight + 1 // If first block, save validators for block 1. if nextHeight == 1 { - lastHeightVoteChanged := int64(1) // Due to Tendermint validator set changes being delayed 1 block. + // This extra logic due to Tendermint validator set changes being delayed 1 block. + // It may get overwritten due to InitChain validator updates. + lastHeightVoteChanged := int64(1) saveValidatorsInfo(db, nextHeight, lastHeightVoteChanged, state.Validators) } // Save next validators. @@ -105,8 +107,9 @@ func saveState(db dbm.DB, state State, key []byte) { // of the various ABCI calls during block processing. // It is persisted to disk for each height before calling Commit. type ABCIResponses struct { - DeliverTx []*abci.ResponseDeliverTx - EndBlock *abci.ResponseEndBlock + DeliverTx []*abci.ResponseDeliverTx + EndBlock *abci.ResponseEndBlock + BeginBlock *abci.ResponseBeginBlock } // NewABCIResponses returns a new ABCIResponses @@ -191,12 +194,14 @@ func LoadValidators(db dbm.DB, height int64) (*types.ValidatorSet, error) { ), ) } + valInfo2.ValidatorSet.IncrementAccum(int(height - valInfo.LastHeightChanged)) // mutate valInfo = valInfo2 } return valInfo.ValidatorSet, nil } +// CONTRACT: Returned ValidatorsInfo can be mutated. func loadValidatorsInfo(db dbm.DB, height int64) *ValidatorsInfo { buf := db.Get(calcValidatorsKey(height)) if len(buf) == 0 { @@ -215,18 +220,22 @@ func loadValidatorsInfo(db dbm.DB, height int64) *ValidatorsInfo { return v } -// saveValidatorsInfo persists the validator set for the next block to disk. +// saveValidatorsInfo persists the validator set. +// `height` is the effective height for which the validator is responsible for signing. // It should be called from s.Save(), right before the state itself is persisted. // If the validator set did not change after processing the latest block, // only the last height for which the validators changed is persisted. -func saveValidatorsInfo(db dbm.DB, nextHeight, changeHeight int64, valSet *types.ValidatorSet) { +func saveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *types.ValidatorSet) { + if lastHeightChanged > height { + panic("LastHeightChanged cannot be greater than ValidatorsInfo height") + } valInfo := &ValidatorsInfo{ - LastHeightChanged: changeHeight, + LastHeightChanged: lastHeightChanged, } - if changeHeight == nextHeight { + if lastHeightChanged == height { valInfo.ValidatorSet = valSet } - db.Set(calcValidatorsKey(nextHeight), valInfo.Bytes()) + db.Set(calcValidatorsKey(height), valInfo.Bytes()) } //----------------------------------------------------------------------------- diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index 363ab1193..a5913d5b7 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -207,8 +207,11 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { i++ } - // sort by height by default + // sort by height & index by default sort.Slice(results, func(i, j int) bool { + if results[i].Height == results[j].Height { + return results[i].Index < results[j].Index + } return results[i].Height < results[j].Height }) @@ -225,9 +228,10 @@ func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool) return } +// lookForHeight returns a height if there is an "height=X" condition. func lookForHeight(conditions []query.Condition) (height int64) { for _, c := range conditions { - if c.Tag == types.TxHeightKey { + if c.Tag == types.TxHeightKey && c.Op == query.OpEqual { return c.Operand.(int64) } } @@ -408,9 +412,9 @@ LOOP: func startKey(c query.Condition, height int64) []byte { var key string if height > 0 { - key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height) + key = fmt.Sprintf("%s/%v/%d/", c.Tag, c.Operand, height) } else { - key = fmt.Sprintf("%s/%v", c.Tag, c.Operand) + key = fmt.Sprintf("%s/%v/", c.Tag, c.Operand) } return []byte(key) } diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 78a76168d..7cf16dc52 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -73,6 +73,8 @@ func TestTxSearch(t *testing.T) { {"account.number = 1 AND account.owner = 'Ivan'", 1}, // search by exact match (two tags) {"account.number = 1 AND account.owner = 'Vlad'", 0}, + // search using a prefix of the stored value + {"account.owner = 'Iv'", 0}, // search by range {"account.number >= 1 AND account.number <= 5", 1}, // search by range (lower bound) @@ -133,6 +135,7 @@ func TestTxSearchMultipleTxs(t *testing.T) { }) txResult.Tx = types.Tx("Bob's account") txResult.Height = 2 + txResult.Index = 1 err := indexer.Index(txResult) require.NoError(t, err) @@ -142,14 +145,26 @@ func TestTxSearchMultipleTxs(t *testing.T) { }) txResult2.Tx = types.Tx("Alice's account") txResult2.Height = 1 + txResult2.Index = 2 + err = indexer.Index(txResult2) require.NoError(t, err) + // indexed third (to test the order of transactions) + txResult3 := txResultWithTags([]cmn.KVPair{ + {Key: []byte("account.number"), Value: []byte("3")}, + }) + txResult3.Tx = types.Tx("Jack's account") + txResult3.Height = 1 + txResult3.Index = 1 + err = indexer.Index(txResult3) + require.NoError(t, err) + results, err := indexer.Search(query.MustParse("account.number >= 1")) assert.NoError(t, err) - require.Len(t, results, 2) - assert.Equal(t, []*types.TxResult{txResult2, txResult}, results) + require.Len(t, results, 3) + assert.Equal(t, []*types.TxResult{txResult3, txResult2, txResult}, results) } func TestIndexAllTags(t *testing.T) { diff --git a/tools/tm-bench/transacter.go b/tools/tm-bench/transacter.go index 36cc761e5..c20aa5b5b 100644 --- a/tools/tm-bench/transacter.go +++ b/tools/tm-bench/transacter.go @@ -191,7 +191,7 @@ func (t *transacter) sendLoop(connIndex int) { c.SetWriteDeadline(now.Add(sendTimeout)) err = c.WriteJSON(rpctypes.RPCRequest{ JSONRPC: "2.0", - ID: "tm-bench", + ID: rpctypes.JSONRPCStringID("tm-bench"), Method: t.BroadcastTxMethod, Params: rawParamsJSON, }) diff --git a/tools/tm-monitor/monitor/node_test.go b/tools/tm-monitor/monitor/node_test.go index 10c2a13f1..0048e48fa 100644 --- a/tools/tm-monitor/monitor/node_test.go +++ b/tools/tm-monitor/monitor/node_test.go @@ -34,7 +34,7 @@ func TestNodeNewBlockReceived(t *testing.T) { n.SendBlocksTo(blockCh) blockHeader := tmtypes.Header{Height: 5} - emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{blockHeader}) + emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{Header: blockHeader}) assert.Equal(t, int64(5), n.Height) assert.Equal(t, blockHeader, <-blockCh) diff --git a/types/block.go b/types/block.go index b2cddb5e3..15b88d81d 100644 --- a/types/block.go +++ b/types/block.go @@ -275,6 +275,28 @@ func (b *Block) StringShort() string { return fmt.Sprintf("Block#%v", b.Hash()) } +//----------------------------------------------------------- +// These methods are for Protobuf Compatibility + +// Marshal returns the amino encoding. +func (b *Block) Marshal() ([]byte, error) { + return cdc.MarshalBinaryBare(b) +} + +// MarshalTo calls Marshal and copies to the given buffer. +func (b *Block) MarshalTo(data []byte) (int, error) { + bs, err := b.Marshal() + if err != nil { + return -1, err + } + return copy(data, bs), nil +} + +// Unmarshal deserializes from amino encoded form. +func (b *Block) Unmarshal(bs []byte) error { + return cdc.UnmarshalBinaryBare(bs, b) +} + //----------------------------------------------------------------------------- // MaxDataBytes returns the maximum size of block's data. diff --git a/types/block_meta.go b/types/block_meta.go index d8926af0b..8297446ab 100644 --- a/types/block_meta.go +++ b/types/block_meta.go @@ -13,3 +13,31 @@ func NewBlockMeta(block *Block, blockParts *PartSet) *BlockMeta { Header: block.Header, } } + +//----------------------------------------------------------- +// These methods are for Protobuf Compatibility + +// Size returns the size of the amino encoding, in bytes. +func (bm *BlockMeta) Size() int { + bs, _ := bm.Marshal() + return len(bs) +} + +// Marshal returns the amino encoding. +func (bm *BlockMeta) Marshal() ([]byte, error) { + return cdc.MarshalBinaryBare(bm) +} + +// MarshalTo calls Marshal and copies to the given buffer. +func (bm *BlockMeta) MarshalTo(data []byte) (int, error) { + bs, err := bm.Marshal() + if err != nil { + return -1, err + } + return copy(data, bs), nil +} + +// Unmarshal deserializes from amino encoded form. +func (bm *BlockMeta) Unmarshal(bs []byte) error { + return cdc.UnmarshalBinaryBare(bs, bm) +} diff --git a/types/event_bus.go b/types/event_bus.go index fbe5ac478..d941e9aa9 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -71,12 +71,48 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error { return nil } +func (b *EventBus) validateAndStringifyTags(tags []cmn.KVPair, logger log.Logger) map[string]string { + result := make(map[string]string) + for _, tag := range tags { + // basic validation + if len(tag.Key) == 0 { + logger.Debug("Got tag with an empty key (skipping)", "tag", tag) + continue + } + result[string(tag.Key)] = string(tag.Value) + } + return result +} + func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error { - return b.Publish(EventNewBlock, data) + // no explicit deadline for publishing events + ctx := context.Background() + + resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...) + tags := b.validateAndStringifyTags(resultTags, b.Logger.With("block", data.Block.StringShort())) + + // add predefined tags + logIfTagExists(EventTypeKey, tags, b.Logger) + tags[EventTypeKey] = EventNewBlock + + b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags)) + return nil } func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error { - return b.Publish(EventNewBlockHeader, data) + // no explicit deadline for publishing events + ctx := context.Background() + + resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...) + // TODO: Create StringShort method for Header and use it in logger. + tags := b.validateAndStringifyTags(resultTags, b.Logger.With("header", data.Header)) + + // add predefined tags + logIfTagExists(EventTypeKey, tags, b.Logger) + tags[EventTypeKey] = EventNewBlockHeader + + b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags)) + return nil } func (b *EventBus) PublishEventVote(data EventDataVote) error { @@ -94,17 +130,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error { // no explicit deadline for publishing events ctx := context.Background() - tags := make(map[string]string) - - // validate and fill tags from tx result - for _, tag := range data.Result.Tags { - // basic validation - if len(tag.Key) == 0 { - b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", data.Tx) - continue - } - tags[string(tag.Key)] = string(tag.Value) - } + tags := b.validateAndStringifyTags(data.Result.Tags, b.Logger.With("tx", data.Tx)) // add predefined tags logIfTagExists(EventTypeKey, tags, b.Logger) diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 4056dacd4..0af11ebd9 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -58,6 +58,90 @@ func TestEventBusPublishEventTx(t *testing.T) { } } +func TestEventBusPublishEventNewBlock(t *testing.T) { + eventBus := NewEventBus() + err := eventBus.Start() + require.NoError(t, err) + defer eventBus.Stop() + + block := MakeBlock(0, []Tx{}, nil, []Evidence{}) + resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}} + resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}} + + txEventsCh := make(chan interface{}) + + // PublishEventNewBlock adds the tm.event tag, so the query below should work + query := "tm.event='NewBlock' AND baz=1 AND foz=2" + err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh) + require.NoError(t, err) + + done := make(chan struct{}) + go func() { + for e := range txEventsCh { + edt := e.(EventDataNewBlock) + assert.Equal(t, block, edt.Block) + assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) + assert.Equal(t, resultEndBlock, edt.ResultEndBlock) + close(done) + } + }() + + err = eventBus.PublishEventNewBlock(EventDataNewBlock{ + Block: block, + ResultBeginBlock: resultBeginBlock, + ResultEndBlock: resultEndBlock, + }) + assert.NoError(t, err) + + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("did not receive a block after 1 sec.") + } +} + +func TestEventBusPublishEventNewBlockHeader(t *testing.T) { + eventBus := NewEventBus() + err := eventBus.Start() + require.NoError(t, err) + defer eventBus.Stop() + + block := MakeBlock(0, []Tx{}, nil, []Evidence{}) + resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}} + resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}} + + txEventsCh := make(chan interface{}) + + // PublishEventNewBlockHeader adds the tm.event tag, so the query below should work + query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2" + err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh) + require.NoError(t, err) + + done := make(chan struct{}) + go func() { + for e := range txEventsCh { + edt := e.(EventDataNewBlockHeader) + assert.Equal(t, block.Header, edt.Header) + assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) + assert.Equal(t, resultEndBlock, edt.ResultEndBlock) + close(done) + } + }() + + err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{ + Header: block.Header, + ResultBeginBlock: resultBeginBlock, + ResultEndBlock: resultEndBlock, + }) + assert.NoError(t, err) + + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("did not receive a block header after 1 sec.") + } +} + func TestEventBusPublish(t *testing.T) { eventBus := NewEventBus() err := eventBus.Start() diff --git a/types/events.go b/types/events.go index 2f9dc76ee..b22a1c8b8 100644 --- a/types/events.go +++ b/types/events.go @@ -4,6 +4,7 @@ import ( "fmt" amino "github.com/tendermint/go-amino" + abci "github.com/tendermint/tendermint/abci/types" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ) @@ -56,11 +57,17 @@ func RegisterEventDatas(cdc *amino.Codec) { type EventDataNewBlock struct { Block *Block `json:"block"` + + ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"` + ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"` } // light weight event for benchmarking type EventDataNewBlockHeader struct { Header Header `json:"header"` + + ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"` + ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"` } // All txs fire EventDataTx @@ -100,7 +107,7 @@ type EventDataCompleteProposal struct { Round int `json:"round"` Step string `json:"step"` - BlockID BlockID `json:"block_id"` + BlockID BlockID `json:"block_id"` } type EventDataVote struct { diff --git a/types/part_set.go b/types/part_set.go index af59851c9..a040258d1 100644 --- a/types/part_set.go +++ b/types/part_set.go @@ -200,6 +200,9 @@ func (ps *PartSet) Total() int { } func (ps *PartSet) AddPart(part *Part) (bool, error) { + if ps == nil { + return false, nil + } ps.mtx.Lock() defer ps.mtx.Unlock() diff --git a/types/validator_set.go b/types/validator_set.go index ab030d1be..f5e57077b 100644 --- a/types/validator_set.go +++ b/types/validator_set.go @@ -62,7 +62,11 @@ func (vals *ValidatorSet) CopyIncrementAccum(times int) *ValidatorSet { // IncrementAccum increments accum of each validator and updates the // proposer. Panics if validator set is empty. +// `times` must be positive. func (vals *ValidatorSet) IncrementAccum(times int) { + if times <= 0 { + panic("Cannot call IncrementAccum with non-positive times") + } // Add VotingPower * times to each validator and order into heap. validatorsHeap := cmn.NewHeap() diff --git a/types/validator_set_test.go b/types/validator_set_test.go index aad9d85a8..81124637f 100644 --- a/types/validator_set_test.go +++ b/types/validator_set_test.go @@ -86,6 +86,19 @@ func TestCopy(t *testing.T) { } } +// Test that IncrementAccum requires positive times. +func TestIncrementAccumPositiveTimes(t *testing.T) { + vset := NewValidatorSet([]*Validator{ + newValidator([]byte("foo"), 1000), + newValidator([]byte("bar"), 300), + newValidator([]byte("baz"), 330), + }) + + assert.Panics(t, func() { vset.IncrementAccum(-1) }) + assert.Panics(t, func() { vset.IncrementAccum(0) }) + vset.IncrementAccum(1) +} + func BenchmarkValidatorSetCopy(b *testing.B) { b.StopTimer() vset := NewValidatorSet([]*Validator{}) @@ -239,7 +252,7 @@ func TestProposerSelection3(t *testing.T) { mod := (cmn.RandInt() % 5) + 1 if cmn.RandInt()%mod > 0 { // sometimes its up to 5 - times = cmn.RandInt() % 5 + times = (cmn.RandInt() % 4) + 1 } vset.IncrementAccum(times) diff --git a/version/version.go b/version/version.go index aa52a82ec..933328a65 100644 --- a/version/version.go +++ b/version/version.go @@ -18,7 +18,7 @@ const ( // TMCoreSemVer is the current version of Tendermint Core. // It's the Semantic Version of the software. // Must be a string because scripts like dist.sh read this file. - TMCoreSemVer = "0.26.3" + TMCoreSemVer = "0.26.4" // ABCISemVer is the semantic version of the ABCI library ABCISemVer = "0.15.0"