You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
7.3 KiB

10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
  1. package node
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "net"
  6. "net/http"
  7. "os"
  8. "strconv"
  9. "time"
  10. bc "github.com/tendermint/tendermint/blockchain"
  11. . "github.com/tendermint/tendermint/common"
  12. "github.com/tendermint/tendermint/config"
  13. "github.com/tendermint/tendermint/consensus"
  14. dbm "github.com/tendermint/tendermint/db"
  15. "github.com/tendermint/tendermint/events"
  16. mempl "github.com/tendermint/tendermint/mempool"
  17. "github.com/tendermint/tendermint/p2p"
  18. "github.com/tendermint/tendermint/rpc"
  19. "github.com/tendermint/tendermint/rpc/core"
  20. sm "github.com/tendermint/tendermint/state"
  21. "github.com/tendermint/tendermint/types"
  22. )
  23. import _ "net/http/pprof"
  24. func init() {
  25. go func() {
  26. fmt.Println(http.ListenAndServe("0.0.0.0:6060", nil))
  27. }()
  28. }
  29. type Node struct {
  30. sw *p2p.Switch
  31. evsw *events.EventSwitch
  32. book *p2p.AddrBook
  33. blockStore *bc.BlockStore
  34. pexReactor *p2p.PEXReactor
  35. bcReactor *bc.BlockchainReactor
  36. mempoolReactor *mempl.MempoolReactor
  37. consensusState *consensus.ConsensusState
  38. consensusReactor *consensus.ConsensusReactor
  39. privValidator *sm.PrivValidator
  40. }
  41. func NewNode() *Node {
  42. // Get BlockStore
  43. blockStoreDB := dbm.GetDB("blockstore")
  44. blockStore := bc.NewBlockStore(blockStoreDB)
  45. // Get State
  46. stateDB := dbm.GetDB("state")
  47. state := sm.LoadState(stateDB)
  48. if state == nil {
  49. state = sm.MakeGenesisStateFromFile(stateDB, config.App().GetString("GenesisFile"))
  50. state.Save()
  51. }
  52. // Get PrivValidator
  53. var privValidator *sm.PrivValidator
  54. privValidatorFile := config.App().GetString("PrivValidatorFile")
  55. if _, err := os.Stat(privValidatorFile); err == nil {
  56. privValidator = sm.LoadPrivValidator(privValidatorFile)
  57. log.Info("Loaded PrivValidator",
  58. "file", privValidatorFile, "privValidator", privValidator)
  59. } else {
  60. privValidator = sm.GenPrivValidator()
  61. privValidator.SetFile(privValidatorFile)
  62. privValidator.Save()
  63. log.Info("Generated PrivValidator", "file", privValidatorFile)
  64. }
  65. eventSwitch := new(events.EventSwitch)
  66. eventSwitch.Start()
  67. // Get PEXReactor
  68. book := p2p.NewAddrBook(config.App().GetString("AddrBookFile"))
  69. pexReactor := p2p.NewPEXReactor(book)
  70. // Get BlockchainReactor
  71. bcReactor := bc.NewBlockchainReactor(state, blockStore, config.App().GetBool("FastSync"))
  72. // Get MempoolReactor
  73. mempool := mempl.NewMempool(state.Copy())
  74. mempoolReactor := mempl.NewMempoolReactor(mempool)
  75. // Get ConsensusReactor
  76. consensusState := consensus.NewConsensusState(state, blockStore, mempoolReactor)
  77. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore)
  78. if privValidator != nil {
  79. consensusReactor.SetPrivValidator(privValidator)
  80. }
  81. // so the consensus reactor won't do anything until we're synced
  82. if config.App().GetBool("FastSync") {
  83. consensusReactor.SetSyncing(true)
  84. }
  85. sw := p2p.NewSwitch()
  86. sw.AddReactor("PEX", pexReactor)
  87. sw.AddReactor("MEMPOOL", mempoolReactor)
  88. sw.AddReactor("BLOCKCHAIN", bcReactor)
  89. sw.AddReactor("CONSENSUS", consensusReactor)
  90. // add the event switch to all services
  91. // they should all satisfy events.Eventable
  92. SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
  93. return &Node{
  94. sw: sw,
  95. evsw: eventSwitch,
  96. book: book,
  97. blockStore: blockStore,
  98. pexReactor: pexReactor,
  99. bcReactor: bcReactor,
  100. mempoolReactor: mempoolReactor,
  101. consensusState: consensusState,
  102. consensusReactor: consensusReactor,
  103. privValidator: privValidator,
  104. }
  105. }
  106. // Call Start() after adding the listeners.
  107. func (n *Node) Start() {
  108. log.Info("Starting Node")
  109. n.book.Start()
  110. nodeInfo := makeNodeInfo(n.sw)
  111. n.sw.SetNodeInfo(nodeInfo)
  112. n.sw.Start()
  113. }
  114. func (n *Node) Stop() {
  115. log.Info("Stopping Node")
  116. // TODO: gracefully disconnect from peers.
  117. n.sw.Stop()
  118. n.book.Stop()
  119. }
  120. // Add the event switch to reactors, mempool, etc.
  121. func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
  122. for _, e := range eventables {
  123. e.SetFireable(evsw)
  124. }
  125. }
  126. // Add a Listener to accept inbound peer connections.
  127. // Add listeners before starting the Node.
  128. // The first listener is the primary listener (in NodeInfo)
  129. func (n *Node) AddListener(l p2p.Listener) {
  130. log.Info(Fmt("Added %v", l))
  131. n.sw.AddListener(l)
  132. n.book.AddOurAddress(l.ExternalAddress())
  133. }
  134. // NOTE: Blocking
  135. func (n *Node) DialSeed() {
  136. // if the single seed node is available, use only it
  137. prioritySeed := config.App().GetString("SeedNode")
  138. if prioritySeed != "" {
  139. addr := p2p.NewNetAddressString(prioritySeed)
  140. n.dialSeed(addr)
  141. return
  142. }
  143. // permute the list, dial them in random order.
  144. seeds := config.App().GetStringSlice("SeedNodes")
  145. perm := rand.Perm(len(seeds))
  146. for i := 0; i < len(perm); i++ {
  147. go func(i int) {
  148. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  149. j := perm[i]
  150. addr := p2p.NewNetAddressString(seeds[j])
  151. n.dialSeed(addr)
  152. }(i)
  153. }
  154. }
  155. func (n *Node) dialSeed(addr *p2p.NetAddress) {
  156. peer, err := n.sw.DialPeerWithAddress(addr)
  157. if err != nil {
  158. log.Error("Error dialing seed", "error", err)
  159. //n.book.MarkAttempt(addr)
  160. return
  161. } else {
  162. log.Info("Connected to seed", "peer", peer)
  163. n.book.AddAddress(addr, addr)
  164. }
  165. }
  166. func (n *Node) StartRPC() {
  167. core.SetBlockStore(n.blockStore)
  168. core.SetConsensusState(n.consensusState)
  169. core.SetConsensusReactor(n.consensusReactor)
  170. core.SetMempoolReactor(n.mempoolReactor)
  171. core.SetSwitch(n.sw)
  172. listenAddr := config.App().GetString("RPC.HTTP.ListenAddr")
  173. mux := http.NewServeMux()
  174. rpc.RegisterEventsHandler(mux, n.evsw)
  175. rpc.RegisterRPCFuncs(mux, core.Routes)
  176. rpc.StartHTTPServer(listenAddr, mux)
  177. }
  178. func (n *Node) Switch() *p2p.Switch {
  179. return n.sw
  180. }
  181. func (n *Node) ConsensusState() *consensus.ConsensusState {
  182. return n.consensusState
  183. }
  184. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  185. return n.mempoolReactor
  186. }
  187. func (n *Node) EventSwitch() *events.EventSwitch {
  188. return n.evsw
  189. }
  190. func makeNodeInfo(sw *p2p.Switch) *types.NodeInfo {
  191. nodeInfo := &types.NodeInfo{
  192. Moniker: config.App().GetString("Moniker"),
  193. Network: config.App().GetString("Network"),
  194. Version: "0.2.0", // Everything is in Big Endian.
  195. }
  196. if !sw.IsListening() {
  197. return nodeInfo
  198. }
  199. p2pListener := sw.Listeners()[0]
  200. p2pHost := p2pListener.ExternalAddress().IP.String()
  201. p2pPort := p2pListener.ExternalAddress().Port
  202. rpcListenAddr := config.App().GetString("RPC.HTTP.ListenAddr")
  203. _, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr)
  204. rpcPort, err := strconv.Atoi(rpcPortStr)
  205. if err != nil {
  206. panic(Fmt("Expected numeric RPC.HTTP.ListenAddr port but got %v", rpcPortStr))
  207. }
  208. // We assume that the rpcListener has the same ExternalAddress.
  209. // This is probably true because both P2P and RPC listeners use UPnP.
  210. nodeInfo.Host = p2pHost
  211. nodeInfo.P2PPort = p2pPort
  212. nodeInfo.RPCPort = uint16(rpcPort)
  213. return nodeInfo
  214. }
  215. //------------------------------------------------------------------------------
  216. func RunNode() {
  217. // Create & start node
  218. n := NewNode()
  219. l := p2p.NewDefaultListener("tcp", config.App().GetString("ListenAddr"), false)
  220. n.AddListener(l)
  221. n.Start()
  222. // If seedNode is provided by config, dial out.
  223. if config.App().GetString("SeedNode") != "" || len(config.App().GetStringSlice("SeedNodes")) != 0 {
  224. n.DialSeed()
  225. }
  226. // Run the RPC server.
  227. if config.App().GetString("RPC.HTTP.ListenAddr") != "" {
  228. n.StartRPC()
  229. }
  230. // Sleep forever and then...
  231. TrapSignal(func() {
  232. n.Stop()
  233. })
  234. }