Browse Source

Merge pull request #983 from tendermint/circle-testing

Circle testing
pull/992/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
783ffdb7fd
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 143 additions and 93 deletions
  1. +1
    -1
      config/config.go
  2. +1
    -1
      consensus/mempool_test.go
  3. +4
    -1
      consensus/reactor.go
  4. +73
    -34
      consensus/reactor_test.go
  5. +5
    -4
      consensus/replay_test.go
  6. +11
    -5
      consensus/wal_generator.go
  7. +1
    -1
      glide.lock
  8. +16
    -2
      p2p/connection.go
  9. +4
    -16
      p2p/peer.go
  10. +0
    -1
      p2p/peer_set_test.go
  11. +2
    -2
      p2p/pex_reactor.go
  12. +0
    -1
      p2p/pex_reactor_test.go
  13. +1
    -0
      p2p/switch.go
  14. +24
    -24
      test/p2p/kill_all/check_peers.sh

+ 1
- 1
config/config.go View File

@ -389,7 +389,7 @@ func DefaultConsensusConfig() *ConsensusConfig {
// TestConsensusConfig returns a configuration for testing the consensus service
func TestConsensusConfig() *ConsensusConfig {
config := DefaultConsensusConfig()
config.TimeoutPropose = 2000
config.TimeoutPropose = 100
config.TimeoutProposeDelta = 1
config.TimeoutPrevote = 10
config.TimeoutPrevoteDelta = 1


+ 1
- 1
consensus/mempool_test.go View File

@ -104,8 +104,8 @@ func TestTxConcurrentWithCommit(t *testing.T) {
go deliverTxsRange(cs, 0, NTxs)
startTestRound(cs, height, round)
ticker := time.NewTicker(time.Second * 20)
for nTxs := 0; nTxs < NTxs; {
ticker := time.NewTicker(time.Second * 30)
select {
case b := <-newBlockCh:
evt := b.(types.TMEventData).Unwrap().(types.EventDataNewBlock)


+ 4
- 1
consensus/reactor.go View File

@ -916,6 +916,7 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int, index int)
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
}
return false
@ -1194,6 +1195,8 @@ func (ps *PeerState) String() string {
// StringIndented returns a string representation of the PeerState
func (ps *PeerState) StringIndented(indent string) string {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return fmt.Sprintf(`PeerState{
%s Key %v
%s PRS %v
@ -1344,7 +1347,7 @@ type HasVoteMessage struct {
// String returns a string representation.
func (m *HasVoteMessage) String() string {
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v}]", m.Index, m.Height, m.Round, m.Type)
}
//-------------------------------------


+ 73
- 34
consensus/reactor_test.go View File

@ -10,11 +10,13 @@ import (
"time"
"github.com/tendermint/abci/example/dummy"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -53,6 +55,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.SetLogger(reactors[i].Logger.With("module", "p2p", "validator", i))
return s
}, p2p.Connect2Switches)
@ -67,13 +70,17 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
return reactors, eventChans, eventBuses
}
func stopConsensusNet(reactors []*ConsensusReactor, eventBuses []*types.EventBus) {
for _, r := range reactors {
func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuses []*types.EventBus) {
logger.Info("stopConsensusNet", "n", len(reactors))
for i, r := range reactors {
logger.Info("stopConsensusNet: Stopping ConsensusReactor", "i", i)
r.Switch.Stop()
}
for _, b := range eventBuses {
for i, b := range eventBuses {
logger.Info("stopConsensusNet: Stopping eventBus", "i", i)
b.Stop()
}
logger.Info("stopConsensusNet: DONE", "n", len(reactors))
}
// Ensure a testnet makes blocks
@ -81,7 +88,7 @@ func TestReactor(t *testing.T) {
N := 4
css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(reactors, eventBuses)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block
timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
<-eventChans[j]
@ -97,7 +104,7 @@ func TestReactorProposalHeartbeats(t *testing.T) {
c.Consensus.CreateEmptyBlocks = false
})
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(reactors, eventBuses)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
heartbeatChans := make([]chan interface{}, N)
var err error
for i := 0; i < N; i++ {
@ -126,11 +133,12 @@ func TestReactorProposalHeartbeats(t *testing.T) {
//-------------------------------------------------------------
// ensure we can make blocks despite cycling a validator set
func TestVotingPowerChange(t *testing.T) {
func TestReactorVotingPowerChange(t *testing.T) {
nVals := 4
logger := log.TestingLogger()
css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy)
reactors, eventChans, eventBuses := startConsensusNet(t, css, nVals)
defer stopConsensusNet(reactors, eventBuses)
defer stopConsensusNet(logger, reactors, eventBuses)
// map of active validators
activeVals := make(map[string]struct{})
@ -145,14 +153,14 @@ func TestVotingPowerChange(t *testing.T) {
}, css)
//---------------------------------------------------------------------------
t.Log("---------------------------- Testing changing the voting power of one validator a few times")
logger.Debug("---------------------------- Testing changing the voting power of one validator a few times")
val1PubKey := css[0].privValidator.GetPubKey()
updateValidatorTx := dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 25)
previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
@ -164,7 +172,7 @@ func TestVotingPowerChange(t *testing.T) {
previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
@ -176,7 +184,7 @@ func TestVotingPowerChange(t *testing.T) {
previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
@ -185,13 +193,15 @@ func TestVotingPowerChange(t *testing.T) {
}
}
func TestValidatorSetChanges(t *testing.T) {
func TestReactorValidatorSetChanges(t *testing.T) {
nPeers := 7
nVals := 4
css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy)
logger := log.TestingLogger()
reactors, eventChans, eventBuses := startConsensusNet(t, css, nPeers)
defer stopConsensusNet(reactors, eventBuses)
defer stopConsensusNet(logger, reactors, eventBuses)
// map of active validators
activeVals := make(map[string]struct{})
@ -206,7 +216,7 @@ func TestValidatorSetChanges(t *testing.T) {
}, css)
//---------------------------------------------------------------------------
t.Log("---------------------------- Testing adding one validator")
logger.Info("---------------------------- Testing adding one validator")
newValidatorPubKey1 := css[nVals].privValidator.GetPubKey()
newValidatorTx1 := dummy.MakeValSetChangeTx(newValidatorPubKey1.Bytes(), testMinPower)
@ -218,7 +228,7 @@ func TestValidatorSetChanges(t *testing.T) {
// wait till everyone makes block 3.
// it includes the commit for block 2, which is by the original validator set
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
// wait till everyone makes block 4.
// it includes the commit for block 3, which is by the original validator set
@ -232,14 +242,14 @@ func TestValidatorSetChanges(t *testing.T) {
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
//---------------------------------------------------------------------------
t.Log("---------------------------- Testing changing the voting power of one validator")
logger.Info("---------------------------- Testing changing the voting power of one validator")
updateValidatorPubKey1 := css[nVals].privValidator.GetPubKey()
updateValidatorTx1 := dummy.MakeValSetChangeTx(updateValidatorPubKey1.Bytes(), 25)
previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
@ -248,7 +258,7 @@ func TestValidatorSetChanges(t *testing.T) {
}
//---------------------------------------------------------------------------
t.Log("---------------------------- Testing adding two validators at once")
logger.Info("---------------------------- Testing adding two validators at once")
newValidatorPubKey2 := css[nVals+1].privValidator.GetPubKey()
newValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), testMinPower)
@ -257,20 +267,20 @@ func TestValidatorSetChanges(t *testing.T) {
newValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), testMinPower)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
activeVals[string(newValidatorPubKey2.Address())] = struct{}{}
activeVals[string(newValidatorPubKey3.Address())] = struct{}{}
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
//---------------------------------------------------------------------------
t.Log("---------------------------- Testing removing two validators at once")
logger.Info("---------------------------- Testing removing two validators at once")
removeValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), 0)
removeValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), 0)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
delete(activeVals, string(newValidatorPubKey2.Address()))
delete(activeVals, string(newValidatorPubKey3.Address()))
@ -287,7 +297,7 @@ func TestReactorWithTimeoutCommit(t *testing.T) {
}
reactors, eventChans, eventBuses := startConsensusNet(t, css, N-1)
defer stopConsensusNet(reactors, eventBuses)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block
timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) {
@ -300,21 +310,51 @@ func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}
timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
defer wg.Done()
css[j].Logger.Debug("waitForAndValidateBlock")
newBlockI, ok := <-eventChans[j]
if !ok {
return
}
newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
t.Logf("Got block height=%v validator=%v", newBlock.Height, j)
css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, err)
for _, tx := range txs {
if err = css[j].mempool.CheckTx(tx, nil); err != nil {
t.Fatal(err)
css[j].mempool.CheckTx(tx, nil)
assert.Nil(t, err)
}
}, css)
}
func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
defer wg.Done()
ntxs := 0
BLOCK_TX_LOOP:
for {
css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs)
newBlockI, ok := <-eventChans[j]
if !ok {
return
}
newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
// check that txs match the txs we're waiting for.
// note they could be spread over multiple blocks,
// but they should be in order.
for _, tx := range newBlock.Data.Txs {
assert.EqualValues(t, txs[ntxs], tx)
ntxs += 1
}
if ntxs == len(txs) {
break BLOCK_TX_LOOP
}
}
}, css)
}
@ -325,23 +365,22 @@ func waitForBlockWithUpdatedValsAndValidateIt(t *testing.T, n int, updatedVals m
var newBlock *types.Block
LOOP:
for {
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt")
newBlockI, ok := <-eventChans[j]
if !ok {
return
}
newBlock = newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
if newBlock.LastCommit.Size() == len(updatedVals) {
t.Logf("Block with new validators height=%v validator=%v", newBlock.Height, j)
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height)
break LOOP
} else {
t.Logf("Block with no new validators height=%v validator=%v. Skipping...", newBlock.Height, j)
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block with no new validators. Skipping", "height", newBlock.Height)
}
}
err := validateBlock(newBlock, updatedVals)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, err)
}, css)
}
@ -374,7 +413,7 @@ func timeoutWaitGroup(t *testing.T, n int, f func(*sync.WaitGroup, int), css []*
// we're running many nodes in-process, possibly in in a virtual machine,
// and spewing debug messages - making a block could take a while,
timeout := time.Second * 60
timeout := time.Second * 300
select {
case <-done:


+ 5
- 4
consensus/replay_test.go View File

@ -65,9 +65,7 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64,
err := cs.Start()
require.NoError(t, err)
defer func() {
cs.Stop()
}()
defer cs.Stop()
// This is just a signal that we haven't halted; its not something contained
// in the WAL itself. Assuming the consensus state is running, replay of any
@ -337,6 +335,8 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
if err := wal.Start(); err != nil {
t.Fatal(err)
}
defer wal.Stop()
chain, commits, err := makeBlockchainFromWAL(wal)
if err != nil {
t.Fatalf(err.Error())
@ -366,6 +366,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
defer proxyApp.Stop()
// get the latest app hash from the app
res, err := proxyApp.Query().InfoSync(abci.RequestInfo{""})
@ -404,13 +405,13 @@ func buildAppStateFromChain(proxyApp proxy.AppConns,
if err := proxyApp.Start(); err != nil {
panic(err)
}
defer proxyApp.Stop()
validators := types.TM2PB.Validators(state.Validators)
if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil {
panic(err)
}
defer proxyApp.Stop()
switch mode {
case 0:
for i := 0; i < nBlocks; i++ {


+ 11
- 5
consensus/wal_generator.go View File

@ -32,7 +32,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
app := dummy.NewPersistentDummyApplication(filepath.Join(config.DBDir(), "wal_generator"))
logger := log.NewNopLogger() // log.TestingLogger().With("wal_generator", "wal_generator")
logger := log.TestingLogger().With("wal_generator", "wal_generator")
/////////////////////////////////////////////////////////////////////////////
// COPY PASTE FROM node.go WITH A FEW MODIFICATIONS
@ -77,7 +77,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
var b bytes.Buffer
wr := bufio.NewWriter(&b)
numBlocksWritten := make(chan struct{})
wal := newByteBufferWAL(NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
// see wal.go#103
wal.Save(EndHeightMessage{0})
consensusState.wal = wal
@ -91,8 +91,8 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
case <-numBlocksWritten:
wr.Flush()
return b.Bytes(), nil
case <-time.After(time.Duration(5*numBlocks) * time.Second):
return b.Bytes(), fmt.Errorf("waited too long for tendermint to produce %d blocks", numBlocks)
case <-time.After(1 * time.Minute):
return b.Bytes(), fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks)
}
}
@ -142,16 +142,18 @@ type byteBufferWAL struct {
stopped bool
heightToStop int64
signalWhenStopsTo chan struct{}
logger log.Logger
}
// needed for determinism
var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
func newByteBufferWAL(enc *WALEncoder, nBlocks int64, signalStop chan struct{}) *byteBufferWAL {
func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalStop chan struct{}) *byteBufferWAL {
return &byteBufferWAL{
enc: enc,
heightToStop: nBlocks,
signalWhenStopsTo: signalStop,
logger: logger,
}
}
@ -160,17 +162,21 @@ func newByteBufferWAL(enc *WALEncoder, nBlocks int64, signalStop chan struct{})
// skip writing.
func (w *byteBufferWAL) Save(m WALMessage) {
if w.stopped {
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
return
}
if endMsg, ok := m.(EndHeightMessage); ok {
w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop)
if endMsg.Height == w.heightToStop {
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
w.signalWhenStopsTo <- struct{}{}
w.stopped = true
return
}
}
w.logger.Debug("WAL Write Message", "msg", m)
err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
if err != nil {
panic(fmt.Sprintf("failed to encode the msg %v", m))


+ 1
- 1
glide.lock View File

@ -129,7 +129,7 @@ imports:
subpackages:
- iavl
- name: github.com/tendermint/tmlibs
version: e4ef2835f0081c2ece83b9c1f777cf071f956e81
version: a483e1ff486b577ba94e6a20f08bf52fbb7bff14
subpackages:
- autofile
- cli


+ 16
- 2
p2p/connection.go View File

@ -14,6 +14,7 @@ import (
tmlegacy "github.com/tendermint/go-wire/nowriter/tmlegacy"
cmn "github.com/tendermint/tmlibs/common"
flow "github.com/tendermint/tmlibs/flowrate"
"github.com/tendermint/tmlibs/log"
)
var legacy = tmlegacy.TMEncoderLegacy{}
@ -161,6 +162,13 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
return mconn
}
func (c *MConnection) SetLogger(l log.Logger) {
c.BaseService.SetLogger(l)
for _, ch := range c.channels {
ch.SetLogger(l)
}
}
// OnStart implements BaseService
func (c *MConnection) OnStart() error {
if err := c.BaseService.OnStart(); err != nil {
@ -566,6 +574,8 @@ type Channel struct {
recentlySent int64 // exponential moving average
maxMsgPacketPayloadSize int
Logger log.Logger
}
func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
@ -582,6 +592,10 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
}
}
func (ch *Channel) SetLogger(l log.Logger) {
ch.Logger = l
}
// Queues message to send to this channel.
// Goroutine-safe
// Times out (and returns false) after defaultSendTimeout
@ -654,7 +668,7 @@ func (ch *Channel) nextMsgPacket() msgPacket {
// Not goroutine-safe
func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
packet := ch.nextMsgPacket()
// log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
ch.Logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
writeMsgPacketTo(packet, w, &n, &err)
if err == nil {
ch.recentlySent += int64(n)
@ -670,7 +684,7 @@ func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) {
// Handles incoming msgPackets. Returns a msg bytes if msg is complete.
// Not goroutine-safe
func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
// log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
return nil, wire.ErrBinaryReadOverflow
}


+ 4
- 16
p2p/peer.go View File

@ -2,7 +2,6 @@ package p2p
import (
"fmt"
"io"
"net"
"time"
@ -48,7 +47,6 @@ type peer struct {
config *PeerConfig
nodeInfo *NodeInfo
key string
Data *cmn.CMap // User data.
}
@ -209,8 +207,6 @@ func (p *peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er
peerNodeInfo.RemoteAddr = p.Addr().String()
p.nodeInfo = peerNodeInfo
p.key = peerNodeInfo.PubKey.KeyString()
return nil
}
@ -283,26 +279,18 @@ func (p *peer) CanSend(chID byte) bool {
return p.mconn.CanSend(chID)
}
// WriteTo writes the peer's public key to w.
func (p *peer) WriteTo(w io.Writer) (int64, error) {
var n int
var err error
wire.WriteString(p.key, w, &n, &err)
return int64(n), err
}
// String representation.
func (p *peer) String() string {
if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.key[:12])
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key())
}
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.key[:12])
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key())
}
// Equals reports whenever 2 peers are actually represent the same node.
func (p *peer) Equals(other Peer) bool {
return p.key == other.Key()
return p.Key() == other.Key()
}
// Get the data for a given key.
@ -317,7 +305,7 @@ func (p *peer) Set(key string, data interface{}) {
// Key returns the peer's id key.
func (p *peer) Key() string {
return p.key
return p.nodeInfo.ListenAddr // XXX: should probably be PubKey.KeyString()
}
// NodeInfo returns a copy of the peer's NodeInfo.


+ 0
- 1
p2p/peer_set_test.go View File

@ -13,7 +13,6 @@ import (
// Returns an empty dummy peer
func randPeer() *peer {
return &peer{
key: cmn.RandStr(12),
nodeInfo: &NodeInfo{
RemoteAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256),
ListenAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256),


+ 2
- 2
p2p/pex_reactor.go View File

@ -264,8 +264,8 @@ func (r *PEXReactor) ensurePeers() {
if dialling := r.Switch.IsDialing(try); dialling {
continue
}
// XXX: does this work ?!
if connected := r.Switch.Peers().Has(try.IP.String()); connected {
// XXX: Should probably use pubkey as peer key ...
if connected := r.Switch.Peers().Has(try.String()); connected {
continue
}
r.Logger.Info("Will dial address", "addr", try)


+ 0
- 1
p2p/pex_reactor_test.go View File

@ -194,7 +194,6 @@ func createRoutableAddr() (addr string, netAddr *NetAddress) {
func createRandomPeer(outbound bool) *peer {
addr, netAddr := createRoutableAddr()
p := &peer{
key: cmn.RandStr(12),
nodeInfo: &NodeInfo{
ListenAddr: addr,
RemoteAddr: netAddr.String(),


+ 1
- 0
p2p/switch.go View File

@ -220,6 +220,7 @@ func (sw *Switch) OnStop() {
sw.peers.Remove(peer)
}
// Stop reactors
sw.Logger.Debug("Switch: Stopping reactors")
for _, reactor := range sw.reactors {
reactor.Stop()
}


+ 24
- 24
test/p2p/kill_all/check_peers.sh View File

@ -4,20 +4,20 @@ set -eu
NUM_OF_PEERS=$1
# how many attempts for each peer to catch up by height
MAX_ATTEMPTS_TO_CATCH_UP=20
MAX_ATTEMPTS_TO_CATCH_UP=120
echo "Waiting for nodes to come online"
set +e
for i in $(seq 1 "$NUM_OF_PEERS"); do
addr=$(test/p2p/ip.sh "$i"):46657
curl -s "$addr/status" > /dev/null
ERR=$?
while [ "$ERR" != 0 ]; do
sleep 1
curl -s "$addr/status" > /dev/null
ERR=$?
done
echo "... node $i is up"
addr=$(test/p2p/ip.sh "$i"):46657
curl -s "$addr/status" > /dev/null
ERR=$?
while [ "$ERR" != 0 ]; do
sleep 1
curl -s "$addr/status" > /dev/null
ERR=$?
done
echo "... node $i is up"
done
set -e
@ -28,22 +28,22 @@ echo "1st peer is on height $h1"
echo "Waiting until other peers reporting a height higher than the 1st one"
for i in $(seq 2 "$NUM_OF_PEERS"); do
attempt=1
hi=0
attempt=1
hi=0
while [[ $hi -le $h1 ]] ; do
addr=$(test/p2p/ip.sh "$i"):46657
hi=$(curl -s "$addr/status" | jq .result.latest_block_height)
while [[ $hi -le $h1 ]] ; do
addr=$(test/p2p/ip.sh "$i"):46657
hi=$(curl -s "$addr/status" | jq .result.latest_block_height)
echo "... peer $i is on height $hi"
echo "... peer $i is on height $hi"
((attempt++))
if [ "$attempt" -ge $MAX_ATTEMPTS_TO_CATCH_UP ] ; then
echo "$attempt unsuccessful attempts were made to catch up"
curl -s "$addr/dump_consensus_state" | jq .result
exit 1
fi
((attempt++))
if [ "$attempt" -ge $MAX_ATTEMPTS_TO_CATCH_UP ] ; then
echo "$attempt unsuccessful attempts were made to catch up"
curl -s "$addr/dump_consensus_state" | jq .result
exit 1
fi
sleep 1
done
sleep 1
done
done

Loading…
Cancel
Save