diff --git a/config/config.go b/config/config.go index 7d2613313..a9b68b90b 100644 --- a/config/config.go +++ b/config/config.go @@ -22,7 +22,7 @@ var Config Config_ func setFlags(printHelp *bool) { flag.BoolVar(printHelp, "help", false, "Print this help message.") flag.StringVar(&Config.LAddr, "laddr", Config.LAddr, "Listen address. (0.0.0.0:0 means any interface, any port)") - flag.StringVar(&Config.Seed, "seed", Config.Seed, "Address of seed node") + flag.StringVar(&Config.SeedNode, "seed", Config.SeedNode, "Address of seed node") } func ParseFlags() { @@ -67,9 +67,9 @@ func ParseFlags() { /* Default configuration */ var defaultConfig = Config_{ - Network: "tendermint_testnet0", - LAddr: "0.0.0.0:0", - Seed: "", + Network: "tendermint_testnet0", + LAddr: "0.0.0.0:0", + SeedNode: "", Db: DbConfig{ Type: "level", Dir: RootDir + "/data", @@ -80,11 +80,11 @@ var defaultConfig = Config_{ /* Configuration types */ type Config_ struct { - Network string - LAddr string - Seed string - Db DbConfig - Twilio TwilioConfig + Network string + LAddr string + SeedNode string + Db DbConfig + Twilio TwilioConfig } type TwilioConfig struct { @@ -107,8 +107,8 @@ func (cfg *Config_) validate() error { if cfg.LAddr == "" { cfg.LAddr = defaultConfig.LAddr } - if cfg.Seed == "" { - cfg.Seed = defaultConfig.Seed + if cfg.SeedNode == "" { + cfg.SeedNode = defaultConfig.SeedNode } if cfg.Db.Type == "" { return errors.New("Db.Type must be set") @@ -136,11 +136,3 @@ func (cfg *Config_) write(configFile string) { panic(err) } } - -/* TODO: generate priv/pub keys -func generateKeys() string { - bytes := &[30]byte{} - rand.Read(bytes[:]) - return hex.EncodeToString(bytes[:]) -} -*/ diff --git a/consensus/consensus.go b/consensus/reactor.go similarity index 98% rename from consensus/consensus.go rename to consensus/reactor.go index 17b5d8a79..d62f6f79b 100644 --- a/consensus/consensus.go +++ b/consensus/reactor.go @@ -98,6 +98,8 @@ type RoundAction struct { Action RoundActionType // Action to perform. } +//----------------------------------------------------------------------------- + type ConsensusReactor struct { sw *p2p.Switch quit chan struct{} @@ -108,10 +110,9 @@ type ConsensusReactor struct { doActionCh chan RoundAction } -func NewConsensusReactor(sw *p2p.Switch, blockStore *BlockStore, mempool *mempool.Mempool, state *state.State) *ConsensusReactor { +func NewConsensusReactor(blockStore *BlockStore, mempool *mempool.Mempool, state *state.State) *ConsensusReactor { conS := NewConsensusState(state, blockStore, mempool) conR := &ConsensusReactor{ - sw: sw, quit: make(chan struct{}), conS: conS, @@ -120,18 +121,16 @@ func NewConsensusReactor(sw *p2p.Switch, blockStore *BlockStore, mempool *mempoo return conR } -// Sets our private validator account for signing votes. -func (conR *ConsensusReactor) SetPrivValidator(priv *PrivValidator) { - conR.conS.SetPrivValidator(priv) -} - -func (conR *ConsensusReactor) Start() { +// Implements Reactor +func (conR *ConsensusReactor) Start(sw *p2p.Switch) { if atomic.CompareAndSwapUint32(&conR.started, 0, 1) { log.Info("Starting ConsensusReactor") + conR.sw = sw go conR.stepTransitionRoutine() } } +// Implements Reactor func (conR *ConsensusReactor) Stop() { if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) { log.Info("Stopping ConsensusReactor") @@ -139,10 +138,6 @@ func (conR *ConsensusReactor) Stop() { } } -func (conR *ConsensusReactor) IsStopped() bool { - return atomic.LoadUint32(&conR.stopped) == 1 -} - // Implements Reactor func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // TODO optimize @@ -280,6 +275,15 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte } } +// Sets our private validator account for signing votes. +func (conR *ConsensusReactor) SetPrivValidator(priv *PrivValidator) { + conR.conS.SetPrivValidator(priv) +} + +func (conR *ConsensusReactor) IsStopped() bool { + return atomic.LoadUint32(&conR.stopped) == 1 +} + //-------------------------------------- // Source of all round state transitions (and votes). diff --git a/main.go b/main.go index 9e3ffc8fc..3d74e925f 100644 --- a/main.go +++ b/main.go @@ -4,27 +4,55 @@ import ( "os" "os/signal" + "github.com/tendermint/tendermint/blocks" "github.com/tendermint/tendermint/config" - //"github.com/tendermint/tendermint/consensus" + "github.com/tendermint/tendermint/consensus" + db_ "github.com/tendermint/tendermint/db" + mempool_ "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" + state_ "github.com/tendermint/tendermint/state" ) type Node struct { - lz []p2p.Listener - sw *p2p.Switch - book *p2p.AddrBook - pexReactor *p2p.PEXReactor + lz []p2p.Listener + sw *p2p.Switch + book *p2p.AddrBook + pexReactor *p2p.PEXReactor + mempoolReactor *mempool_.MempoolReactor + consensusReactor *consensus.ConsensusReactor } func NewNode() *Node { - sw := p2p.NewSwitch(nil) // XXX create and pass reactors + // Get BlockStore + blockStoreDB := db_.NewMemDB() // TODO configurable db. + blockStore := blocks.NewBlockStore(blockStoreDB) + + // Get State + stateDB := db_.NewMemDB() // TODO configurable db. + state := state_.LoadState(stateDB) + if state == nil { + state = state_.GenesisStateFromFile(stateDB, config.RootDir+"/genesis.json") + } + + // Get PEXReactor book := p2p.NewAddrBook(config.RootDir + "/addrbook.json") - pexReactor := p2p.NewPEXReactor(sw, book) + pexReactor := p2p.NewPEXReactor(book) + + // Get MempoolReactor + mempool := mempool_.NewMempool(state) + mempoolReactor := mempool_.NewMempoolReactor(mempool) + + // Get ConsensusReactor + consensusReactor := consensus.NewConsensusReactor(blockStore, mempool, state) + + sw := p2p.NewSwitch([]p2p.Reactor{pexReactor, mempoolReactor, consensusReactor}) return &Node{ - sw: sw, - book: book, - pexReactor: pexReactor, + sw: sw, + book: book, + pexReactor: pexReactor, + mempoolReactor: mempoolReactor, + consensusReactor: consensusReactor, } } @@ -33,9 +61,8 @@ func (n *Node) Start() { for _, l := range n.lz { go n.inboundConnectionRoutine(l) } - n.sw.Start() n.book.Start() - n.pexReactor.Start() + n.sw.Start() } func (n *Node) Stop() { @@ -43,7 +70,6 @@ func (n *Node) Stop() { // TODO: gracefully disconnect from peers. n.sw.Stop() n.book.Stop() - n.pexReactor.Stop() } // Add a Listener to accept inbound peer connections. @@ -87,9 +113,9 @@ func main() { n.AddListener(l) n.Start() - // Seed? - if config.Config.Seed != "" { - peer, err := n.sw.DialPeerWithAddress(p2p.NewNetAddressString(config.Config.Seed)) + // If seedNode is provided by config, dial out. + if config.Config.SeedNode != "" { + peer, err := n.sw.DialPeerWithAddress(p2p.NewNetAddressString(config.Config.SeedNode)) if err != nil { log.Error("Error dialing seed: %v", err) //n.book.MarkAttempt(addr) diff --git a/mempool/reactor.go b/mempool/reactor.go index 22d3b7487..05d7278dc 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -25,21 +25,23 @@ type MempoolReactor struct { mempool *Mempool } -func NewMempoolReactor(sw *p2p.Switch, mempool *Mempool) *MempoolReactor { +func NewMempoolReactor(mempool *Mempool) *MempoolReactor { memR := &MempoolReactor{ - sw: sw, quit: make(chan struct{}), mempool: mempool, } return memR } -func (memR *MempoolReactor) Start() { +// Implements Reactor +func (memR *MempoolReactor) Start(sw *p2p.Switch) { if atomic.CompareAndSwapUint32(&memR.started, 0, 1) { + memR.sw = sw log.Info("Starting MempoolReactor") } } +// Implements Reactor func (memR *MempoolReactor) Stop() { if atomic.CompareAndSwapUint32(&memR.stopped, 0, 1) { log.Info("Stopping MempoolReactor") @@ -47,14 +49,14 @@ func (memR *MempoolReactor) Stop() { } } -func (memR *MempoolReactor) BroadcastTx(tx Tx) error { - err := memR.mempool.AddTx(tx) - if err != nil { - return err +// Implements Reactor +func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + &p2p.ChannelDescriptor{ + Id: MempoolCh, + Priority: 5, + }, } - msg := &TxMessage{Tx: tx} - memR.sw.Broadcast(MempoolCh, msg) - return nil } // Implements Reactor @@ -62,9 +64,10 @@ func (pexR *MempoolReactor) AddPeer(peer *p2p.Peer) { } // Implements Reactor -func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, err error) { +func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { } +// Implements Reactor func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { _, msg_ := decodeMessage(msgBytes) log.Info("MempoolReactor received %v", msg_) @@ -95,6 +98,16 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { } } +func (memR *MempoolReactor) BroadcastTx(tx Tx) error { + err := memR.mempool.AddTx(tx) + if err != nil { + return err + } + msg := &TxMessage{Tx: tx} + memR.sw.Broadcast(MempoolCh, msg) + return nil +} + //----------------------------------------------------------------------------- // Messages diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index e0c0713ba..85b94ffb7 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -34,22 +34,24 @@ type PEXReactor struct { book *AddrBook } -func NewPEXReactor(sw *Switch, book *AddrBook) *PEXReactor { +func NewPEXReactor(book *AddrBook) *PEXReactor { pexR := &PEXReactor{ - sw: sw, quit: make(chan struct{}), book: book, } return pexR } -func (pexR *PEXReactor) Start() { +// Implements Reactor +func (pexR *PEXReactor) Start(sw *Switch) { if atomic.CompareAndSwapUint32(&pexR.started, 0, 1) { log.Info("Starting PEXReactor") + pexR.sw = sw go pexR.ensurePeersRoutine() } } +// Implements Reactor func (pexR *PEXReactor) Stop() { if atomic.CompareAndSwapUint32(&pexR.stopped, 0, 1) { log.Info("Stopping PEXReactor") @@ -57,15 +59,6 @@ func (pexR *PEXReactor) Stop() { } } -// Asks peer for more addresses. -func (pexR *PEXReactor) RequestPEX(peer *Peer) { - peer.TrySend(PexCh, &pexRequestMessage{}) -} - -func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { - peer.Send(PexCh, &pexRddrsMessage{Addrs: addrs}) -} - // Implements Reactor func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { return []*ChannelDescriptor{ @@ -87,7 +80,7 @@ func (pexR *PEXReactor) AddPeer(peer *Peer) { } // Implements Reactor -func (pexR *PEXReactor) RemovePeer(peer *Peer, err error) { +func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) { // TODO } @@ -123,6 +116,15 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { } +// Asks peer for more addresses. +func (pexR *PEXReactor) RequestPEX(peer *Peer) { + peer.TrySend(PexCh, &pexRequestMessage{}) +} + +func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { + peer.Send(PexCh, &pexRddrsMessage{Addrs: addrs}) +} + // Ensures that sufficient peers are connected. (continuous) func (pexR *PEXReactor) ensurePeersRoutine() { // fire once immediately. diff --git a/p2p/switch.go b/p2p/switch.go index 32cbcc8be..57433a753 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -11,6 +11,8 @@ import ( ) type Reactor interface { + Start(sw *Switch) + Stop() GetChannels() []*ChannelDescriptor AddPeer(peer *Peer) RemovePeer(peer *Peer, reason interface{}) @@ -72,7 +74,7 @@ func NewSwitch(reactors []Reactor) *Switch { } } - s := &Switch{ + sw := &Switch{ reactors: reactors, chDescs: chDescs, reactorsByCh: reactorsByCh, @@ -83,41 +85,47 @@ func NewSwitch(reactors []Reactor) *Switch { stopped: 0, } - return s + return sw } -func (s *Switch) Start() { - if atomic.CompareAndSwapUint32(&s.started, 0, 1) { +func (sw *Switch) Start() { + if atomic.CompareAndSwapUint32(&sw.started, 0, 1) { log.Info("Starting switch") + for _, reactor := range sw.reactors { + reactor.Start(sw) + } } } -func (s *Switch) Stop() { - if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { +func (sw *Switch) Stop() { + if atomic.CompareAndSwapUint32(&sw.stopped, 0, 1) { log.Info("Stopping switch") - close(s.quit) - // stop each peer. - for _, peer := range s.peers.List() { + close(sw.quit) + // Stop each peer. + for _, peer := range sw.peers.List() { peer.stop() } - // empty tree. - s.peers = NewPeerSet() + sw.peers = NewPeerSet() + // Stop all reactors. + for _, reactor := range sw.reactors { + reactor.Stop() + } } } -func (s *Switch) Reactors() []Reactor { - return s.reactors +func (sw *Switch) Reactors() []Reactor { + return sw.reactors } -func (s *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { - if atomic.LoadUint32(&s.stopped) == 1 { +func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { + if atomic.LoadUint32(&sw.stopped) == 1 { return nil, ErrSwitchStopped } - peer := newPeer(conn, outbound, s.reactorsByCh, s.chDescs, s.StopPeerForError) + peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) // Add the peer to .peers - if s.peers.Add(peer) { + if sw.peers.Add(peer) { log.Info("+ %v", peer) } else { log.Info("Ignoring duplicate: %v", peer) @@ -128,42 +136,42 @@ func (s *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, err go peer.start() // Notify listeners. - s.doAddPeer(peer) + sw.doAddPeer(peer) return peer, nil } -func (s *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { - if atomic.LoadUint32(&s.stopped) == 1 { +func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { + if atomic.LoadUint32(&sw.stopped) == 1 { return nil, ErrSwitchStopped } log.Info("Dialing peer @ %v", addr) - s.dialing.Set(addr.String(), addr) + sw.dialing.Set(addr.String(), addr) conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second) - s.dialing.Delete(addr.String()) + sw.dialing.Delete(addr.String()) if err != nil { return nil, err } - peer, err := s.AddPeerWithConnection(conn, true) + peer, err := sw.AddPeerWithConnection(conn, true) if err != nil { return nil, err } return peer, nil } -func (s *Switch) IsDialing(addr *NetAddress) bool { - return s.dialing.Has(addr.String()) +func (sw *Switch) IsDialing(addr *NetAddress) bool { + return sw.dialing.Has(addr.String()) } // XXX: This is wrong, we can't just ignore failures on TrySend. -func (s *Switch) Broadcast(chId byte, msg Binary) (numSuccess, numFailure int) { - if atomic.LoadUint32(&s.stopped) == 1 { +func (sw *Switch) Broadcast(chId byte, msg Binary) (numSuccess, numFailure int) { + if atomic.LoadUint32(&sw.stopped) == 1 { return } log.Debug("Broadcast on [%X]", chId, msg) - for _, peer := range s.peers.List() { + for _, peer := range sw.peers.List() { success := peer.TrySend(chId, msg) log.Debug("Broadcast for peer %v success: %v", peer, success) if success { @@ -177,8 +185,8 @@ func (s *Switch) Broadcast(chId byte, msg Binary) (numSuccess, numFailure int) { } // Returns the count of outbound/inbound and outbound-dialing peers. -func (s *Switch) NumPeers() (outbound, inbound, dialing int) { - peers := s.peers.List() +func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { + peers := sw.peers.List() for _, peer := range peers { if peer.outbound { outbound++ @@ -186,44 +194,44 @@ func (s *Switch) NumPeers() (outbound, inbound, dialing int) { inbound++ } } - dialing = s.dialing.Size() + dialing = sw.dialing.Size() return } -func (s *Switch) Peers() IPeerSet { - return s.peers +func (sw *Switch) Peers() IPeerSet { + return sw.peers } // Disconnect from a peer due to external error. // TODO: make record depending on reason. -func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) { +func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { log.Info("- %v !! reason: %v", peer, reason) - s.peers.Remove(peer) + sw.peers.Remove(peer) peer.stop() // Notify listeners - s.doRemovePeer(peer, reason) + sw.doRemovePeer(peer, reason) } // Disconnect from a peer gracefully. // TODO: handle graceful disconnects. -func (s *Switch) StopPeerGracefully(peer *Peer) { +func (sw *Switch) StopPeerGracefully(peer *Peer) { log.Info("- %v", peer) - s.peers.Remove(peer) + sw.peers.Remove(peer) peer.stop() // Notify listeners - s.doRemovePeer(peer, nil) + sw.doRemovePeer(peer, nil) } -func (s *Switch) doAddPeer(peer *Peer) { - for _, reactor := range s.reactors { +func (sw *Switch) doAddPeer(peer *Peer) { + for _, reactor := range sw.reactors { reactor.AddPeer(peer) } } -func (s *Switch) doRemovePeer(peer *Peer, reason interface{}) { - for _, reactor := range s.reactors { +func (sw *Switch) doRemovePeer(peer *Peer, reason interface{}) { + for _, reactor := range sw.reactors { reactor.RemovePeer(peer, reason) } } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 33745a9de..0b9f00d3a 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -43,6 +43,12 @@ func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReacto } } +func (tr *TestReactor) Start(sw *Switch) { +} + +func (tr *TestReactor) Stop() { +} + func (tr *TestReactor) GetChannels() []*ChannelDescriptor { return tr.channels } diff --git a/state/genesis.go b/state/genesis.go new file mode 100644 index 000000000..fe6307fee --- /dev/null +++ b/state/genesis.go @@ -0,0 +1,71 @@ +package state + +import ( + "encoding/json" + "io/ioutil" + "time" + + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" + db_ "github.com/tendermint/tendermint/db" + "github.com/tendermint/tendermint/merkle" +) + +type GenesisDoc struct { + GenesisTime time.Time + AccountDetails []*AccountDetail +} + +func ReadGenesisDocJSON(jsonBlob []byte) (genState *GenesisDoc) { + err := json.Unmarshal(jsonBlob, &genState) + if err != nil { + Panicf("Couldn't read GenesisDoc: %v", err) + } + return +} + +func GenesisStateFromFile(db db_.DB, genDocFile string) *State { + jsonBlob, err := ioutil.ReadFile(genDocFile) + if err != nil { + Panicf("Couldn't read GenesisDoc file: %v", err) + } + genDoc := ReadGenesisDocJSON(jsonBlob) + return GenesisStateFromDoc(db, genDoc) +} + +func GenesisStateFromDoc(db db_.DB, genDoc *GenesisDoc) *State { + return GenesisState(db, genDoc.GenesisTime, genDoc.AccountDetails) +} + +func GenesisState(db db_.DB, genesisTime time.Time, accDets []*AccountDetail) *State { + + // TODO: Use "uint64Codec" instead of BasicCodec + accountDetails := merkle.NewIAVLTree(BasicCodec, AccountDetailCodec, defaultAccountDetailsCacheCapacity, db) + validators := []*Validator{} + + for _, accDet := range accDets { + accountDetails.Set(accDet.Id, accDet) + if accDet.Status == AccountStatusBonded { + validators = append(validators, &Validator{ + Account: accDet.Account, + BondHeight: 0, + VotingPower: accDet.Balance, + Accum: 0, + }) + } + } + + if len(validators) == 0 { + panic("Must have some validators") + } + + return &State{ + DB: db, + Height: 0, + BlockHash: nil, + BlockTime: genesisTime, + BondedValidators: NewValidatorSet(validators), + UnbondingValidators: NewValidatorSet(nil), + accountDetails: accountDetails, + } +} diff --git a/state/state.go b/state/state.go index 658a1b968..780f0f176 100644 --- a/state/state.go +++ b/state/state.go @@ -51,39 +51,6 @@ type State struct { accountDetails merkle.Tree // Shouldn't be accessed directly. } -func GenesisState(db db_.DB, genesisTime time.Time, accDets []*AccountDetail) *State { - - // TODO: Use "uint64Codec" instead of BasicCodec - accountDetails := merkle.NewIAVLTree(BasicCodec, AccountDetailCodec, defaultAccountDetailsCacheCapacity, db) - validators := []*Validator{} - - for _, accDet := range accDets { - accountDetails.Set(accDet.Id, accDet) - if accDet.Status == AccountStatusBonded { - validators = append(validators, &Validator{ - Account: accDet.Account, - BondHeight: 0, - VotingPower: accDet.Balance, - Accum: 0, - }) - } - } - - if len(validators) == 0 { - panic("Must have some validators") - } - - return &State{ - DB: db, - Height: 0, - BlockHash: nil, - BlockTime: genesisTime, - BondedValidators: NewValidatorSet(validators), - UnbondingValidators: NewValidatorSet(nil), - accountDetails: accountDetails, - } -} - func LoadState(db db_.DB) *State { s := &State{DB: db} buf := db.Get(stateKey)