diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 7ebf31796..91f0adede 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -121,6 +121,24 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { bcR.pool.RemovePeer(peer.Key()) } +// respondToPeer loads a block and sends it to the requesting peer, +// if we have it. Otherwise, we'll respond saying we don't have it. +// According to the Tendermint spec, if all nodes are honest, +// no node should be requesting for a block that's non-existent. +func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, src p2p.Peer) (queued bool) { + block := bcR.store.LoadBlock(msg.Height) + if block != nil { + msg := &bcBlockResponseMessage{Block: block} + return src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}) + } + + bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height) + + return src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{ + &bcNoBlockResponseMessage{Height: msg.Height}, + }) +} + // Receive implements Reactor by handling 4 types of messages (look below). func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes, bcR.maxMsgSize()) @@ -134,16 +152,8 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) // TODO: improve logic to satisfy megacheck switch msg := msg.(type) { case *bcBlockRequestMessage: - // Got a request for a block. Respond with block if we have it. - block := bcR.store.LoadBlock(msg.Height) - if block != nil { - msg := &bcBlockResponseMessage{Block: block} - queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}) - if !queued { - // queue is full, just ignore. - } - } else { - // TODO peer is asking for things we don't have. + if queued := bcR.respondToPeer(msg, src); !queued { + // Unfortunately not queued since the queue is full. } case *bcBlockResponseMessage: // Got a block. @@ -276,10 +286,11 @@ func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) { // Messages const ( - msgTypeBlockRequest = byte(0x10) - msgTypeBlockResponse = byte(0x11) - msgTypeStatusResponse = byte(0x20) - msgTypeStatusRequest = byte(0x21) + msgTypeBlockRequest = byte(0x10) + msgTypeBlockResponse = byte(0x11) + msgTypeNoBlockResponse = byte(0x12) + msgTypeStatusResponse = byte(0x20) + msgTypeStatusRequest = byte(0x21) ) // BlockchainMessage is a generic message for this reactor. @@ -289,6 +300,7 @@ var _ = wire.RegisterInterface( struct{ BlockchainMessage }{}, wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest}, wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse}, + wire.ConcreteType{&bcNoBlockResponseMessage{}, msgTypeNoBlockResponse}, wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse}, wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest}, ) @@ -316,6 +328,14 @@ func (m *bcBlockRequestMessage) String() string { return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height) } +type bcNoBlockResponseMessage struct { + Height int +} + +func (brm *bcNoBlockResponseMessage) String() string { + return cmn.Fmt("[bcNoBlockResponseMessage %d]", brm.Height) +} + //------------------------------------- // NOTE: keep up-to-date with maxBlockchainResponseSize diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go new file mode 100644 index 000000000..492ea7a80 --- /dev/null +++ b/blockchain/reactor_test.go @@ -0,0 +1,158 @@ +package blockchain + +import ( + "bytes" + "testing" + + wire "github.com/tendermint/go-wire" + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" +) + +func newBlockchainReactor(logger log.Logger, maxBlockHeight int) *BlockchainReactor { + config := cfg.ResetTestRoot("node_node_test") + + blockStoreDB := db.NewDB("blockstore", config.DBBackend, config.DBDir()) + blockStore := NewBlockStore(blockStoreDB) + + stateLogger := logger.With("module", "state") + + // Get State + stateDB := db.NewDB("state", config.DBBackend, config.DBDir()) + state, _ := sm.GetState(stateDB, config.GenesisFile()) + + state.SetLogger(stateLogger) + state.Save() + + // Make the blockchainReactor itself + fastSync := true + bcReactor := NewBlockchainReactor(state.Copy(), nil, blockStore, fastSync) + + // Next: we need to set a switch in order for peers to be added in + bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig()) + bcReactor.SetLogger(logger.With("module", "blockchain")) + + // Lastly: let's add some blocks in + for blockHeight := 1; blockHeight <= maxBlockHeight; blockHeight++ { + firstBlock := makeBlock(blockHeight, state) + secondBlock := makeBlock(blockHeight+1, state) + firstParts := firstBlock.MakePartSet(state.Params().BlockGossipParams.BlockPartSizeBytes) + blockStore.SaveBlock(firstBlock, firstParts, secondBlock.LastCommit) + } + + return bcReactor +} + +func TestNoBlockMessageResponse(t *testing.T) { + logBuf := new(bytes.Buffer) + logger := log.NewTMLogger(logBuf) + maxBlockHeight := 20 + + bcr := newBlockchainReactor(logger, maxBlockHeight) + go bcr.OnStart() + defer bcr.Stop() + + // Add some peers in + peer := newbcrTestPeer(cmn.RandStr(12)) + bcr.AddPeer(peer) + + chID := byte(0x01) + + tests := []struct { + height int + existent bool + }{ + {maxBlockHeight + 2, false}, + {10, true}, + {1, true}, + {100, false}, + } + + for _, tt := range tests { + reqBlockMsg := &bcBlockRequestMessage{tt.height} + reqBlockBytes := wire.BinaryBytes(struct{ BlockchainMessage }{reqBlockMsg}) + bcr.Receive(chID, peer, reqBlockBytes) + value := peer.lastValue() + msg := value.(struct{ BlockchainMessage }).BlockchainMessage + + if tt.existent { + if blockMsg, ok := msg.(*bcBlockResponseMessage); !ok { + t.Fatalf("Expected to receive a block response for height %d", tt.height) + } else if blockMsg.Block.Height != tt.height { + t.Fatalf("Expected response to be for height %d, got %d", tt.height, blockMsg.Block.Height) + } + } else { + if noBlockMsg, ok := msg.(*bcNoBlockResponseMessage); !ok { + t.Fatalf("Expected to receive a no block response for height %d", tt.height) + } else if noBlockMsg.Height != tt.height { + t.Fatalf("Expected response to be for height %d, got %d", tt.height, noBlockMsg.Height) + } + } + } +} + +//---------------------------------------------- +// utility funcs + +func makeTxs(blockNumber int) (txs []types.Tx) { + for i := 0; i < 10; i++ { + txs = append(txs, types.Tx([]byte{byte(blockNumber), byte(i)})) + } + return txs +} + +func makeBlock(blockNumber int, state *sm.State) *types.Block { + prevHash := state.LastBlockID.Hash + prevParts := types.PartSetHeader{} + valHash := state.Validators.Hash() + prevBlockID := types.BlockID{prevHash, prevParts} + block, _ := types.MakeBlock(blockNumber, "test_chain", makeTxs(blockNumber), + new(types.Commit), prevBlockID, valHash, state.AppHash, state.Params().BlockGossipParams.BlockPartSizeBytes) + return block +} + +// The Test peer +type bcrTestPeer struct { + cmn.Service + key string + ch chan interface{} +} + +var _ p2p.Peer = (*bcrTestPeer)(nil) + +func newbcrTestPeer(key string) *bcrTestPeer { + return &bcrTestPeer{ + Service: cmn.NewBaseService(nil, "bcrTestPeer", nil), + key: key, + ch: make(chan interface{}, 2), + } +} + +func (tp *bcrTestPeer) lastValue() interface{} { return <-tp.ch } + +func (tp *bcrTestPeer) TrySend(chID byte, value interface{}) bool { + if _, ok := value.(struct{ BlockchainMessage }).BlockchainMessage.(*bcStatusResponseMessage); ok { + // Discard status response messages since they skew our results + // We only want to deal with: + // + bcBlockResponseMessage + // + bcNoBlockResponseMessage + } else { + tp.ch <- value + } + return true +} + +func (tp *bcrTestPeer) Send(chID byte, data interface{}) bool { return tp.TrySend(chID, data) } +func (tp *bcrTestPeer) NodeInfo() *p2p.NodeInfo { return nil } +func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} } +func (tp *bcrTestPeer) Key() string { return tp.key } +func (tp *bcrTestPeer) IsOutbound() bool { return false } +func (tp *bcrTestPeer) IsPersistent() bool { return true } +func (tp *bcrTestPeer) Get(s string) interface{} { return s } +func (tp *bcrTestPeer) Set(string, interface{}) {}