From e53b148acfc954d1c74150ecbc45f7ac2ba92398 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 3 Sep 2014 20:41:57 -0700 Subject: [PATCH] refactor from Binary centric model to global method model --- accounts/store.go | 18 --- binary/binary.go | 9 ++ blocks/accounts.go | 20 +-- blocks/adjustment.go | 90 +++++------ blocks/block.go | 126 ++++++++-------- blocks/block_part_set.go | 6 +- blocks/block_test.go | 52 +++---- blocks/codec_test.go | 20 +-- blocks/store.go | 8 +- blocks/tx.go | 54 +++---- consensus/README.md | 42 +++++- consensus/consensus.go | 177 ++++++++++++---------- consensus/state.go | 283 +++++++---------------------------- consensus/validator.go | 66 -------- p2p/connection.go | 89 +++++------ p2p/listener.go | 5 +- p2p/netaddress.go | 18 +-- p2p/peer.go | 3 +- p2p/peer_manager.go | 28 ++-- p2p/switch_test.go | 15 +- state/state.go | 127 ++++++++++++++++ state/validator.go | 32 ++-- {consensus => state}/vote.go | 24 +-- 23 files changed, 622 insertions(+), 690 deletions(-) delete mode 100644 accounts/store.go delete mode 100644 consensus/validator.go create mode 100644 state/state.go rename {consensus => state}/vote.go (92%) diff --git a/accounts/store.go b/accounts/store.go deleted file mode 100644 index 5c3c27251..000000000 --- a/accounts/store.go +++ /dev/null @@ -1,18 +0,0 @@ -package accounts - -import ( - . "github.com/tendermint/tendermint/blocks" -) - -type AccountStore struct { -} - -func (as *AccountStore) StageBlock(block *Block) error { - // XXX implement staging. - return nil -} - -func (as *AccountStore) CommitBlock(block *Block) error { - // XXX implement staging. - return nil -} diff --git a/binary/binary.go b/binary/binary.go index 5bc8357ff..778a8ab62 100644 --- a/binary/binary.go +++ b/binary/binary.go @@ -6,6 +6,15 @@ type Binary interface { WriteTo(w io.Writer) (int64, error) } +func WriteBinary(w io.Writer, b Binary, n *int64, err *error) { + if *err != nil { + return + } + n_, err_ := b.WriteTo(w) + *n += int64(n_) + *err = err_ +} + func WriteTo(w io.Writer, bz []byte, n *int64, err *error) { if *err != nil { return diff --git a/blocks/accounts.go b/blocks/accounts.go index 90b33019d..5d418a956 100644 --- a/blocks/accounts.go +++ b/blocks/accounts.go @@ -8,10 +8,10 @@ import ( // NOTE: consensus/Validator embeds this, so.. type Account struct { Id uint64 // Numeric id of account, incrementing. - PubKey ByteSlice + PubKey []byte } -func (self *Account) Verify(msg ByteSlice, sig ByteSlice) bool { +func (self *Account) Verify(msg []byte, sig []byte) bool { return false } @@ -19,10 +19,10 @@ func (self *Account) Verify(msg ByteSlice, sig ByteSlice) bool { type PrivAccount struct { Account - PrivKey ByteSlice + PrivKey []byte } -func (self *PrivAccount) Sign(msg ByteSlice) Signature { +func (self *PrivAccount) Sign(msg []byte) Signature { return Signature{} } @@ -42,13 +42,13 @@ It usually follows the message to be signed. type Signature struct { SignerId uint64 - Bytes ByteSlice + Bytes []byte } -func ReadSignature(r io.Reader) Signature { +func ReadSignature(r io.Reader, n *int64, err *error) Signature { return Signature{ - SignerId: Readuint64(r), - Bytes: ReadByteSlice(r), + SignerId: ReadUInt64(r, n, err), + Bytes: ReadByteSlice(r, n, err), } } @@ -57,7 +57,7 @@ func (sig Signature) IsZero() bool { } func (sig Signature) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(UInt64(sig.SignerId), w, n, err) - n, err = WriteTo(sig.Bytes, w, n, err) + WriteUInt64(w, sig.SignerId, &n, &err) + WriteByteSlice(w, sig.Bytes, &n, &err) return } diff --git a/blocks/adjustment.go b/blocks/adjustment.go index e8565105d..0cc2f19c8 100644 --- a/blocks/adjustment.go +++ b/blocks/adjustment.go @@ -16,41 +16,41 @@ import ( TODO: signing a bad checkpoint (block) */ type Adjustment interface { - Type() Byte + Type() byte Binary } const ( - ADJ_TYPE_BOND = Byte(0x01) - ADJ_TYPE_UNBOND = Byte(0x02) - ADJ_TYPE_TIMEOUT = Byte(0x03) - ADJ_TYPE_DUPEOUT = Byte(0x04) + ADJ_TYPE_BOND = byte(0x01) + ADJ_TYPE_UNBOND = byte(0x02) + ADJ_TYPE_TIMEOUT = byte(0x03) + ADJ_TYPE_DUPEOUT = byte(0x04) ) -func ReadAdjustment(r io.Reader) Adjustment { - switch t := ReadByte(r); t { +func ReadAdjustment(r io.Reader, n *int64, err *error) Adjustment { + switch t := ReadByte(r, n, err); t { case ADJ_TYPE_BOND: return &Bond{ - Fee: Readuint64(r), - UnbondTo: Readuint64(r), - Amount: Readuint64(r), - Signature: ReadSignature(r), + Fee: ReadUInt64(r, n, err), + UnbondTo: ReadUInt64(r, n, err), + Amount: ReadUInt64(r, n, err), + Signature: ReadSignature(r, n, err), } case ADJ_TYPE_UNBOND: return &Unbond{ - Fee: Readuint64(r), - Amount: Readuint64(r), - Signature: ReadSignature(r), + Fee: ReadUInt64(r, n, err), + Amount: ReadUInt64(r, n, err), + Signature: ReadSignature(r, n, err), } case ADJ_TYPE_TIMEOUT: return &Timeout{ - AccountId: Readuint64(r), - Penalty: Readuint64(r), + AccountId: ReadUInt64(r, n, err), + Penalty: ReadUInt64(r, n, err), } case ADJ_TYPE_DUPEOUT: return &Dupeout{ - VoteA: ReadBlockVote(r), - VoteB: ReadBlockVote(r), + VoteA: ReadBlockVote(r, n, err), + VoteB: ReadBlockVote(r, n, err), } default: Panicf("Unknown Adjustment type %x", t) @@ -68,16 +68,16 @@ type Bond struct { Signature } -func (self *Bond) Type() Byte { +func (self *Bond) Type() byte { return ADJ_TYPE_BOND } func (self *Bond) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(self.Type(), w, n, err) - n, err = WriteTo(UInt64(self.Fee), w, n, err) - n, err = WriteTo(UInt64(self.UnbondTo), w, n, err) - n, err = WriteTo(UInt64(self.Amount), w, n, err) - n, err = WriteTo(self.Signature, w, n, err) + WriteByte(w, self.Type(), &n, &err) + WriteUInt64(w, self.Fee, &n, &err) + WriteUInt64(w, self.UnbondTo, &n, &err) + WriteUInt64(w, self.Amount, &n, &err) + WriteBinary(w, self.Signature, &n, &err) return } @@ -90,15 +90,15 @@ type Unbond struct { Signature } -func (self *Unbond) Type() Byte { +func (self *Unbond) Type() byte { return ADJ_TYPE_UNBOND } func (self *Unbond) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(self.Type(), w, n, err) - n, err = WriteTo(UInt64(self.Fee), w, n, err) - n, err = WriteTo(UInt64(self.Amount), w, n, err) - n, err = WriteTo(self.Signature, w, n, err) + WriteByte(w, self.Type(), &n, &err) + WriteUInt64(w, self.Fee, &n, &err) + WriteUInt64(w, self.Amount, &n, &err) + WriteBinary(w, self.Signature, &n, &err) return } @@ -110,14 +110,14 @@ type Timeout struct { Penalty uint64 } -func (self *Timeout) Type() Byte { +func (self *Timeout) Type() byte { return ADJ_TYPE_TIMEOUT } func (self *Timeout) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(self.Type(), w, n, err) - n, err = WriteTo(UInt64(self.AccountId), w, n, err) - n, err = WriteTo(UInt64(self.Penalty), w, n, err) + WriteByte(w, self.Type(), &n, &err) + WriteUInt64(w, self.AccountId, &n, &err) + WriteUInt64(w, self.Penalty, &n, &err) return } @@ -129,22 +129,22 @@ Typically only the signature is passed around, as the hash & height are implied. */ type BlockVote struct { Height uint64 - BlockHash ByteSlice + BlockHash []byte Signature } -func ReadBlockVote(r io.Reader) BlockVote { +func ReadBlockVote(r io.Reader, n *int64, err *error) BlockVote { return BlockVote{ - Height: Readuint64(r), - BlockHash: ReadByteSlice(r), - Signature: ReadSignature(r), + Height: ReadUInt64(r, n, err), + BlockHash: ReadByteSlice(r, n, err), + Signature: ReadSignature(r, n, err), } } func (self BlockVote) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(UInt64(self.Height), w, n, err) - n, err = WriteTo(self.BlockHash, w, n, err) - n, err = WriteTo(self.Signature, w, n, err) + WriteUInt64(w, self.Height, &n, &err) + WriteByteSlice(w, self.BlockHash, &n, &err) + WriteBinary(w, self.Signature, &n, &err) return } @@ -154,13 +154,13 @@ type Dupeout struct { VoteB BlockVote } -func (self *Dupeout) Type() Byte { +func (self *Dupeout) Type() byte { return ADJ_TYPE_DUPEOUT } func (self *Dupeout) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(self.Type(), w, n, err) - n, err = WriteTo(self.VoteA, w, n, err) - n, err = WriteTo(self.VoteB, w, n, err) + WriteByte(w, self.Type(), &n, &err) + WriteBinary(w, self.VoteA, &n, &err) + WriteBinary(w, self.VoteB, &n, &err) return } diff --git a/blocks/block.go b/blocks/block.go index 9914dfe98..9de809ef1 100644 --- a/blocks/block.go +++ b/blocks/block.go @@ -4,6 +4,7 @@ import ( "crypto/sha256" "fmt" "io" + "time" . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" @@ -32,18 +33,18 @@ type Block struct { hash []byte } -func ReadBlock(r io.Reader) *Block { +func ReadBlock(r io.Reader, n *int64, err *error) *Block { return &Block{ - Header: ReadHeader(r), - Validation: ReadValidation(r), - Txs: ReadTxs(r), + Header: ReadHeader(r, n, err), + Validation: ReadValidation(r, n, err), + Txs: ReadTxs(r, n, err), } } func (b *Block) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(&b.Header, w, n, err) - n, err = WriteTo(&b.Validation, w, n, err) - n, err = WriteTo(&b.Txs, w, n, err) + WriteBinary(w, &b.Header, &n, &err) + WriteBinary(w, &b.Validation, &n, &err) + WriteBinary(w, &b.Txs, &n, &err) return } @@ -54,20 +55,20 @@ func (b *Block) ValidateBasic() error { } func (b *Block) URI() string { - return CalcBlockURI(uint32(b.Height), b.Hash()) + return CalcBlockURI(b.Height, b.Hash()) } func (b *Block) Hash() []byte { if b.hash != nil { return b.hash } else { - hashes := []Binary{ - ByteSlice(b.Header.Hash()), - ByteSlice(b.Validation.Hash()), - ByteSlice(b.Txs.Hash()), + hashes := [][]byte{ + b.Header.Hash(), + b.Validation.Hash(), + b.Txs.Hash(), } // Merkle hash from sub-hashes. - return merkle.HashFromBinarySlice(hashes) + return merkle.HashFromByteSlices(hashes) } } @@ -104,31 +105,31 @@ type BlockPart struct { Round uint16 // Add Round? Well I need to know... Index uint16 Total uint16 - Bytes ByteSlice + Bytes []byte Signature // Volatile hash []byte } -func ReadBlockPart(r io.Reader) *BlockPart { +func ReadBlockPart(r io.Reader, n *int64, err *error) *BlockPart { return &BlockPart{ - Height: Readuint32(r), - Round: Readuint16(r), - Index: Readuint16(r), - Total: Readuint16(r), - Bytes: ReadByteSlice(r), - Signature: ReadSignature(r), + Height: ReadUInt32(r, n, err), + Round: ReadUInt16(r, n, err), + Index: ReadUInt16(r, n, err), + Total: ReadUInt16(r, n, err), + Bytes: ReadByteSlice(r, n, err), + Signature: ReadSignature(r, n, err), } } func (bp *BlockPart) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(UInt32(bp.Height), w, n, err) - n, err = WriteTo(UInt16(bp.Round), w, n, err) - n, err = WriteTo(UInt16(bp.Index), w, n, err) - n, err = WriteTo(UInt16(bp.Total), w, n, err) - n, err = WriteTo(bp.Bytes, w, n, err) - n, err = WriteTo(bp.Signature, w, n, err) + WriteUInt32(w, bp.Height, &n, &err) + WriteUInt16(w, bp.Round, &n, &err) + WriteUInt16(w, bp.Index, &n, &err) + WriteUInt16(w, bp.Total, &n, &err) + WriteByteSlice(w, bp.Bytes, &n, &err) + WriteBinary(w, bp.Signature, &n, &err) return } @@ -173,38 +174,41 @@ func (bp *BlockPart) ValidateWithSigner(signer *Account) error { /* Header is part of a Block */ type Header struct { - Name String + Name string Height uint32 Fees uint64 - Time Time - PrevHash ByteSlice - ValidationHash ByteSlice - TxsHash ByteSlice + Time time.Time + PrevHash []byte + ValidationHash []byte + TxsHash []byte // Volatile hash []byte } -func ReadHeader(r io.Reader) Header { +func ReadHeader(r io.Reader, n *int64, err *error) (h Header) { + if *err != nil { + return Header{} + } return Header{ - Name: ReadString(r), - Height: Readuint32(r), - Fees: Readuint64(r), - Time: ReadTime(r), - PrevHash: ReadByteSlice(r), - ValidationHash: ReadByteSlice(r), - TxsHash: ReadByteSlice(r), + Name: ReadString(r, n, err), + Height: ReadUInt32(r, n, err), + Fees: ReadUInt64(r, n, err), + Time: ReadTime(r, n, err), + PrevHash: ReadByteSlice(r, n, err), + ValidationHash: ReadByteSlice(r, n, err), + TxsHash: ReadByteSlice(r, n, err), } } func (h *Header) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(h.Name, w, n, err) - n, err = WriteTo(UInt32(h.Height), w, n, err) - n, err = WriteTo(UInt64(h.Fees), w, n, err) - n, err = WriteTo(h.Time, w, n, err) - n, err = WriteTo(h.PrevHash, w, n, err) - n, err = WriteTo(h.ValidationHash, w, n, err) - n, err = WriteTo(h.TxsHash, w, n, err) + WriteString(w, h.Name, &n, &err) + WriteUInt32(w, h.Height, &n, &err) + WriteUInt64(w, h.Fees, &n, &err) + WriteTime(w, h.Time, &n, &err) + WriteByteSlice(w, h.PrevHash, &n, &err) + WriteByteSlice(w, h.ValidationHash, &n, &err) + WriteByteSlice(w, h.TxsHash, &n, &err) return } @@ -231,16 +235,16 @@ type Validation struct { hash []byte } -func ReadValidation(r io.Reader) Validation { - numSigs := Readuint32(r) - numAdjs := Readuint32(r) +func ReadValidation(r io.Reader, n *int64, err *error) Validation { + numSigs := ReadUInt32(r, n, err) + numAdjs := ReadUInt32(r, n, err) sigs := make([]Signature, 0, numSigs) for i := uint32(0); i < numSigs; i++ { - sigs = append(sigs, ReadSignature(r)) + sigs = append(sigs, ReadSignature(r, n, err)) } adjs := make([]Adjustment, 0, numAdjs) for i := uint32(0); i < numAdjs; i++ { - adjs = append(adjs, ReadAdjustment(r)) + adjs = append(adjs, ReadAdjustment(r, n, err)) } return Validation{ Signatures: sigs, @@ -249,13 +253,13 @@ func ReadValidation(r io.Reader) Validation { } func (v *Validation) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(UInt32(len(v.Signatures)), w, n, err) - n, err = WriteTo(UInt32(len(v.Adjustments)), w, n, err) + WriteUInt32(w, uint32(len(v.Signatures)), &n, &err) + WriteUInt32(w, uint32(len(v.Adjustments)), &n, &err) for _, sig := range v.Signatures { - n, err = WriteTo(sig, w, n, err) + WriteBinary(w, sig, &n, &err) } for _, adj := range v.Adjustments { - n, err = WriteTo(adj, w, n, err) + WriteBinary(w, adj, &n, &err) } return } @@ -282,19 +286,19 @@ type Txs struct { hash []byte } -func ReadTxs(r io.Reader) Txs { - numTxs := Readuint32(r) +func ReadTxs(r io.Reader, n *int64, err *error) Txs { + numTxs := ReadUInt32(r, n, err) txs := make([]Tx, 0, numTxs) for i := uint32(0); i < numTxs; i++ { - txs = append(txs, ReadTx(r)) + txs = append(txs, ReadTx(r, n, err)) } return Txs{Txs: txs} } func (txs *Txs) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(UInt32(len(txs.Txs)), w, n, err) + WriteUInt32(w, uint32(len(txs.Txs)), &n, &err) for _, tx := range txs.Txs { - n, err = WriteTo(tx, w, n, err) + WriteBinary(w, tx, &n, &err) } return } diff --git a/blocks/block_part_set.go b/blocks/block_part_set.go index b240f10af..68f3f0434 100644 --- a/blocks/block_part_set.go +++ b/blocks/block_part_set.go @@ -95,7 +95,7 @@ func (bps *BlockPartSet) AddBlockPart(part *BlockPart) (added bool, err error) { // Check for existing parts. existing := bps.parts[part.Index] if existing != nil { - if existing.Bytes.Equals(part.Bytes) { + if bytes.Equal(existing.Bytes, part.Bytes) { // Ignore duplicate return false, nil } else { @@ -127,7 +127,9 @@ func (bps *BlockPartSet) Block() *Block { for _, part := range bps.parts { blockBytes = append(blockBytes, part.Bytes...) } - block := ReadBlock(bytes.NewReader(blockBytes)) + var n int64 + var err error + block := ReadBlock(bytes.NewReader(blockBytes), &n, &err) bps._block = block } return bps._block diff --git a/blocks/block_test.go b/blocks/block_test.go index 2dcdb9e2e..71bab6ac0 100644 --- a/blocks/block_test.go +++ b/blocks/block_test.go @@ -9,7 +9,7 @@ import ( ) // Distributed pseudo-exponentially to test for various cases -func randuint64() uint64 { +func randUInt64() uint64 { bits := rand.Uint32() % 64 if bits == 0 { return 0 @@ -19,7 +19,7 @@ func randuint64() uint64 { return n } -func randuint32() uint32 { +func randUInt32() uint32 { bits := rand.Uint32() % 32 if bits == 0 { return 0 @@ -29,18 +29,18 @@ func randuint32() uint32 { return n } -func randTime() Time { - return Time{time.Unix(int64(randuint64()), 0)} +func randTime() time.Time { + return time.Unix(int64(randUInt64()), 0) } func randAccount() Account { return Account{ - Id: randuint64(), + Id: randUInt64(), PubKey: randBytes(32), } } -func randBytes(n int) ByteSlice { +func randBytes(n int) []byte { bs := make([]byte, n) for i := 0; i < n; i++ { bs[i] = byte(rand.Intn(256)) @@ -49,7 +49,7 @@ func randBytes(n int) ByteSlice { } func randSig() Signature { - return Signature{randuint64(), randBytes(32)} + return Signature{randUInt64(), randBytes(32)} } func TestBlock(t *testing.T) { @@ -58,15 +58,15 @@ func TestBlock(t *testing.T) { sendTx := &SendTx{ Signature: randSig(), - Fee: randuint64(), - To: randuint64(), - Amount: randuint64(), + Fee: randUInt64(), + To: randUInt64(), + Amount: randUInt64(), } nameTx := &NameTx{ Signature: randSig(), - Fee: randuint64(), - Name: String(randBytes(12)), + Fee: randUInt64(), + Name: string(randBytes(12)), PubKey: randBytes(32), } @@ -74,30 +74,30 @@ func TestBlock(t *testing.T) { bond := &Bond{ Signature: randSig(), - Fee: randuint64(), - UnbondTo: randuint64(), - Amount: randuint64(), + Fee: randUInt64(), + UnbondTo: randUInt64(), + Amount: randUInt64(), } unbond := &Unbond{ Signature: randSig(), - Fee: randuint64(), - Amount: randuint64(), + Fee: randUInt64(), + Amount: randUInt64(), } timeout := &Timeout{ - AccountId: randuint64(), - Penalty: randuint64(), + AccountId: randUInt64(), + Penalty: randUInt64(), } dupeout := &Dupeout{ VoteA: BlockVote{ - Height: randuint64(), + Height: randUInt64(), BlockHash: randBytes(32), Signature: randSig(), }, VoteB: BlockVote{ - Height: randuint64(), + Height: randUInt64(), BlockHash: randBytes(32), Signature: randSig(), }, @@ -108,8 +108,8 @@ func TestBlock(t *testing.T) { block := &Block{ Header: Header{ Name: "Tendermint", - Height: randuint32(), - Fees: randuint64(), + Height: randUInt32(), + Fees: randUInt64(), Time: randTime(), PrevHash: randBytes(32), ValidationHash: randBytes(32), @@ -129,10 +129,12 @@ func TestBlock(t *testing.T) { // Then, compare. blockBytes := BinaryBytes(block) - block2 := ReadBlock(bytes.NewReader(blockBytes)) + var n int64 + var err error + block2 := ReadBlock(bytes.NewReader(blockBytes), &n, &err) blockBytes2 := BinaryBytes(block2) - if !BinaryEqual(blockBytes, blockBytes2) { + if !bytes.Equal(blockBytes, blockBytes2) { t.Fatal("Write->Read of block failed.") } } diff --git a/blocks/codec_test.go b/blocks/codec_test.go index 33c69c6f6..8c1499062 100644 --- a/blocks/codec_test.go +++ b/blocks/codec_test.go @@ -5,8 +5,8 @@ import ( "encoding/gob" "encoding/json" "testing" + "time" - . "github.com/tendermint/tendermint/binary" "github.com/ugorji/go/codec" "github.com/vmihailenco/msgpack" ) @@ -18,10 +18,10 @@ func BenchmarkTestCustom(b *testing.B) { Name: "Header", Height: 123, Fees: 123, - Time: TimeFromUnix(123), - PrevHash: ByteSlice("prevhash"), - ValidationHash: ByteSlice("validationhash"), - TxsHash: ByteSlice("txshash"), + Time: time.Unix(123, 0), + PrevHash: []byte("prevhash"), + ValidationHash: []byte("validationhash"), + TxsHash: []byte("txshash"), } buf := bytes.NewBuffer(nil) @@ -30,7 +30,9 @@ func BenchmarkTestCustom(b *testing.B) { for i := 0; i < b.N; i++ { buf.Reset() h.WriteTo(buf) - h2 := ReadHeader(buf) + var n int64 + var err error + h2 := ReadHeader(buf, &n, &err) if h2.Name != "Header" { b.Fatalf("wrong name") } @@ -83,7 +85,7 @@ func BenchmarkTestGob(b *testing.B) { Name: "Header", Height: 123, Fees: 123, - Time: TimeFromUnix(123), + Time: time.Unix(123, 0), PrevHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), TxsHash: []byte("txshash"), @@ -112,7 +114,7 @@ func BenchmarkTestMsgPack(b *testing.B) { Name: "Header", Height: 123, Fees: 123, - Time: TimeFromUnix(123), + Time: time.Unix(123, 0), PrevHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), TxsHash: []byte("txshash"), @@ -141,7 +143,7 @@ func BenchmarkTestMsgPack2(b *testing.B) { Name: "Header", Height: 123, Fees: 123, - Time: TimeFromUnix(123), + Time: time.Unix(123, 0), PrevHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), TxsHash: []byte("txshash"), diff --git a/blocks/store.go b/blocks/store.go index 7f2ff5238..0f4391b57 100644 --- a/blocks/store.go +++ b/blocks/store.go @@ -79,7 +79,8 @@ func (bs *BlockStore) LoadBlockPart(height uint32, index uint16) *BlockPart { if partBytes == nil { return nil } - return ReadBlockPart(bytes.NewReader(partBytes)) + var n int64 + return ReadBlockPart(bytes.NewReader(partBytes), &n, &err) } // Convenience method for loading block parts and merging to a block. @@ -93,11 +94,6 @@ func (bs *BlockStore) LoadBlock(height uint32) *Block { panic("TODO: Not implemented") } -func (bs *BlockStore) StageBlockAndParts(block *Block, parts []*BlockPart) error { - // XXX validate - return nil -} - // NOTE: Assumes that parts as well as the block are valid. See StageBlockParts(). // Writes are synchronous and atomic. func (bs *BlockStore) SaveBlockParts(height uint32, parts []*BlockPart) error { diff --git a/blocks/tx.go b/blocks/tx.go index f4f73a525..1ae439ec2 100644 --- a/blocks/tx.go +++ b/blocks/tx.go @@ -21,30 +21,30 @@ Tx wire format: */ type Tx interface { - Type() Byte + Type() byte Binary } const ( - TX_TYPE_SEND = Byte(0x01) - TX_TYPE_NAME = Byte(0x02) + TX_TYPE_SEND = byte(0x01) + TX_TYPE_NAME = byte(0x02) ) -func ReadTx(r io.Reader) Tx { - switch t := ReadByte(r); t { +func ReadTx(r io.Reader, n *int64, err *error) Tx { + switch t := ReadByte(r, n, err); t { case TX_TYPE_SEND: return &SendTx{ - Fee: Readuint64(r), - To: Readuint64(r), - Amount: Readuint64(r), - Signature: ReadSignature(r), + Fee: ReadUInt64(r, n, err), + To: ReadUInt64(r, n, err), + Amount: ReadUInt64(r, n, err), + Signature: ReadSignature(r, n, err), } case TX_TYPE_NAME: return &NameTx{ - Fee: Readuint64(r), - Name: ReadString(r), - PubKey: ReadByteSlice(r), - Signature: ReadSignature(r), + Fee: ReadUInt64(r, n, err), + Name: ReadString(r, n, err), + PubKey: ReadByteSlice(r, n, err), + Signature: ReadSignature(r, n, err), } default: Panicf("Unknown Tx type %x", t) @@ -61,16 +61,16 @@ type SendTx struct { Signature } -func (self *SendTx) Type() Byte { +func (self *SendTx) Type() byte { return TX_TYPE_SEND } func (self *SendTx) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(self.Type(), w, n, err) - n, err = WriteTo(UInt64(self.Fee), w, n, err) - n, err = WriteTo(UInt64(self.To), w, n, err) - n, err = WriteTo(UInt64(self.Amount), w, n, err) - n, err = WriteTo(self.Signature, w, n, err) + WriteByte(w, self.Type(), &n, &err) + WriteUInt64(w, self.Fee, &n, &err) + WriteUInt64(w, self.To, &n, &err) + WriteUInt64(w, self.Amount, &n, &err) + WriteBinary(w, self.Signature, &n, &err) return } @@ -78,20 +78,20 @@ func (self *SendTx) WriteTo(w io.Writer) (n int64, err error) { type NameTx struct { Fee uint64 - Name String - PubKey ByteSlice + Name string + PubKey []byte Signature } -func (self *NameTx) Type() Byte { +func (self *NameTx) Type() byte { return TX_TYPE_NAME } func (self *NameTx) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(self.Type(), w, n, err) - n, err = WriteTo(UInt64(self.Fee), w, n, err) - n, err = WriteTo(self.Name, w, n, err) - n, err = WriteTo(self.PubKey, w, n, err) - n, err = WriteTo(self.Signature, w, n, err) + WriteByte(w, self.Type(), &n, &err) + WriteUInt64(w, self.Fee, &n, &err) + WriteString(w, self.Name, &n, &err) + WriteByteSlice(w, self.PubKey, &n, &err) + WriteBinary(w, self.Signature, &n, &err) return } diff --git a/consensus/README.md b/consensus/README.md index da608d993..82df15f5a 100644 --- a/consensus/README.md +++ b/consensus/README.md @@ -1,16 +1,48 @@ -## [ZombieValidators] +## Determining the order of proposers at height h: -The most likely scenario may be during an upgrade. +``` +Determining the order of proposers at height h: + A B C All validators A, B, and C + [+10, +5, +2] (+17) Voting power -We'll see some validators that fail to upgrade while most have. Then, some updated validator will propose a block that appears invalid to the outdated validators. What is the outdated validator to do? + [ 0, 0, 0] Genesis? + [ 10, 5, 2] (+17) +A [ -7, 5, 2] (-17) Round 0 proposer: A + [ 3, 10, 4] (+17) +B [ 3, -7, 4] (-17) Round 1 proposer: B + [ 13, -2, 6] (+17) +A [ -4, -2, 6] (-17) Round 2 proposer: A + [ 6, 3, 8] (+17) +C [ 6, 3, -9] (-17) Round 3 proposer: C + [ 16, 8, -7] (+17) +A [ -1, 8, -7] (-17) Round 4 proposer: A + [ 9, 13, -5] (+17) +B [ 9, -4, -5] (-17) Round 5 proposer: B + [ 19, 1, -3] (+17) +A [ 2, 1, -3] (-17) Round 6 proposer: A + ........... ... +For a node, once consensus has been reached at some round R, +the moment the node sees +2/3 in votes for a proposal is when +the consensus rounds for the *next* height h+1 begins. -The right thing to do is to stop participating, because you have no idea what is going on, and prompt the administrator to upgrade the daemon. (Now this could be a security issue if not handled properly, so in the future we should think about upgrade security best practices). Yet say you don't, and you continue to sign blocks without really participating in the consensus rounds -- maybe voting nil each time and then signing whatever is decided on. Well for one, you've lost all ability to validate any blocks. It's a problem because if there are too many of these zombies, the network might accidentally commit a bad block -- in effect, crashing the network. So, the network wants to weed the zombies out. +Round R+1 in the consensus rounds at height h+1 is the same as +round R in the consensus rounds at height h (the parent block). +We omit details of dealing with membership changes. +``` -It's hard catching the zombie. It can mimick whatever other validator is doing, perhaps mimicking the first one to vote during the rounds and waiting just a little longer for the final block vote. Based on my extensive experience with zombie flicks, it appears that the best way to identify a zombie is to make it perform some action that only non-zombies would understand. That's easy! Just make each version of the protocol have a special algorithm that selects a small but sufficiently large fraction of the validator population at each block, and make them perform an action (intuitively, make them raise their hadns). Eventually, it'll become the zombie's turn to do something but it won't know what to do. Or it will act out of turn. Gotcha. +## Zombie Validators + +The most likely scenario may be during an upgrade. + +We'll see some validators that fail to upgrade while most have. Then, some updated validator will propose a block that appears invalid to the outdated validators. What is the outdated validator to do? + +The right thing to do is to stop participating, because you have no idea what is going on, and prompt the administrator to upgrade the daemon. (Now this could be a security issue if not handled properly, so in the future we should think about upgrade security best practices). Yet say you don't, and you continue to sign blocks without really participating in the consensus rounds -- maybe voting nil each time and then signing whatever is decided on. Well for one, you've lost all ability to validate any blocks. It's a problem because if there are too many of these zombies, the network might accidentally commit a bad block -- in effect, crashing the network. So, the network wants to weed the zombies out. + +It's hard catching the zombie. It can mimick whatever other validator is doing, perhaps mimicking the first one to vote during the rounds and waiting just a little longer for the final block vote. Based on my extensive experience with zombie flicks, it appears that the best way to identify a zombie is to make it perform some action that only non-zombies would understand. That's easy! Just make each version of the protocol have a special algorithm that selects a small but sufficiently large fraction of the validator population at each block, and make them perform an action (intuitively, make them raise their hadns). Eventually, it'll become the zombie's turn to do something but it won't know what to do. Or it will act out of turn. Gotcha. The algorithm could even depend on state data, such that validators are required to keep it updated, which is a hair away from full validation. I suspect that there are more complete ways to enforce validation, but such measures may not be necessary in practice. diff --git a/consensus/consensus.go b/consensus/consensus.go index cc4b08830..8cbc43ddb 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -10,11 +10,11 @@ import ( "sync/atomic" "time" - . "github.com/tendermint/tendermint/accounts" . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/blocks" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/p2p" + . "github.com/tendermint/tendermint/state" ) const ( @@ -89,31 +89,44 @@ type ConsensusManager struct { started uint32 stopped uint32 - csc *ConsensusStateControl - blockStore *BlockStore - accountStore *AccountStore - mtx sync.Mutex - peerStates map[string]*PeerState - doActionCh chan RoundAction + cs *ConsensusState + blockStore *BlockStore + doActionCh chan RoundAction + + mtx sync.Mutex + state *State + privValidator *PrivValidator + peerStates map[string]*PeerState + stagedProposal *BlockPartSet + stagedState *State } -func NewConsensusManager(sw *p2p.Switch, csc *ConsensusStateControl, blockStore *BlockStore, accountStore *AccountStore) *ConsensusManager { +func NewConsensusManager(sw *p2p.Switch, state *State, blockStore *BlockStore) *ConsensusManager { swEvents := make(chan interface{}) sw.AddEventListener("ConsensusManager.swEvents", swEvents) - csc.Update(blockStore) // Update csc with new blocks. + cs := NewConsensusState(state) cm := &ConsensusManager{ - sw: sw, - swEvents: swEvents, - quit: make(chan struct{}), - csc: csc, - blockStore: blockStore, - accountStore: accountStore, - peerStates: make(map[string]*PeerState), - doActionCh: make(chan RoundAction, 1), + sw: sw, + swEvents: swEvents, + quit: make(chan struct{}), + + cs: cs, + blockStore: blockStore, + doActionCh: make(chan RoundAction, 1), + + state: state, + peerStates: make(map[string]*PeerState), } return cm } +// Sets our private validator account for signing votes. +func (cm *ConsensusManager) SetPrivValidator(priv *PrivValidator) { + cm.mtx.Lock() + defer cm.mtx.Unlock() + cm.privValidator = priv +} + func (cm *ConsensusManager) Start() { if atomic.CompareAndSwapUint32(&cm.started, 0, 1) { log.Info("Starting ConsensusManager") @@ -166,7 +179,7 @@ func (cm *ConsensusManager) switchEventsRoutine() { // Like, how large is it and how often can we send it? func (cm *ConsensusManager) makeKnownBlockPartsMessage() *KnownBlockPartsMessage { - rs := cm.csc.RoundState() + rs := cm.cs.RoundState() return &KnownBlockPartsMessage{ Height: rs.Height, SecondsSinceStartTime: uint32(time.Now().Sub(rs.StartTime).Seconds()), @@ -188,7 +201,7 @@ func (cm *ConsensusManager) gossipProposalRoutine() { OUTER_LOOP: for { // Get round state - rs := cm.csc.RoundState() + rs := cm.cs.RoundState() // Receive incoming message on ProposalCh inMsg, ok := cm.sw.Receive(ProposalCh) @@ -282,9 +295,8 @@ OUTER_LOOP: // Signs a vote document and broadcasts it. // hash can be nil to vote "nil" func (cm *ConsensusManager) signAndVote(vote *Vote) error { - privValidator := cm.csc.PrivValidator() - if privValidator != nil { - err := privValidator.SignVote(vote) + if cm.privValidator != nil { + err := cm.privValidator.SignVote(vote) if err != nil { return err } @@ -294,15 +306,43 @@ func (cm *ConsensusManager) signAndVote(vote *Vote) error { return nil } -func (cm *ConsensusManager) isProposalValid(rs *RoundState) bool { - if !rs.BlockPartSet.IsComplete() { - return false +func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error { + // Already staged? + cm.mtx.Lock() + if cm.stagedProposal == proposal { + cm.mtx.Unlock() + return nil + } else { + cm.mtx.Unlock() + } + + // Basic validation + if !proposal.IsComplete() { + return errors.New("Incomplete proposal BlockPartSet") + } + block, blockParts := blockPartSet.Block(), blockPartSet.BlockParts() + err := block.ValidateBasic() + if err != nil { + return err } - err := cm.stageBlock(rs.BlockPartSet) + + // Create a copy of the state for staging + cm.mtx.Lock() + stateCopy := cm.state.Copy() // Deep copy the state before staging. + cm.mtx.Unlock() + + // Commit block onto the copied state. + err := stateCopy.CommitBlock(block, block.Header.Time) // NOTE: fake commit time. if err != nil { - return false + return err } - return true + + // Looks good! + cm.mtx.Lock() + cm.stagedProposal = proposal + cm.stagedState = state + cm.mtx.Unlock() + return nil } func (cm *ConsensusManager) constructProposal(rs *RoundState) (*Block, error) { @@ -315,7 +355,7 @@ func (cm *ConsensusManager) constructProposal(rs *RoundState) (*Block, error) { // We may not have received a full proposal. func (cm *ConsensusManager) voteProposal(rs *RoundState) error { // If we're locked, must vote that. - locked := cm.csc.LockedProposal() + locked := cm.cs.LockedProposal() if locked != nil { block := locked.Block() err := cm.signAndVote(&Vote{ @@ -326,9 +366,10 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error { }) return err } - // If proposal is invalid - if !cm.isProposalValid(rs) { - // Vote for nil. + // Stage proposal + err := cm.stageProposal(rs.BlockPartSet) + if err != nil { + // Vote for nil, whatever the error. err := cm.signAndVote(&Vote{ Height: rs.Height, Round: rs.Round, @@ -361,13 +402,13 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error { // If proposal is invalid or unknown, do nothing. // See note on ZombieValidators to see why. - if !cm.isProposalValid(rs) { + if cm.stageProposal(rs.BlockPartSet) != nil { return nil } // Lock this proposal. // NOTE: we're unlocking any prior locks. - cm.csc.LockProposal(rs.BlockPartSet) + cm.cs.LockProposal(rs.BlockPartSet) // Send precommit vote. err := cm.signAndVote(&Vote{ @@ -395,11 +436,11 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { // do not commit. // TODO If we were just late to receive the block, when // do we actually get it? Document it. - if !cm.isProposalValid(rs) { + if cm.stageProposal(rs.BlockPartSet) != nil { return nil } // TODO: Remove? - cm.csc.LockProposal(rs.BlockPartSet) + cm.cs.LockProposal(rs.BlockPartSet) // Vote commit. err := cm.signAndVote(&Vote{ Height: rs.Height, @@ -416,11 +457,11 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { // time differences between nodes, so nodes end up drifting // in time. commitTime := time.Now() - cm.commitBlock(rs.BlockPartSet, commitTime) + cm.commitProposal(rs.BlockPartSet, commitTime) return nil } else { // Otherwise, if a 1/3 majority if a block that isn't our locked one exists, unlock. - locked := cm.csc.LockedProposal() + locked := cm.cs.LockedProposal() if locked != nil { for _, hashOrNil := range rs.RoundPrecommits.OneThirdMajority() { if hashOrNil == nil { @@ -429,7 +470,7 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { hash := hashOrNil.([]byte) if !bytes.Equal(hash, locked.Block().Hash()) { // Unlock our lock. - cm.csc.LockProposal(nil) + cm.cs.LockProposal(nil) } } } @@ -437,52 +478,27 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { } } -// After stageBlock(), a call to commitBlock() with the same arguments must succeed. -func (cm *ConsensusManager) stageBlock(blockPartSet *BlockPartSet) error { +func (cm *ConsensusManager) commitProposal(blockPartSet *BlockPartSet, commitTime time.Time) error { cm.mtx.Lock() defer cm.mtx.Unlock() - block, blockParts := blockPartSet.Block(), blockPartSet.BlockParts() - - err := block.ValidateBasic() - if err != nil { - return err - } - err = cm.blockStore.StageBlockAndParts(block, blockParts) - if err != nil { - return err + if cm.stagedProposal != blockPartSet { + panic("Unexpected stagedProposal.") // Shouldn't happen. } - err = cm.csc.StageBlock(block) - if err != nil { - return err - } - err = cm.accountStore.StageBlock(block) - if err != nil { - return err - } - // NOTE: more stores may be added here for validation. - return nil -} - -// after stageBlock(), a call to commitBlock() with the same arguments must succeed. -func (cm *ConsensusManager) commitBlock(blockPartSet *BlockPartSet, commitTime time.Time) error { - cm.mtx.Lock() - defer cm.mtx.Unlock() + // Save to blockStore block, blockParts := blockPartSet.Block(), blockPartSet.BlockParts() - err := cm.blockStore.SaveBlockParts(block.Height, blockParts) if err != nil { return err } - err = cm.csc.CommitBlock(block, commitTime) - if err != nil { - return err - } - err = cm.accountStore.CommitBlock(block) - if err != nil { - return err - } + + // What was staged becomes committed. + cm.state = cm.stagedState + cm.cs.Update(cm.state) + cm.stagedProposal = nil + cm.stagedState = nil + return nil } @@ -490,7 +506,7 @@ func (cm *ConsensusManager) gossipVoteRoutine() { OUTER_LOOP: for { // Get round state - rs := cm.csc.RoundState() + rs := cm.cs.RoundState() // Receive incoming message on VoteCh inMsg, ok := cm.sw.Receive(VoteCh) @@ -569,7 +585,7 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { // Figure out which height/round/step we're at, // then schedule an action for when it is due. - rs := cm.csc.RoundState() + rs := cm.cs.RoundState() _, _, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime) switch rs.Step() { case RoundStepStart: @@ -607,15 +623,14 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { // We only consider transitioning to given step. step := roundAction.XnToStep // This is the current state. - rs := cm.csc.RoundState() + rs := cm.cs.RoundState() if height != rs.Height || round != rs.Round { return // Not relevant. } if step == RoundStepProposal && rs.Step() == RoundStepStart { // Propose a block if I am the proposer. - privValidator := cm.csc.PrivValidator() - if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id { + if cm.privValidator != nil && rs.Proposer.Account.Id == cm.privValidator.Id { block, err := cm.constructProposal(rs) if err != nil { log.Error("Error attempting to construct a proposal: %v", err) @@ -644,7 +659,7 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { } // Round is over. This is a special case. // Prepare a new RoundState for the next state. - cm.csc.SetupRound(rs.Round + 1) + cm.cs.SetupRound(rs.Round + 1) return // setAlarm() takes care of the rest. } else { return // Action is not relevant. diff --git a/consensus/state.go b/consensus/state.go index 6cb4db17f..f87464973 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -5,288 +5,113 @@ import ( "sync" "time" - . "github.com/tendermint/tendermint/accounts" . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/blocks" . "github.com/tendermint/tendermint/common" - db_ "github.com/tendermint/tendermint/db" + . "github.com/tendermint/tendermint/state" ) var ( consensusStateKey = []byte("consensusState") ) -/* -Determining the order of proposers at height h: - - A B C All validators A, B, and C - [+10, +5, +2] (+17) Voting power - - [ 0, 0, 0] Genesis? - [ 10, 5, 2] (+17) -A [ -7, 5, 2] (-17) Round 0 proposer: A - [ 3, 10, 4] (+17) -B [ 3, -7, 4] (-17) Round 1 proposer: B - [ 13, -2, 6] (+17) -A [ -4, -2, 6] (-17) Round 2 proposer: A - [ 6, 3, 8] (+17) -C [ 6, 3, -9] (-17) Round 3 proposer: C - [ 16, 8, -7] (+17) -A [ -1, 8, -7] (-17) Round 4 proposer: A - [ 9, 13, -5] (+17) -B [ 9, -4, -5] (-17) Round 5 proposer: B - [ 19, 1, -3] (+17) -A [ 2, 1, -3] (-17) Round 6 proposer: A - ........... ... - -For a node, once consensus has been reached at some round R, -the moment the node sees +2/3 in votes for a proposal is when -the consensus rounds for the *next* height h+1 begins. - -Round R+1 in the consensus rounds at height h+1 is the same as -round R in the consensus rounds at height h (the parent block). - -We omit details of dealing with membership changes. -*/ - -func getProposer(validators map[uint64]*Validator) (proposer *Validator) { - highestAccum := int64(0) - for _, validator := range validators { - if validator.Accum > highestAccum { - highestAccum = validator.Accum - proposer = validator - } else if validator.Accum == highestAccum { - if validator.Id < proposer.Id { // Seniority - proposer = validator - } - } - } - return -} - -func incrementAccum(validators map[uint64]*Validator) { - totalDelta := int64(0) - for _, validator := range validators { - validator.Accum += int64(validator.VotingPower) - totalDelta += int64(validator.VotingPower) - } - proposer := getProposer(validators) - proposer.Accum -= totalDelta - // NOTE: sum(validators) here should be zero. - if true { - totalAccum := int64(0) - for _, validator := range validators { - totalAccum += validator.Accum - } - if totalAccum != 0 { - Panicf("Total Accum of validators did not equal 0. Got: ", totalAccum) - } - } -} - -// Creates a deep copy of validators. -// Caller can then modify the resulting validators' .Accum field without -// modifying the original *Validator's. -func copyValidators(validators map[uint64]*Validator) map[uint64]*Validator { - mapCopy := map[uint64]*Validator{} - for _, val := range validators { - mapCopy[val.Id] = val.Copy() - } - return mapCopy -} - -//----------------------------------------------------------------------------- - -// Handles consensus state tracking across block heights. -// NOTE: When adding more fields, also reset it in Load() and CommitBlock() -type ConsensusStateControl struct { +// Tracks consensus state across block heights and rounds. +type ConsensusState struct { mtx sync.Mutex - db db_.Db // Where we store the validators list & other data. - validatorsR0 map[uint64]*Validator // A copy of the validators at round 0 - privValidator *PrivValidator // PrivValidator used to participate, if present. - accountStore *AccountStore // Account storage height uint32 // Height we are working on. + validatorsR0 map[uint64]*Validator // A copy of the validators at round 0 lockedProposal *BlockPartSet // A BlockPartSet of the locked proposal. startTime time.Time // Start of round 0 for this height. - roundState *RoundState // The RoundState object for the current round. commits *VoteSet // Commits for this height. + roundState *RoundState // The RoundState object for the current round. } -func NewConsensusStateControl(db db_.Db, accountStore *AccountStore) *ConsensusStateControl { - csc := &ConsensusStateControl{ - db: db, - accountStore: accountStore, - } - csc.Load() - return csc -} - -// Load the current state from db. -func (csc *ConsensusStateControl) Load() { - csc.mtx.Lock() - defer csc.mtx.Unlock() - buf := csc.db.Get(consensusStateKey) - if len(buf) == 0 { - height := uint32(0) - validators := make(map[uint64]*Validator) // XXX BOOTSTRAP - startTime := time.Now() // XXX BOOTSTRAP - csc.setupHeight(height, validators, startTime) - } else { - reader := bytes.NewReader(buf) - height := Readuint32(reader) - validators := make(map[uint64]*Validator) - startTime := ReadTime(reader) - for reader.Len() > 0 { - validator := ReadValidator(reader) - validators[validator.Id] = validator - } - csc.setupHeight(height, validators, startTime.Time) - } -} - -// Save the current state onto db. -// Doesn't save the round state, just initial state at round 0. -func (csc *ConsensusStateControl) Save() { - csc.mtx.Lock() - defer csc.mtx.Unlock() - var buf bytes.Buffer - UInt32(csc.height).WriteTo(&buf) - Time{csc.startTime}.WriteTo(&buf) - for _, validator := range csc.validatorsR0 { - validator.WriteTo(&buf) - } - csc.db.Set(consensusStateKey, buf.Bytes()) -} - -// Finds more blocks from blockStore and commits them. -func (csc *ConsensusStateControl) Update(blockStore *BlockStore) { - csc.mtx.Lock() - defer csc.mtx.Unlock() - for h := csc.height + 1; h <= blockStore.Height(); h++ { - block := blockStore.LoadBlock(h) - // TODO: would be better to be able to override - // the block commit time, but in the meantime, - // just use the block time as proposed by the proposer. - csc.CommitBlock(block, block.Header.Time.Time) - } -} - -func (csc *ConsensusStateControl) PrivValidator() *PrivValidator { - csc.mtx.Lock() - defer csc.mtx.Unlock() - return csc.privValidator -} - -func (csc *ConsensusStateControl) SetPrivValidator(privValidator *PrivValidator) error { - csc.mtx.Lock() - defer csc.mtx.Unlock() - if csc.privValidator != nil { - panic("ConsensusStateControl privValidator already set.") - } - csc.privValidator = privValidator - return nil -} - -// Set blockPartSet to nil to unlock. -func (csc *ConsensusStateControl) LockProposal(blockPartSet *BlockPartSet) { - csc.mtx.Lock() - defer csc.mtx.Unlock() - csc.lockedProposal = blockPartSet +func NewConsensusState(state *State) *ConsensusState { + cs := &ConsensusState{} + cs.Update(state) + return cs } -func (csc *ConsensusStateControl) LockedProposal() *BlockPartSet { - csc.mtx.Lock() - defer csc.mtx.Unlock() - return csc.lockedProposal +func (cs *ConsensusState) LockProposal(blockPartSet *BlockPartSet) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.lockedProposal = blockPartSet } -func (csc *ConsensusStateControl) StageBlock(block *Block) error { - // XXX implement staging. - return nil +func (cs *ConsensusState) UnlockProposal() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.lockedProposal = nil } -// NOTE: assumes that block is valid. -// NOTE: the block should be saved on the BlockStore before commiting here. -// commitTime is usually set to the system clock time (time.Now()). -func (csc *ConsensusStateControl) CommitBlock(block *Block, commitTime time.Time) error { - csc.mtx.Lock() - defer csc.mtx.Unlock() - // Ensure that block is the next block needed. - if block.Height != csc.height { - return Errorf("Cannot commit block %v to csc. Expected height %v", block, csc.height+1) - } - // Update validator. - validators := copyValidators(csc.validatorsR0) - incrementAccum(validators) - // TODO if there are new validators in the block, add them. - - // XXX: it's not commitTime we want... - csc.setupHeight(block.Height+1, validators, commitTime) - - // Save the state. - csc.Save() - - return nil +func (cs *ConsensusState) LockedProposal() *BlockPartSet { + cs.mtx.Lock() + defer cs.mtx.Unlock() + return cs.lockedProposal } -func (csc *ConsensusStateControl) RoundState() *RoundState { - csc.mtx.Lock() - defer csc.mtx.Unlock() - return csc.roundState +func (cs *ConsensusState) RoundState() *RoundState { + cs.mtx.Lock() + defer cs.mtx.Unlock() + return cs.roundState } -func (csc *ConsensusStateControl) setupHeight(height uint32, validators map[uint64]*Validator, startTime time.Time) { +func (cs *ConsensusState) Update(state *State) { + cs.mtx.Lock() + defer cs.mtx.Unlock() - if height > 0 && height != csc.height+1 { - panic("setupHeight() cannot skip heights") + // Sanity check state. + stateHeight := state.Height() + if stateHeight > 0 && stateHeight != cs.height+1 { + Panicf("Update() expected state height of %v but found %v", cs.height+1, stateHeight) } - // Reset the state for the next height. - csc.validatorsR0 = validators - csc.height = height - csc.lockedProposal = nil - csc.startTime = startTime - csc.commits = NewVoteSet(height, 0, VoteTypeCommit, validators) + // Reset fields based on state. + cs.height = stateHeight + cs.validatorsR0 = state.Validators().Copy() // NOTE: immutable. + cs.lockedProposal = nil + cs.startTime = state.commitTime // XXX is this what we want? + cs.commits = NewVoteSet(stateHeight, 0, VoteTypeCommit, cs.validatorsR0) // Setup the roundState - csc.roundState = nil - csc.setupRound(0) + cs.roundState = nil + cs.setupRound(0) } -// If csc.roundSTate isn't at round, set up new roundState at round. -func (csc *ConsensusStateControl) SetupRound(round uint16) { - csc.mtx.Lock() - defer csc.mtx.Unlock() - if csc.roundState != nil && csc.roundState.Round >= round { +// If cs.roundSTate isn't at round, set up new roundState at round. +func (cs *ConsensusState) SetupRound(round uint16) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + if cs.roundState != nil && cs.roundState.Round >= round { return } - csc.setupRound(round) + cs.setupRound(round) } // Initialize roundState for given round. // Involves incrementing validators for each past rand. -func (csc *ConsensusStateControl) setupRound(round uint16) { +func (cs *ConsensusState) setupRound(round uint16) { // Increment validator accums as necessary. - // We need to start with csc.validatorsR0 or csc.roundState.Validators + // We need to start with cs.validatorsR0 or cs.roundState.Validators var validators map[uint64]*Validator = nil var validatorsRound uint16 - if csc.roundState == nil { + if cs.roundState == nil { // We have no roundState so we start from validatorsR0 at round 0. - validators = copyValidators(csc.validatorsR0) + validators = copyValidators(cs.validatorsR0) validatorsRound = 0 } else { // We have a previous roundState so we start from that. - validators = copyValidators(csc.roundState.Validators) - validatorsRound = csc.roundState.Round + validators = copyValidators(cs.roundState.Validators) + validatorsRound = cs.roundState.Round } // Increment all the way to round. for r := validatorsRound; r < round; r++ { incrementAccum(validators) } - roundState := NewRoundState(csc.height, round, csc.startTime, validators, csc.commits) - csc.roundState = roundState + roundState := NewRoundState(cs.height, round, cs.startTime, validators, cs.commits) + cs.roundState = roundState } //----------------------------------------------------------------------------- diff --git a/consensus/validator.go b/consensus/validator.go deleted file mode 100644 index 499119495..000000000 --- a/consensus/validator.go +++ /dev/null @@ -1,66 +0,0 @@ -package consensus - -import ( - "io" - - . "github.com/tendermint/tendermint/binary" - . "github.com/tendermint/tendermint/blocks" - //. "github.com/tendermint/tendermint/common" - db_ "github.com/tendermint/tendermint/db" -) - -// Holds state for a Validator at a given height+round. -// Meant to be discarded every round of the consensus protocol. -type Validator struct { - Account - BondHeight uint32 - VotingPower uint64 - Accum int64 -} - -// Used to persist the state of ConsensusStateControl. -func ReadValidator(r io.Reader) *Validator { - return &Validator{ - Account: Account{ - Id: Readuint64(r), - PubKey: ReadByteSlice(r), - }, - BondHeight: Readuint32(r), - VotingPower: Readuint64(r), - Accum: Readint64(r), - } -} - -// Creates a new copy of the validator so we can mutate accum. -func (v *Validator) Copy() *Validator { - return &Validator{ - Account: v.Account, - BondHeight: v.BondHeight, - VotingPower: v.VotingPower, - Accum: v.Accum, - } -} - -// Used to persist the state of ConsensusStateControl. -func (v *Validator) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(UInt64(v.Id), w, n, err) - n, err = WriteTo(v.PubKey, w, n, err) - n, err = WriteTo(UInt32(v.BondHeight), w, n, err) - n, err = WriteTo(UInt64(v.VotingPower), w, n, err) - n, err = WriteTo(Int64(v.Accum), w, n, err) - return -} - -//----------------------------------------------------------------------------- - -// TODO: Ensure that double signing never happens via an external persistent check. -type PrivValidator struct { - PrivAccount - db *db_.LevelDB -} - -// Modifies the vote object in memory. -// Double signing results in an error. -func (pv *PrivValidator) SignVote(vote *Vote) error { - return nil -} diff --git a/p2p/connection.go b/p2p/connection.go index 609c2a724..dcf230eeb 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -30,7 +30,7 @@ const ( /* A MConnection wraps a network connection and handles buffering and multiplexing. Binary messages are sent with ".Send(channelId, msg)". -Inbound ByteSlices are pushed to the designated chan<- InboundBytes. +Inbound byteslices are pushed to the designated chan<- InboundBytes. */ type MConnection struct { conn net.Conn @@ -217,6 +217,7 @@ func (c *MConnection) sendRoutine() { FOR_LOOP: for { + var n int64 var err error select { case <-c.flushTimer.Ch: @@ -228,13 +229,11 @@ FOR_LOOP: channel.updateStats() } case <-c.pingTimer.Ch: - var n int64 - n, err = packetTypePing.WriteTo(c.bufWriter) + WriteByte(c.bufWriter, packetTypePing, &n, &err) c.sendMonitor.Update(int(n)) c.flush() case <-c.pong: - var n int64 - n, err = packetTypePong.WriteTo(c.bufWriter) + WriteByte(c.bufWriter, packetTypePong, &n, &err) c.sendMonitor.Update(int(n)) c.flush() case <-c.quit: @@ -331,7 +330,9 @@ FOR_LOOP: c.recvMonitor.Limit(maxPacketSize, atomic.LoadInt64(&c.recvRate), true) // Read packet type - pktType, n, err := ReadUInt8Safe(c.bufReader) + var n int64 + var err error + pktType := ReadByte(c.bufReader, &n, &err) c.recvMonitor.Update(int(n)) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { @@ -409,10 +410,10 @@ type Channel struct { desc *ChannelDescriptor id byte recvQueue chan InboundBytes - sendQueue chan ByteSlice + sendQueue chan []byte sendQueueSize uint32 - recving ByteSlice - sending ByteSlice + recving []byte + sending []byte priority uint recentlySent int64 // exponential moving average } @@ -426,7 +427,7 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { desc: desc, id: desc.Id, recvQueue: desc.recvQueue, - sendQueue: make(chan ByteSlice, desc.SendQueueCapacity), + sendQueue: make(chan []byte, desc.SendQueueCapacity), recving: make([]byte, 0, desc.RecvBufferSize), priority: desc.DefaultPriority, } @@ -434,7 +435,7 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { // Queues message to send to this channel. // Goroutine-safe -func (ch *Channel) sendBytes(bytes ByteSlice) { +func (ch *Channel) sendBytes(bytes []byte) { ch.sendQueue <- bytes atomic.AddUint32(&ch.sendQueueSize, 1) } @@ -442,7 +443,7 @@ func (ch *Channel) sendBytes(bytes ByteSlice) { // Queues message to send to this channel. // Nonblocking, returns true if successful. // Goroutine-safe -func (ch *Channel) trySendBytes(bytes ByteSlice) bool { +func (ch *Channel) trySendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddUint32(&ch.sendQueueSize, 1) @@ -480,14 +481,14 @@ func (ch *Channel) sendPending() bool { // Not goroutine-safe func (ch *Channel) nextPacket() packet { packet := packet{} - packet.ChannelId = Byte(ch.id) + packet.ChannelId = byte(ch.id) packet.Bytes = ch.sending[:MinInt(maxPacketSize, len(ch.sending))] if len(ch.sending) <= maxPacketSize { - packet.EOF = Byte(0x01) + packet.EOF = byte(0x01) ch.sending = nil atomic.AddUint32(&ch.sendQueueSize, ^uint32(0)) // decrement sendQueueSize } else { - packet.EOF = Byte(0x00) + packet.EOF = byte(0x00) ch.sending = ch.sending[MinInt(maxPacketSize, len(ch.sending)):] } return packet @@ -497,8 +498,8 @@ func (ch *Channel) nextPacket() packet { // Not goroutine-safe func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) { packet := ch.nextPacket() - n, err = WriteTo(packetTypeMessage, w, n, err) - n, err = WriteTo(packet, w, n, err) + WriteByte(w, packetTypeMessage, &n, &err) + WriteBinary(w, packet, &n, &err) if err != nil { ch.recentlySent += n } @@ -509,7 +510,7 @@ func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) { // Not goroutine-safe func (ch *Channel) recvPacket(pkt packet) { ch.recving = append(ch.recving, pkt.Bytes...) - if pkt.EOF == Byte(0x01) { + if pkt.EOF == byte(0x01) { ch.recvQueue <- InboundBytes{ch.conn, ch.recving} ch.recving = make([]byte, 0, ch.desc.RecvBufferSize) } @@ -527,22 +528,22 @@ func (ch *Channel) updateStats() { const ( maxPacketSize = 1024 - packetTypePing = UInt8(0x00) - packetTypePong = UInt8(0x01) - packetTypeMessage = UInt8(0x10) + packetTypePing = byte(0x00) + packetTypePong = byte(0x01) + packetTypeMessage = byte(0x10) ) // Messages in channels are chopped into smaller packets for multiplexing. type packet struct { - ChannelId Byte - EOF Byte // 1 means message ends here. - Bytes ByteSlice + ChannelId byte + EOF byte // 1 means message ends here. + Bytes []byte } func (p packet) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(p.ChannelId, w, n, err) - n, err = WriteTo(p.EOF, w, n, err) - n, err = WriteTo(p.Bytes, w, n, err) + WriteByte(w, p.ChannelId, &n, &err) + WriteByte(w, p.EOF, &n, &err) + WriteByteSlice(w, p.Bytes, &n, &err) return } @@ -551,44 +552,32 @@ func (p packet) String() string { } func readPacketSafe(r io.Reader) (pkt packet, n int64, err error) { - chId, n_, err := ReadByteSafe(r) - n += n_ - if err != nil { - return - } - eof, n_, err := ReadByteSafe(r) - n += n_ - if err != nil { - return - } - // TODO: packet length sanity check. - bytes, n_, err := ReadByteSliceSafe(r) - n += n_ - if err != nil { - return - } - return packet{chId, eof, bytes}, n, nil + chId := ReadByte(r, &n, &err) + eof := ReadByte(r, &n, &err) + bytes := ReadByteSlice(r, &n, &err) + pkt = packet{chId, eof, bytes} + return } //----------------------------------------------------------------------------- type InboundBytes struct { MConn *MConnection - Bytes ByteSlice + Bytes []byte } //----------------------------------------------------------------------------- // Convenience struct for writing typed messages. -// Reading requires a custom decoder that switches on the first type byte of a ByteSlice. +// Reading requires a custom decoder that switches on the first type byte of a byteslice. type TypedMessage struct { - Type Byte + Type byte Msg Binary } func (tm TypedMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(tm.Type, w, n, err) - n, err = WriteTo(tm.Msg, w, n, err) + WriteByte(w, tm.Type, &n, &err) + WriteBinary(w, tm.Msg, &n, &err) return } @@ -596,6 +585,6 @@ func (tm TypedMessage) String() string { return fmt.Sprintf("<%X:%v>", tm.Type, tm.Msg) } -func (tm TypedMessage) Bytes() ByteSlice { +func (tm TypedMessage) Bytes() []byte { return BinaryBytes(tm) } diff --git a/p2p/listener.go b/p2p/listener.go index f26242f2e..0b79944c7 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -6,7 +6,6 @@ import ( "strconv" "sync/atomic" - . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/p2p/upnp" ) @@ -159,7 +158,7 @@ func getUPNPExternalAddress(externalPort, internalPort int) *NetAddress { } log.Debug("Got UPNP external address: %v", ext) - return NewNetAddressIPPort(ext, UInt16(externalPort)) + return NewNetAddressIPPort(ext, uint16(externalPort)) } // TODO: use syscalls: http://pastebin.com/9exZG4rh @@ -178,7 +177,7 @@ func getNaiveExternalAddress(port int) *NetAddress { if v4 == nil || v4[0] == 127 { continue } // loopback - return NewNetAddressIPPort(ipnet.IP, UInt16(port)) + return NewNetAddressIPPort(ipnet.IP, uint16(port)) } return nil } diff --git a/p2p/netaddress.go b/p2p/netaddress.go index a13220b7e..c3e3c6d7d 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -18,7 +18,7 @@ import ( type NetAddress struct { IP net.IP - Port UInt16 + Port uint16 str string } @@ -29,7 +29,7 @@ func NewNetAddress(addr net.Addr) *NetAddress { Panicf("Only TCPAddrs are supported. Got: %v", addr) } ip := tcpAddr.IP - port := UInt16(tcpAddr.Port) + port := uint16(tcpAddr.Port) return NewNetAddressIPPort(ip, port) } @@ -43,17 +43,17 @@ func NewNetAddressString(addr string) *NetAddress { if err != nil { panic(err) } - na := NewNetAddressIPPort(ip, UInt16(port)) + na := NewNetAddressIPPort(ip, uint16(port)) return na } -func ReadNetAddress(r io.Reader) *NetAddress { - ipBytes := ReadByteSlice(r) - port := ReadUInt16(r) +func ReadNetAddress(r io.Reader, n *int64, err *error) *NetAddress { + ipBytes := ReadByteSlice(r, n, err) + port := ReadUInt16(r, n, err) return NewNetAddressIPPort(net.IP(ipBytes), port) } -func NewNetAddressIPPort(ip net.IP, port UInt16) *NetAddress { +func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress { na := &NetAddress{ IP: ip, Port: port, @@ -66,8 +66,8 @@ func NewNetAddressIPPort(ip net.IP, port UInt16) *NetAddress { } func (na *NetAddress) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(ByteSlice(na.IP.To16()), w, n, err) - n, err = WriteTo(na.Port, w, n, err) + WriteByteSlice(w, na.IP.To16(), &n, &err) + WriteUInt16(w, na.Port, &n, &err) return } diff --git a/p2p/peer.go b/p2p/peer.go index 7ebb7f18f..c331c0c89 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -77,7 +77,8 @@ func (p *Peer) CanSend(chId byte) bool { } func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { - return String(p.Key).WriteTo(w) + WriteString(w, p.Key, &n, &err) + return } func (p *Peer) String() string { diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go index 1c2152b4f..0a88b0f50 100644 --- a/p2p/peer_manager.go +++ b/p2p/peer_manager.go @@ -210,19 +210,21 @@ func (pm *PeerManager) requestRoutine() { /* Messages */ const ( - msgTypeUnknown = Byte(0x00) - msgTypeRequest = Byte(0x01) - msgTypeAddrs = Byte(0x02) + msgTypeUnknown = byte(0x00) + msgTypeRequest = byte(0x01) + msgTypeAddrs = byte(0x02) ) // TODO: check for unnecessary extra bytes at the end. -func decodeMessage(bz ByteSlice) (msg interface{}) { +func decodeMessage(bz []byte) (msg interface{}) { + var n int64 + var err error // log.Debug("decoding msg bytes: %X", bz) - switch Byte(bz[0]) { + switch bz[0] { case msgTypeRequest: return &pexRequestMessage{} case msgTypeAddrs: - return readPexAddrsMessage(bytes.NewReader(bz[1:])) + return readPexAddrsMessage(bytes.NewReader(bz[1:]), &n, &err) default: return nil } @@ -235,7 +237,7 @@ type pexRequestMessage struct { } func (m *pexRequestMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(msgTypeRequest, w, n, err) + WriteByte(w, msgTypeRequest, &n, &err) return } @@ -250,11 +252,11 @@ type pexAddrsMessage struct { Addrs []*NetAddress } -func readPexAddrsMessage(r io.Reader) *pexAddrsMessage { - numAddrs := int(ReadUInt32(r)) +func readPexAddrsMessage(r io.Reader, n *int64, err *error) *pexAddrsMessage { + numAddrs := int(ReadUInt32(r, n, err)) addrs := []*NetAddress{} for i := 0; i < numAddrs; i++ { - addr := ReadNetAddress(r) + addr := ReadNetAddress(r, n, err) addrs = append(addrs, addr) } return &pexAddrsMessage{ @@ -263,10 +265,10 @@ func readPexAddrsMessage(r io.Reader) *pexAddrsMessage { } func (m *pexAddrsMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(msgTypeAddrs, w, n, err) - n, err = WriteTo(UInt32(len(m.Addrs)), w, n, err) + WriteByte(w, msgTypeAddrs, &n, &err) + WriteUInt32(w, uint32(len(m.Addrs)), &n, &err) for _, addr := range m.Addrs { - n, err = WriteTo(addr, w, n, err) + WriteBinary(w, addr, &n, &err) } return } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 508e7a4b4..612ca3126 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -1,13 +1,22 @@ package p2p import ( + "bytes" "encoding/hex" + "io" "testing" "time" . "github.com/tendermint/tendermint/binary" ) +type String string + +func (s String) WriteTo(w io.Writer) (n int64, err error) { + WriteString(w, string(s), &n, &err) + return +} + // convenience method for creating two switches connected to each other. func makeSwitchPair(t testing.TB, numChannels int, sendQueueCapacity int, recvBufferSize int, recvQueueCapacity int) (*Switch, *Switch, []*ChannelDescriptor) { @@ -86,10 +95,12 @@ func TestSwitches(t *testing.T) { // Receive message from channel 1 and check inMsg, ok := s2.Receive(byte(0x01)) + var n int64 + var err error if !ok { t.Errorf("Failed to receive from channel one") } - if ReadString(inMsg.Bytes.Reader()) != "channel one" { + if ReadString(bytes.NewBuffer(inMsg.Bytes), &n, &err) != "channel one" { t.Errorf("Unexpected received message bytes:\n%v", hex.Dump(inMsg.Bytes)) } @@ -98,7 +109,7 @@ func TestSwitches(t *testing.T) { if !ok { t.Errorf("Failed to receive from channel zero") } - if ReadString(inMsg.Bytes.Reader()) != "channel zero" { + if ReadString(bytes.NewBuffer(inMsg.Bytes), &n, &err) != "channel zero" { t.Errorf("Unexpected received message bytes:\n%v", hex.Dump(inMsg.Bytes)) } } diff --git a/state/state.go b/state/state.go new file mode 100644 index 000000000..13018c353 --- /dev/null +++ b/state/state.go @@ -0,0 +1,127 @@ +package state + +import ( + "bytes" + "sync" + "time" + + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/blocks" + db_ "github.com/tendermint/tendermint/db" + "github.com/tendermint/tendermint/merkle" +) + +var ( + stateKey = []byte("stateKey") +) + +type State struct { + mtx sync.Mutex + db db_.Db + height uint32 + commitTime time.Time + accounts merkle.Tree + validators *ValidatorSet +} + +func LoadState(db db_.Db) *State { + s := &State{} + buf := db.Get(stateKey) + if len(buf) == 0 { + s.height = uint32(0) + s.commitTime = time.Unix(0, 0) // XXX BOOTSTRAP + s.accounts = merkle.NewIAVLTree(db) // XXX BOOTSTRAP + s.validators = NewValidatorSet(nil) // XXX BOOTSTRAP + } else { + reader := bytes.NewReader(buf) + var n int64 + var err error + s.height = ReadUInt32(reader, &n, &err) + s.commitTime = ReadTime(reader, &n, &err) + accountsMerkleRoot := ReadByteSlice(reader, &n, &err) + s.accounts = merkle.NewIAVLTreeFromHash(db, accountsMerkleRoot) + s.validators = NewValidatorSet(nil) + for reader.Len() > 0 { + validator := ReadValidator(reader, &n, &err) + s.validators.Add(validator) + } + if err != nil { + panic(err) + } + } + return s +} + +func (s *State) Save() { + s.mtx.Lock() + defer s.mtx.Unlock() + s.accounts.Save() + var buf bytes.Buffer + var n int64 + var err error + WriteUInt32(&buf, s.height, &n, &err) + WriteTime(&buf, s.commitTime, &n, &err) + WriteByteSlice(&buf, s.accounts.Hash(), &n, &err) + for _, validator := range s.validators.Map() { + WriteBinary(&buf, validator, &n, &err) + } + if err != nil { + panic(err) + } + s.db.Set(stateKey, buf.Bytes()) +} + +func (s *State) Copy() *State { + s.mtx.Lock() + defer s.mtx.Unlock() + return &State{ + db: s.db, + height: s.height, + commitTime: s.commitTime, + accounts: s.accounts.Copy(), + validators: s.validators.Copy(), + } +} + +func (s *State) CommitTx(tx *Tx) error { + s.mtx.Lock() + defer s.mtx.Unlock() + // TODO commit the tx + panic("Implement CommitTx()") + return nil +} + +func (s *State) CommitBlock(b *Block, commitTime time.Time) error { + s.mtx.Lock() + defer s.mtx.Unlock() + // TODO commit the txs + // XXX also increment validator accum. + panic("Implement CommitBlock()") + return nil +} + +func (s *State) Height() uint32 { + s.mtx.Lock() + defer s.mtx.Unlock() + return s.height +} + +func (s *State) CommitTime() time.Time { + s.mtx.Lock() + defer s.mtx.Unlock() + return s.commitTime +} + +// The returned ValidatorSet gets mutated upon s.Commit*(). +func (s *State) Validators() *ValidatorSet { + s.mtx.Lock() + defer s.mtx.Unlock() + return s.validators +} + +func (s *State) Account(accountId uint64) *Account { + s.mtx.Lock() + defer s.mtx.Unlock() + // XXX: figure out a way to load an Account Binary type. + return s.accounts.Get(accountId) +} diff --git a/state/validator.go b/state/validator.go index 884b0836c..0d8900929 100644 --- a/state/validator.go +++ b/state/validator.go @@ -5,7 +5,7 @@ import ( . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/blocks" - //. "github.com/tendermint/tendermint/common" + . "github.com/tendermint/tendermint/common" db_ "github.com/tendermint/tendermint/db" ) @@ -20,15 +20,15 @@ type Validator struct { } // Used to persist the state of ConsensusStateControl. -func ReadValidator(r io.Reader) *Validator { +func ReadValidator(r io.Reader, n *int64, err *error) *Validator { return &Validator{ Account: Account{ - Id: Readuint64(r), - PubKey: ReadByteSlice(r), + Id: ReadUInt64(r, n, err), + PubKey: ReadByteSlice(r, n, err), }, - BondHeight: Readuint32(r), - VotingPower: Readuint64(r), - Accum: Readint64(r), + BondHeight: ReadUInt32(r, n, err), + VotingPower: ReadUInt64(r, n, err), + Accum: ReadInt64(r, n, err), } } @@ -44,11 +44,11 @@ func (v *Validator) Copy() *Validator { // Used to persist the state of ConsensusStateControl. func (v *Validator) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(UInt64(v.Id), w, n, err) - n, err = WriteTo(v.PubKey, w, n, err) - n, err = WriteTo(UInt32(v.BondHeight), w, n, err) - n, err = WriteTo(UInt64(v.VotingPower), w, n, err) - n, err = WriteTo(Int64(v.Accum), w, n, err) + WriteUInt64(w, v.Id, &n, &err) + WriteByteSlice(w, v.PubKey, &n, &err) + WriteUInt32(w, v.BondHeight, &n, &err) + WriteUInt64(w, v.VotingPower, &n, &err) + WriteInt64(w, v.Accum, &n, &err) return } @@ -78,7 +78,7 @@ func NewValidatorSet(validators map[uint64]*Validator) *ValidatorSet { validators = make(map[uint64]*Validator) } return &ValidatorSet{ - valdiators: validators, + validators: validators, } } @@ -104,7 +104,7 @@ func (v *ValidatorSet) IncrementAccum() { func (v *ValidatorSet) Copy() *ValidatorSet { mapCopy := map[uint64]*Validator{} - for _, val := range validators { + for _, val := range v.validators { mapCopy[val.Id] = val.Copy() } return &ValidatorSet{ @@ -112,12 +112,12 @@ func (v *ValidatorSet) Copy() *ValidatorSet { } } -func (v *ValidatorSet) Add(validator *Valdaitor) { +func (v *ValidatorSet) Add(validator *Validator) { v.validators[validator.Id] = validator } func (v *ValidatorSet) Get(id uint64) *Validator { - return v.validators[validator.Id] + return v.validators[id] } func (v *ValidatorSet) Map() map[uint64]*Validator { diff --git a/consensus/vote.go b/state/vote.go similarity index 92% rename from consensus/vote.go rename to state/vote.go index ae160d711..dd1bfec36 100644 --- a/consensus/vote.go +++ b/state/vote.go @@ -1,4 +1,4 @@ -package consensus +package state import ( "bytes" @@ -35,22 +35,22 @@ type Vote struct { Signature } -func ReadVote(r io.Reader) *Vote { +func ReadVote(r io.Reader, n *int64, err *error) *Vote { return &Vote{ - Height: Readuint32(r), - Round: Readuint16(r), - Type: Readbyte(r), - Hash: ReadByteSlice(r), - Signature: ReadSignature(r), + Height: ReadUInt32(r, n, err), + Round: ReadUInt16(r, n, err), + Type: ReadByte(r, n, err), + Hash: ReadByteSlice(r, n, err), + Signature: ReadSignature(r, n, err), } } func (v *Vote) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(UInt32(v.Height), w, n, err) - n, err = WriteTo(UInt16(v.Round), w, n, err) - n, err = WriteTo(Byte(v.Type), w, n, err) - n, err = WriteTo(ByteSlice(v.Hash), w, n, err) - n, err = WriteTo(v.Signature, w, n, err) + WriteUInt32(w, v.Height, &n, &err) + WriteUInt16(w, v.Round, &n, &err) + WriteByte(w, v.Type, &n, &err) + WriteByteSlice(w, v.Hash, &n, &err) + WriteBinary(w, v.Signature, &n, &err) return }