diff --git a/account/account.go b/account/account.go index fb65ecd66..15333258c 100644 --- a/account/account.go +++ b/account/account.go @@ -49,7 +49,7 @@ func AccountEncoder(o interface{}, w io.Writer, n *int64, err *error) { WriteBinary(o.(*Account), w, n, err) } -func AccountDecoder(r io.Reader, n *int64, err *error) interface{} { +func AccountDecoder(r Unreader, n *int64, err *error) interface{} { return ReadBinary(&Account{}, r, n, err) } diff --git a/account/privkey.go b/account/privkey.go index b1b49205f..c220492b6 100644 --- a/account/privkey.go +++ b/account/privkey.go @@ -2,7 +2,6 @@ package account import ( "errors" - "io" "reflect" "github.com/tendermint/go-ed25519" @@ -23,8 +22,8 @@ const ( //------------------------------------- // For binary.readReflect -func PrivKeyDecoder(r io.Reader, n *int64, err *error) interface{} { - switch t := ReadByte(r, n, err); t { +func PrivKeyDecoder(r Unreader, n *int64, err *error) interface{} { + switch t := PeekByte(r, n, err); t { case PrivKeyTypeEd25519: return ReadBinary(PrivKeyEd25519{}, r, n, err) default: diff --git a/account/pubkey.go b/account/pubkey.go index 0d65e4c69..84acb7218 100644 --- a/account/pubkey.go +++ b/account/pubkey.go @@ -2,7 +2,6 @@ package account import ( "errors" - "io" "reflect" "github.com/tendermint/go-ed25519" @@ -26,8 +25,8 @@ const ( //------------------------------------- // for binary.readReflect -func PubKeyDecoder(r io.Reader, n *int64, err *error) interface{} { - switch t := ReadByte(r, n, err); t { +func PubKeyDecoder(r Unreader, n *int64, err *error) interface{} { + switch t := PeekByte(r, n, err); t { case PubKeyTypeNil: return PubKeyNil{} case PubKeyTypeEd25519: diff --git a/account/signature.go b/account/signature.go index 691b6d68e..a6c6b83cc 100644 --- a/account/signature.go +++ b/account/signature.go @@ -3,7 +3,6 @@ package account import ( "errors" "fmt" - "io" "reflect" "github.com/tendermint/go-ed25519" @@ -23,8 +22,8 @@ const ( //------------------------------------- // for binary.readReflect -func SignatureDecoder(r io.Reader, n *int64, err *error) interface{} { - switch t := ReadByte(r, n, err); t { +func SignatureDecoder(r Unreader, n *int64, err *error) interface{} { + switch t := PeekByte(r, n, err); t { case SignatureTypeEd25519: return ReadBinary(SignatureEd25519{}, r, n, err) default: diff --git a/binary/README.md b/binary/README.md index 7824c045b..cd8d7f2c1 100644 --- a/binary/README.md +++ b/binary/README.md @@ -123,8 +123,9 @@ func GreeterEncoder(o interface{}, w io.Writer, n *int64, err *error) { } } -func GreeterDecoder(r io.Reader, n *int64, err *error) interface{} { - switch t := ReadByte(r, n, err); t { +func GreeterDecoder(r *bytes.Reader, n *int64, err *error) interface{} { + // We must peek the type byte because ReadBinary() expects it. + switch t := PeekByte(r, n, err); t { case GreeterTypeDog: return ReadBinary(Dog{}, r, n, err) case GreeterTypeCat: diff --git a/binary/binary.go b/binary/binary.go index 8e19bc973..7db7175be 100644 --- a/binary/binary.go +++ b/binary/binary.go @@ -5,7 +5,12 @@ import ( "reflect" ) -func ReadBinary(o interface{}, r io.Reader, n *int64, err *error) interface{} { +type Unreader interface { + io.Reader + UnreadByte() error +} + +func ReadBinary(o interface{}, r Unreader, n *int64, err *error) interface{} { rv, rt := reflect.ValueOf(o), reflect.TypeOf(o) if rv.Kind() == reflect.Ptr { readReflect(rv.Elem(), rt.Elem(), r, n, err) diff --git a/binary/codec.go b/binary/codec.go index a7f384a23..954a50cff 100644 --- a/binary/codec.go +++ b/binary/codec.go @@ -9,7 +9,7 @@ import ( ) type Encoder func(o interface{}, w io.Writer, n *int64, err *error) -type Decoder func(r io.Reader, n *int64, err *error) interface{} +type Decoder func(r Unreader, n *int64, err *error) interface{} type Comparator func(o1 interface{}, o2 interface{}) int type Codec struct { @@ -86,7 +86,7 @@ func BasicCodecEncoder(o interface{}, w io.Writer, n *int64, err *error) { } } -func BasicCodecDecoder(r io.Reader, n *int64, err *error) (o interface{}) { +func BasicCodecDecoder(r Unreader, n *int64, err *error) (o interface{}) { type_ := ReadByte(r, n, err) switch type_ { case typeByte: diff --git a/binary/int.go b/binary/int.go index a63c9bb63..015b83a02 100644 --- a/binary/int.go +++ b/binary/int.go @@ -18,6 +18,16 @@ func ReadByte(r io.Reader, n *int64, err *error) byte { return buf[0] } +// NOTE: may end up advancing the reader upon error. +func PeekByte(r Unreader, n *int64, err *error) byte { + byte_ := ReadByte(r, n, err) + if *err != nil { + return 0 + } + *err = r.UnreadByte() + return byte_ +} + // Int8 func WriteInt8(i int8, w io.Writer, n *int64, err *error) { diff --git a/binary/reflect.go b/binary/reflect.go index 0c1a75324..1f1b56fbe 100644 --- a/binary/reflect.go +++ b/binary/reflect.go @@ -47,34 +47,44 @@ func GetTypeInfo(rt reflect.Type) *TypeInfo { // NOTE: not goroutine safe, so only call upon program init. func RegisterType(info *TypeInfo) *TypeInfo { - // Register the type info - typeInfos[info.Type] = info - // Also register the underlying struct's info, if info.Type is a pointer. // Or, if info.Type is not a pointer, register the pointer. + var rt, ptrRt reflect.Type if info.Type.Kind() == reflect.Ptr { - rt := info.Type.Elem() - typeInfos[rt] = info + rt, ptrRt = info.Type.Elem(), info.Type } else { - ptrRt := reflect.PtrTo(info.Type) - typeInfos[ptrRt] = info + rt, ptrRt = info.Type, reflect.PtrTo(info.Type) } + // Register the type info + typeInfos[rt] = info + typeInfos[ptrRt] = info + // See if the type implements HasTypeByte - if info.Type.Implements(reflect.TypeOf((*HasTypeByte)(nil)).Elem()) { - zero := reflect.Zero(info.Type) + if rt.Implements(reflect.TypeOf((*HasTypeByte)(nil)).Elem()) { + zero := reflect.Zero(rt) typeByte := zero.Interface().(HasTypeByte).TypeByte() if info.HasTypeByte && info.TypeByte != typeByte { - panic(fmt.Sprintf("Type %v expected TypeByte of %X", info.Type, typeByte)) + panic(fmt.Sprintf("Type %v expected TypeByte of %X", rt, typeByte)) + } else { + info.HasTypeByte = true + info.TypeByte = typeByte + } + } else if ptrRt.Implements(reflect.TypeOf((*HasTypeByte)(nil)).Elem()) { + zero := reflect.Zero(ptrRt) + typeByte := zero.Interface().(HasTypeByte).TypeByte() + if info.HasTypeByte && info.TypeByte != typeByte { + panic(fmt.Sprintf("Type %v expected TypeByte of %X", ptrRt, typeByte)) + } else { + info.HasTypeByte = true + info.TypeByte = typeByte } - info.HasTypeByte = true - info.TypeByte = typeByte } return info } -func readReflect(rv reflect.Value, rt reflect.Type, r io.Reader, n *int64, err *error) { +func readReflect(rv reflect.Value, rt reflect.Type, r Unreader, n *int64, err *error) { // First, create a new struct if rv is nil pointer. if rt.Kind() == reflect.Ptr && rv.IsNil() { diff --git a/block/part_set.go b/block/part_set.go index 046b95430..ecf3e36a2 100644 --- a/block/part_set.go +++ b/block/part_set.go @@ -5,10 +5,10 @@ import ( "crypto/sha256" "errors" "fmt" - "io" "strings" "sync" + . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/merkle" ) @@ -227,7 +227,7 @@ func (ps *PartSet) IsComplete() bool { return ps.count == ps.total } -func (ps *PartSet) GetReader() io.Reader { +func (ps *PartSet) GetReader() Unreader { if !ps.IsComplete() { panic("Cannot GetReader() on incomplete PartSet") } diff --git a/block/store.go b/block/store.go index fd4654610..aca540948 100644 --- a/block/store.go +++ b/block/store.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "fmt" - "io" . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" @@ -41,7 +40,7 @@ func (bs *BlockStore) Height() uint { return bs.height } -func (bs *BlockStore) GetReader(key []byte) io.Reader { +func (bs *BlockStore) GetReader(key []byte) Unreader { bytez := bs.db.Get(key) if bytez == nil { return nil diff --git a/block/tx.go b/block/tx.go index f84b4faf0..6e719d5c1 100644 --- a/block/tx.go +++ b/block/tx.go @@ -52,8 +52,8 @@ const ( //------------------------------------- // for binary.readReflect -func TxDecoder(r io.Reader, n *int64, err *error) interface{} { - switch t := ReadByte(r, n, err); t { +func TxDecoder(r Unreader, n *int64, err *error) interface{} { + switch t := PeekByte(r, n, err); t { case TxTypeSend: return ReadBinary(&SendTx{}, r, n, err) case TxTypeBond: diff --git a/consensus/reactor.go b/consensus/reactor.go index 07365e5ed..c17ce4287 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -111,10 +111,13 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte // Get round state rs := conR.conS.GetRoundState() ps := peer.Data.Get(peerStateKey).(*PeerState) - _, msg_ := decodeMessage(msgBytes) - var err error = nil - - log.Debug("[%X][%v] Receive: %v", chId, peer, msg_) + _, msg_, err := decodeMessage(msgBytes) + if err != nil { + log.Warning("[%X] RECEIVE %v: %v ERROR: %v", chId, peer.Connection().RemoteAddress, msg_, err) + return + } + log.Debug("[%X] RECEIVE %v: %v", chId, peer.Connection().RemoteAddress, msg_) + log.Debug("[%X] RECEIVE BYTES: %X", chId, msgBytes) switch chId { case StateCh: @@ -219,7 +222,7 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { // Get seconds since beginning of height. // Due to the condition documented, this is safe. - timeElapsed := rs.StartTime.Sub(time.Now()) + timeElapsed := time.Now().Sub(rs.StartTime) // Broadcast NewRoundStepMessage { @@ -635,26 +638,26 @@ const ( ) // TODO: check for unnecessary extra bytes at the end. -func decodeMessage(bz []byte) (msgType byte, msg interface{}) { - n, err := new(int64), new(error) +func decodeMessage(bz []byte) (msgType byte, msg interface{}, err error) { + n := new(int64) // log.Debug("decoding msg bytes: %X", bz) msgType = bz[0] r := bytes.NewReader(bz[1:]) switch msgType { // Messages for communicating state changes case msgTypeNewRoundStep: - msg = ReadBinary(&NewRoundStepMessage{}, r, n, err) + msg = ReadBinary(&NewRoundStepMessage{}, r, n, &err) case msgTypeCommitStep: - msg = ReadBinary(&CommitStepMessage{}, r, n, err) + msg = ReadBinary(&CommitStepMessage{}, r, n, &err) // Messages of data case msgTypeProposal: - msg = ReadBinary(&Proposal{}, r, n, err) + msg = ReadBinary(&Proposal{}, r, n, &err) case msgTypePart: - msg = ReadBinary(&PartMessage{}, r, n, err) + msg = ReadBinary(&PartMessage{}, r, n, &err) case msgTypeVote: - msg = ReadBinary(&VoteMessage{}, r, n, err) + msg = ReadBinary(&VoteMessage{}, r, n, &err) case msgTypeHasVote: - msg = ReadBinary(&HasVoteMessage{}, r, n, err) + msg = ReadBinary(&HasVoteMessage{}, r, n, &err) default: msg = nil } diff --git a/merkle/iavl_node.go b/merkle/iavl_node.go index a29217423..3a1ea6898 100644 --- a/merkle/iavl_node.go +++ b/merkle/iavl_node.go @@ -2,8 +2,9 @@ package merkle import ( "crypto/sha256" - . "github.com/tendermint/tendermint/binary" "io" + + . "github.com/tendermint/tendermint/binary" ) // Node @@ -29,7 +30,7 @@ func NewIAVLNode(key interface{}, value interface{}) *IAVLNode { } } -func ReadIAVLNode(t *IAVLTree, r io.Reader, n *int64, err *error) *IAVLNode { +func ReadIAVLNode(t *IAVLTree, r Unreader, n *int64, err *error) *IAVLNode { node := &IAVLNode{} // node header & key diff --git a/p2p/connection.go b/p2p/connection.go index 58ea68576..84ad5df25 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -182,7 +182,7 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool { return false } - log.Debug("[%X] Send to %v: %v", chId, c, msg) + log.Debug("[%X] SEND %v: %v", chId, c.RemoteAddress, msg) log.Debug(" Bytes: %X", BinaryBytes(msg)) // Send message to channel. @@ -210,7 +210,7 @@ func (c *MConnection) TrySend(chId byte, msg interface{}) bool { return false } - log.Debug("[%X] TrySend to %v: %v", chId, c, msg) + log.Debug("[%X] TRYSEND %v: %v", chId, c.RemoteAddress, msg) // Send message to channel. channel, ok := c.channelsIdx[chId] @@ -407,6 +407,7 @@ FOR_LOOP: Panicf("Unknown channel %X", pkt.ChannelId) } msgBytes := channel.recvMsgPacket(pkt) + log.Warning("RECEIVE_MSG_BYTES: %X", msgBytes) if msgBytes != nil { c.onReceive(pkt.ChannelId, msgBytes) } diff --git a/p2p/peer.go b/p2p/peer.go index ca8fc2323..d1af20af9 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -62,8 +62,8 @@ func (p *Peer) IsStopped() bool { return atomic.LoadUint32(&p.stopped) == 1 } -func (p *Peer) RemoteAddress() *NetAddress { - return p.mconn.RemoteAddress +func (p *Peer) Connection() *MConnection { + return p.mconn } func (p *Peer) IsOutbound() bool { diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index e8c16728f..bc1642e98 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -105,7 +105,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { // We received some peer addresses from src. // TODO: prevent abuse. // (We don't want to get spammed with bad peers) - srcAddr := src.RemoteAddress() + srcAddr := src.Connection().RemoteAddress for _, addr := range msg.(*pexAddrsMessage).Addrs { pexR.book.AddAddress(addr, srcAddr) } diff --git a/p2p/switch.go b/p2p/switch.go index d855b51b7..8af3f158c 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -161,7 +161,7 @@ func (sw *Switch) Broadcast(chId byte, msg interface{}) (numSuccess, numFailure return } - log.Debug("[%X] Broadcast: %v", chId, msg) + log.Debug("[%X] BROADCAST: %v", chId, msg) for _, peer := range sw.peers.List() { // XXX XXX Change. // success := peer.TrySend(chId, msg) diff --git a/state/validator.go b/state/validator.go index 689e834b5..405518a87 100644 --- a/state/validator.go +++ b/state/validator.go @@ -35,7 +35,7 @@ func ValidatorInfoEncoder(o interface{}, w io.Writer, n *int64, err *error) { WriteBinary(o.(*ValidatorInfo), w, n, err) } -func ValidatorInfoDecoder(r io.Reader, n *int64, err *error) interface{} { +func ValidatorInfoDecoder(r Unreader, n *int64, err *error) interface{} { return ReadBinary(&ValidatorInfo{}, r, n, err) } @@ -110,7 +110,7 @@ func (vc validatorCodec) Encode(o interface{}, w io.Writer, n *int64, err *error WriteBinary(o.(*Validator), w, n, err) } -func (vc validatorCodec) Decode(r io.Reader, n *int64, err *error) interface{} { +func (vc validatorCodec) Decode(r Unreader, n *int64, err *error) interface{} { return ReadBinary(&Validator{}, r, n, err) }