You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
5.4 KiB

10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
  1. package node
  2. import (
  3. "net/http"
  4. "os"
  5. bc "github.com/tendermint/tendermint/blockchain"
  6. . "github.com/tendermint/tendermint/common"
  7. "github.com/tendermint/tendermint/config"
  8. "github.com/tendermint/tendermint/consensus"
  9. dbm "github.com/tendermint/tendermint/db"
  10. "github.com/tendermint/tendermint/events"
  11. mempl "github.com/tendermint/tendermint/mempool"
  12. "github.com/tendermint/tendermint/p2p"
  13. "github.com/tendermint/tendermint/rpc"
  14. "github.com/tendermint/tendermint/rpc/core"
  15. sm "github.com/tendermint/tendermint/state"
  16. )
  17. type Node struct {
  18. sw *p2p.Switch
  19. evsw *events.EventSwitch
  20. book *p2p.AddrBook
  21. blockStore *bc.BlockStore
  22. pexReactor *p2p.PEXReactor
  23. bcReactor *bc.BlockchainReactor
  24. mempoolReactor *mempl.MempoolReactor
  25. consensusState *consensus.ConsensusState
  26. consensusReactor *consensus.ConsensusReactor
  27. privValidator *sm.PrivValidator
  28. }
  29. func NewNode() *Node {
  30. // Get BlockStore
  31. blockStoreDB := dbm.GetDB("blockstore")
  32. blockStore := bc.NewBlockStore(blockStoreDB)
  33. // Get State
  34. stateDB := dbm.GetDB("state")
  35. state := sm.LoadState(stateDB)
  36. if state == nil {
  37. state = sm.MakeGenesisStateFromFile(stateDB, config.App().GetString("GenesisFile"))
  38. state.Save()
  39. }
  40. // Get PrivValidator
  41. var privValidator *sm.PrivValidator
  42. privValidatorFile := config.App().GetString("PrivValidatorFile")
  43. if _, err := os.Stat(privValidatorFile); err == nil {
  44. privValidator = sm.LoadPrivValidator(privValidatorFile)
  45. log.Info("Loaded PrivValidator",
  46. "file", privValidatorFile, "privValidator", privValidator)
  47. } else {
  48. privValidator = sm.GenPrivValidator()
  49. privValidator.SetFile(privValidatorFile)
  50. privValidator.Save()
  51. log.Info("Generated PrivValidator", "file", privValidatorFile)
  52. }
  53. eventSwitch := new(events.EventSwitch)
  54. eventSwitch.Start()
  55. // Get PEXReactor
  56. book := p2p.NewAddrBook(config.App().GetString("AddrBookFile"))
  57. pexReactor := p2p.NewPEXReactor(book)
  58. // Get BlockchainReactor
  59. bcReactor := bc.NewBlockchainReactor(state, blockStore, config.App().GetBool("FastSync"))
  60. // Get MempoolReactor
  61. mempool := mempl.NewMempool(state.Copy())
  62. mempoolReactor := mempl.NewMempoolReactor(mempool)
  63. // Get ConsensusReactor
  64. consensusState := consensus.NewConsensusState(state, blockStore, mempoolReactor)
  65. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore)
  66. if privValidator != nil {
  67. consensusReactor.SetPrivValidator(privValidator)
  68. }
  69. sw := p2p.NewSwitch()
  70. sw.SetNetwork(config.App().GetString("Network"))
  71. sw.AddReactor("PEX", pexReactor)
  72. sw.AddReactor("MEMPOOL", mempoolReactor)
  73. sw.AddReactor("BLOCKCHAIN", bcReactor)
  74. sw.AddReactor("CONSENSUS", consensusReactor)
  75. // add the event switch to all services
  76. // they should all satisfy events.Eventable
  77. SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
  78. return &Node{
  79. sw: sw,
  80. evsw: eventSwitch,
  81. book: book,
  82. blockStore: blockStore,
  83. pexReactor: pexReactor,
  84. bcReactor: bcReactor,
  85. mempoolReactor: mempoolReactor,
  86. consensusState: consensusState,
  87. consensusReactor: consensusReactor,
  88. privValidator: privValidator,
  89. }
  90. }
  91. func (n *Node) Start() {
  92. log.Info("Starting Node")
  93. n.book.Start()
  94. n.sw.Start()
  95. if config.App().GetBool("FastSync") {
  96. // TODO: When FastSync is done, start CONSENSUS.
  97. n.sw.Reactor("CONSENSUS").Stop()
  98. }
  99. }
  100. func (n *Node) Stop() {
  101. log.Info("Stopping Node")
  102. // TODO: gracefully disconnect from peers.
  103. n.sw.Stop()
  104. n.book.Stop()
  105. }
  106. // Add the event switch to reactors, mempool, etc.
  107. func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
  108. for _, e := range eventables {
  109. e.SetFireable(evsw)
  110. }
  111. }
  112. // Add a Listener to accept inbound peer connections.
  113. func (n *Node) AddListener(l p2p.Listener) {
  114. log.Info(Fmt("Added %v", l))
  115. n.sw.AddListener(l)
  116. n.book.AddOurAddress(l.ExternalAddress())
  117. }
  118. func (n *Node) DialSeed() {
  119. addr := p2p.NewNetAddressString(config.App().GetString("SeedNode"))
  120. peer, err := n.sw.DialPeerWithAddress(addr)
  121. if err != nil {
  122. log.Error("Error dialing seed", "error", err)
  123. //n.book.MarkAttempt(addr)
  124. return
  125. } else {
  126. log.Info("Connected to seed", "peer", peer)
  127. n.book.AddAddress(addr, addr)
  128. }
  129. }
  130. func (n *Node) StartRPC() {
  131. core.SetBlockStore(n.blockStore)
  132. core.SetConsensusState(n.consensusState)
  133. core.SetMempoolReactor(n.mempoolReactor)
  134. core.SetSwitch(n.sw)
  135. listenAddr := config.App().GetString("RPC.HTTP.ListenAddr")
  136. mux := http.NewServeMux()
  137. rpc.RegisterEventsHandler(mux, n.evsw)
  138. rpc.RegisterRPCFuncs(mux, core.Routes)
  139. rpc.StartHTTPServer(listenAddr, mux)
  140. }
  141. func (n *Node) Switch() *p2p.Switch {
  142. return n.sw
  143. }
  144. func (n *Node) ConsensusState() *consensus.ConsensusState {
  145. return n.consensusState
  146. }
  147. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  148. return n.mempoolReactor
  149. }
  150. func (n *Node) EventSwitch() *events.EventSwitch {
  151. return n.evsw
  152. }
  153. //------------------------------------------------------------------------------
  154. func RunNode() {
  155. // Create & start node
  156. n := NewNode()
  157. l := p2p.NewDefaultListener("tcp", config.App().GetString("ListenAddr"), false)
  158. n.AddListener(l)
  159. n.Start()
  160. // If seedNode is provided by config, dial out.
  161. if config.App().GetString("SeedNode") != "" {
  162. n.DialSeed()
  163. }
  164. // Run the RPC server.
  165. if config.App().GetString("RPC.HTTP.ListenAddr") != "" {
  166. n.StartRPC()
  167. }
  168. // Sleep forever and then...
  169. TrapSignal(func() {
  170. n.Stop()
  171. })
  172. }