Browse Source

Handle errors in DecodeMessage, added logging

pull/9/head
Jae Kwon 10 years ago
parent
commit
591d84947b
5 changed files with 52 additions and 15 deletions
  1. +11
    -0
      binary/log.go
  2. +14
    -0
      binary/reflect.go
  3. +4
    -3
      consensus/reactor.go
  4. +9
    -4
      mempool/reactor.go
  5. +14
    -8
      p2p/pex_reactor.go

+ 11
- 0
binary/log.go View File

@ -0,0 +1,11 @@
package binary
import (
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("binary")
func SetBinaryLogger(l *logging.Logger) {
log = l
}

+ 14
- 0
binary/reflect.go View File

@ -113,6 +113,7 @@ func readReflect(rv reflect.Value, rt reflect.Type, r Unreader, n *int64, err *e
// Read TypeByte prefix // Read TypeByte prefix
if typeInfo.HasTypeByte { if typeInfo.HasTypeByte {
typeByte := ReadByte(r, n, err) typeByte := ReadByte(r, n, err)
log.Debug("Read TypeByte: %X", typeByte)
if typeByte != typeInfo.TypeByte { if typeByte != typeInfo.TypeByte {
*err = errors.New(fmt.Sprintf("Expected TypeByte of %X but got %X", typeInfo.TypeByte, typeByte)) *err = errors.New(fmt.Sprintf("Expected TypeByte of %X but got %X", typeInfo.TypeByte, typeByte))
return return
@ -125,10 +126,12 @@ func readReflect(rv reflect.Value, rt reflect.Type, r Unreader, n *int64, err *e
if elemRt.Kind() == reflect.Uint8 { if elemRt.Kind() == reflect.Uint8 {
// Special case: Byteslices // Special case: Byteslices
byteslice := ReadByteSlice(r, n, err) byteslice := ReadByteSlice(r, n, err)
log.Debug("Read byteslice: %X", byteslice)
rv.Set(reflect.ValueOf(byteslice)) rv.Set(reflect.ValueOf(byteslice))
} else { } else {
// Read length // Read length
length := int(ReadUvarint(r, n, err)) length := int(ReadUvarint(r, n, err))
log.Debug("Read length: %v", length)
sliceRv := reflect.MakeSlice(rt, length, length) sliceRv := reflect.MakeSlice(rt, length, length)
// Read elems // Read elems
for i := 0; i < length; i++ { for i := 0; i < length; i++ {
@ -151,46 +154,57 @@ func readReflect(rv reflect.Value, rt reflect.Type, r Unreader, n *int64, err *e
case reflect.String: case reflect.String:
str := ReadString(r, n, err) str := ReadString(r, n, err)
log.Debug("Read string: %v", str)
rv.SetString(str) rv.SetString(str)
case reflect.Int64: case reflect.Int64:
num := ReadUint64(r, n, err) num := ReadUint64(r, n, err)
log.Debug("Read num: %v", num)
rv.SetInt(int64(num)) rv.SetInt(int64(num))
case reflect.Int32: case reflect.Int32:
num := ReadUint32(r, n, err) num := ReadUint32(r, n, err)
log.Debug("Read num: %v", num)
rv.SetInt(int64(num)) rv.SetInt(int64(num))
case reflect.Int16: case reflect.Int16:
num := ReadUint16(r, n, err) num := ReadUint16(r, n, err)
log.Debug("Read num: %v", num)
rv.SetInt(int64(num)) rv.SetInt(int64(num))
case reflect.Int8: case reflect.Int8:
num := ReadUint8(r, n, err) num := ReadUint8(r, n, err)
log.Debug("Read num: %v", num)
rv.SetInt(int64(num)) rv.SetInt(int64(num))
case reflect.Int: case reflect.Int:
num := ReadUvarint(r, n, err) num := ReadUvarint(r, n, err)
log.Debug("Read num: %v", num)
rv.SetInt(int64(num)) rv.SetInt(int64(num))
case reflect.Uint64: case reflect.Uint64:
num := ReadUint64(r, n, err) num := ReadUint64(r, n, err)
log.Debug("Read num: %v", num)
rv.SetUint(uint64(num)) rv.SetUint(uint64(num))
case reflect.Uint32: case reflect.Uint32:
num := ReadUint32(r, n, err) num := ReadUint32(r, n, err)
log.Debug("Read num: %v", num)
rv.SetUint(uint64(num)) rv.SetUint(uint64(num))
case reflect.Uint16: case reflect.Uint16:
num := ReadUint16(r, n, err) num := ReadUint16(r, n, err)
log.Debug("Read num: %v", num)
rv.SetUint(uint64(num)) rv.SetUint(uint64(num))
case reflect.Uint8: case reflect.Uint8:
num := ReadUint8(r, n, err) num := ReadUint8(r, n, err)
log.Debug("Read num: %v", num)
rv.SetUint(uint64(num)) rv.SetUint(uint64(num))
case reflect.Uint: case reflect.Uint:
num := ReadUvarint(r, n, err) num := ReadUvarint(r, n, err)
log.Debug("Read num: %v", num)
rv.SetUint(uint64(num)) rv.SetUint(uint64(num))
default: default:


+ 4
- 3
consensus/reactor.go View File

@ -111,9 +111,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
// Get round state // Get round state
rs := conR.conS.GetRoundState() rs := conR.conS.GetRoundState()
ps := peer.Data.Get(peerStateKey).(*PeerState) ps := peer.Data.Get(peerStateKey).(*PeerState)
_, msg_, err := decodeMessage(msgBytes)
_, msg_, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
log.Warning("[%X] RECEIVE %v: %v ERROR: %v", chId, peer.Connection().RemoteAddress, msg_, err) log.Warning("[%X] RECEIVE %v: %v ERROR: %v", chId, peer.Connection().RemoteAddress, msg_, err)
log.Warning("[%X] RECEIVE BYTES: %X", chId, msgBytes)
return return
} }
log.Debug("[%X] RECEIVE %v: %v", chId, peer.Connection().RemoteAddress, msg_) log.Debug("[%X] RECEIVE %v: %v", chId, peer.Connection().RemoteAddress, msg_)
@ -638,11 +639,11 @@ const (
) )
// TODO: check for unnecessary extra bytes at the end. // TODO: check for unnecessary extra bytes at the end.
func decodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
n := new(int64) n := new(int64)
// log.Debug("decoding msg bytes: %X", bz) // log.Debug("decoding msg bytes: %X", bz)
msgType = bz[0] msgType = bz[0]
r := bytes.NewReader(bz[1:])
r := bytes.NewReader(bz)
switch msgType { switch msgType {
// Messages for communicating state changes // Messages for communicating state changes
case msgTypeNewRoundStep: case msgTypeNewRoundStep:


+ 9
- 4
mempool/reactor.go View File

@ -68,7 +68,11 @@ func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Implements Reactor // Implements Reactor
func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
_, msg_ := decodeMessage(msgBytes)
_, msg_, err := DecodeMessage(msgBytes)
if err != nil {
log.Warning("Error decoding message: %v", err)
return
}
log.Info("MempoolReactor received %v", msg_) log.Info("MempoolReactor received %v", msg_)
switch msg_.(type) { switch msg_.(type) {
@ -116,12 +120,13 @@ const (
) )
// TODO: check for unnecessary extra bytes at the end. // TODO: check for unnecessary extra bytes at the end.
func decodeMessage(bz []byte) (msgType byte, msg interface{}) {
n, err := new(int64), new(error)
func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
n := new(int64)
msgType = bz[0] msgType = bz[0]
r := bytes.NewReader(bz)
switch msgType { switch msgType {
case msgTypeTx: case msgTypeTx:
msg = ReadBinary(&TxMessage{}, bytes.NewReader(bz[1:]), n, err)
msg = ReadBinary(&TxMessage{}, r, n, &err)
default: default:
msg = nil msg = nil
} }


+ 14
- 8
p2p/pex_reactor.go View File

@ -88,7 +88,11 @@ func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) {
func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
// decode message // decode message
msg := decodeMessage(msgBytes)
msg, err := DecodeMessage(msgBytes)
if err != nil {
log.Warning("Error decoding message: %v", err)
return
}
log.Info("requestRoutine received %v", msg) log.Info("requestRoutine received %v", msg)
switch msg.(type) { switch msg.(type) {
@ -201,18 +205,20 @@ const (
) )
// TODO: check for unnecessary extra bytes at the end. // TODO: check for unnecessary extra bytes at the end.
func decodeMessage(bz []byte) (msg interface{}) {
var n int64
var err error
func DecodeMessage(bz []byte) (msg interface{}, err error) {
n := new(int64)
msgType := bz[0]
r := bytes.NewReader(bz)
// log.Debug("decoding msg bytes: %X", bz) // log.Debug("decoding msg bytes: %X", bz)
switch bz[0] {
switch msgType {
case msgTypeRequest: case msgTypeRequest:
return &pexRequestMessage{}
msg = &pexRequestMessage{}
case msgTypeAddrs: case msgTypeAddrs:
return ReadBinary(&pexAddrsMessage{}, bytes.NewReader(bz[1:]), &n, &err)
msg = ReadBinary(&pexAddrsMessage{}, r, n, &err)
default: default:
return nil
msg = nil
} }
return
} }
/* /*


Loading…
Cancel
Save