You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 lines
6.4 KiB

10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
  1. package node
  2. import (
  3. "net"
  4. "net/http"
  5. "os"
  6. "strconv"
  7. bc "github.com/tendermint/tendermint/blockchain"
  8. . "github.com/tendermint/tendermint/common"
  9. "github.com/tendermint/tendermint/config"
  10. "github.com/tendermint/tendermint/consensus"
  11. dbm "github.com/tendermint/tendermint/db"
  12. "github.com/tendermint/tendermint/events"
  13. mempl "github.com/tendermint/tendermint/mempool"
  14. "github.com/tendermint/tendermint/p2p"
  15. "github.com/tendermint/tendermint/rpc"
  16. "github.com/tendermint/tendermint/rpc/core"
  17. sm "github.com/tendermint/tendermint/state"
  18. "github.com/tendermint/tendermint/types"
  19. )
  20. type Node struct {
  21. sw *p2p.Switch
  22. evsw *events.EventSwitch
  23. book *p2p.AddrBook
  24. blockStore *bc.BlockStore
  25. pexReactor *p2p.PEXReactor
  26. bcReactor *bc.BlockchainReactor
  27. mempoolReactor *mempl.MempoolReactor
  28. consensusState *consensus.ConsensusState
  29. consensusReactor *consensus.ConsensusReactor
  30. privValidator *sm.PrivValidator
  31. }
  32. func NewNode() *Node {
  33. // Get BlockStore
  34. blockStoreDB := dbm.GetDB("blockstore")
  35. blockStore := bc.NewBlockStore(blockStoreDB)
  36. // Get State
  37. stateDB := dbm.GetDB("state")
  38. state := sm.LoadState(stateDB)
  39. if state == nil {
  40. state = sm.MakeGenesisStateFromFile(stateDB, config.App().GetString("GenesisFile"))
  41. state.Save()
  42. }
  43. // Get PrivValidator
  44. var privValidator *sm.PrivValidator
  45. privValidatorFile := config.App().GetString("PrivValidatorFile")
  46. if _, err := os.Stat(privValidatorFile); err == nil {
  47. privValidator = sm.LoadPrivValidator(privValidatorFile)
  48. log.Info("Loaded PrivValidator",
  49. "file", privValidatorFile, "privValidator", privValidator)
  50. } else {
  51. privValidator = sm.GenPrivValidator()
  52. privValidator.SetFile(privValidatorFile)
  53. privValidator.Save()
  54. log.Info("Generated PrivValidator", "file", privValidatorFile)
  55. }
  56. eventSwitch := new(events.EventSwitch)
  57. eventSwitch.Start()
  58. // Get PEXReactor
  59. book := p2p.NewAddrBook(config.App().GetString("AddrBookFile"))
  60. pexReactor := p2p.NewPEXReactor(book)
  61. // Get BlockchainReactor
  62. bcReactor := bc.NewBlockchainReactor(state, blockStore, config.App().GetBool("FastSync"))
  63. // Get MempoolReactor
  64. mempool := mempl.NewMempool(state.Copy())
  65. mempoolReactor := mempl.NewMempoolReactor(mempool)
  66. // Get ConsensusReactor
  67. consensusState := consensus.NewConsensusState(state, blockStore, mempoolReactor)
  68. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore)
  69. if privValidator != nil {
  70. consensusReactor.SetPrivValidator(privValidator)
  71. }
  72. sw := p2p.NewSwitch()
  73. sw.AddReactor("PEX", pexReactor)
  74. sw.AddReactor("MEMPOOL", mempoolReactor)
  75. sw.AddReactor("BLOCKCHAIN", bcReactor)
  76. sw.AddReactor("CONSENSUS", consensusReactor)
  77. // add the event switch to all services
  78. // they should all satisfy events.Eventable
  79. SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
  80. return &Node{
  81. sw: sw,
  82. evsw: eventSwitch,
  83. book: book,
  84. blockStore: blockStore,
  85. pexReactor: pexReactor,
  86. bcReactor: bcReactor,
  87. mempoolReactor: mempoolReactor,
  88. consensusState: consensusState,
  89. consensusReactor: consensusReactor,
  90. privValidator: privValidator,
  91. }
  92. }
  93. // Call Start() after adding the listeners.
  94. func (n *Node) Start() {
  95. log.Info("Starting Node")
  96. n.book.Start()
  97. nodeInfo := makeNodeInfo(n.sw)
  98. n.sw.SetNodeInfo(nodeInfo)
  99. n.sw.Start()
  100. if config.App().GetBool("FastSync") {
  101. // TODO: When FastSync is done, start CONSENSUS.
  102. n.sw.Reactor("CONSENSUS").Stop()
  103. }
  104. }
  105. func (n *Node) Stop() {
  106. log.Info("Stopping Node")
  107. // TODO: gracefully disconnect from peers.
  108. n.sw.Stop()
  109. n.book.Stop()
  110. }
  111. // Add the event switch to reactors, mempool, etc.
  112. func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
  113. for _, e := range eventables {
  114. e.SetFireable(evsw)
  115. }
  116. }
  117. // Add a Listener to accept inbound peer connections.
  118. // Add listeners before starting the Node.
  119. // The first listener is the primary listener (in NodeInfo)
  120. func (n *Node) AddListener(l p2p.Listener) {
  121. log.Info(Fmt("Added %v", l))
  122. n.sw.AddListener(l)
  123. n.book.AddOurAddress(l.ExternalAddress())
  124. }
  125. func (n *Node) DialSeed() {
  126. addr := p2p.NewNetAddressString(config.App().GetString("SeedNode"))
  127. peer, err := n.sw.DialPeerWithAddress(addr)
  128. if err != nil {
  129. log.Error("Error dialing seed", "error", err)
  130. //n.book.MarkAttempt(addr)
  131. return
  132. } else {
  133. log.Info("Connected to seed", "peer", peer)
  134. n.book.AddAddress(addr, addr)
  135. }
  136. }
  137. func (n *Node) StartRPC() {
  138. core.SetBlockStore(n.blockStore)
  139. core.SetConsensusState(n.consensusState)
  140. core.SetMempoolReactor(n.mempoolReactor)
  141. core.SetSwitch(n.sw)
  142. listenAddr := config.App().GetString("RPC.HTTP.ListenAddr")
  143. mux := http.NewServeMux()
  144. rpc.RegisterEventsHandler(mux, n.evsw)
  145. rpc.RegisterRPCFuncs(mux, core.Routes)
  146. rpc.StartHTTPServer(listenAddr, mux)
  147. }
  148. func (n *Node) Switch() *p2p.Switch {
  149. return n.sw
  150. }
  151. func (n *Node) ConsensusState() *consensus.ConsensusState {
  152. return n.consensusState
  153. }
  154. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  155. return n.mempoolReactor
  156. }
  157. func (n *Node) EventSwitch() *events.EventSwitch {
  158. return n.evsw
  159. }
  160. func makeNodeInfo(sw *p2p.Switch) *types.NodeInfo {
  161. nodeInfo := &types.NodeInfo{
  162. Moniker: config.App().GetString("Moniker"),
  163. Network: config.App().GetString("Network"),
  164. }
  165. if !sw.IsListening() {
  166. return nodeInfo
  167. }
  168. p2pListener := sw.Listeners()[0]
  169. p2pHost := p2pListener.ExternalAddress().IP.String()
  170. p2pPort := p2pListener.ExternalAddress().Port
  171. rpcListenAddr := config.App().GetString("RPC.HTTP.ListenAddr")
  172. _, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr)
  173. rpcPort, err := strconv.Atoi(rpcPortStr)
  174. if err != nil {
  175. panic(Fmt("Expected numeric RPC.HTTP.ListenAddr port but got %v", rpcPortStr))
  176. }
  177. // We assume that the rpcListener has the same ExternalAddress.
  178. // This is probably true because both P2P and RPC listeners use UPnP.
  179. nodeInfo.Host = p2pHost
  180. nodeInfo.P2PPort = p2pPort
  181. nodeInfo.RPCPort = uint16(rpcPort)
  182. return nodeInfo
  183. }
  184. //------------------------------------------------------------------------------
  185. func RunNode() {
  186. // Create & start node
  187. n := NewNode()
  188. l := p2p.NewDefaultListener("tcp", config.App().GetString("ListenAddr"), false)
  189. n.AddListener(l)
  190. n.Start()
  191. // If seedNode is provided by config, dial out.
  192. if config.App().GetString("SeedNode") != "" {
  193. n.DialSeed()
  194. }
  195. // Run the RPC server.
  196. if config.App().GetString("RPC.HTTP.ListenAddr") != "" {
  197. n.StartRPC()
  198. }
  199. // Sleep forever and then...
  200. TrapSignal(func() {
  201. n.Stop()
  202. })
  203. }