You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
7.1 KiB

10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package node
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "net"
  6. "net/http"
  7. "os"
  8. "strconv"
  9. "strings"
  10. "time"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. . "github.com/tendermint/tendermint/common"
  13. "github.com/tendermint/tendermint/config"
  14. "github.com/tendermint/tendermint/consensus"
  15. dbm "github.com/tendermint/tendermint/db"
  16. "github.com/tendermint/tendermint/events"
  17. mempl "github.com/tendermint/tendermint/mempool"
  18. "github.com/tendermint/tendermint/p2p"
  19. "github.com/tendermint/tendermint/rpc"
  20. "github.com/tendermint/tendermint/rpc/core"
  21. sm "github.com/tendermint/tendermint/state"
  22. "github.com/tendermint/tendermint/types"
  23. )
  24. import _ "net/http/pprof"
  25. func init() {
  26. go func() {
  27. fmt.Println(http.ListenAndServe("0.0.0.0:6060", nil))
  28. }()
  29. }
  30. type Node struct {
  31. sw *p2p.Switch
  32. evsw *events.EventSwitch
  33. book *p2p.AddrBook
  34. blockStore *bc.BlockStore
  35. pexReactor *p2p.PEXReactor
  36. bcReactor *bc.BlockchainReactor
  37. mempoolReactor *mempl.MempoolReactor
  38. consensusState *consensus.ConsensusState
  39. consensusReactor *consensus.ConsensusReactor
  40. privValidator *sm.PrivValidator
  41. }
  42. func NewNode() *Node {
  43. // Get BlockStore
  44. blockStoreDB := dbm.GetDB("blockstore")
  45. blockStore := bc.NewBlockStore(blockStoreDB)
  46. // Get State
  47. stateDB := dbm.GetDB("state")
  48. state := sm.LoadState(stateDB)
  49. if state == nil {
  50. state = sm.MakeGenesisStateFromFile(stateDB, config.App().GetString("genesis_file"))
  51. state.Save()
  52. }
  53. // Get PrivValidator
  54. var privValidator *sm.PrivValidator
  55. privValidatorFile := config.App().GetString("priv_validator_file")
  56. if _, err := os.Stat(privValidatorFile); err == nil {
  57. privValidator = sm.LoadPrivValidator(privValidatorFile)
  58. log.Info("Loaded PrivValidator",
  59. "file", privValidatorFile, "privValidator", privValidator)
  60. } else {
  61. privValidator = sm.GenPrivValidator()
  62. privValidator.SetFile(privValidatorFile)
  63. privValidator.Save()
  64. log.Info("Generated PrivValidator", "file", privValidatorFile)
  65. }
  66. eventSwitch := new(events.EventSwitch)
  67. eventSwitch.Start()
  68. // Get PEXReactor
  69. book := p2p.NewAddrBook(config.App().GetString("addrbook_file"))
  70. pexReactor := p2p.NewPEXReactor(book)
  71. // Get BlockchainReactor
  72. bcReactor := bc.NewBlockchainReactor(state, blockStore, config.App().GetBool("fast_sync"))
  73. // Get MempoolReactor
  74. mempool := mempl.NewMempool(state.Copy())
  75. mempoolReactor := mempl.NewMempoolReactor(mempool)
  76. // Get ConsensusReactor
  77. consensusState := consensus.NewConsensusState(state, blockStore, mempoolReactor)
  78. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore)
  79. if privValidator != nil {
  80. consensusReactor.SetPrivValidator(privValidator)
  81. }
  82. // so the consensus reactor won't do anything until we're synced
  83. if config.App().GetBool("fast_sync") {
  84. consensusReactor.SetSyncing(true)
  85. }
  86. sw := p2p.NewSwitch()
  87. sw.AddReactor("PEX", pexReactor)
  88. sw.AddReactor("MEMPOOL", mempoolReactor)
  89. sw.AddReactor("BLOCKCHAIN", bcReactor)
  90. sw.AddReactor("CONSENSUS", consensusReactor)
  91. // add the event switch to all services
  92. // they should all satisfy events.Eventable
  93. SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
  94. return &Node{
  95. sw: sw,
  96. evsw: eventSwitch,
  97. book: book,
  98. blockStore: blockStore,
  99. pexReactor: pexReactor,
  100. bcReactor: bcReactor,
  101. mempoolReactor: mempoolReactor,
  102. consensusState: consensusState,
  103. consensusReactor: consensusReactor,
  104. privValidator: privValidator,
  105. }
  106. }
  107. // Call Start() after adding the listeners.
  108. func (n *Node) Start() {
  109. log.Info("Starting Node")
  110. n.book.Start()
  111. nodeInfo := makeNodeInfo(n.sw)
  112. n.sw.SetNodeInfo(nodeInfo)
  113. n.sw.Start()
  114. }
  115. func (n *Node) Stop() {
  116. log.Info("Stopping Node")
  117. // TODO: gracefully disconnect from peers.
  118. n.sw.Stop()
  119. n.book.Stop()
  120. }
  121. // Add the event switch to reactors, mempool, etc.
  122. func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
  123. for _, e := range eventables {
  124. e.SetFireable(evsw)
  125. }
  126. }
  127. // Add a Listener to accept inbound peer connections.
  128. // Add listeners before starting the Node.
  129. // The first listener is the primary listener (in NodeInfo)
  130. func (n *Node) AddListener(l p2p.Listener) {
  131. log.Info(Fmt("Added %v", l))
  132. n.sw.AddListener(l)
  133. n.book.AddOurAddress(l.ExternalAddress())
  134. }
  135. // NOTE: Blocking
  136. func (n *Node) DialSeed() {
  137. // permute the list, dial them in random order.
  138. seeds := strings.Split(config.App().GetString("seeds"), ",")
  139. perm := rand.Perm(len(seeds))
  140. for i := 0; i < len(perm); i++ {
  141. go func(i int) {
  142. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  143. j := perm[i]
  144. addr := p2p.NewNetAddressString(seeds[j])
  145. n.dialSeed(addr)
  146. }(i)
  147. }
  148. }
  149. func (n *Node) dialSeed(addr *p2p.NetAddress) {
  150. peer, err := n.sw.DialPeerWithAddress(addr)
  151. if err != nil {
  152. log.Error("Error dialing seed", "error", err)
  153. //n.book.MarkAttempt(addr)
  154. return
  155. } else {
  156. log.Info("Connected to seed", "peer", peer)
  157. n.book.AddAddress(addr, addr)
  158. }
  159. }
  160. func (n *Node) StartRPC() {
  161. core.SetBlockStore(n.blockStore)
  162. core.SetConsensusState(n.consensusState)
  163. core.SetConsensusReactor(n.consensusReactor)
  164. core.SetMempoolReactor(n.mempoolReactor)
  165. core.SetSwitch(n.sw)
  166. core.SetPrivValidator(n.privValidator)
  167. listenAddr := config.App().GetString("rpc_laddr")
  168. mux := http.NewServeMux()
  169. rpc.RegisterEventsHandler(mux, n.evsw)
  170. rpc.RegisterRPCFuncs(mux, core.Routes)
  171. rpc.StartHTTPServer(listenAddr, mux)
  172. }
  173. func (n *Node) Switch() *p2p.Switch {
  174. return n.sw
  175. }
  176. func (n *Node) BlockStore() *bc.BlockStore {
  177. return n.blockStore
  178. }
  179. func (n *Node) ConsensusState() *consensus.ConsensusState {
  180. return n.consensusState
  181. }
  182. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  183. return n.mempoolReactor
  184. }
  185. func (n *Node) EventSwitch() *events.EventSwitch {
  186. return n.evsw
  187. }
  188. func makeNodeInfo(sw *p2p.Switch) *types.NodeInfo {
  189. nodeInfo := &types.NodeInfo{
  190. Network: config.App().GetString("network"),
  191. Moniker: config.App().GetString("moniker"),
  192. Version: config.App().GetString("version"),
  193. }
  194. if !sw.IsListening() {
  195. return nodeInfo
  196. }
  197. p2pListener := sw.Listeners()[0]
  198. p2pHost := p2pListener.ExternalAddress().IP.String()
  199. p2pPort := p2pListener.ExternalAddress().Port
  200. rpcListenAddr := config.App().GetString("rpc_laddr")
  201. _, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr)
  202. rpcPort, err := strconv.Atoi(rpcPortStr)
  203. if err != nil {
  204. panic(Fmt("Expected numeric RPC.ListenAddr port but got %v", rpcPortStr))
  205. }
  206. // We assume that the rpcListener has the same ExternalAddress.
  207. // This is probably true because both P2P and RPC listeners use UPnP.
  208. nodeInfo.Host = p2pHost
  209. nodeInfo.P2PPort = p2pPort
  210. nodeInfo.RPCPort = uint16(rpcPort)
  211. return nodeInfo
  212. }
  213. //------------------------------------------------------------------------------
  214. func RunNode() {
  215. // Create & start node
  216. n := NewNode()
  217. l := p2p.NewDefaultListener("tcp", config.App().GetString("node_laddr"), false)
  218. n.AddListener(l)
  219. n.Start()
  220. // If seedNode is provided by config, dial out.
  221. if len(config.App().GetStringSlice("seeds")) != 0 {
  222. n.DialSeed()
  223. }
  224. // Run the RPC server.
  225. if config.App().GetString("rpc_laddr") != "" {
  226. n.StartRPC()
  227. }
  228. // Sleep forever and then...
  229. TrapSignal(func() {
  230. n.Stop()
  231. })
  232. }