From d300a67bb1a9740d03b7e9b2265b7c6af6c82ed7 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 10 Aug 2014 16:35:08 -0700 Subject: [PATCH] saving development state... --- README.md | 3 +- accounts/account.go | 23 - accounts/store.go | 18 + binary/byteslice.go | 4 +- binary/int.go | 36 +- binary/string.go | 4 +- binary/time.go | 4 +- blocks/account.go | 48 --- blocks/accounts.go | 63 +++ blocks/adjustment.go | 54 ++- blocks/block.go | 261 ++++++++++-- blocks/block_manager.go | 660 ----------------------------- blocks/block_part_set.go | 134 ++++++ blocks/log.go | 2 +- blocks/signature.go | 41 -- blocks/store.go | 126 ++++++ blocks/tx.go | 4 +- blocks/vote.go | 32 -- common/errors.go | 10 + common/heap.go | 87 +++- common/repeat_timer.go | 4 +- common/throttle_timer.go | 4 +- config/config.go | 17 +- consensus/README.md | 17 + consensus/consensus.go | 875 +++++++++++++++++++++++++++++++++++++++ consensus/log.go | 15 + consensus/state.go | 378 +++++++++++++++++ consensus/validator.go | 66 +++ consensus/vote.go | 207 +++++++++ db/db.go | 6 + db/level_db.go | 10 - db/mem_db.go | 3 - log.go | 8 +- main.go | 52 ++- merkle/iavl_node.go | 59 ++- merkle/iavl_tree.go | 24 +- merkle/types.go | 5 +- p2p/addrbook.go | 4 +- p2p/connection.go | 48 +-- p2p/listener.go | 4 +- p2p/log.go | 2 +- p2p/netaddress.go | 4 +- p2p/peer.go | 12 +- p2p/peer_manager.go | 38 +- p2p/switch.go | 6 +- p2p/switch_test.go | 6 +- 46 files changed, 2444 insertions(+), 1044 deletions(-) delete mode 100644 accounts/account.go create mode 100644 accounts/store.go delete mode 100644 blocks/account.go create mode 100644 blocks/accounts.go delete mode 100644 blocks/block_manager.go create mode 100644 blocks/block_part_set.go delete mode 100644 blocks/signature.go create mode 100644 blocks/store.go delete mode 100644 blocks/vote.go create mode 100644 common/errors.go create mode 100644 consensus/README.md create mode 100644 consensus/consensus.go create mode 100644 consensus/log.go create mode 100644 consensus/state.go create mode 100644 consensus/validator.go create mode 100644 consensus/vote.go create mode 100644 db/db.go diff --git a/README.md b/README.md index 9f71a04fd..fed2b59eb 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,8 @@ TenderMint - proof of concept ### Status -* Block manager *now* +* Consensus *now* +* Block propagation *sidelined* * Node & testnet *complete* * PEX peer exchange *complete* * p2p/* *complete* diff --git a/accounts/account.go b/accounts/account.go deleted file mode 100644 index ee12ced32..000000000 --- a/accounts/account.go +++ /dev/null @@ -1,23 +0,0 @@ -package accounts - -import ( - . "github.com/tendermint/tendermint/binary" -) - -type Account struct { - Name String - PubKey ByteSlice -} - -func (self *Account) Verify(msg ByteSlice, sig ByteSlice) bool { - return false -} - -type MyAccount struct { - Account - PrivKey ByteSlice -} - -func (self *MyAccount) Sign(msg ByteSlice) ByteSlice { - return nil -} diff --git a/accounts/store.go b/accounts/store.go new file mode 100644 index 000000000..5c3c27251 --- /dev/null +++ b/accounts/store.go @@ -0,0 +1,18 @@ +package accounts + +import ( + . "github.com/tendermint/tendermint/blocks" +) + +type AccountStore struct { +} + +func (as *AccountStore) StageBlock(block *Block) error { + // XXX implement staging. + return nil +} + +func (as *AccountStore) CommitBlock(block *Block) error { + // XXX implement staging. + return nil +} diff --git a/binary/byteslice.go b/binary/byteslice.go index 0390b0177..7c1c495b1 100644 --- a/binary/byteslice.go +++ b/binary/byteslice.go @@ -5,7 +5,7 @@ import "bytes" type ByteSlice []byte -func (self ByteSlice) Equals(other Binary) bool { +func (self ByteSlice) Equals(other interface{}) bool { if o, ok := other.(ByteSlice); ok { return bytes.Equal(self, o) } else { @@ -13,7 +13,7 @@ func (self ByteSlice) Equals(other Binary) bool { } } -func (self ByteSlice) Less(other Binary) bool { +func (self ByteSlice) Less(other interface{}) bool { if o, ok := other.(ByteSlice); ok { return bytes.Compare(self, o) < 0 // -1 if a < b } else { diff --git a/binary/int.go b/binary/int.go index 81382419a..4e404cfa7 100644 --- a/binary/int.go +++ b/binary/int.go @@ -19,11 +19,11 @@ type UInt uint // Byte -func (self Byte) Equals(other Binary) bool { +func (self Byte) Equals(other interface{}) bool { return self == other } -func (self Byte) Less(other Binary) bool { +func (self Byte) Less(other interface{}) bool { if o, ok := other.(Byte); ok { return self < o } else { @@ -67,11 +67,11 @@ func ReadByte(r io.Reader) Byte { // Int8 -func (self Int8) Equals(other Binary) bool { +func (self Int8) Equals(other interface{}) bool { return self == other } -func (self Int8) Less(other Binary) bool { +func (self Int8) Less(other interface{}) bool { if o, ok := other.(Int8); ok { return self < o } else { @@ -115,11 +115,11 @@ func ReadInt8(r io.Reader) Int8 { // UInt8 -func (self UInt8) Equals(other Binary) bool { +func (self UInt8) Equals(other interface{}) bool { return self == other } -func (self UInt8) Less(other Binary) bool { +func (self UInt8) Less(other interface{}) bool { if o, ok := other.(UInt8); ok { return self < o } else { @@ -163,11 +163,11 @@ func ReadUInt8(r io.Reader) UInt8 { // Int16 -func (self Int16) Equals(other Binary) bool { +func (self Int16) Equals(other interface{}) bool { return self == other } -func (self Int16) Less(other Binary) bool { +func (self Int16) Less(other interface{}) bool { if o, ok := other.(Int16); ok { return self < o } else { @@ -213,11 +213,11 @@ func ReadInt16(r io.Reader) Int16 { // UInt16 -func (self UInt16) Equals(other Binary) bool { +func (self UInt16) Equals(other interface{}) bool { return self == other } -func (self UInt16) Less(other Binary) bool { +func (self UInt16) Less(other interface{}) bool { if o, ok := other.(UInt16); ok { return self < o } else { @@ -263,11 +263,11 @@ func ReadUInt16(r io.Reader) UInt16 { // Int32 -func (self Int32) Equals(other Binary) bool { +func (self Int32) Equals(other interface{}) bool { return self == other } -func (self Int32) Less(other Binary) bool { +func (self Int32) Less(other interface{}) bool { if o, ok := other.(Int32); ok { return self < o } else { @@ -313,11 +313,11 @@ func ReadInt32(r io.Reader) Int32 { // UInt32 -func (self UInt32) Equals(other Binary) bool { +func (self UInt32) Equals(other interface{}) bool { return self == other } -func (self UInt32) Less(other Binary) bool { +func (self UInt32) Less(other interface{}) bool { if o, ok := other.(UInt32); ok { return self < o } else { @@ -363,11 +363,11 @@ func ReadUInt32(r io.Reader) UInt32 { // Int64 -func (self Int64) Equals(other Binary) bool { +func (self Int64) Equals(other interface{}) bool { return self == other } -func (self Int64) Less(other Binary) bool { +func (self Int64) Less(other interface{}) bool { if o, ok := other.(Int64); ok { return self < o } else { @@ -413,11 +413,11 @@ func ReadInt64(r io.Reader) Int64 { // UInt64 -func (self UInt64) Equals(other Binary) bool { +func (self UInt64) Equals(other interface{}) bool { return self == other } -func (self UInt64) Less(other Binary) bool { +func (self UInt64) Less(other interface{}) bool { if o, ok := other.(UInt64); ok { return self < o } else { diff --git a/binary/string.go b/binary/string.go index b5ad69d02..a05d48453 100644 --- a/binary/string.go +++ b/binary/string.go @@ -6,11 +6,11 @@ type String string // String -func (self String) Equals(other Binary) bool { +func (self String) Equals(other interface{}) bool { return self == other } -func (self String) Less(other Binary) bool { +func (self String) Less(other interface{}) bool { if o, ok := other.(String); ok { return self < o } else { diff --git a/binary/time.go b/binary/time.go index f2fe1ea66..4750694cb 100644 --- a/binary/time.go +++ b/binary/time.go @@ -9,7 +9,7 @@ type Time struct { time.Time } -func (self Time) Equals(other Binary) bool { +func (self Time) Equals(other interface{}) bool { if o, ok := other.(Time); ok { return self.Equal(o.Time) } else { @@ -17,7 +17,7 @@ func (self Time) Equals(other Binary) bool { } } -func (self Time) Less(other Binary) bool { +func (self Time) Less(other interface{}) bool { if o, ok := other.(Time); ok { return self.Before(o.Time) } else { diff --git a/blocks/account.go b/blocks/account.go deleted file mode 100644 index 110e414a4..000000000 --- a/blocks/account.go +++ /dev/null @@ -1,48 +0,0 @@ -package blocks - -import ( - . "github.com/tendermint/tendermint/binary" - . "github.com/tendermint/tendermint/common" - "io" -) - -type AccountId struct { - Type Byte - Number UInt64 - PubKey ByteSlice -} - -const ( - ACCOUNT_TYPE_NUMBER = Byte(0x01) - ACCOUNT_TYPE_PUBKEY = Byte(0x02) - ACCOUNT_TYPE_BOTH = Byte(0x03) -) - -func ReadAccountId(r io.Reader) AccountId { - switch t := ReadByte(r); t { - case ACCOUNT_TYPE_NUMBER: - return AccountId{t, ReadUInt64(r), nil} - case ACCOUNT_TYPE_PUBKEY: - return AccountId{t, 0, ReadByteSlice(r)} - case ACCOUNT_TYPE_BOTH: - return AccountId{t, ReadUInt64(r), ReadByteSlice(r)} - default: - Panicf("Unknown AccountId type %x", t) - return AccountId{} - } -} - -func (self AccountId) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(self.Type, w, n, err) - if self.Type == ACCOUNT_TYPE_NUMBER || self.Type == ACCOUNT_TYPE_BOTH { - n, err = WriteTo(self.Number, w, n, err) - } - if self.Type == ACCOUNT_TYPE_PUBKEY || self.Type == ACCOUNT_TYPE_BOTH { - n, err = WriteTo(self.PubKey, w, n, err) - } - return -} - -func AccountNumber(n UInt64) AccountId { - return AccountId{ACCOUNT_TYPE_NUMBER, n, nil} -} diff --git a/blocks/accounts.go b/blocks/accounts.go new file mode 100644 index 000000000..7589a8d29 --- /dev/null +++ b/blocks/accounts.go @@ -0,0 +1,63 @@ +package blocks + +import ( + . "github.com/tendermint/tendermint/binary" + "io" +) + +// NOTE: consensus/Validator embeds this, so.. +type Account struct { + Id UInt64 // Numeric id of account, incrementing. + PubKey ByteSlice +} + +func (self *Account) Verify(msg ByteSlice, sig ByteSlice) bool { + return false +} + +//----------------------------------------------------------------------------- + +type PrivAccount struct { + Account + PrivKey ByteSlice +} + +func (self *PrivAccount) Sign(msg ByteSlice) Signature { + return Signature{} +} + +//----------------------------------------------------------------------------- + +/* +Signature message wire format: + + |A...|SSS...| + + A account number, varint encoded (1+ bytes) + S signature of all prior bytes (32 bytes) + +It usually follows the message to be signed. + +*/ + +type Signature struct { + SignerId UInt64 + Bytes ByteSlice +} + +func ReadSignature(r io.Reader) Signature { + return Signature{ + SignerId: ReadUInt64(r), + Bytes: ReadByteSlice(r), + } +} + +func (sig Signature) IsZero() bool { + return len(sig.Bytes) == 0 +} + +func (sig Signature) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(sig.SignerId, w, n, err) + n, err = WriteTo(sig.Bytes, w, n, err) + return +} diff --git a/blocks/adjustment.go b/blocks/adjustment.go index 84a429b42..0b86703af 100644 --- a/blocks/adjustment.go +++ b/blocks/adjustment.go @@ -15,7 +15,6 @@ import ( TODO: signing a bad checkpoint (block) */ - type Adjustment interface { Type() Byte Binary @@ -33,7 +32,7 @@ func ReadAdjustment(r io.Reader) Adjustment { case ADJ_TYPE_BOND: return &Bond{ Fee: ReadUInt64(r), - UnbondTo: ReadAccountId(r), + UnbondTo: ReadUInt64(r), Amount: ReadUInt64(r), Signature: ReadSignature(r), } @@ -45,13 +44,13 @@ func ReadAdjustment(r io.Reader) Adjustment { } case ADJ_TYPE_TIMEOUT: return &Timeout{ - Account: ReadAccountId(r), + Account: ReadUInt64(r), Penalty: ReadUInt64(r), } case ADJ_TYPE_DUPEOUT: return &Dupeout{ - VoteA: ReadVote(r), - VoteB: ReadVote(r), + VoteA: ReadBlockVote(r), + VoteB: ReadBlockVote(r), } default: Panicf("Unknown Adjustment type %x", t) @@ -59,11 +58,12 @@ func ReadAdjustment(r io.Reader) Adjustment { } } -/* Bond < Adjustment */ +//----------------------------------------------------------------------------- +/* Bond < Adjustment */ type Bond struct { Fee UInt64 - UnbondTo AccountId + UnbondTo UInt64 Amount UInt64 Signature } @@ -81,8 +81,9 @@ func (self *Bond) WriteTo(w io.Writer) (n int64, err error) { return } -/* Unbond < Adjustment */ +//----------------------------------------------------------------------------- +/* Unbond < Adjustment */ type Unbond struct { Fee UInt64 Amount UInt64 @@ -101,10 +102,11 @@ func (self *Unbond) WriteTo(w io.Writer) (n int64, err error) { return } -/* Timeout < Adjustment */ +//----------------------------------------------------------------------------- +/* Timeout < Adjustment */ type Timeout struct { - Account AccountId + Account UInt64 Penalty UInt64 } @@ -119,11 +121,37 @@ func (self *Timeout) WriteTo(w io.Writer) (n int64, err error) { return } -/* Dupeout < Adjustment */ +//----------------------------------------------------------------------------- + +/* +The full vote structure is only needed when presented as evidence. +Typically only the signature is passed around, as the hash & height are implied. +*/ +type BlockVote struct { + Height UInt64 + BlockHash ByteSlice + Signature +} + +func ReadBlockVote(r io.Reader) BlockVote { + return BlockVote{ + Height: ReadUInt64(r), + BlockHash: ReadByteSlice(r), + Signature: ReadSignature(r), + } +} +func (self BlockVote) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(self.Height, w, n, err) + n, err = WriteTo(self.BlockHash, w, n, err) + n, err = WriteTo(self.Signature, w, n, err) + return +} + +/* Dupeout < Adjustment */ type Dupeout struct { - VoteA Vote - VoteB Vote + VoteA BlockVote + VoteB BlockVote } func (self *Dupeout) Type() Byte { diff --git a/blocks/block.go b/blocks/block.go index 37b5ce439..3448bd333 100644 --- a/blocks/block.go +++ b/blocks/block.go @@ -1,16 +1,35 @@ package blocks import ( + "crypto/sha256" + "fmt" + "io" + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/merkle" - "io" ) -/* Block */ +const ( + defaultBlockPartSizeBytes = 4096 +) + +func CalcBlockURI(height uint32, hash []byte) string { + return fmt.Sprintf("%v://block/%v#%X", + config.Config.Network, + height, + hash, + ) +} + type Block struct { Header Validation Txs + + // Volatile + hash []byte } func ReadBlock(r io.Reader) *Block { @@ -21,60 +40,200 @@ func ReadBlock(r io.Reader) *Block { } } -func (self *Block) Validate() bool { - return false +func (b *Block) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(&b.Header, w, n, err) + n, err = WriteTo(&b.Validation, w, n, err) + n, err = WriteTo(&b.Txs, w, n, err) + return +} + +func (b *Block) ValidateBasic() error { + // Basic validation that doesn't involve context. + // XXX + return nil +} + +func (b *Block) URI() string { + return CalcBlockURI(uint32(b.Height), b.Hash()) +} + +func (b *Block) Hash() []byte { + if b.hash != nil { + return b.hash + } else { + hashes := []Binary{ + ByteSlice(b.Header.Hash()), + ByteSlice(b.Validation.Hash()), + ByteSlice(b.Txs.Hash()), + } + // Merkle hash from sub-hashes. + return merkle.HashFromBinarySlice(hashes) + } +} + +// The returns parts must be signed afterwards. +func (b *Block) ToBlockParts() (parts []*BlockPart) { + blockBytes := BinaryBytes(b) + total := (len(blockBytes) + defaultBlockPartSizeBytes - 1) / defaultBlockPartSizeBytes + for i := 0; i < total; i++ { + start := defaultBlockPartSizeBytes * i + end := MinInt(start+defaultBlockPartSizeBytes, len(blockBytes)) + partBytes := make([]byte, end-start) + copy(partBytes, blockBytes[start:end]) // Do not ref the original byteslice. + part := &BlockPart{ + Height: b.Height, + Index: UInt16(i), + Total: UInt16(total), + Bytes: partBytes, + Signature: Signature{}, // No signature. + } + parts = append(parts, part) + } + return parts +} + +//----------------------------------------------------------------------------- + +/* +BlockPart represents a chunk of the bytes of a block. +Each block is divided into fixed length chunks (e.g. 4Kb) +for faster propagation across the gossip network. +*/ +type BlockPart struct { + Height UInt32 + Round UInt16 // Add Round? Well I need to know... + Index UInt16 + Total UInt16 + Bytes ByteSlice + Signature + + // Volatile + hash []byte +} + +func ReadBlockPart(r io.Reader) *BlockPart { + return &BlockPart{ + Height: ReadUInt32(r), + Round: ReadUInt16(r), + Index: ReadUInt16(r), + Total: ReadUInt16(r), + Bytes: ReadByteSlice(r), + Signature: ReadSignature(r), + } } -func (self *Block) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(&self.Header, w, n, err) - n, err = WriteTo(&self.Validation, w, n, err) - n, err = WriteTo(&self.Txs, w, n, err) +func (bp *BlockPart) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(&bp.Height, w, n, err) + n, err = WriteTo(&bp.Round, w, n, err) + n, err = WriteTo(&bp.Index, w, n, err) + n, err = WriteTo(&bp.Total, w, n, err) + n, err = WriteTo(&bp.Bytes, w, n, err) + n, err = WriteTo(&bp.Signature, w, n, err) return } -/* Block > Header */ +func (bp *BlockPart) URI() string { + return fmt.Sprintf("%v://block/%v/%v[%v/%v]#%X\n", + config.Config.Network, + bp.Height, + bp.Round, + bp.Index, + bp.Total, + bp.BlockPartHash(), + ) +} + +func (bp *BlockPart) BlockPartHash() []byte { + if bp.hash != nil { + return bp.hash + } else { + hasher := sha256.New() + hasher.Write(bp.Bytes) + bp.hash = hasher.Sum(nil) + return bp.hash + } +} + +// Signs the URI, which includes all data and metadata. +// XXX implement or change +func (bp *BlockPart) Sign(acc *PrivAccount) { + // TODO: populate Signature +} + +// XXX maybe change. +func (bp *BlockPart) ValidateWithSigner(signer *Account) error { + // TODO: Sanity check height, index, total, bytes, etc. + if !signer.Verify([]byte(bp.URI()), bp.Signature.Bytes) { + return ErrInvalidBlockPartSignature + } + return nil +} + +//----------------------------------------------------------------------------- + +/* Header is part of a Block */ type Header struct { Name String - Height UInt64 + Height UInt32 Fees UInt64 - Time UInt64 + Time Time PrevHash ByteSlice ValidationHash ByteSlice TxsHash ByteSlice + + // Volatile + hash []byte } func ReadHeader(r io.Reader) Header { return Header{ Name: ReadString(r), - Height: ReadUInt64(r), + Height: ReadUInt32(r), Fees: ReadUInt64(r), - Time: ReadUInt64(r), + Time: ReadTime(r), PrevHash: ReadByteSlice(r), ValidationHash: ReadByteSlice(r), TxsHash: ReadByteSlice(r), } } -func (self *Header) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(self.Name, w, n, err) - n, err = WriteTo(self.Height, w, n, err) - n, err = WriteTo(self.Fees, w, n, err) - n, err = WriteTo(self.Time, w, n, err) - n, err = WriteTo(self.PrevHash, w, n, err) - n, err = WriteTo(self.ValidationHash, w, n, err) - n, err = WriteTo(self.TxsHash, w, n, err) +func (h *Header) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(h.Name, w, n, err) + n, err = WriteTo(h.Height, w, n, err) + n, err = WriteTo(h.Fees, w, n, err) + n, err = WriteTo(h.Time, w, n, err) + n, err = WriteTo(h.PrevHash, w, n, err) + n, err = WriteTo(h.ValidationHash, w, n, err) + n, err = WriteTo(h.TxsHash, w, n, err) return } -/* Block > Validation */ +func (h *Header) Hash() []byte { + if h.hash != nil { + return h.hash + } else { + hasher := sha256.New() + _, err := h.WriteTo(hasher) + if err != nil { + panic(err) + } + h.hash = hasher.Sum(nil) + return h.hash + } +} + +/* Validation is part of a block */ type Validation struct { Signatures []Signature Adjustments []Adjustment + + // Volatile + hash []byte } func ReadValidation(r io.Reader) Validation { - numSigs := int(ReadUInt64(r)) - numAdjs := int(ReadUInt64(r)) + numSigs := int(ReadUInt32(r)) + numAdjs := int(ReadUInt32(r)) sigs := make([]Signature, 0, numSigs) for i := 0; i < numSigs; i++ { sigs = append(sigs, ReadSignature(r)) @@ -89,44 +248,66 @@ func ReadValidation(r io.Reader) Validation { } } -func (self *Validation) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(UInt64(len(self.Signatures)), w, n, err) - n, err = WriteTo(UInt64(len(self.Adjustments)), w, n, err) - for _, sig := range self.Signatures { +func (v *Validation) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(UInt32(len(v.Signatures)), w, n, err) + n, err = WriteTo(UInt32(len(v.Adjustments)), w, n, err) + for _, sig := range v.Signatures { n, err = WriteTo(sig, w, n, err) } - for _, adj := range self.Adjustments { + for _, adj := range v.Adjustments { n, err = WriteTo(adj, w, n, err) } return } -/* Block > Txs */ +func (v *Validation) Hash() []byte { + if v.hash != nil { + return v.hash + } else { + hasher := sha256.New() + _, err := v.WriteTo(hasher) + if err != nil { + panic(err) + } + v.hash = hasher.Sum(nil) + return v.hash + } +} + +/* Txs is part of a block */ type Txs struct { Txs []Tx + + // Volatile + hash []byte } func ReadTxs(r io.Reader) Txs { - numTxs := int(ReadUInt64(r)) + numTxs := int(ReadUInt32(r)) txs := make([]Tx, 0, numTxs) for i := 0; i < numTxs; i++ { txs = append(txs, ReadTx(r)) } - return Txs{txs} + return Txs{Txs: txs} } -func (self *Txs) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(UInt64(len(self.Txs)), w, n, err) - for _, tx := range self.Txs { +func (txs *Txs) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(UInt32(len(txs.Txs)), w, n, err) + for _, tx := range txs.Txs { n, err = WriteTo(tx, w, n, err) } return } -func (self *Txs) MerkleHash() ByteSlice { - bs := make([]Binary, 0, len(self.Txs)) - for i, tx := range self.Txs { - bs[i] = Binary(tx) +func (txs *Txs) Hash() []byte { + if txs.hash != nil { + return txs.hash + } else { + bs := make([]Binary, 0, len(txs.Txs)) + for i, tx := range txs.Txs { + bs[i] = Binary(tx) + } + txs.hash = merkle.HashFromBinarySlice(bs) + return txs.hash } - return merkle.HashFromBinarySlice(bs) } diff --git a/blocks/block_manager.go b/blocks/block_manager.go deleted file mode 100644 index fcad79951..000000000 --- a/blocks/block_manager.go +++ /dev/null @@ -1,660 +0,0 @@ -package blocks - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "sync" - "sync/atomic" - "time" - - . "github.com/tendermint/tendermint/binary" - . "github.com/tendermint/tendermint/common" - db_ "github.com/tendermint/tendermint/db" - "github.com/tendermint/tendermint/p2p" -) - -var dbKeyState = []byte("state") - -const ( - blocksInfoCh = byte(0x10) // For requests & cancellations - blocksDataCh = byte(0x11) // For data - - msgTypeUnknown = Byte(0x00) - msgTypeState = Byte(0x01) - msgTypeRequest = Byte(0x02) - msgTypeData = Byte(0x03) - - maxRequestsPerPeer = 2 // Maximum number of outstanding requests from peer. - maxRequestsPerData = 2 // Maximum number of outstanding requests of some data. - maxRequestAheadBlock = 5 // Maximum number of blocks to request ahead of current verified. Must be >= 1 - - defaultRequestTimeoutS = - timeoutRepeatTimerMS = 1000 // Handle timed out requests periodically -) - -/* -TODO: keep a heap of dataRequests * their corresponding timeouts. -timeout dataRequests and update the peerState, -TODO: need to keep track of progress, blocks are too large. or we need to chop into chunks. -TODO: need to validate blocks. :/ -TODO: actually save the block. -*/ - -//----------------------------------------------------------------------------- - -const ( - dataTypeBlock = byte(0x00) - // TODO: allow for more types, such as specific transactions -) - -type dataKey struct { - dataType byte - height uint64 -} - -func newDataKey(dataType byte, height uint64) dataKey { - return dataKey{dataType, height} -} - -func readDataKey(r io.Reader) dataKey { - return dataKey{ - dataType: ReadByte(r), - height: ReadUInt64(r), - } -} - -func (dk dataKey) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(dk.dataType, w, n, err) - n, err = WriteTo(dk.height, w, n, err) - return -} - -func (dk dataKey) String() string { - switch dataType { - case dataTypeBlock: - return dataKeyfmt.Sprintf("B%v", height) - default: - Panicf("Unknown datatype %X", dataType) - return "" // should not happen - } -} - -//----------------------------------------------------------------------------- - -type BlockManager struct { - db *db_.LevelDB - sw *p2p.Switch - swEvents chan interface{} - state *blockManagerState - timeoutTimer *RepeatTimer - quit chan struct{} - started uint32 - stopped uint32 -} - -func NewBlockManager(sw *p2p.Switch, db *db_.LevelDB) *BlockManager { - swEvents := make(chan interface{}) - sw.AddEventListener("BlockManager.swEvents", swEvents) - bm := &BlockManager{ - db: db, - sw: sw, - swEvents: swEvents, - state: newBlockManagerState(), - timeoutTimer: NewRepeatTimer(timeoutRepeatTimerMS * time.Second), - quit: make(chan struct{}), - } - bm.loadState() - return bm -} - -func (bm *BlockManager) Start() { - if atomic.CompareAndSwapUint32(&bm.started, 0, 1) { - log.Info("Starting BlockManager") - go bm.switchEventsHandler() - go bm.blocksInfoHandler() - go bm.blocksDataHandler() - go bm.requestTimeoutHandler() - } -} - -func (bm *BlockManager) Stop() { - if atomic.CompareAndSwapUint32(&bm.stopped, 0, 1) { - log.Info("Stopping BlockManager") - close(bm.quit) - close(bm.swEvents) - } -} - -// NOTE: assumes that data is already validated. -// "request" is optional, it's the request response that supplied -// the data. -func (bm *BlockManager) StoreBlock(block *Block, origin *dataRequest) { - dataKey := newDataKey(dataTypeBlock, uint64(block.Header.Height)) - - // XXX actually save the block. - - canceled, newHeight := bm.state.didGetDataFromPeer(dataKey, origin.peer) - - // Notify peers that the request has been canceled. - for _, request := range canceled { - msg := &requestMessage{ - key: dataKey, - type_: requestTypeCanceled, - } - tm := p2p.TypedMessage{msgTypeRequest, msg} - request.peer.TrySend(blocksInfoCh, tm.Bytes()) - } - - // If we have new data that extends our contiguous range, then announce it. - if newHeight { - bm.sw.Broadcast(blocksInfoCh, bm.state.makeStateMessage()) - } -} - -func (bm *BlockManager) LoadBlock(height uint64) *Block { - panic("not yet implemented") -} - -// Handle peer new/done events -func (bm *BlockManager) switchEventsHandler() { - for { - swEvent, ok := <-bm.swEvents - if !ok { - break - } - switch swEvent.(type) { - case p2p.SwitchEventNewPeer: - event := swEvent.(p2p.SwitchEventNewPeer) - // Create peerState for event.Peer - bm.state.createEntryForPeer(event.Peer) - // Share our state with event.Peer - msg := &stateMessage{ - lastBlockHeight: UInt64(bm.state.lastBlockHeight), - } - tm := p2p.TypedMessage{msgTypeRequest, msg} - event.Peer.TrySend(blocksInfoCh, tm.Bytes()) - case p2p.SwitchEventDonePeer: - event := swEvent.(p2p.SwitchEventDonePeer) - // Delete peerState for event.Peer - bm.state.deleteEntryForPeer(event.Peer) - default: - log.Warning("Unhandled switch event type") - } - } -} - -// Handle requests/cancellations from the blocksInfo channel -func (bm *BlockManager) blocksInfoHandler() { - for { - inMsg, ok := bm.sw.Receive(blocksInfoCh) - if !ok { - break // Client has stopped - } - - msg := decodeMessage(inMsg.Bytes) - log.Info("blocksInfoHandler received %v", msg) - - switch msg.(type) { - case *stateMessage: - m := msg.(*stateMessage) - peerState := bm.getPeerState(inMsg.MConn.Peer) - if peerState == nil { - continue // peer has since been disconnected. - } - newDataTypes := peerState.applyStateMessage(m) - // Consider requesting data. - // Does the peer claim to have something we want? - FOR_LOOP: - for _, newDataType := range newDataTypes { - // Are we already requesting too much data from peer? - if !peerState.canRequestMore() { - break FOR_LOOP - } - for _, wantedKey := range bm.state.nextWantedKeysForType(newDataType) { - if !peerState.hasData(wantedKey) { - break FOR_LOOP - } - // Request wantedKey from peer. - msg := &requestMessage{ - key: dataKey, - type_: requestTypeFetch, - } - tm := p2p.TypedMessage{msgTypeRequest, msg} - sent := inMsg.MConn.Peer.TrySend(blocksInfoCh, tm.Bytes()) - if sent { - // Log the request - request := &dataRequest{ - peer: inMsg.MConn.Peer, - key: wantedKey, - time: time.Now(), - timeout: time.Now().Add(defaultRequestTimeout - } - bm.state.addDataRequest(request) - } - } - } - case *requestMessage: - m := msg.(*requestMessage) - switch m.type_ { - case requestTypeFetch: - // TODO: prevent abuse. - if !inMsg.MConn.Peer.CanSend(blocksDataCh) { - msg := &requestMessage{ - key: dataKey, - type_: requestTypeTryAgain, - } - tm := p2p.TypedMessage{msgTypeRequest, msg} - sent := inMsg.MConn.Peer.TrySend(blocksInfoCh, tm.Bytes()) - } else { - // If we don't have it, log and ignore. - block := bm.LoadBlock(m.key.height) - if block == nil { - log.Warning("Peer %v asked for nonexistant block %v", inMsg.MConn.Peer, m.key) - } - // Send the data. - msg := &dataMessage{ - key: dataKey, - bytes: BinaryBytes(block), - } - tm := p2p.TypedMessage{msgTypeData, msg} - inMsg.MConn.Peer.TrySend(blocksDataCh, tm.Bytes()) - } - case requestTypeCanceled: - // TODO: handle - // This requires modifying mconnection to keep track of item keys. - case requestTypeTryAgain: - // TODO: handle - default: - log.Warning("Invalid request: %v", m) - // Ignore. - } - default: - // should not happen - Panicf("Unknown message %v", msg) - // bm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) - } - } - - // Cleanup -} - -// Handle receiving data from the blocksData channel -func (bm *BlockManager) blocksDataHandler() { - for { - inMsg, ok := bm.sw.Receive(blocksDataCh) - if !ok { - break // Client has stopped - } - - msg := decodeMessage(inMsg.Bytes) - log.Info("blocksDataHandler received %v", msg) - - switch msg.(type) { - case *dataMessage: - // See if we want the data. - // Validate data. - // Add to db. - // Update state & broadcast as necessary. - default: - // Ignore unknown message - // bm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) - } - } - - // Cleanup -} - -// Handle timed out requests by requesting from others. -func (bm *BlockManager) requestTimeoutHandler() { - for { - _, ok := <-bm.timeoutTimer - if !ok { - break - } - // Iterate over requests by time and handle timed out requests. - } -} - -//----------------------------------------------------------------------------- - -// blockManagerState keeps track of which block parts are stored locally. -// It's also persisted via JSON in the db. -type blockManagerState struct { - mtx sync.Mutex - lastBlockHeight uint64 // Last contiguous header height - otherBlockHeights map[uint64]struct{} - requestsByKey map[dataKey][]*dataRequest - requestsByTimeout *Heap // Could be a linkedlist, but more flexible. - peerStates map[string]*peerState -} - -func newBlockManagerState() *blockManagerState { - return &blockManagerState{ - requestsByKey: make(map[dataKey][]*dataRequest), - requestsByTimeout: NewHeap(), - peerStates: make(map[string]*peerState), - } -} - -type blockManagerStateJSON struct { - LastBlockHeight uint64 // Last contiguous header height - OtherBlockHeights map[uint64]struct{} -} - -func (bms *BlockManagerState) loadState(db _db.LevelDB) { - bms.mtx.Lock() - defer bms.mtx.Unlock() - stateBytes := db.Get(dbKeyState) - if stateBytes == nil { - log.Info("New BlockManager with no state") - } else { - bmsJSON := &blockManagerStateJSON{} - err := json.Unmarshal(stateBytes, bmsJSON) - if err != nil { - Panicf("Could not unmarshal state bytes: %X", stateBytes) - } - bms.lastBlockHeight = bmsJSON.LastBlockHeight - bms.otherBlockHeights = bmsJSON.OtherBlockHeights - } -} - -func (bms *BlockManagerState) saveState(db _db.LevelDB) { - bms.mtx.Lock() - defer bms.mtx.Unlock() - bmsJSON := &blockManagerStateJSON{ - LastBlockHeight: bms.lastBlockHeight, - OtherBlockHeights: bms.otherBlockHeights, - } - stateBytes, err := json.Marshal(bmsJSON) - if err != nil { - panic("Could not marshal state bytes") - } - db.Set(dbKeyState, stateBytes) -} - -func (bms *blockManagerState) makeStateMessage() *stateMessage { - bms.mtx.Lock() - defer bms.mtx.Unlock() - return &stateMessage{ - lastBlockHeight: UInt64(bms.lastBlockHeight), - } -} - -func (bms *blockManagerState) createEntryForPeer(peer *peer) { - bms.mtx.Lock() - defer bms.mtx.Unlock() - bms.peerStates[peer.Key] = &peerState{peer: peer} -} - -func (bms *blockManagerState) deleteEntryForPeer(peer *peer) { - bms.mtx.Lock() - defer bms.mtx.Unlock() - delete(bms.peerStates, peer.Key) -} - -func (bms *blockManagerState) getPeerState(peer *Peer) { - bms.mtx.Lock() - defer bms.mtx.Unlock() - return bms.peerStates[peer.Key] -} - -func (bms *blockManagerState) addDataRequest(newRequest *dataRequest) { - ps.mtx.Lock() - bms.requestsByKey[newRequest.key] = append(bms.requestsByKey[newRequest.key], newRequest) - bms.requestsByTimeout.Push(newRequest) // XXX - peerState, ok := bms.peerStates[newRequest.peer.Key] - ps.mtx.Unlock() - if ok { - peerState.addDataRequest(newRequest) - } -} - -func (bms *blockManagerState) didGetDataFromPeer(key dataKey, peer *p2p.Peer) (canceled []*dataRequest, newHeight bool) { - bms.mtx.Lock() - defer bms.mtx.Unlock() - if key.dataType != dataTypeBlock { - Panicf("Unknown datatype %X", key.dataType) - } - // Adjust lastBlockHeight/otherBlockHeights. - height := key.height - if bms.lastBlockHeight == height-1 { - bms.lastBlockHeight = height - height++ - for _, ok := bms.otherBlockHeights[height]; ok; { - delete(bms.otherBlockHeights, height) - bms.lastBlockHeight = height - height++ - } - newHeight = true - } - // Remove dataRequests - requests := bms.requestsByKey[key] - for _, request := range requests { - peerState, ok := bms.peerStates[peer.Key] - if ok { - peerState.removeDataRequest(request) - } - if request.peer == peer { - continue - } - canceled = append(canceled, request) - } - delete(bms.requestsByKey, key) - - return canceled, newHeight -} - -// Returns at most maxRequestAheadBlock dataKeys that we don't yet have & -// aren't already requesting from maxRequestsPerData peers. -func (bms *blockManagerState) nextWantedKeysForType(dataType byte) []dataKey { - bms.mtx.Lock() - defer bms.mtx.Unlock() - var keys []dataKey - switch dataType { - case dataTypeBlock: - for h := bms.lastBlockHeight + 1; h <= bms.lastBlockHeight+maxRequestAheadBlock; h++ { - if _, ok := bms.otherBlockHeights[h]; !ok { - key := newDataKey(dataTypeBlock, h) - if len(bms.requestsByKey[key]) < maxRequestsPerData { - keys = append(keys, key) - } - } - } - return keys - default: - Panicf("Unknown datatype %X", dataType) - return - } -} - -//----------------------------------------------------------------------------- - -// dataRequest keeps track of each request for a given peice of data & peer. -type dataRequest struct { - peer *p2p.Peer - key dataKey - time time.Time - timeout time.Time -} - -//----------------------------------------------------------------------------- - -type peerState struct { - mtx sync.Mutex - peer *Peer - lastBlockHeight uint64 // Last contiguous header height - requests []*dataRequest // Active requests - // XXX we need to -} - -// Returns which dataTypes are new as declared by stateMessage. -func (ps *peerState) applyStateMessage(msg *stateMessage) []byte { - ps.mtx.Lock() - defer ps.mtx.Unlock() - var newTypes []byte - if uint64(msg.lastBlockHeight) > ps.lastBlockHeight { - newTypes = append(newTypes, dataTypeBlock) - ps.lastBlockHeight = uint64(msg.lastBlockHeight) - } else { - log.Info("Strange, peer declares a regression of %X", dataTypeBlock) - } - return newTypes -} - -func (ps *peerState) hasData(key dataKey) bool { - ps.mtx.Lock() - defer ps.mtx.Unlock() - switch key.dataType { - case dataTypeBlock: - return key.height <= ps.lastBlockHeight - default: - Panicf("Unknown datatype %X", dataType) - return false // should not happen - } -} - -func (ps *peerState) addDataRequest(newRequest *dataRequest) { - ps.mtx.Lock() - defer ps.mtx.Unlock() - for _, request := range ps.requests { - if request.key == newRequest.key { - return - } - } - ps.requests = append(ps.requests, newRequest) - return newRequest -} - -func (ps *peerState) remoteDataRequest(key dataKey) bool { - ps.mtx.Lock() - defer ps.mtx.Unlock() - filtered := []*dataRequest{} - removed := false - for _, request := range ps.requests { - if request.key == key { - removed = true - } else { - filtered = append(filtered, request) - } - } - ps.requests = filtered - return removed -} - -func (ps *peerState) canRequestMore() bool { - ps.mtx.Lock() - defer ps.mtx.Unlock() - return len(ps.requests) < maxRequestsPerPeer -} - -//----------------------------------------------------------------------------- - -/* Messages */ - -// TODO: check for unnecessary extra bytes at the end. -func decodeMessage(bz ByteSlice) (msg interface{}) { - // log.Debug("decoding msg bytes: %X", bz) - switch Byte(bz[0]) { - case msgTypeState: - return &stateMessage{} - case msgTypeRequest: - return readRequestMessage(bytes.NewReader(bz[1:])) - case msgTypeData: - return readDataMessage(bytes.NewReader(bz[1:])) - default: - return nil - } -} - -/* -A stateMessage declares what (contiguous) blocks & headers are known. -*/ -type stateMessage struct { - lastBlockHeight UInt64 // Last contiguous block height -} - -func readStateMessage(r io.Reader) *stateMessage { - return &stateMessage{ - lastBlockHeight: ReadUInt64(r), - } -} - -func (m *stateMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(msgTypeState, w, n, err) - n, err = WriteTo(m.lastBlockHeight, w, n, err) - return -} - -func (m *stateMessage) String() string { - return fmt.Sprintf("[State B:%v]", m.lastBlockHeight) -} - -/* -A requestMessage requests a block and/or header at a given height. -*/ -type requestMessage struct { - key dataKey - type_ Byte -} - -const ( - requestTypeFetch = Byte(0x01) - requestTypeCanceled = Byte(0x02) - requestTypeTryAgain = Byte(0x03) -) - -func readRequestMessage(r io.Reader) *requestMessage { - return &requestMessage{ - key: ReadDataKey(r), - type_: ReadByte(r), - } -} - -func (m *requestMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(msgTypeRequest, w, n, err) - n, err = WriteTo(m.key, w, n, err) - n, err = WriteTo(m.type_, w, n, err) - return -} - -func (m *requestMessage) String() string { - switch m.type_ { - case requestTypeByte: - return fmt.Sprintf("[Request(fetch) %v]", m.key) - case requestTypeCanceled: - return fmt.Sprintf("[Request(canceled) %v]", m.key) - case requestTypeTryAgain: - return fmt.Sprintf("[Request(tryagain) %v]", m.key) - default: - return fmt.Sprintf("[Request(invalid) %v]", m.key) - } -} - -/* -A dataMessage contains block data, maybe requested. -The data can be a Validation, Txs, or whole Block object. -*/ -type dataMessage struct { - key dataKey - bytes ByteSlice -} - -func readDataMessage(r io.Reader) *dataMessage { - return &dataMessage{ - key: readDataKey(r), - bytes: readByteSlice(r), - } -} - -func (m *dataMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(msgTypeData, w, n, err) - n, err = WriteTo(m.key, w, n, err) - n, err = WriteTo(m.bytes, w, n, err) - return -} - -func (m *dataMessage) String() string { - return fmt.Sprintf("[Data %v]", m.key) -} diff --git a/blocks/block_part_set.go b/blocks/block_part_set.go new file mode 100644 index 000000000..b240f10af --- /dev/null +++ b/blocks/block_part_set.go @@ -0,0 +1,134 @@ +package blocks + +import ( + "bytes" + "errors" + "sync" +) + +// Helper for keeping track of block parts. +type BlockPartSet struct { + mtx sync.Mutex + signer *Account + height uint32 + round uint16 // Not used + total uint16 + numParts uint16 + parts []*BlockPart + + _block *Block // cache +} + +var ( + ErrInvalidBlockPartSignature = errors.New("Invalid block part signature") // Peer gave us a fake part + ErrInvalidBlockPartConflict = errors.New("Invalid block part conflict") // Signer signed conflicting parts +) + +// Signer may be nil if signer is unknown beforehand. +func NewBlockPartSet(height uint32, round uint16, signer *Account) *BlockPartSet { + return &BlockPartSet{ + signer: signer, + height: height, + round: round, + } +} + +// In the case where the signer wasn't known prior to NewBlockPartSet(), +// user should call SetSigner() prior to AddBlockPart(). +func (bps *BlockPartSet) SetSigner(signer *Account) { + bps.mtx.Lock() + defer bps.mtx.Unlock() + if bps.signer != nil { + panic("BlockPartSet signer already set.") + } + bps.signer = signer +} + +func (bps *BlockPartSet) BlockParts() []*BlockPart { + bps.mtx.Lock() + defer bps.mtx.Unlock() + return bps.parts +} + +func (bps *BlockPartSet) BitArray() []byte { + bps.mtx.Lock() + defer bps.mtx.Unlock() + if bps.parts == nil { + return nil + } + bitArray := make([]byte, (len(bps.parts)+7)/8) + for i, part := range bps.parts { + if part != nil { + bitArray[i/8] |= 1 << uint(i%8) + } + } + return bitArray +} + +// If the part isn't valid, returns an error. +// err can be ErrInvalidBlockPart[Conflict|Signature] +func (bps *BlockPartSet) AddBlockPart(part *BlockPart) (added bool, err error) { + bps.mtx.Lock() + defer bps.mtx.Unlock() + + // If part is invalid, return an error. + err = part.ValidateWithSigner(bps.signer) + if err != nil { + return false, err + } + + if bps.parts == nil { + // First received part for this round. + bps.parts = make([]*BlockPart, part.Total) + bps.total = uint16(part.Total) + bps.parts[int(part.Index)] = part + bps.numParts++ + return true, nil + } else { + // Check part.Index and part.Total + if uint16(part.Index) >= bps.total { + return false, ErrInvalidBlockPartConflict + } + if uint16(part.Total) != bps.total { + return false, ErrInvalidBlockPartConflict + } + // Check for existing parts. + existing := bps.parts[part.Index] + if existing != nil { + if existing.Bytes.Equals(part.Bytes) { + // Ignore duplicate + return false, nil + } else { + return false, ErrInvalidBlockPartConflict + } + } else { + bps.parts[int(part.Index)] = part + bps.numParts++ + return true, nil + } + } + +} + +func (bps *BlockPartSet) IsComplete() bool { + bps.mtx.Lock() + defer bps.mtx.Unlock() + return bps.total > 0 && bps.total == bps.numParts +} + +func (bps *BlockPartSet) Block() *Block { + if !bps.IsComplete() { + return nil + } + bps.mtx.Lock() + defer bps.mtx.Unlock() + if bps._block == nil { + blockBytes := []byte{} + for _, part := range bps.parts { + blockBytes = append(blockBytes, part.Bytes...) + } + block := ReadBlock(bytes.NewReader(blockBytes)) + bps._block = block + } + return bps._block +} diff --git a/blocks/log.go b/blocks/log.go index 1ddb0d940..81cf6b4b6 100644 --- a/blocks/log.go +++ b/blocks/log.go @@ -10,6 +10,6 @@ func init() { logging.SetFormatter(logging.MustStringFormatter("[%{level:.1s}] %{message}")) } -func SetLogger(l *logging.Logger) { +func SetBlocksLogger(l *logging.Logger) { log = l } diff --git a/blocks/signature.go b/blocks/signature.go deleted file mode 100644 index cdd1f361d..000000000 --- a/blocks/signature.go +++ /dev/null @@ -1,41 +0,0 @@ -package blocks - -import ( - . "github.com/tendermint/tendermint/binary" - "io" -) - -/* - -Signature message wire format: - - |A...|SSS...| - - A account number, varint encoded (1+ bytes) - S signature of all prior bytes (32 bytes) - -It usually follows the message to be signed. - -*/ - -type Signature struct { - Signer AccountId - SigBytes ByteSlice -} - -func ReadSignature(r io.Reader) Signature { - return Signature{ - Signer: ReadAccountId(r), - SigBytes: ReadByteSlice(r), - } -} - -func (self Signature) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(self.Signer, w, n, err) - n, err = WriteTo(self.SigBytes, w, n, err) - return -} - -func (self *Signature) Verify(msg ByteSlice) bool { - return false -} diff --git a/blocks/store.go b/blocks/store.go new file mode 100644 index 000000000..7f2ff5238 --- /dev/null +++ b/blocks/store.go @@ -0,0 +1,126 @@ +package blocks + +import ( + "bytes" + "encoding/binary" + "encoding/json" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" +) + +var ( + blockStoreKey = []byte("blockStore") +) + +//----------------------------------------------------------------------------- + +type BlockStoreJSON struct { + Height uint32 +} + +func (bsj BlockStoreJSON) Save(db *leveldb.DB) { + bytes, err := json.Marshal(bsj) + if err != nil { + Panicf("Could not marshal state bytes: %v", err) + } + db.Put(blockStoreKey, bytes, nil) +} + +func LoadBlockStoreJSON(db *leveldb.DB) BlockStoreJSON { + bytes, err := db.Get(blockStoreKey, nil) + if err != nil { + Panicf("Could not load BlockStoreJSON from db: %v", err) + } + if bytes == nil { + return BlockStoreJSON{ + Height: 0, + } + } + bsj := BlockStoreJSON{} + err = json.Unmarshal(bytes, &bsj) + if err != nil { + Panicf("Could not unmarshal bytes: %X", bytes) + } + return bsj +} + +//----------------------------------------------------------------------------- + +/* +Simple low level store for blocks, which is actually stored as separte parts (wire format). +*/ +type BlockStore struct { + height uint32 + db *leveldb.DB +} + +func NewBlockStore(db *leveldb.DB) *BlockStore { + bsjson := LoadBlockStoreJSON(db) + return &BlockStore{ + height: bsjson.Height, + db: db, + } +} + +// Height() returns the last known contiguous block height. +func (bs *BlockStore) Height() uint32 { + return bs.height +} + +// LoadBlockPart loads a part of a block. +func (bs *BlockStore) LoadBlockPart(height uint32, index uint16) *BlockPart { + partBytes, err := bs.db.Get(calcBlockPartKey(height, index), nil) + if err != nil { + Panicf("Could not load block part: %v", err) + } + if partBytes == nil { + return nil + } + return ReadBlockPart(bytes.NewReader(partBytes)) +} + +// Convenience method for loading block parts and merging to a block. +func (bs *BlockStore) LoadBlock(height uint32) *Block { + // Get the first part. + part0 := bs.LoadBlockPart(height, 0) + if part0 == nil { + return nil + } + // XXX implement + panic("TODO: Not implemented") +} + +func (bs *BlockStore) StageBlockAndParts(block *Block, parts []*BlockPart) error { + // XXX validate + return nil +} + +// NOTE: Assumes that parts as well as the block are valid. See StageBlockParts(). +// Writes are synchronous and atomic. +func (bs *BlockStore) SaveBlockParts(height uint32, parts []*BlockPart) error { + if height != bs.height+1 { + return Errorf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height) + } + // Save parts + batch := new(leveldb.Batch) + for _, part := range parts { + partBytes := BinaryBytes(part) + batch.Put(calcBlockPartKey(uint32(part.Height), uint16(part.Index)), partBytes) + } + err := bs.db.Write(batch, &opt.WriteOptions{Sync: true}) + // Save new BlockStoreJSON descriptor + BlockStoreJSON{Height: height}.Save(bs.db) + return err +} + +//----------------------------------------------------------------------------- + +func calcBlockPartKey(height uint32, index uint16) []byte { + buf := [11]byte{'B'} + binary.BigEndian.PutUint32(buf[1:9], height) + binary.BigEndian.PutUint16(buf[9:11], index) + return buf[:] +} diff --git a/blocks/tx.go b/blocks/tx.go index 441a10081..d7c2adb18 100644 --- a/blocks/tx.go +++ b/blocks/tx.go @@ -35,7 +35,7 @@ func ReadTx(r io.Reader) Tx { case TX_TYPE_SEND: return &SendTx{ Fee: ReadUInt64(r), - To: ReadAccountId(r), + To: ReadUInt64(r), Amount: ReadUInt64(r), Signature: ReadSignature(r), } @@ -56,7 +56,7 @@ func ReadTx(r io.Reader) Tx { type SendTx struct { Fee UInt64 - To AccountId + To UInt64 Amount UInt64 Signature } diff --git a/blocks/vote.go b/blocks/vote.go deleted file mode 100644 index c1500f05e..000000000 --- a/blocks/vote.go +++ /dev/null @@ -1,32 +0,0 @@ -package blocks - -import ( - . "github.com/tendermint/tendermint/binary" - "io" -) - -/* -The full vote structure is only needed when presented as evidence. -Typically only the signature is passed around, as the hash & height are implied. -*/ - -type Vote struct { - Height UInt64 - BlockHash ByteSlice - Signature -} - -func ReadVote(r io.Reader) Vote { - return Vote{ - Height: ReadUInt64(r), - BlockHash: ReadByteSlice(r), - Signature: ReadSignature(r), - } -} - -func (self Vote) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(self.Height, w, n, err) - n, err = WriteTo(self.BlockHash, w, n, err) - n, err = WriteTo(self.Signature, w, n, err) - return -} diff --git a/common/errors.go b/common/errors.go new file mode 100644 index 000000000..82b41575c --- /dev/null +++ b/common/errors.go @@ -0,0 +1,10 @@ +package common + +import ( + "errors" + "fmt" +) + +func Errorf(s string, args ...interface{}) error { + return errors.New(fmt.Sprintf(s, args...)) +} diff --git a/common/heap.go b/common/heap.go index b75e32bcd..25ea7bc45 100644 --- a/common/heap.go +++ b/common/heap.go @@ -2,8 +2,31 @@ package common import ( "container/heap" + "sync" ) +/* +Example usage: + +func main() { + h := NewHeap() + + h.Push(String("msg1"), 1) + h.Push(String("msg3"), 3) + h.Push(String("msg2"), 2) + + fmt.Println(h.Pop()) + fmt.Println(h.Pop()) + fmt.Println(h.Pop()) +} +*/ + +type Comparable interface { + Less(o interface{}) bool +} + +//----------------------------------------------------------------------------- + type Heap struct { pq priorityQueue } @@ -13,38 +36,72 @@ func NewHeap() *Heap { } func (h *Heap) Len() int64 { - return len(h.pq) + return int64(len(h.pq)) } -func (h *Heap) Push(value interface{}, priority int64) { +func (h *Heap) Push(value interface{}, priority Comparable) { heap.Push(&h.pq, &pqItem{value: value, priority: priority}) } +func (h *Heap) Peek() interface{} { + if len(h.pq) == 0 { + return nil + } + return h.pq[0].value +} + func (h *Heap) Pop() interface{} { item := heap.Pop(&h.pq).(*pqItem) return item.value } -/* -func main() { - h := NewHeap() +//----------------------------------------------------------------------------- - h.Push(String("msg1"), 1) - h.Push(String("msg3"), 3) - h.Push(String("msg2"), 2) +type CHeap struct { + mtx sync.Mutex + pq priorityQueue +} - fmt.Println(h.Pop()) - fmt.Println(h.Pop()) - fmt.Println(h.Pop()) +func NewCHeap() *CHeap { + return &CHeap{pq: make([]*pqItem, 0)} } -*/ + +func (h *CHeap) Len() int64 { + h.mtx.Lock() + defer h.mtx.Unlock() + return int64(len(h.pq)) +} + +func (h *CHeap) Push(value interface{}, priority Comparable) { + h.mtx.Lock() + defer h.mtx.Unlock() + heap.Push(&h.pq, &pqItem{value: value, priority: priority}) +} + +func (h *CHeap) Peek() interface{} { + h.mtx.Lock() + defer h.mtx.Unlock() + if len(h.pq) == 0 { + return nil + } + return h.pq[0].value +} + +func (h *CHeap) Pop() interface{} { + h.mtx.Lock() + defer h.mtx.Unlock() + item := heap.Pop(&h.pq).(*pqItem) + return item.value +} + +//----------------------------------------------------------------------------- /////////////////////// // From: http://golang.org/pkg/container/heap/#example__priorityQueue type pqItem struct { value interface{} - priority int64 + priority Comparable index int } @@ -53,7 +110,7 @@ type priorityQueue []*pqItem func (pq priorityQueue) Len() int { return len(pq) } func (pq priorityQueue) Less(i, j int) bool { - return pq[i].priority < pq[j].priority + return pq[i].priority.Less(pq[j].priority) } func (pq priorityQueue) Swap(i, j int) { @@ -78,7 +135,7 @@ func (pq *priorityQueue) Pop() interface{} { return item } -func (pq *priorityQueue) Update(item *pqItem, value interface{}, priority int64) { +func (pq *priorityQueue) Update(item *pqItem, value interface{}, priority Comparable) { heap.Remove(pq, item.index) item.value = value item.priority = priority diff --git a/common/repeat_timer.go b/common/repeat_timer.go index 49d26beef..5973d148a 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -17,11 +17,11 @@ func NewRepeatTimer(dur time.Duration) *RepeatTimer { var ch = make(chan struct{}) var quit = make(chan struct{}) var t = &RepeatTimer{Ch: ch, dur: dur, quit: quit} - t.timer = time.AfterFunc(dur, t.fireHandler) + t.timer = time.AfterFunc(dur, t.fireRoutine) return t } -func (t *RepeatTimer) fireHandler() { +func (t *RepeatTimer) fireRoutine() { select { case t.Ch <- struct{}{}: t.timer.Reset(t.dur) diff --git a/common/throttle_timer.go b/common/throttle_timer.go index 62ad60ec6..74843e28d 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -23,12 +23,12 @@ func NewThrottleTimer(dur time.Duration) *ThrottleTimer { var ch = make(chan struct{}) var quit = make(chan struct{}) var t = &ThrottleTimer{Ch: ch, dur: dur, quit: quit} - t.timer = time.AfterFunc(dur, t.fireHandler) + t.timer = time.AfterFunc(dur, t.fireRoutine) t.timer.Stop() return t } -func (t *ThrottleTimer) fireHandler() { +func (t *ThrottleTimer) fireRoutine() { select { case t.Ch <- struct{}{}: atomic.StoreUint32(&t.isSet, 0) diff --git a/config/config.go b/config/config.go index c1d31e044..c7a3e4982 100644 --- a/config/config.go +++ b/config/config.go @@ -67,8 +67,9 @@ func init() { /* Default configuration */ var defaultConfig = Config_{ - LAddr: "0.0.0.0:0", - Seed: "", + Network: "tendermint_testnet0", + LAddr: "0.0.0.0:0", + Seed: "", Db: DbConfig{ Type: "level", Dir: RootDir + "/data", @@ -79,10 +80,11 @@ var defaultConfig = Config_{ /* Configuration types */ type Config_ struct { - LAddr string - Seed string - Db DbConfig - Twilio TwilioConfig + Network string + LAddr string + Seed string + Db DbConfig + Twilio TwilioConfig } type TwilioConfig struct { @@ -99,6 +101,9 @@ type DbConfig struct { } func (cfg *Config_) validate() error { + if cfg.Network == "" { + cfg.Network = defaultConfig.Network + } if cfg.LAddr == "" { cfg.LAddr = defaultConfig.LAddr } diff --git a/consensus/README.md b/consensus/README.md new file mode 100644 index 000000000..da608d993 --- /dev/null +++ b/consensus/README.md @@ -0,0 +1,17 @@ +## [ZombieValidators] + +The most likely scenario may be during an upgrade. + + +We'll see some validators that fail to upgrade while most have. Then, some updated validator will propose a block that appears invalid to the outdated validators. What is the outdated validator to do? + + +The right thing to do is to stop participating, because you have no idea what is going on, and prompt the administrator to upgrade the daemon. (Now this could be a security issue if not handled properly, so in the future we should think about upgrade security best practices). Yet say you don't, and you continue to sign blocks without really participating in the consensus rounds -- maybe voting nil each time and then signing whatever is decided on. Well for one, you've lost all ability to validate any blocks. It's a problem because if there are too many of these zombies, the network might accidentally commit a bad block -- in effect, crashing the network. So, the network wants to weed the zombies out. + + +It's hard catching the zombie. It can mimick whatever other validator is doing, perhaps mimicking the first one to vote during the rounds and waiting just a little longer for the final block vote. Based on my extensive experience with zombie flicks, it appears that the best way to identify a zombie is to make it perform some action that only non-zombies would understand. That's easy! Just make each version of the protocol have a special algorithm that selects a small but sufficiently large fraction of the validator population at each block, and make them perform an action (intuitively, make them raise their hadns). Eventually, it'll become the zombie's turn to do something but it won't know what to do. Or it will act out of turn. Gotcha. + + +The algorithm could even depend on state data, such that validators are required to keep it updated, which is a hair away from full validation. I suspect that there are more complete ways to enforce validation, but such measures may not be necessary in practice. + +TODO: implement such a mechanism. diff --git a/consensus/consensus.go b/consensus/consensus.go new file mode 100644 index 000000000..6a5e2a44c --- /dev/null +++ b/consensus/consensus.go @@ -0,0 +1,875 @@ +package consensus + +import ( + "bytes" + "errors" + "fmt" + "io" + "math" + "sync" + "sync/atomic" + "time" + + . "github.com/tendermint/tendermint/accounts" + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/blocks" + . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/p2p" +) + +const ( + ProposalCh = byte(0x20) + KnownPartsCh = byte(0x21) + VoteCh = byte(0x22) + + voteTypeNil = byte(0x00) + voteTypeBlock = byte(0x01) + + roundDuration0 = 60 * time.Second // The first round is 60 seconds long. + roundDurationDelta = 15 * time.Second // Each successive round lasts 15 seconds longer. + roundDeadlineBare = float64(1.0 / 3.0) // When the bare vote is due. + roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due. +) + +//----------------------------------------------------------------------------- + +// convenience +func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration, roundElapsed time.Duration, elapsedRatio float64) { + round = calcRound(startTime) + roundStartTime = calcRoundStartTime(round, startTime) + roundDuration = calcRoundDuration(round) + roundElapsed = time.Now().Sub(roundStartTime) + elapsedRatio = float64(roundElapsed) / float64(roundDuration) + return +} + +// total duration of given round +func calcRoundDuration(round uint16) time.Duration { + return roundDuration0 + roundDurationDelta*time.Duration(round) +} + +// startTime is when round zero started. +func calcRoundStartTime(round uint16, startTime time.Time) time.Time { + return startTime.Add(roundDuration0*time.Duration(round) + + roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2))) +} + +// calcs the current round given startTime of round zero. +func calcRound(startTime time.Time) uint16 { + now := time.Now() + if now.Before(startTime) { + Panicf("Cannot calc round when startTime is in the future: %v", startTime) + } + // Start + D_0 * R + D_delta * (R^2 - R)/2 <= Now; find largest integer R. + // D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0. + // AR^2 + BR + C <= 0; A = D_delta, B = (2_D0 - D_delta), C = 2(Start - Now). + // R = Floor((-B + Sqrt(B^2 - 4AC))/2A) + A := float64(roundDurationDelta) + B := 2.0*float64(roundDuration0) - float64(roundDurationDelta) + C := 2.0 * float64(startTime.Sub(now)) + R := math.Floor((-B + math.Sqrt(B*B-4.0*A*C)/(2*A))) + if math.IsNaN(R) { + panic("Could not calc round, should not happen") + } + if R > math.MaxInt16 { + Panicf("Could not calc round, round overflow: %v", R) + } + if R < 0 { + return 0 + } + return uint16(R) +} + +//----------------------------------------------------------------------------- + +type ConsensusManager struct { + sw *p2p.Switch + swEvents chan interface{} + quit chan struct{} + started uint32 + stopped uint32 + + csc *ConsensusStateControl + blockStore *BlockStore + accountStore *AccountStore + mtx sync.Mutex + peerStates map[string]*PeerState + doActionCh chan RoundAction +} + +func NewConsensusManager(sw *p2p.Switch, csc *ConsensusStateControl, blockStore *BlockStore, accountStore *AccountStore) *ConsensusManager { + swEvents := make(chan interface{}) + sw.AddEventListener("ConsensusManager.swEvents", swEvents) + csc.Update(blockStore) // Update csc with new blocks. + cm := &ConsensusManager{ + sw: sw, + swEvents: swEvents, + quit: make(chan struct{}), + csc: csc, + blockStore: blockStore, + accountStore: accountStore, + peerStates: make(map[string]*PeerState), + doActionCh: make(chan RoundAction, 1), + } + return cm +} + +func (cm *ConsensusManager) Start() { + if atomic.CompareAndSwapUint32(&cm.started, 0, 1) { + log.Info("Starting ConsensusManager") + go cm.switchEventsRoutine() + go cm.gossipProposalRoutine() + go cm.knownPartsRoutine() + go cm.gossipVoteRoutine() + go cm.proposeAndVoteRoutine() + } +} + +func (cm *ConsensusManager) Stop() { + if atomic.CompareAndSwapUint32(&cm.stopped, 0, 1) { + log.Info("Stopping ConsensusManager") + close(cm.quit) + close(cm.swEvents) + } +} + +// Handle peer new/done events +func (cm *ConsensusManager) switchEventsRoutine() { + for { + swEvent, ok := <-cm.swEvents + if !ok { + break + } + switch swEvent.(type) { + case p2p.SwitchEventNewPeer: + event := swEvent.(p2p.SwitchEventNewPeer) + // Create peerState for event.Peer + cm.mtx.Lock() + cm.peerStates[event.Peer.Key] = NewPeerState(event.Peer) + cm.mtx.Unlock() + // Share our state with event.Peer + // By sending KnownBlockPartsMessage, + // we send our height/round + startTime, and known block parts, + // which is sufficient for the peer to begin interacting with us. + event.Peer.TrySend(ProposalCh, cm.makeKnownBlockPartsMessage()) + case p2p.SwitchEventDonePeer: + event := swEvent.(p2p.SwitchEventDonePeer) + // Delete peerState for event.Peer + cm.mtx.Lock() + delete(cm.peerStates, event.Peer.Key) + cm.mtx.Unlock() + default: + log.Warning("Unhandled switch event type") + } + } +} + +// Like, how large is it and how often can we send it? +func (cm *ConsensusManager) makeKnownBlockPartsMessage() *KnownBlockPartsMessage { + rs := cm.csc.RoundState() + return &KnownBlockPartsMessage{ + Height: UInt32(rs.Height), + SecondsSinceStartTime: UInt32(time.Now().Sub(rs.StartTime).Seconds()), + BlockPartsBitArray: rs.BlockPartSet.BitArray(), + } +} + +func (cm *ConsensusManager) getPeerState(peer *p2p.Peer) *PeerState { + cm.mtx.Lock() + defer cm.mtx.Unlock() + peerState := cm.peerStates[peer.Key] + if peerState == nil { + log.Warning("Wanted peerState for %v but none exists", peer) + } + return peerState +} + +func (cm *ConsensusManager) gossipProposalRoutine() { +OUTER_LOOP: + for { + // Get round state + rs := cm.csc.RoundState() + + // Receive incoming message on ProposalCh + inMsg, ok := cm.sw.Receive(ProposalCh) + if !ok { + break OUTER_LOOP // Client has stopped + } + msg_ := decodeMessage(inMsg.Bytes) + log.Info("gossipProposalRoutine received %v", msg_) + + switch msg_.(type) { + case *BlockPartMessage: + msg := msg_.(*BlockPartMessage) + + // Add the block part if the height matches. + if uint32(msg.BlockPart.Height) == rs.Height && + uint16(msg.BlockPart.Round) == rs.Round { + + // TODO Continue if we've already voted, then no point processing the part. + + // Add and process the block part + added, err := rs.BlockPartSet.AddBlockPart(msg.BlockPart) + if err == ErrInvalidBlockPartConflict { + // TODO: Bad validator + } else if err == ErrInvalidBlockPartSignature { + // TODO: Bad peer + } else if err != nil { + Panicf("Unexpected blockPartsSet error %v", err) + } + if added { + // If peer wants this part, send peer the part + // and our new blockParts state. + kbpMsg := cm.makeKnownBlockPartsMessage() + partMsg := &BlockPartMessage{BlockPart: msg.BlockPart} + PEERS_LOOP: + for _, peer := range cm.sw.Peers().List() { + peerState := cm.getPeerState(peer) + if peerState == nil { + // Peer disconnected before we were able to process. + continue PEERS_LOOP + } + if peerState.WantsBlockPart(msg.BlockPart) { + peer.TrySend(KnownPartsCh, kbpMsg) + peer.TrySend(ProposalCh, partMsg) + } + } + + } else { + // We failed to process the block part. + // Either an error, which we handled, or duplicate part. + continue OUTER_LOOP + } + } + + default: + // Ignore unknown message + // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) + } + } + + // Cleanup +} + +func (cm *ConsensusManager) knownPartsRoutine() { +OUTER_LOOP: + for { + // Receive incoming message on ProposalCh + inMsg, ok := cm.sw.Receive(KnownPartsCh) + if !ok { + break OUTER_LOOP // Client has stopped + } + msg_ := decodeMessage(inMsg.Bytes) + log.Info("knownPartsRoutine received %v", msg_) + + msg, ok := msg_.(*KnownBlockPartsMessage) + if !ok { + // Ignore unknown message type + // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) + continue OUTER_LOOP + } + peerState := cm.getPeerState(inMsg.MConn.Peer) + if peerState == nil { + // Peer disconnected before we were able to process. + continue OUTER_LOOP + } + peerState.ApplyKnownBlockPartsMessage(msg) + } + + // Cleanup +} + +// Signs a vote document and broadcasts it. +// hash can be nil to vote "nil" +func (cm *ConsensusManager) signAndVote(vote *Vote) error { + privValidator := cm.csc.PrivValidator() + if privValidator != nil { + err := privValidator.SignVote(vote) + if err != nil { + return err + } + msg := p2p.TypedMessage{msgTypeVote, vote} + cm.sw.Broadcast(VoteCh, msg) + } + return nil +} + +func (cm *ConsensusManager) isProposalValid(rs *RoundState) bool { + if !rs.BlockPartSet.IsComplete() { + return false + } + err := cm.stageBlock(rs.BlockPartSet) + if err != nil { + return false + } + return true +} + +func (cm *ConsensusManager) constructProposal(rs *RoundState) (*Block, error) { + // XXX implement + return nil, nil +} + +// Vote for (or against) the proposal for this round. +// Call during transition from RoundStepProposal to RoundStepVote. +// We may not have received a full proposal. +func (cm *ConsensusManager) voteProposal(rs *RoundState) error { + // If we're locked, must vote that. + locked := cm.csc.LockedProposal() + if locked != nil { + block := locked.Block() + err := cm.signAndVote(&Vote{ + Height: rs.Height, + Round: rs.Round, + Type: VoteTypeBare, + Hash: block.Hash(), + }) + return err + } + // If proposal is invalid + if !cm.isProposalValid(rs) { + // Vote for nil. + err := cm.signAndVote(&Vote{ + Height: rs.Height, + Round: rs.Round, + Type: VoteTypeBare, + Hash: nil, + }) + return err + } + // Vote for block. + err := cm.signAndVote(&Vote{ + Height: rs.Height, + Round: rs.Round, + Type: VoteTypeBare, + Hash: rs.BlockPartSet.Block().Hash(), + }) + return err +} + +// Precommit proposal if we see enough votes for it. +// Call during transition from RoundStepVote to RoundStepPrecommit. +func (cm *ConsensusManager) precommitProposal(rs *RoundState) error { + // If we see a 2/3 majority for votes for a block, precommit. + + if hash, ok := rs.RoundBareVotes.TwoThirdsMajority(); ok { + if len(hash) == 0 { + // 2/3 majority voted for nil. + return nil + } else { + // 2/3 majority voted for a block. + + // If proposal is invalid or unknown, do nothing. + // See note on ZombieValidators to see why. + if !cm.isProposalValid(rs) { + return nil + } + + // Lock this proposal. + // NOTE: we're unlocking any prior locks. + cm.csc.LockProposal(rs.BlockPartSet) + + // Send precommit vote. + err := cm.signAndVote(&Vote{ + Height: rs.Height, + Round: rs.Round, + Type: VoteTypePrecommit, + Hash: hash, + }) + return err + } + } else { + // If we haven't seen enough votes, do nothing. + return nil + } +} + +// Commit or unlock. +// Call after RoundStepPrecommit, after round has expired. +func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { + if hash, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok { + // If there exists a 2/3 majority of precommits. + // Validate the block and commit. + + // If the proposal is invalid or we don't have it, + // do not commit. + // TODO If we were just late to receive the block, when + // do we actually get it? Document it. + if !cm.isProposalValid(rs) { + return nil + } + // TODO: Remove? + cm.csc.LockProposal(rs.BlockPartSet) + // Vote commit. + err := cm.signAndVote(&Vote{ + Height: rs.Height, + Round: rs.Round, + Type: VoteTypePrecommit, + Hash: hash, + }) + if err != nil { + return err + } + // Commit block. + // XXX use adjusted commit time. + // If we just use time.Now() we're not converging + // time differences between nodes, so nodes end up drifting + // in time. + commitTime := time.Now() + cm.commitBlock(rs.BlockPartSet, commitTime) + return nil + } else { + // Otherwise, if a 1/3 majority if a block that isn't our locked one exists, unlock. + locked := cm.csc.LockedProposal() + if locked != nil { + for _, hashOrNil := range rs.RoundPrecommits.OneThirdMajority() { + if hashOrNil == nil { + continue + } + hash := hashOrNil.([]byte) + if !bytes.Equal(hash, locked.Block().Hash()) { + // Unlock our lock. + cm.csc.LockProposal(nil) + } + } + } + return nil + } +} + +// After stageBlock(), a call to commitBlock() with the same arguments must succeed. +func (cm *ConsensusManager) stageBlock(blockPartSet *BlockPartSet) error { + cm.mtx.Lock() + defer cm.mtx.Unlock() + + block, blockParts := blockPartSet.Block(), blockPartSet.BlockParts() + + err := block.ValidateBasic() + if err != nil { + return err + } + err = cm.blockStore.StageBlockAndParts(block, blockParts) + if err != nil { + return err + } + err = cm.csc.StageBlock(block) + if err != nil { + return err + } + err = cm.accountStore.StageBlock(block) + if err != nil { + return err + } + // NOTE: more stores may be added here for validation. + return nil +} + +// after stageBlock(), a call to commitBlock() with the same arguments must succeed. +func (cm *ConsensusManager) commitBlock(blockPartSet *BlockPartSet, commitTime time.Time) error { + cm.mtx.Lock() + defer cm.mtx.Unlock() + + block, blockParts := blockPartSet.Block(), blockPartSet.BlockParts() + + err := cm.blockStore.SaveBlockParts(uint32(block.Height), blockParts) + if err != nil { + return err + } + err = cm.csc.CommitBlock(block, commitTime) + if err != nil { + return err + } + err = cm.accountStore.CommitBlock(block) + if err != nil { + return err + } + return nil +} + +func (cm *ConsensusManager) gossipVoteRoutine() { +OUTER_LOOP: + for { + // Get round state + rs := cm.csc.RoundState() + + // Receive incoming message on VoteCh + inMsg, ok := cm.sw.Receive(VoteCh) + if !ok { + break // Client has stopped + } + msg_ := decodeMessage(inMsg.Bytes) + log.Info("gossipVoteRoutine received %v", msg_) + + switch msg_.(type) { + case *Vote: + vote := msg_.(*Vote) + + if vote.Height != rs.Height || vote.Round != rs.Round { + continue OUTER_LOOP + } + + added, err := rs.AddVote(vote) + if !added { + log.Info("Error adding vote %v", err) + } + switch err { + case ErrVoteInvalidAccount, ErrVoteInvalidSignature: + // TODO: Handle bad peer. + case ErrVoteConflictingSignature, ErrVoteInvalidHash: + // TODO: Handle bad validator. + case nil: + break + //case ErrVoteUnexpectedPhase: Shouldn't happen. + default: + Panicf("Unexpected error from .AddVote(): %v", err) + } + if !added { + continue + } + + // Gossip vote. + PEERS_LOOP: + for _, peer := range cm.sw.Peers().List() { + peerState := cm.getPeerState(peer) + if peerState == nil { + // Peer disconnected before we were able to process. + continue PEERS_LOOP + } + if peerState.WantsVote(vote) { + msg := p2p.TypedMessage{msgTypeVote, vote} + peer.TrySend(VoteCh, msg) + } + } + + default: + // Ignore unknown message + // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) + } + } + + // Cleanup +} + +type RoundAction struct { + Height uint32 // The block height for which consensus is reaching for. + Round uint16 // The round number at given height. + XnToStep uint8 // Transition to this step. Action depends on this value. +} + +// Source of all round state transitions and votes. +// It can be preemptively woken up via amessage to +// doActionCh. +func (cm *ConsensusManager) proposeAndVoteRoutine() { + + // Figure out when to wake up next (in the absence of other events) + setAlarm := func() { + if len(cm.doActionCh) > 0 { + return // Already going to wake up later. + } + + // Figure out which height/round/step we're at, + // then schedule an action for when it is due. + rs := cm.csc.RoundState() + _, _, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime) + switch rs.Step() { + case RoundStepStart: + // It's a new RoundState, immediately wake up and xn to RoundStepProposal. + cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepProposal} + case RoundStepProposal: + // Wake up when it's time to vote. + time.Sleep(time.Duration(roundDeadlineBare-elapsedRatio) * roundDuration) + cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepBareVotes} + case RoundStepBareVotes: + // Wake up when it's time to precommit. + time.Sleep(time.Duration(roundDeadlinePrecommit-elapsedRatio) * roundDuration) + cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepPrecommits} + case RoundStepPrecommits: + // Wake up when the round is over. + time.Sleep(time.Duration(1.0-elapsedRatio) * roundDuration) + cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepCommitOrUnlock} + case RoundStepCommitOrUnlock: + // This shouldn't happen. + // Before setAlarm() got called, + // logic should have created a new RoundState for the next round. + panic("Should not happen") + } + } + + for { + func() { + roundAction := <-cm.doActionCh + // Always set the alarm after any processing below. + defer setAlarm() + + // We only consider acting on given height and round. + height := roundAction.Height + round := roundAction.Round + // We only consider transitioning to given step. + step := roundAction.XnToStep + // This is the current state. + rs := cm.csc.RoundState() + if height != rs.Height || round != rs.Round { + return // Not relevant. + } + + if step == RoundStepProposal && rs.Step() == RoundStepStart { + // Propose a block if I am the proposer. + privValidator := cm.csc.PrivValidator() + if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id { + block, err := cm.constructProposal(rs) + if err != nil { + log.Error("Error attempting to construct a proposal: %v", err) + } + // XXX propose the block. + log.Error("XXX use ", block) + // XXX divide block into parts + // XXX communicate parts. + // XXX put this in another function. + panic("Implement block proposal!") + } + } else if step == RoundStepBareVotes && rs.Step() <= RoundStepProposal { + err := cm.voteProposal(rs) + if err != nil { + log.Info("Error attempting to vote for proposal: %v", err) + } + } else if step == RoundStepPrecommits && rs.Step() <= RoundStepBareVotes { + err := cm.precommitProposal(rs) + if err != nil { + log.Info("Error attempting to precommit for proposal: %v", err) + } + } else if step == RoundStepCommitOrUnlock && rs.Step() <= RoundStepPrecommits { + err := cm.commitOrUnlockProposal(rs) + if err != nil { + log.Info("Error attempting to commit or update for proposal: %v", err) + } + // Round is over. This is a special case. + // Prepare a new RoundState for the next state. + cm.csc.SetupRound(rs.Round + 1) + return // setAlarm() takes care of the rest. + } else { + return // Action is not relevant. + } + + // Transition to new step. + rs.SetStep(step) + }() + } +} + +//----------------------------------------------------------------------------- + +var ( + ErrPeerStateHeightRegression = errors.New("Error peer state height regression") + ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime") +) + +type PeerState struct { + mtx sync.Mutex + peer *p2p.Peer + height uint32 + startTime time.Time // Derived from offset seconds. + blockPartsBitArray []byte + votesWanted map[uint64]float32 +} + +func NewPeerState(peer *p2p.Peer) *PeerState { + return &PeerState{ + peer: peer, + height: 0, + votesWanted: make(map[uint64]float32), + } +} + +func (ps *PeerState) WantsBlockPart(part *BlockPart) bool { + ps.mtx.Lock() + defer ps.mtx.Unlock() + // Only wants the part if peer's current height and round matches. + if ps.height == uint32(part.Height) { + round, _, _, _, elapsedRatio := calcRoundInfo(ps.startTime) + if round == uint16(part.Round) && elapsedRatio < roundDeadlineBare { + // Only wants the part if it doesn't already have it. + if ps.blockPartsBitArray[part.Index/8]&byte(1<<(part.Index%8)) == 0 { + return true + } + } + } + return false +} + +func (ps *PeerState) WantsVote(vote *Vote) bool { + ps.mtx.Lock() + defer ps.mtx.Unlock() + // Only wants the vote if votesWanted says so + if ps.votesWanted[uint64(vote.SignerId)] <= 0 { + // TODO: sometimes, send unsolicited votes to see if peer wants it. + return false + } + // Only wants the vote if peer's current height and round matches. + if ps.height == vote.Height { + round, _, _, _, elapsedRatio := calcRoundInfo(ps.startTime) + if round == vote.Round { + if vote.Type == VoteTypeBare && elapsedRatio > roundDeadlineBare { + return false + } + if vote.Type == VoteTypePrecommit && elapsedRatio > roundDeadlinePrecommit { + return false + } + return true + } + } + return false +} + +func (ps *PeerState) ApplyKnownBlockPartsMessage(msg *KnownBlockPartsMessage) error { + ps.mtx.Lock() + defer ps.mtx.Unlock() + // TODO: Sanity check len(BlockParts) + if uint32(msg.Height) < ps.height { + return ErrPeerStateHeightRegression + } + if uint32(msg.Height) == ps.height { + if len(ps.blockPartsBitArray) == 0 { + ps.blockPartsBitArray = msg.BlockPartsBitArray + } else if len(msg.BlockPartsBitArray) > 0 { + if len(ps.blockPartsBitArray) != len(msg.BlockPartsBitArray) { + // TODO: If the peer received a part from + // a proposer who signed a bad (or conflicting) part, + // just about anything can happen with the new blockPartsBitArray. + // In those cases it's alright to ignore the peer for the round, + // and try to induce nil votes for that round. + return nil + } else { + // TODO: Same as above. If previously known parts disappear, + // something is fishy. + // For now, just copy over known parts. + for i, byt := range msg.BlockPartsBitArray { + ps.blockPartsBitArray[i] |= byt + } + } + } + } else { + // TODO: handle peer connection latency estimation. + newStartTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second) + // Ensure that the new height's start time is sufficiently after the last startTime. + // TODO: there should be some time between rounds. + if !newStartTime.After(ps.startTime) { + return ErrPeerStateInvalidStartTime + } + ps.startTime = newStartTime + ps.height = uint32(msg.Height) + ps.blockPartsBitArray = msg.BlockPartsBitArray + } + return nil +} + +func (ps *PeerState) ApplyVoteRankMessage(msg *VoteRankMessage) error { + ps.mtx.Lock() + defer ps.mtx.Unlock() + // XXX IMPLEMENT + return nil +} + +//----------------------------------------------------------------------------- +// Messages + +const ( + msgTypeUnknown = Byte(0x00) + msgTypeBlockPart = Byte(0x10) + msgTypeKnownBlockParts = Byte(0x11) + msgTypeVote = Byte(0x20) + msgTypeVoteRank = Byte(0x21) +) + +// TODO: check for unnecessary extra bytes at the end. +func decodeMessage(bz ByteSlice) (msg interface{}) { + // log.Debug("decoding msg bytes: %X", bz) + switch Byte(bz[0]) { + case msgTypeBlockPart: + return readBlockPartMessage(bytes.NewReader(bz[1:])) + case msgTypeKnownBlockParts: + return readKnownBlockPartsMessage(bytes.NewReader(bz[1:])) + case msgTypeVote: + return ReadVote(bytes.NewReader(bz[1:])) + case msgTypeVoteRank: + return readVoteRankMessage(bytes.NewReader(bz[1:])) + default: + return nil + } +} + +//------------------------------------- + +type BlockPartMessage struct { + BlockPart *BlockPart +} + +func readBlockPartMessage(r io.Reader) *BlockPartMessage { + return &BlockPartMessage{ + BlockPart: ReadBlockPart(r), + } +} + +func (m *BlockPartMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(msgTypeBlockPart, w, n, err) + n, err = WriteTo(m.BlockPart, w, n, err) + return +} + +func (m *BlockPartMessage) String() string { + return fmt.Sprintf("[BlockPartMessage %v]", m.BlockPart) +} + +//------------------------------------- + +type KnownBlockPartsMessage struct { + Height UInt32 + SecondsSinceStartTime UInt32 + BlockPartsBitArray ByteSlice +} + +func readKnownBlockPartsMessage(r io.Reader) *KnownBlockPartsMessage { + return &KnownBlockPartsMessage{ + Height: ReadUInt32(r), + SecondsSinceStartTime: ReadUInt32(r), + BlockPartsBitArray: ReadByteSlice(r), + } +} + +func (m *KnownBlockPartsMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(msgTypeKnownBlockParts, w, n, err) + n, err = WriteTo(m.Height, w, n, err) + n, err = WriteTo(m.SecondsSinceStartTime, w, n, err) + n, err = WriteTo(m.BlockPartsBitArray, w, n, err) + return +} + +func (m *KnownBlockPartsMessage) String() string { + return fmt.Sprintf("[KnownBlockPartsMessage H:%v SSST:%v, BPBA:%X]", + m.Height, m.SecondsSinceStartTime, m.BlockPartsBitArray) +} + +//------------------------------------- + +// XXX use this. +type VoteRankMessage struct { + ValidatorId UInt64 + Rank UInt8 +} + +func readVoteRankMessage(r io.Reader) *VoteRankMessage { + return &VoteRankMessage{ + ValidatorId: ReadUInt64(r), + Rank: ReadUInt8(r), + } +} + +func (m *VoteRankMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(msgTypeVoteRank, w, n, err) + n, err = WriteTo(m.ValidatorId, w, n, err) + n, err = WriteTo(m.Rank, w, n, err) + return +} + +func (m *VoteRankMessage) String() string { + return fmt.Sprintf("[VoteRankMessage V:%v, R:%v]", m.ValidatorId, m.Rank) +} diff --git a/consensus/log.go b/consensus/log.go new file mode 100644 index 000000000..495f0bf21 --- /dev/null +++ b/consensus/log.go @@ -0,0 +1,15 @@ +package consensus + +import ( + "github.com/op/go-logging" +) + +var log = logging.MustGetLogger("consensus") + +func init() { + logging.SetFormatter(logging.MustStringFormatter("[%{level:.1s}] %{message}")) +} + +func SetConsensusLogger(l *logging.Logger) { + log = l +} diff --git a/consensus/state.go b/consensus/state.go new file mode 100644 index 000000000..14e0db885 --- /dev/null +++ b/consensus/state.go @@ -0,0 +1,378 @@ +package consensus + +import ( + "bytes" + "sync" + "time" + + . "github.com/tendermint/tendermint/accounts" + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/blocks" + . "github.com/tendermint/tendermint/common" + db_ "github.com/tendermint/tendermint/db" +) + +var ( + consensusStateKey = []byte("consensusState") +) + +/* +Determining the order of proposers at height h: + + A B C All validators A, B, and C + [+10, +5, +2] (+17) Voting power + + [ 0, 0, 0] Genesis? + [ 10, 5, 2] (+17) +A [ -7, 5, 2] (-17) Round 0 proposer: A + [ 3, 10, 4] (+17) +B [ 3, -7, 4] (-17) Round 1 proposer: B + [ 13, -2, 6] (+17) +A [ -4, -2, 6] (-17) Round 2 proposer: A + [ 6, 3, 8] (+17) +C [ 6, 3, -9] (-17) Round 3 proposer: C + [ 16, 8, -7] (+17) +A [ -1, 8, -7] (-17) Round 4 proposer: A + [ 9, 13, -5] (+17) +B [ 9, -4, -5] (-17) Round 5 proposer: B + [ 19, 1, -3] (+17) +A [ 2, 1, -3] (-17) Round 6 proposer: A + ........... ... + +For a node, once consensus has been reached at some round R, +the moment the node sees +2/3 in votes for a proposal is when +the consensus rounds for the *next* height h+1 begins. + +Round R+1 in the consensus rounds at height h+1 is the same as +round R in the consensus rounds at height h (the parent block). + +We omit details of dealing with membership changes. +*/ + +func getProposer(validators map[uint64]*Validator) (proposer *Validator) { + highestAccum := Int64(0) + for _, validator := range validators { + if validator.Accum > highestAccum { + highestAccum = validator.Accum + proposer = validator + } else if validator.Accum == highestAccum { + if validator.Id < proposer.Id { // Seniority + proposer = validator + } + } + } + return +} + +func incrementAccum(validators map[uint64]*Validator) { + totalDelta := UInt64(0) + for _, validator := range validators { + validator.Accum += Int64(validator.VotingPower) + totalDelta += validator.VotingPower + } + proposer := getProposer(validators) + proposer.Accum -= Int64(totalDelta) + // NOTE: sum(validators) here should be zero. + if true { + totalAccum := int64(0) + for _, validator := range validators { + totalAccum += int64(validator.Accum) + } + if totalAccum != 0 { + Panicf("Total Accum of validators did not equal 0. Got: ", totalAccum) + } + } +} + +// Creates a deep copy of validators. +// Caller can then modify the resulting validators' .Accum field without +// modifying the original *Validator's. +func copyValidators(validators map[uint64]*Validator) map[uint64]*Validator { + mapCopy := map[uint64]*Validator{} + for _, val := range validators { + mapCopy[uint64(val.Id)] = val.Copy() + } + return mapCopy +} + +//----------------------------------------------------------------------------- + +// Handles consensus state tracking across block heights. +// NOTE: When adding more fields, also reset it in Load() and CommitBlock() +type ConsensusStateControl struct { + mtx sync.Mutex + db db_.Db // Where we store the validators list & other data. + validatorsR0 map[uint64]*Validator // A copy of the validators at round 0 + privValidator *PrivValidator // PrivValidator used to participate, if present. + accountStore *AccountStore // Account storage + height uint32 // Height we are working on. + lockedProposal *BlockPartSet // A BlockPartSet of the locked proposal. + startTime time.Time // Start of round 0 for this height. + roundState *RoundState // The RoundState object for the current round. + commits *VoteSet // Commits for this height. +} + +func NewConsensusStateControl(db db_.Db, accountStore *AccountStore) *ConsensusStateControl { + csc := &ConsensusStateControl{ + db: db, + accountStore: accountStore, + } + csc.Load() + return csc +} + +// Load the current state from db. +func (csc *ConsensusStateControl) Load() { + csc.mtx.Lock() + defer csc.mtx.Unlock() + buf := csc.db.Get(consensusStateKey) + if len(buf) == 0 { + height := uint32(0) + validators := make(map[uint64]*Validator) // XXX BOOTSTRAP + startTime := time.Now() // XXX BOOTSTRAP + csc.setupHeight(height, validators, startTime) + } else { + reader := bytes.NewReader(buf) + height := ReadUInt32(reader) + validators := make(map[uint64]*Validator) + startTime := ReadTime(reader) + for reader.Len() > 0 { + validator := ReadValidator(reader) + validators[uint64(validator.Id)] = validator + } + csc.setupHeight(uint32(height), validators, startTime.Time) + } +} + +// Save the current state onto db. +// Doesn't save the round state, just initial state at round 0. +func (csc *ConsensusStateControl) Save() { + csc.mtx.Lock() + defer csc.mtx.Unlock() + var buf bytes.Buffer + UInt32(csc.height).WriteTo(&buf) + Time{csc.startTime}.WriteTo(&buf) + for _, validator := range csc.validatorsR0 { + validator.WriteTo(&buf) + } + csc.db.Set(consensusStateKey, buf.Bytes()) +} + +// Finds more blocks from blockStore and commits them. +func (csc *ConsensusStateControl) Update(blockStore *BlockStore) { + csc.mtx.Lock() + defer csc.mtx.Unlock() + for h := csc.height + 1; h <= blockStore.Height(); h++ { + block := blockStore.LoadBlock(h) + // TODO: would be better to be able to override + // the block commit time, but in the meantime, + // just use the block time as proposed by the proposer. + csc.CommitBlock(block, block.Header.Time.Time) + } +} + +func (csc *ConsensusStateControl) PrivValidator() *PrivValidator { + csc.mtx.Lock() + defer csc.mtx.Unlock() + return csc.privValidator +} + +func (csc *ConsensusStateControl) SetPrivValidator(privValidator *PrivValidator) error { + csc.mtx.Lock() + defer csc.mtx.Unlock() + if csc.privValidator != nil { + panic("ConsensusStateControl privValidator already set.") + } + csc.privValidator = privValidator + return nil +} + +// Set blockPartSet to nil to unlock. +func (csc *ConsensusStateControl) LockProposal(blockPartSet *BlockPartSet) { + csc.mtx.Lock() + defer csc.mtx.Unlock() + csc.lockedProposal = blockPartSet +} + +func (csc *ConsensusStateControl) LockedProposal() *BlockPartSet { + csc.mtx.Lock() + defer csc.mtx.Unlock() + return csc.lockedProposal +} + +func (csc *ConsensusStateControl) StageBlock(block *Block) error { + // XXX implement staging. + return nil +} + +// NOTE: assumes that block is valid. +// NOTE: the block should be saved on the BlockStore before commiting here. +// commitTime is usually set to the system clock time (time.Now()). +func (csc *ConsensusStateControl) CommitBlock(block *Block, commitTime time.Time) error { + csc.mtx.Lock() + defer csc.mtx.Unlock() + // Ensure that block is the next block needed. + if uint32(block.Height) != csc.height { + return Errorf("Cannot commit block %v to csc. Expected height %v", block, csc.height+1) + } + // Update validator. + validators := copyValidators(csc.validatorsR0) + incrementAccum(validators) + // TODO if there are new validators in the block, add them. + + // XXX: it's not commitTime we want... + csc.setupHeight(uint32(block.Height)+1, validators, commitTime) + + // Save the state. + csc.Save() + + return nil +} + +func (csc *ConsensusStateControl) RoundState() *RoundState { + csc.mtx.Lock() + defer csc.mtx.Unlock() + return csc.roundState +} + +func (csc *ConsensusStateControl) setupHeight(height uint32, validators map[uint64]*Validator, startTime time.Time) { + + if height > 0 && height != csc.height+1 { + panic("setupHeight() cannot skip heights") + } + + // Reset the state for the next height. + csc.validatorsR0 = validators + csc.height = height + csc.lockedProposal = nil + csc.startTime = startTime + csc.commits = NewVoteSet(height, 0, VoteTypeCommit, validators) + + // Setup the roundState + csc.roundState = nil + csc.setupRound(0) + +} + +// If csc.roundSTate isn't at round, set up new roundState at round. +func (csc *ConsensusStateControl) SetupRound(round uint16) { + csc.mtx.Lock() + defer csc.mtx.Unlock() + if csc.roundState != nil && csc.roundState.Round >= round { + return + } + csc.setupRound(round) +} + +// Initialize roundState for given round. +// Involves incrementing validators for each past rand. +func (csc *ConsensusStateControl) setupRound(round uint16) { + // Increment validator accums as necessary. + // We need to start with csc.validatorsR0 or csc.roundState.Validators + var validators map[uint64]*Validator = nil + var validatorsRound uint16 + if csc.roundState == nil { + // We have no roundState so we start from validatorsR0 at round 0. + validators = copyValidators(csc.validatorsR0) + validatorsRound = 0 + } else { + // We have a previous roundState so we start from that. + validators = copyValidators(csc.roundState.Validators) + validatorsRound = csc.roundState.Round + } + // Increment all the way to round. + for r := validatorsRound; r < round; r++ { + incrementAccum(validators) + } + + roundState := NewRoundState(csc.height, round, csc.startTime, validators, csc.commits) + csc.roundState = roundState +} + +//----------------------------------------------------------------------------- + +const ( + RoundStepStart = uint8(0x00) // Round started. + RoundStepProposal = uint8(0x01) // Did propose, broadcasting proposal. + RoundStepBareVotes = uint8(0x02) // Did vote bare, broadcasting bare votes. + RoundStepPrecommits = uint8(0x03) // Did precommit, broadcasting precommits. + RoundStepCommitOrUnlock = uint8(0x04) // We committed at this round -- do not progress to the next round. +) + +//----------------------------------------------------------------------------- + +// RoundState encapsulates all the state needed to engage in the consensus protocol. +type RoundState struct { + Height uint32 // Immutable + Round uint16 // Immutable + StartTime time.Time // Time in which consensus started for this height. + Expires time.Time // Time after which this round is expired. + Proposer *Validator // The proposer to propose a block for this round. + Validators map[uint64]*Validator // All validators with modified accumPower for this round. + BlockPartSet *BlockPartSet // All block parts received for this round. + RoundBareVotes *VoteSet // All votes received for this round. + RoundPrecommits *VoteSet // All precommits received for this round. + Commits *VoteSet // A shared object for all commit votes of this height. + + mtx sync.Mutex + step uint8 // mutable +} + +func NewRoundState(height uint32, round uint16, startTime time.Time, + validators map[uint64]*Validator, commits *VoteSet) *RoundState { + + proposer := getProposer(validators) + blockPartSet := NewBlockPartSet(height, round, &(proposer.Account)) + roundBareVotes := NewVoteSet(height, round, VoteTypeBare, validators) + roundPrecommits := NewVoteSet(height, round, VoteTypePrecommit, validators) + + rs := &RoundState{ + Height: height, + Round: round, + StartTime: startTime, + Expires: calcRoundStartTime(round+1, startTime), + Proposer: proposer, + Validators: validators, + BlockPartSet: blockPartSet, + RoundBareVotes: roundBareVotes, + RoundPrecommits: roundPrecommits, + Commits: commits, + + step: RoundStepStart, + } + return rs +} + +func (rs *RoundState) AddVote(vote *Vote) (bool, error) { + switch vote.Type { + case VoteTypeBare: + return rs.RoundBareVotes.AddVote(vote) + case VoteTypePrecommit: + return rs.RoundPrecommits.AddVote(vote) + case VoteTypeCommit: + return rs.Commits.AddVote(vote) + default: + panic("Unknown vote type") + } +} + +func (rs *RoundState) Expired() bool { + return time.Now().After(rs.Expires) +} + +func (rs *RoundState) Step() uint8 { + rs.mtx.Lock() + defer rs.mtx.Unlock() + return rs.step +} + +func (rs *RoundState) SetStep(step uint8) bool { + rs.mtx.Lock() + defer rs.mtx.Unlock() + if rs.step < step { + rs.step = step + return true + } else { + return false + } +} diff --git a/consensus/validator.go b/consensus/validator.go new file mode 100644 index 000000000..1463a4d5c --- /dev/null +++ b/consensus/validator.go @@ -0,0 +1,66 @@ +package consensus + +import ( + "io" + + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/blocks" + //. "github.com/tendermint/tendermint/common" + db_ "github.com/tendermint/tendermint/db" +) + +// Holds state for a Validator at a given height+round. +// Meant to be discarded every round of the consensus protocol. +type Validator struct { + Account + BondHeight UInt32 + VotingPower UInt64 + Accum Int64 +} + +// Used to persist the state of ConsensusStateControl. +func ReadValidator(r io.Reader) *Validator { + return &Validator{ + Account: Account{ + Id: ReadUInt64(r), + PubKey: ReadByteSlice(r), + }, + BondHeight: ReadUInt32(r), + VotingPower: ReadUInt64(r), + Accum: ReadInt64(r), + } +} + +// Creates a new copy of the validator so we can mutate accum. +func (v *Validator) Copy() *Validator { + return &Validator{ + Account: v.Account, + BondHeight: v.BondHeight, + VotingPower: v.VotingPower, + Accum: v.Accum, + } +} + +// Used to persist the state of ConsensusStateControl. +func (v *Validator) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(&v.Id, w, n, err) + n, err = WriteTo(&v.PubKey, w, n, err) + n, err = WriteTo(&v.BondHeight, w, n, err) + n, err = WriteTo(&v.VotingPower, w, n, err) + n, err = WriteTo(&v.Accum, w, n, err) + return +} + +//----------------------------------------------------------------------------- + +// TODO: Ensure that double signing never happens via an external persistent check. +type PrivValidator struct { + PrivAccount + db *db_.LevelDB +} + +// Modifies the vote object in memory. +// Double signing results in an error. +func (pv *PrivValidator) SignVote(vote *Vote) error { + return nil +} diff --git a/consensus/vote.go b/consensus/vote.go new file mode 100644 index 000000000..50b8c0e9b --- /dev/null +++ b/consensus/vote.go @@ -0,0 +1,207 @@ +package consensus + +import ( + "bytes" + "errors" + "fmt" + "io" + "sync" + + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/blocks" + "github.com/tendermint/tendermint/config" +) + +const ( + VoteTypeBare = byte(0x00) + VoteTypePrecommit = byte(0x01) + VoteTypeCommit = byte(0x02) +) + +var ( + ErrVoteUnexpectedPhase = errors.New("Unexpected phase") + ErrVoteInvalidAccount = errors.New("Invalid round vote account") + ErrVoteInvalidSignature = errors.New("Invalid round vote signature") + ErrVoteInvalidHash = errors.New("Invalid hash") + ErrVoteConflictingSignature = errors.New("Conflicting round vote signature") +) + +// Represents a bare, precommit, or commit vote for proposals. +type Vote struct { + Height uint32 + Round uint16 + Type byte + Hash []byte // empty if vote is nil. + Signature +} + +func ReadVote(r io.Reader) *Vote { + return &Vote{ + Height: uint32(ReadUInt32(r)), + Round: uint16(ReadUInt16(r)), + Type: byte(ReadByte(r)), + Hash: ReadByteSlice(r), + Signature: ReadSignature(r), + } +} + +func (v *Vote) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(UInt32(v.Height), w, n, err) + n, err = WriteTo(UInt16(v.Round), w, n, err) + n, err = WriteTo(Byte(v.Type), w, n, err) + n, err = WriteTo(ByteSlice(v.Hash), w, n, err) + n, err = WriteTo(v.Signature, w, n, err) + return +} + +// This is the byteslice that validators should sign to signify a vote +// for the given proposal at given height & round. +// If hash is nil, the vote is a nil vote. +func (v *Vote) GetDocument() []byte { + switch v.Type { + case VoteTypeBare: + if len(v.Hash) == 0 { + doc := fmt.Sprintf("%v://consensus/%v/%v/b\nnil", + config.Config.Network, v.Height, v.Round) + return []byte(doc) + } else { + doc := fmt.Sprintf("%v://consensus/%v/%v/b\n%v", + config.Config.Network, v.Height, v.Round, + CalcBlockURI(v.Height, v.Hash)) + return []byte(doc) + } + case VoteTypePrecommit: + if len(v.Hash) == 0 { + doc := fmt.Sprintf("%v://consensus/%v/%v/p\nnil", + config.Config.Network, v.Height, v.Round) + return []byte(doc) + } else { + doc := fmt.Sprintf("%v://consensus/%v/%v/p\n%v", + config.Config.Network, v.Height, v.Round, + CalcBlockURI(v.Height, v.Hash)) + return []byte(doc) + } + case VoteTypeCommit: + if len(v.Hash) == 0 { + panic("Commit hash cannot be nil") + } else { + doc := fmt.Sprintf("%v://consensus/%v/c\n%v", + config.Config.Network, v.Height, // omit round info + CalcBlockURI(v.Height, v.Hash)) + return []byte(doc) + } + default: + panic("Unknown vote type") + } +} + +//----------------------------------------------------------------------------- + +// VoteSet helps collect signatures from validators at each height+round +// for a predefined vote type. +type VoteSet struct { + mtx sync.Mutex + height uint32 + round uint16 + type_ byte + validators map[uint64]*Validator + votes map[uint64]*Vote + votesByHash map[string]uint64 + totalVotes uint64 + totalVotingPower uint64 +} + +// Constructs a new VoteSet struct used to accumulate votes for each round. +func NewVoteSet(height uint32, round uint16, type_ byte, validators map[uint64]*Validator) *VoteSet { + totalVotingPower := uint64(0) + for _, val := range validators { + totalVotingPower += uint64(val.VotingPower) + } + return &VoteSet{ + height: height, + round: round, + type_: type_, + validators: validators, + votes: make(map[uint64]*Vote, len(validators)), + votesByHash: make(map[string]uint64), + totalVotes: 0, + totalVotingPower: totalVotingPower, + } +} + +// True if added, false if not. +// Returns ErrVote[UnexpectedPhase|InvalidAccount|InvalidSignature|InvalidHash|ConflictingSignature] +func (vs *VoteSet) AddVote(vote *Vote) (bool, error) { + vs.mtx.Lock() + defer vs.mtx.Unlock() + + // Make sure the phase matches. + if vote.Height != vs.height || vote.Round != vs.round || vote.Type != vs.type_ { + return false, ErrVoteUnexpectedPhase + } + + val := vs.validators[uint64(vote.SignerId)] + // Ensure that signer is a validator. + if val == nil { + return false, ErrVoteInvalidAccount + } + // Check signature. + if !val.Verify(vote.GetDocument(), vote.Signature.Bytes) { + // Bad signature. + return false, ErrVoteInvalidSignature + } + // If vote already exists, return false. + if existingVote, ok := vs.votes[uint64(vote.SignerId)]; ok { + if bytes.Equal(existingVote.Hash, vote.Hash) { + return false, nil + } else { + return false, ErrVoteConflictingSignature + } + } + vs.votes[uint64(vote.SignerId)] = vote + vs.votesByHash[string(vote.Hash)] += uint64(val.VotingPower) + vs.totalVotes += uint64(val.VotingPower) + return true, nil +} + +// Returns either a blockhash (or nil) that received +2/3 majority. +// If there exists no such majority, returns (nil, false). +func (vs *VoteSet) TwoThirdsMajority() (hash []byte, ok bool) { + vs.mtx.Lock() + defer vs.mtx.Unlock() + twoThirdsMajority := (vs.totalVotingPower*uint64(2) + uint64(2)) / uint64(3) + if vs.totalVotes < twoThirdsMajority { + return nil, false + } + for hash, votes := range vs.votesByHash { + if votes >= twoThirdsMajority { + if hash == "" { + return nil, true + } else { + return []byte(hash), true + } + } + } + return nil, false +} + +// Returns blockhashes (or nil) that received a +1/3 majority. +// If there exists no such majority, returns nil. +func (vs *VoteSet) OneThirdMajority() (hashes []interface{}) { + vs.mtx.Lock() + defer vs.mtx.Unlock() + oneThirdMajority := (vs.totalVotingPower + uint64(2)) / uint64(3) + if vs.totalVotes < oneThirdMajority { + return nil + } + for hash, votes := range vs.votesByHash { + if votes >= oneThirdMajority { + if hash == "" { + hashes = append(hashes, nil) + } else { + hashes = append(hashes, []byte(hash)) + } + } + } + return hashes +} diff --git a/db/db.go b/db/db.go new file mode 100644 index 000000000..84122d049 --- /dev/null +++ b/db/db.go @@ -0,0 +1,6 @@ +package db + +type Db interface { + Get([]byte) []byte + Set([]byte, []byte) +} diff --git a/db/level_db.go b/db/level_db.go index db0c4d069..0366f48ee 100644 --- a/db/level_db.go +++ b/db/level_db.go @@ -28,13 +28,6 @@ func (db *LevelDB) Set(key []byte, value []byte) { } func (db *LevelDB) Get(key []byte) []byte { - - batch := new(leveldb.Batch) - batch.Put([]byte("foo"), []byte("value")) - batch.Put([]byte("bar"), []byte("another value")) - batch.Delete([]byte("baz")) - err = db.Write(batch, nil) - res, err := db.db.Get(key, nil) if err != nil { panic(err) @@ -42,9 +35,6 @@ func (db *LevelDB) Get(key []byte) []byte { return res } -func (db *LevelDB) GetRange(key []byte, start, end int) []byte { -} - func (db *LevelDB) Delete(key []byte) { err := db.db.Delete(key, nil) if err != nil { diff --git a/db/mem_db.go b/db/mem_db.go index 8cb0f4f3d..67f9f061f 100644 --- a/db/mem_db.go +++ b/db/mem_db.go @@ -21,9 +21,6 @@ func (db *MemDB) Get(key []byte) []byte { return db.db[string(key)] } -func (db *MemDB) GetRange(key []byte, start, end int) []byte { -} - func (db *MemDB) Delete(key []byte) { delete(db.db, string(key)) } diff --git a/log.go b/log.go index 50c89fe12..0ab002446 100644 --- a/log.go +++ b/log.go @@ -4,7 +4,8 @@ import ( "os" "github.com/op/go-logging" - "github.com/tendermint/tendermint/block" + "github.com/tendermint/tendermint/blocks" + "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/p2p" ) @@ -27,6 +28,7 @@ func init() { Log.Error("error") */ - p2p.SetLogger(log) - block.SetLogger(log) + p2p.SetP2PLogger(log) + blocks.SetBlocksLogger(log) + consensus.SetConsensusLogger(log) } diff --git a/main.go b/main.go index 4f29d10a0..3e436b089 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "os/signal" "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/p2p" ) @@ -17,27 +18,38 @@ type Node struct { func NewNode() *Node { // Define channels for our app - chDescs := []p2p.ChannelDescriptor{ - p2p.ChannelDescriptor{ - Name: "PEX", - SendBufferSize: 2, - RecvBufferSize: 2, + chDescs := []*p2p.ChannelDescriptor{ + // PEX + &p2p.ChannelDescriptor{ + Id: p2p.PexCh, + SendQueueCapacity: 2, + RecvQueueCapacity: 2, + RecvBufferSize: 1024, + DefaultPriority: 1, }, - p2p.ChannelDescriptor{ - Name: "block", - SendBufferSize: 10, - RecvBufferSize: 10, + // CONSENSUS + &p2p.ChannelDescriptor{ + Id: consensus.ProposalCh, + SendQueueCapacity: 2, + RecvQueueCapacity: 10, + RecvBufferSize: 10240, + DefaultPriority: 5, }, - p2p.ChannelDescriptor{ - Name: "mempool", - SendBufferSize: 100, - RecvBufferSize: 100, + &p2p.ChannelDescriptor{ + Id: consensus.KnownPartsCh, + SendQueueCapacity: 2, + RecvQueueCapacity: 10, + RecvBufferSize: 1024, + DefaultPriority: 5, }, - p2p.ChannelDescriptor{ - Name: "consensus", - SendBufferSize: 1000, - RecvBufferSize: 1000, + &p2p.ChannelDescriptor{ + Id: consensus.VoteCh, + SendQueueCapacity: 100, + RecvQueueCapacity: 1000, + RecvBufferSize: 10240, + DefaultPriority: 5, }, + // TODO: MEMPOOL } sw := p2p.NewSwitch(chDescs) book := p2p.NewAddrBook(config.RootDir + "/addrbook.json") @@ -53,7 +65,7 @@ func NewNode() *Node { func (n *Node) Start() { log.Info("Starting node") for _, l := range n.lz { - go n.inboundConnectionHandler(l) + go n.inboundConnectionRoutine(l) } n.sw.Start() n.book.Start() @@ -75,7 +87,7 @@ func (n *Node) AddListener(l p2p.Listener) { n.book.AddOurAddress(l.ExternalAddress()) } -func (n *Node) inboundConnectionHandler(l p2p.Listener) { +func (n *Node) inboundConnectionRoutine(l p2p.Listener) { for { inConn, ok := <-l.Connections() if !ok { @@ -90,7 +102,7 @@ func (n *Node) inboundConnectionHandler(l p2p.Listener) { } // NOTE: We don't yet have the external address of the // remote (if they have a listener at all). - // PeerManager's pexHandler will handle that. + // PeerManager's pexRoutine will handle that. } // cleanup diff --git a/merkle/iavl_node.go b/merkle/iavl_node.go index 31f2c4e38..3a80e7f81 100644 --- a/merkle/iavl_node.go +++ b/merkle/iavl_node.go @@ -103,7 +103,7 @@ func (self *IAVLNode) Hash() (ByteSlice, uint64) { } hasher := sha256.New() - _, hashCount, err := self.saveToCountHashes(hasher, false) + _, hashCount, err := self.saveToCountHashes(hasher) if err != nil { panic(err) } @@ -215,43 +215,42 @@ func (self *IAVLNode) remove(db Db, key Key) (newSelf *IAVLNode, newKey Key, val } func (self *IAVLNode) WriteTo(w io.Writer) (n int64, err error) { - n, _, err = self.saveToCountHashes(w, true) + n, _, err = self.saveToCountHashes(w) return } -func (self *IAVLNode) saveToCountHashes(w io.Writer, meta bool) (n int64, hashCount uint64, err error) { +func (self *IAVLNode) saveToCountHashes(w io.Writer) (n int64, hashCount uint64, err error) { var _n int64 - if meta { - // height & size - _n, err = UInt8(self.height).WriteTo(w) - if err != nil { - return - } else { - n += _n - } - _n, err = UInt64(self.size).WriteTo(w) - if err != nil { - return - } else { - n += _n - } + // height & size + _n, err = UInt8(self.height).WriteTo(w) + if err != nil { + return + } else { + n += _n + } + _n, err = UInt64(self.size).WriteTo(w) + if err != nil { + return + } else { + n += _n + } - // key - _n, err = Byte(GetBinaryType(self.key)).WriteTo(w) - if err != nil { - return - } else { - n += _n - } - _n, err = self.key.WriteTo(w) - if err != nil { - return - } else { - n += _n - } + // key + _n, err = Byte(GetBinaryType(self.key)).WriteTo(w) + if err != nil { + return + } else { + n += _n + } + _n, err = self.key.WriteTo(w) + if err != nil { + return + } else { + n += _n } + // value or children if self.height == 0 { // value _n, err = Byte(GetBinaryType(self.value)).WriteTo(w) diff --git a/merkle/iavl_tree.go b/merkle/iavl_tree.go index 7a674dca0..286f848eb 100644 --- a/merkle/iavl_tree.go +++ b/merkle/iavl_tree.go @@ -30,6 +30,19 @@ func NewIAVLTreeFromHash(db Db, hash ByteSlice) *IAVLTree { return &IAVLTree{db: db, root: root} } +func NewIAVLTreeFromKey(db Db, key string) *IAVLTree { + hash := db.Get([]byte(key)) + if hash == nil { + return nil + } + root := &IAVLNode{ + hash: hash, + flags: IAVLNODE_FLAG_PERSISTED | IAVLNODE_FLAG_PLACEHOLDER, + } + root.fill(db) + return &IAVLTree{db: db, root: root} +} + func (t *IAVLTree) Root() Node { return t.root } @@ -75,10 +88,17 @@ func (t *IAVLTree) Save() { if t.root == nil { return } - if t.root.hash == nil { - t.root.Hash() + t.root.Hash() + t.root.Save(t.db) +} + +func (t *IAVLTree) SaveKey(key string) { + if t.root == nil { + return } + hash, _ := t.root.Hash() t.root.Save(t.db) + t.db.Set([]byte(key), hash) } func (t *IAVLTree) Get(key Key) (value Value) { diff --git a/merkle/types.go b/merkle/types.go index 10fa120f7..96fd3beda 100644 --- a/merkle/types.go +++ b/merkle/types.go @@ -11,8 +11,8 @@ type Value interface { type Key interface { Binary - Equals(Binary) bool - Less(b Binary) bool + Equals(interface{}) bool + Less(b interface{}) bool } type Db interface { @@ -38,6 +38,7 @@ type Tree interface { Get(key Key) Value Hash() (ByteSlice, uint64) Save() + SaveKey(string) Set(Key, Value) bool Remove(Key) (Value, error) Copy() Tree diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 592e77592..493fb0f38 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -130,7 +130,7 @@ func (a *AddrBook) Start() { log.Info("Starting address manager") a.loadFromFile(a.filePath) a.wg.Add(1) - go a.saveHandler() + go a.saveRoutine() } } @@ -371,7 +371,7 @@ func (a *AddrBook) loadFromFile(filePath string) { /* Private methods */ -func (a *AddrBook) saveHandler() { +func (a *AddrBook) saveRoutine() { dumpAddressTicker := time.NewTicker(dumpAddressInterval) out: for { diff --git a/p2p/connection.go b/p2p/connection.go index 8d45efd1a..609c2a724 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -29,7 +29,7 @@ const ( /* A MConnection wraps a network connection and handles buffering and multiplexing. -ByteSlices are sent with ".Send(channelId, bytes)". +Binary messages are sent with ".Send(channelId, msg)". Inbound ByteSlices are pushed to the designated chan<- InboundBytes. */ type MConnection struct { @@ -98,8 +98,8 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onError func(in func (c *MConnection) Start() { if atomic.CompareAndSwapUint32(&c.started, 0, 1) { log.Debug("Starting %v", c) - go c.sendHandler() - go c.recvHandler() + go c.sendRoutine() + go c.recvRoutine() } } @@ -112,9 +112,9 @@ func (c *MConnection) Stop() { c.chStatsTimer.Stop() c.pingTimer.Stop() // We can't close pong safely here because - // recvHandler may write to it after we've stopped. + // recvRoutine may write to it after we've stopped. // Though it doesn't need to get closed at all, - // we close it @ recvHandler. + // we close it @ recvRoutine. // close(c.pong) } } @@ -149,7 +149,7 @@ func (c *MConnection) stopForError(r interface{}) { } // Queues a message to be sent to channel. -func (c *MConnection) Send(chId byte, bytes ByteSlice) bool { +func (c *MConnection) Send(chId byte, msg Binary) bool { if atomic.LoadUint32(&c.stopped) == 1 { return false } @@ -161,9 +161,9 @@ func (c *MConnection) Send(chId byte, bytes ByteSlice) bool { return false } - channel.sendBytes(bytes) + channel.sendBytes(BinaryBytes(msg)) - // Wake up sendHandler if necessary + // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: default: @@ -174,7 +174,7 @@ func (c *MConnection) Send(chId byte, bytes ByteSlice) bool { // Queues a message to be sent to channel. // Nonblocking, returns true if successful. -func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool { +func (c *MConnection) TrySend(chId byte, msg Binary) bool { if atomic.LoadUint32(&c.stopped) == 1 { return false } @@ -186,9 +186,9 @@ func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool { return false } - ok = channel.trySendBytes(bytes) + ok = channel.trySendBytes(BinaryBytes(msg)) if ok { - // Wake up sendHandler if necessary + // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: default: @@ -206,13 +206,13 @@ func (c *MConnection) CanSend(chId byte) bool { channel, ok := c.channelsIdx[chId] if !ok { log.Error("Unknown channel %X", chId) - return 0 + return false } return channel.canSend() } -// sendHandler polls for packets to send from channels. -func (c *MConnection) sendHandler() { +// sendRoutine polls for packets to send from channels. +func (c *MConnection) sendRoutine() { defer c._recover() FOR_LOOP: @@ -243,7 +243,7 @@ FOR_LOOP: // Send some packets eof := c.sendSomePackets() if !eof { - // Keep sendHandler awake. + // Keep sendRoutine awake. select { case c.send <- struct{}{}: default: @@ -255,7 +255,7 @@ FOR_LOOP: break FOR_LOOP } if err != nil { - log.Info("%v failed @ sendHandler:\n%v", c, err) + log.Info("%v failed @ sendRoutine:\n%v", c, err) c.Stop() break FOR_LOOP } @@ -319,10 +319,10 @@ func (c *MConnection) sendPacket() bool { return false } -// recvHandler reads packets and reconstructs the message using the channels' "recving" buffer. +// recvRoutine reads packets and reconstructs the message using the channels' "recving" buffer. // After a whole message has been assembled, it's pushed to the Channel's recvQueue. // Blocks depending on how the connection is throttled. -func (c *MConnection) recvHandler() { +func (c *MConnection) recvRoutine() { defer c._recover() FOR_LOOP: @@ -335,7 +335,7 @@ FOR_LOOP: c.recvMonitor.Update(int(n)) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Info("%v failed @ recvHandler with err: %v", c, err) + log.Info("%v failed @ recvRoutine with err: %v", c, err) c.Stop() } break FOR_LOOP @@ -346,7 +346,7 @@ FOR_LOOP: numBytes := c.bufReader.Buffered() bytes, err := c.bufReader.Peek(MinInt(numBytes, 100)) if err != nil { - log.Debug("recvHandler packet type %X, peeked: %X", pktType, bytes) + log.Debug("recvRoutine packet type %X, peeked: %X", pktType, bytes) } } @@ -362,7 +362,7 @@ FOR_LOOP: c.recvMonitor.Update(int(n)) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Info("%v failed @ recvHandler", c) + log.Info("%v failed @ recvRoutine", c) c.Stop() } break FOR_LOOP @@ -376,7 +376,7 @@ FOR_LOOP: Panicf("Unknown message type %v", pktType) } - // TODO: shouldn't this go in the sendHandler? + // TODO: shouldn't this go in the sendRoutine? // Better to send a packet when *we* haven't sent anything for a while. c.pingTimer.Reset() } @@ -453,14 +453,14 @@ func (ch *Channel) trySendBytes(bytes ByteSlice) bool { } // Goroutine-safe -func (ch *Channel) sendQueueSize() (size int) { +func (ch *Channel) loadSendQueueSize() (size int) { return int(atomic.LoadUint32(&ch.sendQueueSize)) } // Goroutine-safe // Use only as a heuristic. func (ch *Channel) canSend() bool { - return ch.sendQueueSize() < ch.desc.SendQueueCapacity + return ch.loadSendQueueSize() < ch.desc.SendQueueCapacity } // Returns true if any packets are pending to be sent. diff --git a/p2p/listener.go b/p2p/listener.go index 168a43d16..f26242f2e 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -80,13 +80,13 @@ func NewDefaultListener(protocol string, lAddr string) Listener { connections: make(chan net.Conn, numBufferedConnections), } - go dl.listenHandler() + go dl.listenRoutine() return dl } // TODO: prevent abuse, esp a bunch of connections coming from the same IP range. -func (l *DefaultListener) listenHandler() { +func (l *DefaultListener) listenRoutine() { for { conn, err := l.listener.Accept() diff --git a/p2p/log.go b/p2p/log.go index 5057e1bf6..bcaa8fc57 100644 --- a/p2p/log.go +++ b/p2p/log.go @@ -10,6 +10,6 @@ func init() { logging.SetFormatter(logging.MustStringFormatter("[%{level:.1s}] %{message}")) } -func SetLogger(l *logging.Logger) { +func SetP2PLogger(l *logging.Logger) { log = l } diff --git a/p2p/netaddress.go b/p2p/netaddress.go index a208ca61d..e5517f955 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -71,7 +71,7 @@ func (na *NetAddress) WriteTo(w io.Writer) (n int64, err error) { return } -func (na *NetAddress) Equals(other Binary) bool { +func (na *NetAddress) Equals(other interface{}) bool { if o, ok := other.(*NetAddress); ok { return na.String() == o.String() } else { @@ -79,7 +79,7 @@ func (na *NetAddress) Equals(other Binary) bool { } } -func (na *NetAddress) Less(other Binary) bool { +func (na *NetAddress) Less(other interface{}) bool { if o, ok := other.(*NetAddress); ok { return na.String() < o.String() } else { diff --git a/p2p/peer.go b/p2p/peer.go index c44fcaded..dffd52b10 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -55,23 +55,23 @@ func (p *Peer) IsOutbound() bool { return p.outbound } -func (p *Peer) Send(chId byte, bytes ByteSlice) bool { +func (p *Peer) Send(chId byte, msg Binary) bool { if atomic.LoadUint32(&p.stopped) == 1 { return false } - return p.mconn.Send(chId, bytes) + return p.mconn.Send(chId, BinaryBytes(msg)) } -func (p *Peer) TrySend(chId byte, bytes ByteSlice) bool { +func (p *Peer) TrySend(chId byte, msg Binary) bool { if atomic.LoadUint32(&p.stopped) == 1 { return false } - return p.mconn.TrySend(chId, bytes) + return p.mconn.TrySend(chId, BinaryBytes(msg)) } -func (o *Peer) CanSend(chId byte) int { +func (p *Peer) CanSend(chId byte) bool { if atomic.LoadUint32(&p.stopped) == 1 { - return 0 + return false } return p.mconn.CanSend(chId) } diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go index 464b6ae8a..1c2152b4f 100644 --- a/p2p/peer_manager.go +++ b/p2p/peer_manager.go @@ -15,7 +15,7 @@ import ( var pexErrInvalidMessage = errors.New("Invalid PEX message") const ( - pexCh = byte(0x00) + PexCh = byte(0x00) ensurePeersPeriodSeconds = 30 minNumOutboundPeers = 10 maxNumPeers = 50 @@ -28,10 +28,11 @@ adequate number of peers are connected to the switch. type PeerManager struct { sw *Switch swEvents chan interface{} - book *AddrBook quit chan struct{} started uint32 stopped uint32 + + book *AddrBook } func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager { @@ -40,8 +41,8 @@ func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager { pm := &PeerManager{ sw: sw, swEvents: swEvents, - book: book, quit: make(chan struct{}), + book: book, } return pm } @@ -49,9 +50,9 @@ func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager { func (pm *PeerManager) Start() { if atomic.CompareAndSwapUint32(&pm.started, 0, 1) { log.Info("Starting PeerManager") - go pm.switchEventsHandler() - go pm.requestHandler() - go pm.ensurePeersHandler() + go pm.switchEventsRoutine() + go pm.requestRoutine() + go pm.ensurePeersRoutine() } } @@ -65,20 +66,16 @@ func (pm *PeerManager) Stop() { // Asks peer for more addresses. func (pm *PeerManager) RequestPEX(peer *Peer) { - msg := &pexRequestMessage{} - tm := TypedMessage{msgTypeRequest, msg} - peer.TrySend(pexCh, tm.Bytes()) + peer.TrySend(PexCh, &pexRequestMessage{}) } func (pm *PeerManager) SendAddrs(peer *Peer, addrs []*NetAddress) { - msg := &pexAddrsMessage{Addrs: addrs} - tm := TypedMessage{msgTypeAddrs, msg} - peer.Send(pexCh, tm.Bytes()) + peer.Send(PexCh, &pexAddrsMessage{Addrs: addrs}) } // For new outbound peers, announce our listener addresses if any, // and if .book needs more addresses, ask for them. -func (pm *PeerManager) switchEventsHandler() { +func (pm *PeerManager) switchEventsRoutine() { for { swEvent, ok := <-pm.swEvents if !ok { @@ -100,7 +97,7 @@ func (pm *PeerManager) switchEventsHandler() { } // Ensures that sufficient peers are connected. (continuous) -func (pm *PeerManager) ensurePeersHandler() { +func (pm *PeerManager) ensurePeersRoutine() { // fire once immediately. pm.ensurePeers() // fire periodically @@ -167,10 +164,10 @@ func (pm *PeerManager) ensurePeers() { } // Handles incoming PEX messages. -func (pm *PeerManager) requestHandler() { +func (pm *PeerManager) requestRoutine() { for { - inMsg, ok := pm.sw.Receive(pexCh) // {Peer, Time, Packet} + inMsg, ok := pm.sw.Receive(PexCh) // {Peer, Time, Packet} if !ok { // Client has stopped break @@ -178,7 +175,7 @@ func (pm *PeerManager) requestHandler() { // decode message msg := decodeMessage(inMsg.Bytes) - log.Info("requestHandler received %v", msg) + log.Info("requestRoutine received %v", msg) switch msg.(type) { case *pexRequestMessage: @@ -186,8 +183,7 @@ func (pm *PeerManager) requestHandler() { // TODO: prevent abuse. addrs := pm.book.GetSelection() msg := &pexAddrsMessage{Addrs: addrs} - tm := TypedMessage{msgTypeRequest, msg} - queued := inMsg.MConn.Peer.TrySend(pexCh, tm.Bytes()) + queued := inMsg.MConn.Peer.TrySend(PexCh, msg) if !queued { // ignore } @@ -239,7 +235,8 @@ type pexRequestMessage struct { } func (m *pexRequestMessage) WriteTo(w io.Writer) (n int64, err error) { - return // nothing to write. + n, err = WriteTo(msgTypeRequest, w, n, err) + return } func (m *pexRequestMessage) String() string { @@ -266,6 +263,7 @@ func readPexAddrsMessage(r io.Reader) *pexAddrsMessage { } func (m *pexAddrsMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(msgTypeAddrs, w, n, err) n, err = WriteTo(UInt32(len(m.Addrs)), w, n, err) for _, addr := range m.Addrs { n, err = WriteTo(addr, w, n, err) diff --git a/p2p/switch.go b/p2p/switch.go index bea91110b..9a3386892 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -137,11 +137,9 @@ func (s *Switch) Broadcast(chId byte, msg Binary) (numSuccess, numFailure int) { return } - msgBytes := BinaryBytes(msg) - - log.Debug("Broadcast on [%X] len: %v", chId, len(msgBytes)) + log.Debug("Broadcast on [%X]", chId, msg) for _, peer := range s.peers.List() { - success := peer.TrySend(chId, msgBytes) + success := peer.TrySend(chId, msg) log.Debug("Broadcast for peer %v success: %v", peer, success) if success { numSuccess += 1 diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 2ad9cbf5b..924fc3d25 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -111,7 +111,7 @@ func BenchmarkSwitches(b *testing.B) { defer s2.Stop() // Create a sink on either channel to just pop off messages. - recvHandler := func(c *Switch, chId byte) { + recvRoutine := func(c *Switch, chId byte) { for { _, ok := c.Receive(chId) if !ok { @@ -122,8 +122,8 @@ func BenchmarkSwitches(b *testing.B) { // Create routines to consume from recvQueues. for _, chDesc := range chDescs { - go recvHandler(s1, chDesc.Id) - go recvHandler(s2, chDesc.Id) + go recvRoutine(s1, chDesc.Id) + go recvRoutine(s2, chDesc.Id) } // Allow time for goroutines to boot up