You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

267 lines
7.2 KiB

9 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
9 years ago
  1. package node
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "net"
  6. "net/http"
  7. "os"
  8. "strconv"
  9. bc "github.com/tendermint/tendermint/blockchain"
  10. . "github.com/tendermint/tendermint/common"
  11. "github.com/tendermint/tendermint/config"
  12. "github.com/tendermint/tendermint/consensus"
  13. dbm "github.com/tendermint/tendermint/db"
  14. "github.com/tendermint/tendermint/events"
  15. mempl "github.com/tendermint/tendermint/mempool"
  16. "github.com/tendermint/tendermint/p2p"
  17. "github.com/tendermint/tendermint/rpc"
  18. "github.com/tendermint/tendermint/rpc/core"
  19. sm "github.com/tendermint/tendermint/state"
  20. "github.com/tendermint/tendermint/types"
  21. )
  22. import _ "net/http/pprof"
  23. func init() {
  24. go func() {
  25. fmt.Println(http.ListenAndServe("0.0.0.0:6060", nil))
  26. }()
  27. }
  28. type Node struct {
  29. sw *p2p.Switch
  30. evsw *events.EventSwitch
  31. book *p2p.AddrBook
  32. blockStore *bc.BlockStore
  33. pexReactor *p2p.PEXReactor
  34. bcReactor *bc.BlockchainReactor
  35. mempoolReactor *mempl.MempoolReactor
  36. consensusState *consensus.ConsensusState
  37. consensusReactor *consensus.ConsensusReactor
  38. privValidator *sm.PrivValidator
  39. }
  40. func NewNode() *Node {
  41. // Get BlockStore
  42. blockStoreDB := dbm.GetDB("blockstore")
  43. blockStore := bc.NewBlockStore(blockStoreDB)
  44. // Get State
  45. stateDB := dbm.GetDB("state")
  46. state := sm.LoadState(stateDB)
  47. if state == nil {
  48. state = sm.MakeGenesisStateFromFile(stateDB, config.App().GetString("GenesisFile"))
  49. state.Save()
  50. }
  51. // Get PrivValidator
  52. var privValidator *sm.PrivValidator
  53. privValidatorFile := config.App().GetString("PrivValidatorFile")
  54. if _, err := os.Stat(privValidatorFile); err == nil {
  55. privValidator = sm.LoadPrivValidator(privValidatorFile)
  56. log.Info("Loaded PrivValidator",
  57. "file", privValidatorFile, "privValidator", privValidator)
  58. } else {
  59. privValidator = sm.GenPrivValidator()
  60. privValidator.SetFile(privValidatorFile)
  61. privValidator.Save()
  62. log.Info("Generated PrivValidator", "file", privValidatorFile)
  63. }
  64. eventSwitch := new(events.EventSwitch)
  65. eventSwitch.Start()
  66. // Get PEXReactor
  67. book := p2p.NewAddrBook(config.App().GetString("AddrBookFile"))
  68. pexReactor := p2p.NewPEXReactor(book)
  69. // Get BlockchainReactor
  70. bcReactor := bc.NewBlockchainReactor(state, blockStore, config.App().GetBool("FastSync"))
  71. // Get MempoolReactor
  72. mempool := mempl.NewMempool(state.Copy())
  73. mempoolReactor := mempl.NewMempoolReactor(mempool)
  74. // Get ConsensusReactor
  75. consensusState := consensus.NewConsensusState(state, blockStore, mempoolReactor)
  76. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore)
  77. if privValidator != nil {
  78. consensusReactor.SetPrivValidator(privValidator)
  79. }
  80. // so the consensus reactor won't do anything until we're synced
  81. if config.App().GetBool("FastSync") {
  82. consensusReactor.SetSyncing(true)
  83. }
  84. sw := p2p.NewSwitch()
  85. sw.AddReactor("PEX", pexReactor)
  86. sw.AddReactor("MEMPOOL", mempoolReactor)
  87. sw.AddReactor("BLOCKCHAIN", bcReactor)
  88. sw.AddReactor("CONSENSUS", consensusReactor)
  89. // add the event switch to all services
  90. // they should all satisfy events.Eventable
  91. SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
  92. return &Node{
  93. sw: sw,
  94. evsw: eventSwitch,
  95. book: book,
  96. blockStore: blockStore,
  97. pexReactor: pexReactor,
  98. bcReactor: bcReactor,
  99. mempoolReactor: mempoolReactor,
  100. consensusState: consensusState,
  101. consensusReactor: consensusReactor,
  102. privValidator: privValidator,
  103. }
  104. }
  105. // Call Start() after adding the listeners.
  106. func (n *Node) Start() {
  107. log.Info("Starting Node")
  108. n.book.Start()
  109. nodeInfo := makeNodeInfo(n.sw)
  110. n.sw.SetNodeInfo(nodeInfo)
  111. n.sw.Start()
  112. }
  113. func (n *Node) Stop() {
  114. log.Info("Stopping Node")
  115. // TODO: gracefully disconnect from peers.
  116. n.sw.Stop()
  117. n.book.Stop()
  118. }
  119. // Add the event switch to reactors, mempool, etc.
  120. func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
  121. for _, e := range eventables {
  122. e.SetFireable(evsw)
  123. }
  124. }
  125. // Add a Listener to accept inbound peer connections.
  126. // Add listeners before starting the Node.
  127. // The first listener is the primary listener (in NodeInfo)
  128. func (n *Node) AddListener(l p2p.Listener) {
  129. log.Info(Fmt("Added %v", l))
  130. n.sw.AddListener(l)
  131. n.book.AddOurAddress(l.ExternalAddress())
  132. }
  133. func (n *Node) DialSeed() {
  134. // if the single seed node is available, use only it
  135. prioritySeed := config.App().GetString("SeedNode")
  136. if prioritySeed != "" {
  137. addr := p2p.NewNetAddressString(prioritySeed)
  138. n.dialSeed(addr)
  139. return
  140. }
  141. // permute the list, dial half of them
  142. seeds := config.App().GetStringSlice("SeedNodes")
  143. perm := rand.Perm(len(seeds))
  144. // TODO: we shouldn't necessarily connect to all of them every time ...
  145. for i := 0; i < len(perm); i++ {
  146. j := perm[i]
  147. addr := p2p.NewNetAddressString(seeds[j])
  148. n.dialSeed(addr)
  149. }
  150. }
  151. func (n *Node) dialSeed(addr *p2p.NetAddress) {
  152. peer, err := n.sw.DialPeerWithAddress(addr)
  153. if err != nil {
  154. log.Error("Error dialing seed", "error", err)
  155. //n.book.MarkAttempt(addr)
  156. return
  157. } else {
  158. log.Info("Connected to seed", "peer", peer)
  159. n.book.AddAddress(addr, addr)
  160. }
  161. }
  162. func (n *Node) StartRPC() {
  163. core.SetBlockStore(n.blockStore)
  164. core.SetConsensusState(n.consensusState)
  165. core.SetConsensusReactor(n.consensusReactor)
  166. core.SetMempoolReactor(n.mempoolReactor)
  167. core.SetSwitch(n.sw)
  168. listenAddr := config.App().GetString("RPC.HTTP.ListenAddr")
  169. mux := http.NewServeMux()
  170. rpc.RegisterEventsHandler(mux, n.evsw)
  171. rpc.RegisterRPCFuncs(mux, core.Routes)
  172. rpc.StartHTTPServer(listenAddr, mux)
  173. }
  174. func (n *Node) Switch() *p2p.Switch {
  175. return n.sw
  176. }
  177. func (n *Node) ConsensusState() *consensus.ConsensusState {
  178. return n.consensusState
  179. }
  180. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  181. return n.mempoolReactor
  182. }
  183. func (n *Node) EventSwitch() *events.EventSwitch {
  184. return n.evsw
  185. }
  186. func makeNodeInfo(sw *p2p.Switch) *types.NodeInfo {
  187. nodeInfo := &types.NodeInfo{
  188. Moniker: config.App().GetString("Moniker"),
  189. Network: config.App().GetString("Network"),
  190. Version: "0.0.2", // Bumped for new SignBytes.
  191. }
  192. if !sw.IsListening() {
  193. return nodeInfo
  194. }
  195. p2pListener := sw.Listeners()[0]
  196. p2pHost := p2pListener.ExternalAddress().IP.String()
  197. p2pPort := p2pListener.ExternalAddress().Port
  198. rpcListenAddr := config.App().GetString("RPC.HTTP.ListenAddr")
  199. _, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr)
  200. rpcPort, err := strconv.Atoi(rpcPortStr)
  201. if err != nil {
  202. panic(Fmt("Expected numeric RPC.HTTP.ListenAddr port but got %v", rpcPortStr))
  203. }
  204. // We assume that the rpcListener has the same ExternalAddress.
  205. // This is probably true because both P2P and RPC listeners use UPnP.
  206. nodeInfo.Host = p2pHost
  207. nodeInfo.P2PPort = p2pPort
  208. nodeInfo.RPCPort = uint16(rpcPort)
  209. return nodeInfo
  210. }
  211. //------------------------------------------------------------------------------
  212. func RunNode() {
  213. // Create & start node
  214. n := NewNode()
  215. l := p2p.NewDefaultListener("tcp", config.App().GetString("ListenAddr"), false)
  216. n.AddListener(l)
  217. n.Start()
  218. // If seedNode is provided by config, dial out.
  219. if config.App().GetString("SeedNode") != "" || len(config.App().GetStringSlice("SeedNodes")) != 0 {
  220. n.DialSeed()
  221. }
  222. // Run the RPC server.
  223. if config.App().GetString("RPC.HTTP.ListenAddr") != "" {
  224. n.StartRPC()
  225. }
  226. // Sleep forever and then...
  227. TrapSignal(func() {
  228. n.Stop()
  229. })
  230. }