Browse Source

basic main.go completion

pull/9/head
Jae Kwon 10 years ago
parent
commit
b615e51f95
9 changed files with 236 additions and 147 deletions
  1. +11
    -19
      config/config.go
  2. +16
    -12
      consensus/reactor.go
  3. +42
    -16
      main.go
  4. +24
    -11
      mempool/reactor.go
  5. +15
    -13
      p2p/pex_reactor.go
  6. +51
    -43
      p2p/switch.go
  7. +6
    -0
      p2p/switch_test.go
  8. +71
    -0
      state/genesis.go
  9. +0
    -33
      state/state.go

+ 11
- 19
config/config.go View File

@ -22,7 +22,7 @@ var Config Config_
func setFlags(printHelp *bool) {
flag.BoolVar(printHelp, "help", false, "Print this help message.")
flag.StringVar(&Config.LAddr, "laddr", Config.LAddr, "Listen address. (0.0.0.0:0 means any interface, any port)")
flag.StringVar(&Config.Seed, "seed", Config.Seed, "Address of seed node")
flag.StringVar(&Config.SeedNode, "seed", Config.SeedNode, "Address of seed node")
}
func ParseFlags() {
@ -67,9 +67,9 @@ func ParseFlags() {
/* Default configuration */
var defaultConfig = Config_{
Network: "tendermint_testnet0",
LAddr: "0.0.0.0:0",
Seed: "",
Network: "tendermint_testnet0",
LAddr: "0.0.0.0:0",
SeedNode: "",
Db: DbConfig{
Type: "level",
Dir: RootDir + "/data",
@ -80,11 +80,11 @@ var defaultConfig = Config_{
/* Configuration types */
type Config_ struct {
Network string
LAddr string
Seed string
Db DbConfig
Twilio TwilioConfig
Network string
LAddr string
SeedNode string
Db DbConfig
Twilio TwilioConfig
}
type TwilioConfig struct {
@ -107,8 +107,8 @@ func (cfg *Config_) validate() error {
if cfg.LAddr == "" {
cfg.LAddr = defaultConfig.LAddr
}
if cfg.Seed == "" {
cfg.Seed = defaultConfig.Seed
if cfg.SeedNode == "" {
cfg.SeedNode = defaultConfig.SeedNode
}
if cfg.Db.Type == "" {
return errors.New("Db.Type must be set")
@ -136,11 +136,3 @@ func (cfg *Config_) write(configFile string) {
panic(err)
}
}
/* TODO: generate priv/pub keys
func generateKeys() string {
bytes := &[30]byte{}
rand.Read(bytes[:])
return hex.EncodeToString(bytes[:])
}
*/

consensus/consensus.go → consensus/reactor.go View File


+ 42
- 16
main.go View File

@ -4,27 +4,55 @@ import (
"os"
"os/signal"
"github.com/tendermint/tendermint/blocks"
"github.com/tendermint/tendermint/config"
//"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/consensus"
db_ "github.com/tendermint/tendermint/db"
mempool_ "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
state_ "github.com/tendermint/tendermint/state"
)
type Node struct {
lz []p2p.Listener
sw *p2p.Switch
book *p2p.AddrBook
pexReactor *p2p.PEXReactor
lz []p2p.Listener
sw *p2p.Switch
book *p2p.AddrBook
pexReactor *p2p.PEXReactor
mempoolReactor *mempool_.MempoolReactor
consensusReactor *consensus.ConsensusReactor
}
func NewNode() *Node {
sw := p2p.NewSwitch(nil) // XXX create and pass reactors
// Get BlockStore
blockStoreDB := db_.NewMemDB() // TODO configurable db.
blockStore := blocks.NewBlockStore(blockStoreDB)
// Get State
stateDB := db_.NewMemDB() // TODO configurable db.
state := state_.LoadState(stateDB)
if state == nil {
state = state_.GenesisStateFromFile(stateDB, config.RootDir+"/genesis.json")
}
// Get PEXReactor
book := p2p.NewAddrBook(config.RootDir + "/addrbook.json")
pexReactor := p2p.NewPEXReactor(sw, book)
pexReactor := p2p.NewPEXReactor(book)
// Get MempoolReactor
mempool := mempool_.NewMempool(state)
mempoolReactor := mempool_.NewMempoolReactor(mempool)
// Get ConsensusReactor
consensusReactor := consensus.NewConsensusReactor(blockStore, mempool, state)
sw := p2p.NewSwitch([]p2p.Reactor{pexReactor, mempoolReactor, consensusReactor})
return &Node{
sw: sw,
book: book,
pexReactor: pexReactor,
sw: sw,
book: book,
pexReactor: pexReactor,
mempoolReactor: mempoolReactor,
consensusReactor: consensusReactor,
}
}
@ -33,9 +61,8 @@ func (n *Node) Start() {
for _, l := range n.lz {
go n.inboundConnectionRoutine(l)
}
n.sw.Start()
n.book.Start()
n.pexReactor.Start()
n.sw.Start()
}
func (n *Node) Stop() {
@ -43,7 +70,6 @@ func (n *Node) Stop() {
// TODO: gracefully disconnect from peers.
n.sw.Stop()
n.book.Stop()
n.pexReactor.Stop()
}
// Add a Listener to accept inbound peer connections.
@ -87,9 +113,9 @@ func main() {
n.AddListener(l)
n.Start()
// Seed?
if config.Config.Seed != "" {
peer, err := n.sw.DialPeerWithAddress(p2p.NewNetAddressString(config.Config.Seed))
// If seedNode is provided by config, dial out.
if config.Config.SeedNode != "" {
peer, err := n.sw.DialPeerWithAddress(p2p.NewNetAddressString(config.Config.SeedNode))
if err != nil {
log.Error("Error dialing seed: %v", err)
//n.book.MarkAttempt(addr)


+ 24
- 11
mempool/reactor.go View File

@ -25,21 +25,23 @@ type MempoolReactor struct {
mempool *Mempool
}
func NewMempoolReactor(sw *p2p.Switch, mempool *Mempool) *MempoolReactor {
func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
memR := &MempoolReactor{
sw: sw,
quit: make(chan struct{}),
mempool: mempool,
}
return memR
}
func (memR *MempoolReactor) Start() {
// Implements Reactor
func (memR *MempoolReactor) Start(sw *p2p.Switch) {
if atomic.CompareAndSwapUint32(&memR.started, 0, 1) {
memR.sw = sw
log.Info("Starting MempoolReactor")
}
}
// Implements Reactor
func (memR *MempoolReactor) Stop() {
if atomic.CompareAndSwapUint32(&memR.stopped, 0, 1) {
log.Info("Stopping MempoolReactor")
@ -47,14 +49,14 @@ func (memR *MempoolReactor) Stop() {
}
}
func (memR *MempoolReactor) BroadcastTx(tx Tx) error {
err := memR.mempool.AddTx(tx)
if err != nil {
return err
// Implements Reactor
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{
Id: MempoolCh,
Priority: 5,
},
}
msg := &TxMessage{Tx: tx}
memR.sw.Broadcast(MempoolCh, msg)
return nil
}
// Implements Reactor
@ -62,9 +64,10 @@ func (pexR *MempoolReactor) AddPeer(peer *p2p.Peer) {
}
// Implements Reactor
func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, err error) {
func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
}
// Implements Reactor
func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
_, msg_ := decodeMessage(msgBytes)
log.Info("MempoolReactor received %v", msg_)
@ -95,6 +98,16 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
}
}
func (memR *MempoolReactor) BroadcastTx(tx Tx) error {
err := memR.mempool.AddTx(tx)
if err != nil {
return err
}
msg := &TxMessage{Tx: tx}
memR.sw.Broadcast(MempoolCh, msg)
return nil
}
//-----------------------------------------------------------------------------
// Messages


+ 15
- 13
p2p/pex_reactor.go View File

@ -34,22 +34,24 @@ type PEXReactor struct {
book *AddrBook
}
func NewPEXReactor(sw *Switch, book *AddrBook) *PEXReactor {
func NewPEXReactor(book *AddrBook) *PEXReactor {
pexR := &PEXReactor{
sw: sw,
quit: make(chan struct{}),
book: book,
}
return pexR
}
func (pexR *PEXReactor) Start() {
// Implements Reactor
func (pexR *PEXReactor) Start(sw *Switch) {
if atomic.CompareAndSwapUint32(&pexR.started, 0, 1) {
log.Info("Starting PEXReactor")
pexR.sw = sw
go pexR.ensurePeersRoutine()
}
}
// Implements Reactor
func (pexR *PEXReactor) Stop() {
if atomic.CompareAndSwapUint32(&pexR.stopped, 0, 1) {
log.Info("Stopping PEXReactor")
@ -57,15 +59,6 @@ func (pexR *PEXReactor) Stop() {
}
}
// Asks peer for more addresses.
func (pexR *PEXReactor) RequestPEX(peer *Peer) {
peer.TrySend(PexCh, &pexRequestMessage{})
}
func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) {
peer.Send(PexCh, &pexRddrsMessage{Addrs: addrs})
}
// Implements Reactor
func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor {
return []*ChannelDescriptor{
@ -87,7 +80,7 @@ func (pexR *PEXReactor) AddPeer(peer *Peer) {
}
// Implements Reactor
func (pexR *PEXReactor) RemovePeer(peer *Peer, err error) {
func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) {
// TODO
}
@ -123,6 +116,15 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
}
// Asks peer for more addresses.
func (pexR *PEXReactor) RequestPEX(peer *Peer) {
peer.TrySend(PexCh, &pexRequestMessage{})
}
func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) {
peer.Send(PexCh, &pexRddrsMessage{Addrs: addrs})
}
// Ensures that sufficient peers are connected. (continuous)
func (pexR *PEXReactor) ensurePeersRoutine() {
// fire once immediately.


+ 51
- 43
p2p/switch.go View File

@ -11,6 +11,8 @@ import (
)
type Reactor interface {
Start(sw *Switch)
Stop()
GetChannels() []*ChannelDescriptor
AddPeer(peer *Peer)
RemovePeer(peer *Peer, reason interface{})
@ -72,7 +74,7 @@ func NewSwitch(reactors []Reactor) *Switch {
}
}
s := &Switch{
sw := &Switch{
reactors: reactors,
chDescs: chDescs,
reactorsByCh: reactorsByCh,
@ -83,41 +85,47 @@ func NewSwitch(reactors []Reactor) *Switch {
stopped: 0,
}
return s
return sw
}
func (s *Switch) Start() {
if atomic.CompareAndSwapUint32(&s.started, 0, 1) {
func (sw *Switch) Start() {
if atomic.CompareAndSwapUint32(&sw.started, 0, 1) {
log.Info("Starting switch")
for _, reactor := range sw.reactors {
reactor.Start(sw)
}
}
}
func (s *Switch) Stop() {
if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
func (sw *Switch) Stop() {
if atomic.CompareAndSwapUint32(&sw.stopped, 0, 1) {
log.Info("Stopping switch")
close(s.quit)
// stop each peer.
for _, peer := range s.peers.List() {
close(sw.quit)
// Stop each peer.
for _, peer := range sw.peers.List() {
peer.stop()
}
// empty tree.
s.peers = NewPeerSet()
sw.peers = NewPeerSet()
// Stop all reactors.
for _, reactor := range sw.reactors {
reactor.Stop()
}
}
}
func (s *Switch) Reactors() []Reactor {
return s.reactors
func (sw *Switch) Reactors() []Reactor {
return sw.reactors
}
func (s *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
if atomic.LoadUint32(&s.stopped) == 1 {
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
if atomic.LoadUint32(&sw.stopped) == 1 {
return nil, ErrSwitchStopped
}
peer := newPeer(conn, outbound, s.reactorsByCh, s.chDescs, s.StopPeerForError)
peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
// Add the peer to .peers
if s.peers.Add(peer) {
if sw.peers.Add(peer) {
log.Info("+ %v", peer)
} else {
log.Info("Ignoring duplicate: %v", peer)
@ -128,42 +136,42 @@ func (s *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, err
go peer.start()
// Notify listeners.
s.doAddPeer(peer)
sw.doAddPeer(peer)
return peer, nil
}
func (s *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
if atomic.LoadUint32(&s.stopped) == 1 {
func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
if atomic.LoadUint32(&sw.stopped) == 1 {
return nil, ErrSwitchStopped
}
log.Info("Dialing peer @ %v", addr)
s.dialing.Set(addr.String(), addr)
sw.dialing.Set(addr.String(), addr)
conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
s.dialing.Delete(addr.String())
sw.dialing.Delete(addr.String())
if err != nil {
return nil, err
}
peer, err := s.AddPeerWithConnection(conn, true)
peer, err := sw.AddPeerWithConnection(conn, true)
if err != nil {
return nil, err
}
return peer, nil
}
func (s *Switch) IsDialing(addr *NetAddress) bool {
return s.dialing.Has(addr.String())
func (sw *Switch) IsDialing(addr *NetAddress) bool {
return sw.dialing.Has(addr.String())
}
// XXX: This is wrong, we can't just ignore failures on TrySend.
func (s *Switch) Broadcast(chId byte, msg Binary) (numSuccess, numFailure int) {
if atomic.LoadUint32(&s.stopped) == 1 {
func (sw *Switch) Broadcast(chId byte, msg Binary) (numSuccess, numFailure int) {
if atomic.LoadUint32(&sw.stopped) == 1 {
return
}
log.Debug("Broadcast on [%X]", chId, msg)
for _, peer := range s.peers.List() {
for _, peer := range sw.peers.List() {
success := peer.TrySend(chId, msg)
log.Debug("Broadcast for peer %v success: %v", peer, success)
if success {
@ -177,8 +185,8 @@ func (s *Switch) Broadcast(chId byte, msg Binary) (numSuccess, numFailure int) {
}
// Returns the count of outbound/inbound and outbound-dialing peers.
func (s *Switch) NumPeers() (outbound, inbound, dialing int) {
peers := s.peers.List()
func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
peers := sw.peers.List()
for _, peer := range peers {
if peer.outbound {
outbound++
@ -186,44 +194,44 @@ func (s *Switch) NumPeers() (outbound, inbound, dialing int) {
inbound++
}
}
dialing = s.dialing.Size()
dialing = sw.dialing.Size()
return
}
func (s *Switch) Peers() IPeerSet {
return s.peers
func (sw *Switch) Peers() IPeerSet {
return sw.peers
}
// Disconnect from a peer due to external error.
// TODO: make record depending on reason.
func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) {
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
log.Info("- %v !! reason: %v", peer, reason)
s.peers.Remove(peer)
sw.peers.Remove(peer)
peer.stop()
// Notify listeners
s.doRemovePeer(peer, reason)
sw.doRemovePeer(peer, reason)
}
// Disconnect from a peer gracefully.
// TODO: handle graceful disconnects.
func (s *Switch) StopPeerGracefully(peer *Peer) {
func (sw *Switch) StopPeerGracefully(peer *Peer) {
log.Info("- %v", peer)
s.peers.Remove(peer)
sw.peers.Remove(peer)
peer.stop()
// Notify listeners
s.doRemovePeer(peer, nil)
sw.doRemovePeer(peer, nil)
}
func (s *Switch) doAddPeer(peer *Peer) {
for _, reactor := range s.reactors {
func (sw *Switch) doAddPeer(peer *Peer) {
for _, reactor := range sw.reactors {
reactor.AddPeer(peer)
}
}
func (s *Switch) doRemovePeer(peer *Peer, reason interface{}) {
for _, reactor := range s.reactors {
func (sw *Switch) doRemovePeer(peer *Peer, reason interface{}) {
for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason)
}
}


+ 6
- 0
p2p/switch_test.go View File

@ -43,6 +43,12 @@ func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReacto
}
}
func (tr *TestReactor) Start(sw *Switch) {
}
func (tr *TestReactor) Stop() {
}
func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
return tr.channels
}


+ 71
- 0
state/genesis.go View File

@ -0,0 +1,71 @@
package state
import (
"encoding/json"
"io/ioutil"
"time"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
db_ "github.com/tendermint/tendermint/db"
"github.com/tendermint/tendermint/merkle"
)
type GenesisDoc struct {
GenesisTime time.Time
AccountDetails []*AccountDetail
}
func ReadGenesisDocJSON(jsonBlob []byte) (genState *GenesisDoc) {
err := json.Unmarshal(jsonBlob, &genState)
if err != nil {
Panicf("Couldn't read GenesisDoc: %v", err)
}
return
}
func GenesisStateFromFile(db db_.DB, genDocFile string) *State {
jsonBlob, err := ioutil.ReadFile(genDocFile)
if err != nil {
Panicf("Couldn't read GenesisDoc file: %v", err)
}
genDoc := ReadGenesisDocJSON(jsonBlob)
return GenesisStateFromDoc(db, genDoc)
}
func GenesisStateFromDoc(db db_.DB, genDoc *GenesisDoc) *State {
return GenesisState(db, genDoc.GenesisTime, genDoc.AccountDetails)
}
func GenesisState(db db_.DB, genesisTime time.Time, accDets []*AccountDetail) *State {
// TODO: Use "uint64Codec" instead of BasicCodec
accountDetails := merkle.NewIAVLTree(BasicCodec, AccountDetailCodec, defaultAccountDetailsCacheCapacity, db)
validators := []*Validator{}
for _, accDet := range accDets {
accountDetails.Set(accDet.Id, accDet)
if accDet.Status == AccountStatusBonded {
validators = append(validators, &Validator{
Account: accDet.Account,
BondHeight: 0,
VotingPower: accDet.Balance,
Accum: 0,
})
}
}
if len(validators) == 0 {
panic("Must have some validators")
}
return &State{
DB: db,
Height: 0,
BlockHash: nil,
BlockTime: genesisTime,
BondedValidators: NewValidatorSet(validators),
UnbondingValidators: NewValidatorSet(nil),
accountDetails: accountDetails,
}
}

+ 0
- 33
state/state.go View File

@ -51,39 +51,6 @@ type State struct {
accountDetails merkle.Tree // Shouldn't be accessed directly.
}
func GenesisState(db db_.DB, genesisTime time.Time, accDets []*AccountDetail) *State {
// TODO: Use "uint64Codec" instead of BasicCodec
accountDetails := merkle.NewIAVLTree(BasicCodec, AccountDetailCodec, defaultAccountDetailsCacheCapacity, db)
validators := []*Validator{}
for _, accDet := range accDets {
accountDetails.Set(accDet.Id, accDet)
if accDet.Status == AccountStatusBonded {
validators = append(validators, &Validator{
Account: accDet.Account,
BondHeight: 0,
VotingPower: accDet.Balance,
Accum: 0,
})
}
}
if len(validators) == 0 {
panic("Must have some validators")
}
return &State{
DB: db,
Height: 0,
BlockHash: nil,
BlockTime: genesisTime,
BondedValidators: NewValidatorSet(validators),
UnbondingValidators: NewValidatorSet(nil),
accountDetails: accountDetails,
}
}
func LoadState(db db_.DB) *State {
s := &State{DB: db}
buf := db.Get(stateKey)


Loading…
Cancel
Save