diff --git a/binary/int.go b/binary/int.go index 4ff9d7cb4..ae03adc7c 100644 --- a/binary/int.go +++ b/binary/int.go @@ -160,8 +160,9 @@ func ReadUInt64(r io.Reader, n *int64, err *error) uint64 { func WriteVarInt(w io.Writer, i int64, n *int64, err *error) { buf := make([]byte, 9) - *n += int64(binary.PutVarint(buf, int64(i))) - WriteTo(w, buf, n, err) + n_ := int64(binary.PutVarint(buf, int64(i))) + *n += n_ + WriteTo(w, buf[:n_], n, err) } func ReadVarInt(r io.Reader, n *int64, err *error) int64 { @@ -175,8 +176,9 @@ func ReadVarInt(r io.Reader, n *int64, err *error) int64 { func WriteUVarInt(w io.Writer, i uint64, n *int64, err *error) { buf := make([]byte, 9) - *n += int64(binary.PutUvarint(buf, uint64(i))) - WriteTo(w, buf, n, err) + n_ := int64(binary.PutUvarint(buf, uint64(i))) + *n += n_ + WriteTo(w, buf[:n_], n, err) } func ReadUVarInt(r io.Reader, n *int64, err *error) uint64 { diff --git a/binary/util.go b/binary/util.go index e8a52117b..e80c7a0bf 100644 --- a/binary/util.go +++ b/binary/util.go @@ -7,7 +7,10 @@ import ( func BinaryBytes(b Binary) []byte { buf := bytes.NewBuffer(nil) - b.WriteTo(buf) + _, err := b.WriteTo(buf) + if err != nil { + panic(err) + } return buf.Bytes() } diff --git a/blocks/block_test.go b/blocks/block_test.go index 6763c8230..26fe5eba1 100644 --- a/blocks/block_test.go +++ b/blocks/block_test.go @@ -3,46 +3,16 @@ package blocks import ( "bytes" . "github.com/tendermint/tendermint/binary" - "math/rand" + . "github.com/tendermint/tendermint/common" "testing" - "time" ) -// Distributed pseudo-exponentially to test for various cases -func randUInt64() uint64 { - bits := rand.Uint32() % 64 - if bits == 0 { - return 0 - } - n := uint64(1 << (bits - 1)) - n += uint64(rand.Int63()) & ((1 << (bits - 1)) - 1) - return n -} - -func randUInt32() uint32 { - bits := rand.Uint32() % 32 - if bits == 0 { - return 0 - } - n := uint32(1 << (bits - 1)) - n += uint32(rand.Int31()) & ((1 << (bits - 1)) - 1) - return n -} - -func randTime() time.Time { - return time.Unix(int64(randUInt64()), 0) -} - -func randBytes(n int) []byte { - bs := make([]byte, n) - for i := 0; i < n; i++ { - bs[i] = byte(rand.Intn(256)) - } - return bs +func randSig() Signature { + return Signature{RandUInt64Exp(), RandBytes(32)} } -func randSig() Signature { - return Signature{randUInt64(), randBytes(32)} +func randBaseTx() BaseTx { + return BaseTx{0, randSig()} } func TestBlock(t *testing.T) { @@ -50,48 +20,52 @@ func TestBlock(t *testing.T) { // Account Txs sendTx := &SendTx{ - Signature: randSig(), - Fee: randUInt64(), - To: randUInt64(), - Amount: randUInt64(), + BaseTx: randBaseTx(), + Fee: RandUInt64Exp(), + To: RandUInt64Exp(), + Amount: RandUInt64Exp(), } nameTx := &NameTx{ - Signature: randSig(), - Fee: randUInt64(), - Name: string(randBytes(12)), - PubKey: randBytes(32), + BaseTx: randBaseTx(), + Fee: RandUInt64Exp(), + Name: string(RandBytes(12)), + PubKey: RandBytes(32), } // Validation Txs - bond := &Bond{ - Signature: randSig(), - Fee: randUInt64(), - UnbondTo: randUInt64(), - Amount: randUInt64(), + bondTx := &BondTx{ + BaseTx: randBaseTx(), + Fee: RandUInt64Exp(), + UnbondTo: RandUInt64Exp(), + Amount: RandUInt64Exp(), } - unbond := &Unbond{ - Signature: randSig(), - Fee: randUInt64(), - Amount: randUInt64(), + unbondTx := &UnbondTx{ + BaseTx: randBaseTx(), + Fee: RandUInt64Exp(), + Amount: RandUInt64Exp(), } - timeout := &Timeout{ - AccountId: randUInt64(), - Penalty: randUInt64(), + timeoutTx := &TimeoutTx{ + AccountId: RandUInt64Exp(), + Penalty: RandUInt64Exp(), } - dupeout := &Dupeout{ - VoteA: BlockVote{ - Height: randUInt64(), - BlockHash: randBytes(32), + dupeoutTx := &DupeoutTx{ + VoteA: Vote{ + Height: RandUInt32Exp(), + Round: RandUInt16Exp(), + Type: VoteTypeBare, + BlockHash: RandBytes(32), Signature: randSig(), }, - VoteB: BlockVote{ - Height: randUInt64(), - BlockHash: randBytes(32), + VoteB: Vote{ + Height: RandUInt32Exp(), + Round: RandUInt16Exp(), + Type: VoteTypeBare, + BlockHash: RandBytes(32), Signature: randSig(), }, } @@ -100,30 +74,34 @@ func TestBlock(t *testing.T) { block := &Block{ Header: Header{ - Network: "Tendermint", - Height: randUInt32(), - Fees: randUInt64(), - Time: randTime(), - LastBlockHash: randBytes(32), - ValidationHash: randBytes(32), - DataHash: randBytes(32), + Network: "Tendermint", + Height: RandUInt32Exp(), + Fees: RandUInt64Exp(), + Time: RandTime(), + LastBlockHash: RandBytes(32), + ValidationStateHash: RandBytes(32), + AccountStateHash: RandBytes(32), }, Validation: Validation{ Signatures: []Signature{randSig(), randSig()}, - Txs: []Txs{bond, unbond, timeout, dupeout}, }, Data: Data{ - Txs: []Tx{sendTx, nameTx}, + Txs: []Tx{sendTx, nameTx, bondTx, unbondTx, timeoutTx, dupeoutTx}, }, } // Write the block, read it in again, write it again. // Then, compare. + // TODO We should compute the hash instead, so Block -> Bytes -> Block and compare hashes. blockBytes := BinaryBytes(block) var n int64 var err error block2 := ReadBlock(bytes.NewReader(blockBytes), &n, &err) + if err != nil { + t.Errorf("Reading block failed: %v", err) + } + blockBytes2 := BinaryBytes(block2) if !bytes.Equal(blockBytes, blockBytes2) { diff --git a/blocks/codec_test.go b/blocks/codec_test.go index 8f5482754..1339f1b19 100644 --- a/blocks/codec_test.go +++ b/blocks/codec_test.go @@ -15,13 +15,13 @@ func BenchmarkTestCustom(b *testing.B) { b.StopTimer() h := &Header{ - Network: "Header", - Height: 123, - Fees: 123, - Time: time.Unix(123, 0), - LastBlockHash: []byte("prevhash"), - ValidationHash: []byte("validationhash"), - DataHash: []byte("datahash"), + Network: "Header", + Height: 123, + Fees: 123, + Time: time.Unix(123, 0), + LastBlockHash: []byte("prevhash"), + ValidationStateHash: []byte("validationhash"), + AccountStateHash: []byte("accounthash"), } buf := bytes.NewBuffer(nil) @@ -40,26 +40,26 @@ func BenchmarkTestCustom(b *testing.B) { } type HHeader struct { - Network string `json:"N"` - Height uint64 `json:"H"` - Fees uint64 `json:"F"` - Time uint64 `json:"T"` - LastBlockHash []byte `json:"PH"` - ValidationHash []byte `json:"VH"` - DataHash []byte `json:"DH"` + Network string `json:"N"` + Height uint64 `json:"H"` + Fees uint64 `json:"F"` + Time uint64 `json:"T"` + LastBlockHash []byte `json:"PH"` + ValidationStateHash []byte `json:"VH"` + AccountStateHash []byte `json:"DH"` } func BenchmarkTestJSON(b *testing.B) { b.StopTimer() h := &HHeader{ - Network: "Header", - Height: 123, - Fees: 123, - Time: 123, - LastBlockHash: []byte("prevhash"), - ValidationHash: []byte("validationhash"), - DataHash: []byte("datahash"), + Network: "Header", + Height: 123, + Fees: 123, + Time: 123, + LastBlockHash: []byte("prevhash"), + ValidationStateHash: []byte("validationhash"), + AccountStateHash: []byte("accounthash"), } h2 := &HHeader{} @@ -82,13 +82,13 @@ func BenchmarkTestGob(b *testing.B) { b.StopTimer() h := &Header{ - Network: "Header", - Height: 123, - Fees: 123, - Time: time.Unix(123, 0), - LastBlockHash: []byte("prevhash"), - ValidationHash: []byte("validationhash"), - DataHash: []byte("datahash"), + Network: "Header", + Height: 123, + Fees: 123, + Time: time.Unix(123, 0), + LastBlockHash: []byte("prevhash"), + ValidationStateHash: []byte("validationhash"), + AccountStateHash: []byte("datahash"), } h2 := &Header{} @@ -111,13 +111,13 @@ func BenchmarkTestMsgPack(b *testing.B) { b.StopTimer() h := &Header{ - Network: "Header", - Height: 123, - Fees: 123, - Time: time.Unix(123, 0), - LastBlockHash: []byte("prevhash"), - ValidationHash: []byte("validationhash"), - DataHash: []byte("datahash"), + Network: "Header", + Height: 123, + Fees: 123, + Time: time.Unix(123, 0), + LastBlockHash: []byte("prevhash"), + ValidationStateHash: []byte("validationhash"), + AccountStateHash: []byte("datahash"), } h2 := &Header{} @@ -140,13 +140,13 @@ func BenchmarkTestMsgPack2(b *testing.B) { b.StopTimer() h := &Header{ - Network: "Header", - Height: 123, - Fees: 123, - Time: time.Unix(123, 0), - LastBlockHash: []byte("prevhash"), - ValidationHash: []byte("validationhash"), - DataHash: []byte("datahash"), + Network: "Header", + Height: 123, + Fees: 123, + Time: time.Unix(123, 0), + LastBlockHash: []byte("prevhash"), + ValidationStateHash: []byte("validationhash"), + AccountStateHash: []byte("accounthash"), } h2 := &Header{} var mh codec.MsgpackHandle diff --git a/blocks/tx.go b/blocks/tx.go index 2df41b428..bd69cadc8 100644 --- a/blocks/tx.go +++ b/blocks/tx.go @@ -28,59 +28,59 @@ type Tx interface { const ( // Account transactions - TX_TYPE_SEND = byte(0x01) - TX_TYPE_NAME = byte(0x02) + txTypeSend = byte(0x01) + txTypeName = byte(0x02) // Validation transactions - TX_TYPE_BOND = byte(0x11) - TX_TYPE_UNBOND = byte(0x12) - TX_TYPE_TIMEOUT = byte(0x13) - TX_TYPE_DUPEOUT = byte(0x14) + txTypeBond = byte(0x11) + txTypeUnbond = byte(0x12) + txTypeTimeout = byte(0x13) + txTypeDupeout = byte(0x14) ) func ReadTx(r io.Reader, n *int64, err *error) Tx { switch t := ReadByte(r, n, err); t { - case TX_TYPE_SEND: + case txTypeSend: return &SendTx{ BaseTx: ReadBaseTx(r, n, err), Fee: ReadUInt64(r, n, err), To: ReadUInt64(r, n, err), Amount: ReadUInt64(r, n, err), } - case TX_TYPE_NAME: + case txTypeName: return &NameTx{ BaseTx: ReadBaseTx(r, n, err), Fee: ReadUInt64(r, n, err), Name: ReadString(r, n, err), PubKey: ReadByteSlice(r, n, err), } - case TX_TYPE_BOND: + case txTypeBond: return &BondTx{ BaseTx: ReadBaseTx(r, n, err), Fee: ReadUInt64(r, n, err), UnbondTo: ReadUInt64(r, n, err), Amount: ReadUInt64(r, n, err), } - case TX_TYPE_UNBOND: + case txTypeUnbond: return &UnbondTx{ BaseTx: ReadBaseTx(r, n, err), Fee: ReadUInt64(r, n, err), Amount: ReadUInt64(r, n, err), } - case TX_TYPE_TIMEOUT: + case txTypeTimeout: return &TimeoutTx{ BaseTx: ReadBaseTx(r, n, err), AccountId: ReadUInt64(r, n, err), Penalty: ReadUInt64(r, n, err), } - case TX_TYPE_DUPEOUT: + case txTypeDupeout: return &DupeoutTx{ BaseTx: ReadBaseTx(r, n, err), VoteA: *ReadVote(r, n, err), VoteB: *ReadVote(r, n, err), } default: - Panicf("Unknown Tx type %x", t) + *err = Errorf("Unknown Tx type %X", t) return nil } } @@ -123,7 +123,7 @@ type SendTx struct { } func (tx *SendTx) Type() byte { - return TX_TYPE_SEND + return txTypeSend } func (tx *SendTx) WriteTo(w io.Writer) (n int64, err error) { @@ -145,7 +145,7 @@ type NameTx struct { } func (tx *NameTx) Type() byte { - return TX_TYPE_NAME + return txTypeName } func (tx *NameTx) WriteTo(w io.Writer) (n int64, err error) { @@ -167,7 +167,7 @@ type BondTx struct { } func (tx *BondTx) Type() byte { - return TX_TYPE_BOND + return txTypeBond } func (tx *BondTx) WriteTo(w io.Writer) (n int64, err error) { @@ -188,7 +188,7 @@ type UnbondTx struct { } func (tx *UnbondTx) Type() byte { - return TX_TYPE_UNBOND + return txTypeUnbond } func (tx *UnbondTx) WriteTo(w io.Writer) (n int64, err error) { @@ -208,7 +208,7 @@ type TimeoutTx struct { } func (tx *TimeoutTx) Type() byte { - return TX_TYPE_TIMEOUT + return txTypeTimeout } func (tx *TimeoutTx) WriteTo(w io.Writer) (n int64, err error) { @@ -228,7 +228,7 @@ type DupeoutTx struct { } func (tx *DupeoutTx) Type() byte { - return TX_TYPE_DUPEOUT + return txTypeDupeout } func (tx *DupeoutTx) WriteTo(w io.Writer) (n int64, err error) { diff --git a/common/random.go b/common/random.go index d68ac6da5..2588db623 100644 --- a/common/random.go +++ b/common/random.go @@ -4,6 +4,7 @@ import ( crand "crypto/rand" "encoding/hex" "math/rand" + "time" ) const ( @@ -12,7 +13,7 @@ const ( func init() { // Seed math/rand with "secure" int64 - b := RandBytes(8) + b := CRandBytes(8) var seed uint64 for i := 0; i < 8; i++ { seed |= uint64(b[i]) @@ -22,7 +23,6 @@ func init() { } // Constructs an alphanumeric string of given length. -// Not crypto safe func RandStr(length int) string { chars := []byte{} MAIN_LOOP: @@ -46,8 +46,67 @@ MAIN_LOOP: return string(chars) } -// Crypto safe -func RandBytes(numBytes int) []byte { +func RandUInt16() uint16 { + return uint16(rand.Uint32() & (1<<16 - 1)) +} + +func RandUInt32() uint32 { + return rand.Uint32() +} + +func RandUInt64() uint64 { + return uint64(rand.Uint32())<<32 + uint64(rand.Uint32()) +} + +// Distributed pseudo-exponentially to test for various cases +func RandUInt16Exp() uint16 { + bits := rand.Uint32() % 16 + if bits == 0 { + return 0 + } + n := uint16(1 << (bits - 1)) + n += uint16(rand.Int31()) & ((1 << (bits - 1)) - 1) + return n +} + +// Distributed pseudo-exponentially to test for various cases +func RandUInt32Exp() uint32 { + bits := rand.Uint32() % 32 + if bits == 0 { + return 0 + } + n := uint32(1 << (bits - 1)) + n += uint32(rand.Int31()) & ((1 << (bits - 1)) - 1) + return n +} + +// Distributed pseudo-exponentially to test for various cases +func RandUInt64Exp() uint64 { + bits := rand.Uint32() % 64 + if bits == 0 { + return 0 + } + n := uint64(1 << (bits - 1)) + n += uint64(rand.Int63()) & ((1 << (bits - 1)) - 1) + return n +} + +func RandTime() time.Time { + return time.Unix(int64(RandUInt64Exp()), 0) +} + +func RandBytes(n int) []byte { + bs := make([]byte, n) + for i := 0; i < n; i++ { + bs[i] = byte(rand.Intn(256)) + } + return bs +} + +//----------------------------------------------------------------------------- +// CRand* methods are crypto safe. + +func CRandBytes(numBytes int) []byte { b := make([]byte, numBytes) _, err := crand.Read(b) if err != nil { @@ -56,8 +115,7 @@ func RandBytes(numBytes int) []byte { return b } -// Crypto safe // RandHex(24) gives 96 bits of randomness, strong enough for most purposes. -func RandHex(numDigits int) string { - return hex.EncodeToString(RandBytes(numDigits / 2)) +func CRandHex(numDigits int) string { + return hex.EncodeToString(CRandBytes(numDigits / 2)) } diff --git a/consensus/consensus.go b/consensus/consensus.go index 1205ba490..0d3ffdf3b 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -142,25 +142,16 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // TODO optimize return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ - Id: StateCh, - SendQueueCapacity: 1, - RecvQueueCapacity: 10, - RecvBufferSize: 10240, - DefaultPriority: 5, + Id: StateCh, + Priority: 5, }, &p2p.ChannelDescriptor{ - Id: DataCh, - SendQueueCapacity: 1, - RecvQueueCapacity: 10, - RecvBufferSize: 10240, - DefaultPriority: 5, + Id: DataCh, + Priority: 5, }, &p2p.ChannelDescriptor{ - Id: VoteCh, - SendQueueCapacity: 1, - RecvQueueCapacity: 1000, - RecvBufferSize: 10240, - DefaultPriority: 5, + Id: VoteCh, + Priority: 5, }, } } diff --git a/db/db.go b/db/db.go index 84122d049..d44f4078d 100644 --- a/db/db.go +++ b/db/db.go @@ -1,6 +1,6 @@ package db -type Db interface { +type DB interface { Get([]byte) []byte Set([]byte, []byte) } diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 493fb0f38..44e2538fd 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -112,7 +112,7 @@ func NewAddrBook(filePath string) *AddrBook { // When modifying this, don't forget to update loadFromFile() func (a *AddrBook) init() { - a.key = RandHex(24) // 24/2 * 8 = 96 bits + a.key = CRandHex(24) // 24/2 * 8 = 96 bits // New addr buckets a.addrNew = make([]map[string]*knownAddress, newBucketCount) for i := range a.addrNew { diff --git a/p2p/connection.go b/p2p/connection.go index 6f35eff42..f9aabf554 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -16,15 +16,17 @@ import ( ) const ( - numBatchPackets = 10 - minReadBufferSize = 1024 - minWriteBufferSize = 1024 - flushThrottleMS = 50 - idleTimeoutMinutes = 5 - updateStatsSeconds = 2 - pingTimeoutMinutes = 2 - defaultSendRate = 51200 // 5Kb/s - defaultRecvRate = 51200 // 5Kb/s + numBatchPackets = 10 + minReadBufferSize = 1024 + minWriteBufferSize = 1024 + flushThrottleMS = 50 + idleTimeoutMinutes = 5 + updateStatsSeconds = 2 + pingTimeoutMinutes = 2 + defaultSendRate = 51200 // 5Kb/s + defaultRecvRate = 51200 // 5Kb/s + defaultSendQueueCapacity = 1 + defaultRecvBufferCapacity = 4096 ) type receiveCbFunc func(chId byte, msgBytes []byte) @@ -399,11 +401,8 @@ FOR_LOOP: //----------------------------------------------------------------------------- type ChannelDescriptor struct { - Id byte - SendQueueCapacity int // One per MConnection. - RecvQueueCapacity int // Global for this channel. - RecvBufferSize int - DefaultPriority uint + Id byte + Priority uint } // TODO: lowercase. @@ -421,16 +420,16 @@ type Channel struct { } func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { - if desc.DefaultPriority <= 0 { + if desc.Priority <= 0 { panic("Channel default priority must be a postive integer") } return &Channel{ conn: conn, desc: desc, id: desc.Id, - sendQueue: make(chan []byte, desc.SendQueueCapacity), - recving: make([]byte, 0, desc.RecvBufferSize), - priority: desc.DefaultPriority, + sendQueue: make(chan []byte, defaultSendQueueCapacity), + recving: make([]byte, 0, defaultRecvBufferCapacity), + priority: desc.Priority, } } @@ -462,7 +461,7 @@ func (ch *Channel) loadSendQueueSize() (size int) { // Goroutine-safe // Use only as a heuristic. func (ch *Channel) canSend() bool { - return ch.loadSendQueueSize() < ch.desc.SendQueueCapacity + return ch.loadSendQueueSize() < defaultSendQueueCapacity } // Returns true if any packets are pending to be sent. @@ -513,7 +512,7 @@ func (ch *Channel) recvPacket(pkt packet) []byte { ch.recving = append(ch.recving, pkt.Bytes...) if pkt.EOF == byte(0x01) { msgBytes := ch.recving - ch.recving = make([]byte, 0, ch.desc.RecvBufferSize) + ch.recving = make([]byte, 0, defaultRecvBufferCapacity) return msgBytes } return nil diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index da86f20bf..e0c0713ba 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -68,14 +68,10 @@ func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { // Implements Reactor func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { - // TODO optimize return []*ChannelDescriptor{ &ChannelDescriptor{ - Id: PexCh, - SendQueueCapacity: 1, - RecvQueueCapacity: 2, - RecvBufferSize: 1024, - DefaultPriority: 1, + Id: PexCh, + Priority: 1, }, } } diff --git a/p2p/switch.go b/p2p/switch.go index 565d2c16d..9d55c1f49 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -105,6 +105,10 @@ func (s *Switch) Stop() { } } +func (s *Switch) Reactors() []Reactor { + return s.reactors +} + func (s *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { if atomic.LoadUint32(&s.stopped) == 1 { return nil, ErrSwitchStopped diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 612ca3126..33745a9de 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -2,8 +2,8 @@ package p2p import ( "bytes" - "encoding/hex" "io" + "sync" "testing" "time" @@ -17,30 +17,66 @@ func (s String) WriteTo(w io.Writer) (n int64, err error) { return } -// convenience method for creating two switches connected to each other. -func makeSwitchPair(t testing.TB, numChannels int, sendQueueCapacity int, recvBufferSize int, recvQueueCapacity int) (*Switch, *Switch, []*ChannelDescriptor) { +//----------------------------------------------------------------------------- + +type PeerMessage struct { + PeerKey string + Bytes []byte + Counter int +} - // Make numChannels channels starting at byte(0x00) - chIds := []byte{} - for i := 0; i < numChannels; i++ { - chIds = append(chIds, byte(i)) +type TestReactor struct { + mtx sync.Mutex + channels []*ChannelDescriptor + peersAdded []*Peer + peersRemoved []*Peer + logMessages bool + msgsCounter int + msgsReceived map[byte][]PeerMessage +} + +func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor { + return &TestReactor{ + channels: channels, + logMessages: logMessages, + msgsReceived: make(map[byte][]PeerMessage), } +} + +func (tr *TestReactor) GetChannels() []*ChannelDescriptor { + return tr.channels +} + +func (tr *TestReactor) AddPeer(peer *Peer) { + tr.mtx.Lock() + defer tr.mtx.Unlock() + tr.peersAdded = append(tr.peersAdded, peer) +} - // Make some channel descriptors. - chDescs := []*ChannelDescriptor{} - for _, chId := range chIds { - chDescs = append(chDescs, &ChannelDescriptor{ - Id: chId, - SendQueueCapacity: sendQueueCapacity, - RecvBufferSize: recvBufferSize, - RecvQueueCapacity: recvQueueCapacity, - DefaultPriority: 1, - }) +func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) { + tr.mtx.Lock() + defer tr.mtx.Unlock() + tr.peersRemoved = append(tr.peersRemoved, peer) +} + +func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) { + if tr.logMessages { + tr.mtx.Lock() + defer tr.mtx.Unlock() + //fmt.Printf("Received: %X, %X\n", chId, msgBytes) + tr.msgsReceived[chId] = append(tr.msgsReceived[chId], PeerMessage{peer.Key, msgBytes, tr.msgsCounter}) + tr.msgsCounter++ } +} + +//----------------------------------------------------------------------------- + +// convenience method for creating two switches connected to each other. +func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch, *Switch) { // Create two switches that will be interconnected. - s1 := NewSwitch(chDescs) - s2 := NewSwitch(chDescs) + s1 := NewSwitch(reactorsGenerator()) + s2 := NewSwitch(reactorsGenerator()) // Create a listener for s1 l := NewDefaultListener("tcp", ":8001") @@ -67,11 +103,23 @@ func makeSwitchPair(t testing.TB, numChannels int, sendQueueCapacity int, recvBu // Close the server, no longer needed. l.Stop() - return s1, s2, chDescs + return s1, s2 } func TestSwitches(t *testing.T) { - s1, s2, _ := makeSwitchPair(t, 10, 10, 1024, 10) + s1, s2 := makeSwitchPair(t, func() []Reactor { + // Make two reactors of two channels each + reactors := make([]Reactor, 2) + reactors[0] = NewTestReactor([]*ChannelDescriptor{ + &ChannelDescriptor{Id: byte(0x00), Priority: 10}, + &ChannelDescriptor{Id: byte(0x01), Priority: 10}, + }, true) + reactors[1] = NewTestReactor([]*ChannelDescriptor{ + &ChannelDescriptor{Id: byte(0x02), Priority: 10}, + &ChannelDescriptor{Id: byte(0x03), Priority: 10}, + }, true) + return reactors + }) defer s1.Stop() defer s2.Stop() @@ -83,61 +131,66 @@ func TestSwitches(t *testing.T) { t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size()) } - // Broadcast a message on ch0 - s1.Broadcast(byte(0x00), String("channel zero")) - // Broadcast a message on ch1 - s1.Broadcast(byte(0x01), String("channel one")) - // Broadcast a message on ch2 - s1.Broadcast(byte(0x02), String("channel two")) + ch0Msg := String("channel zero") + ch1Msg := String("channel one") + ch2Msg := String("channel two") + + s1.Broadcast(byte(0x00), ch0Msg) + s1.Broadcast(byte(0x01), ch1Msg) + s1.Broadcast(byte(0x02), ch2Msg) // Wait for things to settle... time.Sleep(100 * time.Millisecond) - // Receive message from channel 1 and check - inMsg, ok := s2.Receive(byte(0x01)) - var n int64 - var err error - if !ok { - t.Errorf("Failed to receive from channel one") + // Check message on ch0 + ch0Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x00)] + if len(ch0Msgs) != 1 { + t.Errorf("Expected to have received 1 message in ch0") } - if ReadString(bytes.NewBuffer(inMsg.Bytes), &n, &err) != "channel one" { - t.Errorf("Unexpected received message bytes:\n%v", hex.Dump(inMsg.Bytes)) + if !bytes.Equal(ch0Msgs[0].Bytes, BinaryBytes(ch0Msg)) { + t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", BinaryBytes(ch0Msg), ch0Msgs[0].Bytes) } - // Receive message from channel 0 and check - inMsg, ok = s2.Receive(byte(0x00)) - if !ok { - t.Errorf("Failed to receive from channel zero") + // Check message on ch1 + ch1Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x01)] + if len(ch1Msgs) != 1 { + t.Errorf("Expected to have received 1 message in ch1") + } + if !bytes.Equal(ch1Msgs[0].Bytes, BinaryBytes(ch1Msg)) { + t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", BinaryBytes(ch1Msg), ch1Msgs[0].Bytes) } - if ReadString(bytes.NewBuffer(inMsg.Bytes), &n, &err) != "channel zero" { - t.Errorf("Unexpected received message bytes:\n%v", hex.Dump(inMsg.Bytes)) + + // Check message on ch2 + ch2Msgs := s2.Reactors()[1].(*TestReactor).msgsReceived[byte(0x02)] + if len(ch2Msgs) != 1 { + t.Errorf("Expected to have received 1 message in ch2") } + if !bytes.Equal(ch2Msgs[0].Bytes, BinaryBytes(ch2Msg)) { + t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", BinaryBytes(ch2Msg), ch2Msgs[0].Bytes) + } + } func BenchmarkSwitches(b *testing.B) { b.StopTimer() - s1, s2, chDescs := makeSwitchPair(b, 10, 10, 1024, 10) + s1, s2 := makeSwitchPair(b, func() []Reactor { + // Make two reactors of two channels each + reactors := make([]Reactor, 2) + reactors[0] = NewTestReactor([]*ChannelDescriptor{ + &ChannelDescriptor{Id: byte(0x00), Priority: 10}, + &ChannelDescriptor{Id: byte(0x01), Priority: 10}, + }, false) + reactors[1] = NewTestReactor([]*ChannelDescriptor{ + &ChannelDescriptor{Id: byte(0x02), Priority: 10}, + &ChannelDescriptor{Id: byte(0x03), Priority: 10}, + }, false) + return reactors + }) defer s1.Stop() defer s2.Stop() - // Create a sink on either channel to just pop off messages. - recvRoutine := func(c *Switch, chId byte) { - for { - _, ok := c.Receive(chId) - if !ok { - break - } - } - } - - // Create routines to consume from recvQueues. - for _, chDesc := range chDescs { - go recvRoutine(s1, chDesc.Id) - go recvRoutine(s2, chDesc.Id) - } - // Allow time for goroutines to boot up time.Sleep(1000 * time.Millisecond) b.StartTimer() @@ -146,7 +199,7 @@ func BenchmarkSwitches(b *testing.B) { // Send random message from one channel to another for i := 0; i < b.N; i++ { - chId := chDescs[i%len(chDescs)].Id + chId := byte(i % 4) nS, nF := s1.Broadcast(chId, String("test data")) numSuccess += nS numFailure += nF diff --git a/state/account.go b/state/account.go index 6a94b8b8a..707a4db7d 100644 --- a/state/account.go +++ b/state/account.go @@ -8,19 +8,30 @@ import ( "io" ) +const ( + AccountBalanceStatusNominal = byte(0x00) + AccountBalanceStatusBonded = byte(0x01) +) + type Account struct { Id uint64 // Numeric id of account, incrementing. PubKey []byte } -func ReadAccount(r io.Reader, n *int64, err *error) *Account { - return &Account{ +func ReadAccount(r io.Reader, n *int64, err *error) Account { + return Account{ Id: ReadUInt64(r, n, err), PubKey: ReadByteSlice(r, n, err), } } -func (account *Account) Verify(msg []byte, sig Signature) bool { +func (account Account) WriteTo(w io.Writer) (n int64, err error) { + WriteUInt64(w, account.Id, &n, &err) + WriteByteSlice(w, account.PubKey, &n, &err) + return +} + +func (account Account) Verify(msg []byte, sig Signature) bool { if sig.SignerId != account.Id { panic("Account.Id doesn't match sig.SignerId") } @@ -38,6 +49,22 @@ func (account *Account) Verify(msg []byte, sig Signature) bool { type AccountBalance struct { Account Balance uint64 + Status byte +} + +func ReadAccountBalance(r io.Reader, n *int64, err *error) *AccountBalance { + return &AccountBalance{ + Account: ReadAccount(r, n, err), + Balance: ReadUInt64(r, n, err), + Status: ReadByte(r, n, err), + } +} + +func (accBal AccountBalance) WriteTo(w io.Writer) (n int64, err error) { + WriteBinary(w, accBal.Account, &n, &err) + WriteUInt64(w, accBal.Balance, &n, &err) + WriteByte(w, accBal.Status, &n, &err) + return } //----------------------------------------------------------------------------- @@ -50,7 +77,7 @@ type PrivAccount struct { // Generates a new account with private key. // The Account.Id is empty since it isn't in the blockchain. func GenPrivAccount() *PrivAccount { - privKey := RandBytes(32) + privKey := CRandBytes(32) pubKey := crypto.MakePubKey(privKey) return &PrivAccount{ Account: Account{ diff --git a/state/account_test.go b/state/account_test.go index 78abec04d..528097052 100644 --- a/state/account_test.go +++ b/state/account_test.go @@ -10,7 +10,7 @@ func TestSignAndValidate(t *testing.T) { privAccount := GenPrivAccount() account := &privAccount.Account - msg := RandBytes(128) + msg := CRandBytes(128) sig := privAccount.Sign(msg) t.Logf("msg: %X, sig: %X", msg, sig) diff --git a/state/state.go b/state/state.go index 06d44ee35..1db32bb13 100644 --- a/state/state.go +++ b/state/state.go @@ -8,7 +8,7 @@ import ( . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/blocks" - db_ "github.com/tendermint/tendermint/db" + . "github.com/tendermint/tendermint/db" "github.com/tendermint/tendermint/merkle" ) @@ -20,7 +20,7 @@ var ( type State struct { mtx sync.Mutex - db db_.Db + db DB height uint32 // Last known block height blockHash []byte // Last known block hash commitTime time.Time @@ -28,11 +28,35 @@ type State struct { validators *ValidatorSet } -func GenesisState(commitTime time.Time, accounts merkle.Tree, validators *ValidatorSet) *State { +func GenesisState(db DB, genesisTime time.Time, accountBalances []*AccountBalance) *State { + + accounts := merkle.NewIAVLTree(db) + validators := map[uint64]*Validator{} + + for _, account := range accountBalances { + // XXX make codec merkle tree. + //accounts.Set(account.Id, BinaryBytes(account)) + validators[account.Id] = &Validator{ + Account: account.Account, + BondHeight: 0, + VotingPower: account.Balance, + Accum: 0, + } + } + validatorSet := NewValidatorSet(validators) + + return &State{ + db: db, + height: 0, + blockHash: nil, + commitTime: genesisTime, + accounts: accounts, + validators: validatorSet, + } } -func LoadState(db db_.Db) *State { - s := &State{} +func LoadState(db DB) *State { + s := &State{db: db} buf := db.Get(stateKey) if len(buf) == 0 { return nil @@ -160,7 +184,7 @@ func (s *State) Validators() *ValidatorSet { return s.validators } -func (s *State) Account(accountId uint64) (*Account, error) { +func (s *State) AccountBalance(accountId uint64) (*AccountBalance, error) { s.mtx.Lock() defer s.mtx.Unlock() idBytes, err := BasicCodec.Write(accountId) @@ -172,6 +196,6 @@ func (s *State) Account(accountId uint64) (*Account, error) { return nil, nil } n, err := int64(0), error(nil) - account := ReadAccount(bytes.NewBuffer(accountBytes), &n, &err) - return account, err + accountBalance := ReadAccountBalance(bytes.NewBuffer(accountBytes), &n, &err) + return accountBalance, err } diff --git a/state/state_test.go b/state/state_test.go new file mode 100644 index 000000000..3a0bd5f02 --- /dev/null +++ b/state/state_test.go @@ -0,0 +1,42 @@ +package state + +import ( + . "github.com/tendermint/tendermint/common" + . "github.com/tendermint/tendermint/db" + + "testing" + "time" +) + +func randAccountBalance(id uint64, status byte) *AccountBalance { + return &AccountBalance{ + Account: Account{ + Id: id, + PubKey: CRandBytes(32), + }, + Balance: RandUInt64(), + Status: status, + } +} + +// The first numValidators accounts are validators. +func randGenesisState(numAccounts int, numValidators int) *State { + db := NewMemDB() + accountBalances := make([]*AccountBalance, numAccounts) + for i := 0; i < numAccounts; i++ { + if i < numValidators { + accountBalances[i] = randAccountBalance(uint64(i), AccountBalanceStatusNominal) + } else { + accountBalances[i] = randAccountBalance(uint64(i), AccountBalanceStatusBonded) + } + } + s0 := GenesisState(db, time.Now(), accountBalances) + return s0 +} + +func TestGenesisSaveLoad(t *testing.T) { + + s0 := randGenesisState(10, 5) + t.Log(s0) + +} diff --git a/state/validator.go b/state/validator.go index 9f493de69..8154795ef 100644 --- a/state/validator.go +++ b/state/validator.go @@ -12,7 +12,7 @@ import ( // TODO consider moving this to another common types package. type Validator struct { Account - BondHeight uint32 + BondHeight uint32 // TODO: is this needed? VotingPower uint64 Accum int64 } @@ -20,10 +20,7 @@ type Validator struct { // Used to persist the state of ConsensusStateControl. func ReadValidator(r io.Reader, n *int64, err *error) *Validator { return &Validator{ - Account: Account{ - Id: ReadUInt64(r, n, err), - PubKey: ReadByteSlice(r, n, err), - }, + Account: ReadAccount(r, n, err), BondHeight: ReadUInt32(r, n, err), VotingPower: ReadUInt64(r, n, err), Accum: ReadInt64(r, n, err), @@ -42,8 +39,7 @@ func (v *Validator) Copy() *Validator { // Used to persist the state of ConsensusStateControl. func (v *Validator) WriteTo(w io.Writer) (n int64, err error) { - WriteUInt64(w, v.Id, &n, &err) - WriteByteSlice(w, v.PubKey, &n, &err) + WriteBinary(w, v.Account, &n, &err) WriteUInt32(w, v.BondHeight, &n, &err) WriteUInt64(w, v.VotingPower, &n, &err) WriteInt64(w, v.Accum, &n, &err)