You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

285 lines
7.6 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/tendermint/tendermint/binary"
  13. bc "github.com/tendermint/tendermint/blockchain"
  14. . "github.com/tendermint/tendermint/common"
  15. "github.com/tendermint/tendermint/consensus"
  16. dbm "github.com/tendermint/tendermint/db"
  17. "github.com/tendermint/tendermint/events"
  18. mempl "github.com/tendermint/tendermint/mempool"
  19. "github.com/tendermint/tendermint/p2p"
  20. "github.com/tendermint/tendermint/rpc/core"
  21. "github.com/tendermint/tendermint/rpc/server"
  22. sm "github.com/tendermint/tendermint/state"
  23. "github.com/tendermint/tendermint/types"
  24. )
  25. import _ "net/http/pprof"
  26. func init() {
  27. go func() {
  28. fmt.Println(http.ListenAndServe("0.0.0.0:6060", nil))
  29. }()
  30. }
  31. type Node struct {
  32. sw *p2p.Switch
  33. evsw *events.EventSwitch
  34. book *p2p.AddrBook
  35. blockStore *bc.BlockStore
  36. pexReactor *p2p.PEXReactor
  37. bcReactor *bc.BlockchainReactor
  38. mempoolReactor *mempl.MempoolReactor
  39. consensusState *consensus.ConsensusState
  40. consensusReactor *consensus.ConsensusReactor
  41. privValidator *sm.PrivValidator
  42. genDoc *sm.GenesisDoc
  43. }
  44. func NewNode() *Node {
  45. // Get BlockStore
  46. blockStoreDB := dbm.GetDB("blockstore")
  47. blockStore := bc.NewBlockStore(blockStoreDB)
  48. // Get State
  49. stateDB := dbm.GetDB("state")
  50. state := sm.LoadState(stateDB)
  51. var genDoc *sm.GenesisDoc
  52. if state == nil {
  53. genDoc, state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  54. state.Save()
  55. // write the gendoc to db
  56. buf, n, err := new(bytes.Buffer), new(int64), new(error)
  57. binary.WriteJSON(genDoc, buf, n, err)
  58. stateDB.Set(sm.GenDocKey, buf.Bytes())
  59. if *err != nil {
  60. panic(Fmt("Unable to write gendoc to db: %v", err))
  61. }
  62. } else {
  63. genDocBytes := stateDB.Get(sm.GenDocKey)
  64. err := new(error)
  65. binary.ReadJSON(&genDoc, genDocBytes, err)
  66. if *err != nil {
  67. panic(Fmt("Unable to read gendoc from db: %v", err))
  68. }
  69. }
  70. // add the chainid to the global config
  71. config.Set("chain_id", state.ChainID)
  72. // Get PrivValidator
  73. var privValidator *sm.PrivValidator
  74. privValidatorFile := config.GetString("priv_validator_file")
  75. if _, err := os.Stat(privValidatorFile); err == nil {
  76. privValidator = sm.LoadPrivValidator(privValidatorFile)
  77. log.Info("Loaded PrivValidator",
  78. "file", privValidatorFile, "privValidator", privValidator)
  79. } else {
  80. privValidator = sm.GenPrivValidator()
  81. privValidator.SetFile(privValidatorFile)
  82. privValidator.Save()
  83. log.Info("Generated PrivValidator", "file", privValidatorFile)
  84. }
  85. eventSwitch := new(events.EventSwitch)
  86. eventSwitch.Start()
  87. // Get PEXReactor
  88. book := p2p.NewAddrBook(config.GetString("addrbook_file"))
  89. pexReactor := p2p.NewPEXReactor(book)
  90. // Get BlockchainReactor
  91. bcReactor := bc.NewBlockchainReactor(state, blockStore, config.GetBool("fast_sync"))
  92. // Get MempoolReactor
  93. mempool := mempl.NewMempool(state.Copy())
  94. mempoolReactor := mempl.NewMempoolReactor(mempool)
  95. // Get ConsensusReactor
  96. consensusState := consensus.NewConsensusState(state, blockStore, mempoolReactor)
  97. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync"))
  98. if privValidator != nil {
  99. consensusReactor.SetPrivValidator(privValidator)
  100. }
  101. sw := p2p.NewSwitch()
  102. sw.AddReactor("PEX", pexReactor)
  103. sw.AddReactor("MEMPOOL", mempoolReactor)
  104. sw.AddReactor("BLOCKCHAIN", bcReactor)
  105. sw.AddReactor("CONSENSUS", consensusReactor)
  106. // add the event switch to all services
  107. // they should all satisfy events.Eventable
  108. SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
  109. return &Node{
  110. sw: sw,
  111. evsw: eventSwitch,
  112. book: book,
  113. blockStore: blockStore,
  114. pexReactor: pexReactor,
  115. bcReactor: bcReactor,
  116. mempoolReactor: mempoolReactor,
  117. consensusState: consensusState,
  118. consensusReactor: consensusReactor,
  119. privValidator: privValidator,
  120. genDoc: genDoc,
  121. }
  122. }
  123. // Call Start() after adding the listeners.
  124. func (n *Node) Start() {
  125. log.Info("Starting Node")
  126. n.book.Start()
  127. nodeInfo := makeNodeInfo(n.sw)
  128. n.sw.SetNodeInfo(nodeInfo)
  129. n.sw.Start()
  130. }
  131. func (n *Node) Stop() {
  132. log.Info("Stopping Node")
  133. // TODO: gracefully disconnect from peers.
  134. n.sw.Stop()
  135. n.book.Stop()
  136. }
  137. // Add the event switch to reactors, mempool, etc.
  138. func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
  139. for _, e := range eventables {
  140. e.SetFireable(evsw)
  141. }
  142. }
  143. // Add a Listener to accept inbound peer connections.
  144. // Add listeners before starting the Node.
  145. // The first listener is the primary listener (in NodeInfo)
  146. func (n *Node) AddListener(l p2p.Listener) {
  147. log.Info(Fmt("Added %v", l))
  148. n.sw.AddListener(l)
  149. n.book.AddOurAddress(l.ExternalAddress())
  150. }
  151. // NOTE: Blocking
  152. func (n *Node) DialSeed() {
  153. // permute the list, dial them in random order.
  154. seeds := strings.Split(config.GetString("seeds"), ",")
  155. perm := rand.Perm(len(seeds))
  156. for i := 0; i < len(perm); i++ {
  157. go func(i int) {
  158. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  159. j := perm[i]
  160. addr := p2p.NewNetAddressString(seeds[j])
  161. n.dialSeed(addr)
  162. }(i)
  163. }
  164. }
  165. func (n *Node) dialSeed(addr *p2p.NetAddress) {
  166. peer, err := n.sw.DialPeerWithAddress(addr)
  167. if err != nil {
  168. log.Error("Error dialing seed", "error", err)
  169. //n.book.MarkAttempt(addr)
  170. return
  171. } else {
  172. log.Info("Connected to seed", "peer", peer)
  173. n.book.AddAddress(addr, addr)
  174. }
  175. }
  176. func (n *Node) StartRPC() {
  177. core.SetBlockStore(n.blockStore)
  178. core.SetConsensusState(n.consensusState)
  179. core.SetConsensusReactor(n.consensusReactor)
  180. core.SetMempoolReactor(n.mempoolReactor)
  181. core.SetSwitch(n.sw)
  182. core.SetPrivValidator(n.privValidator)
  183. core.SetGenDoc(n.genDoc)
  184. listenAddr := config.GetString("rpc_laddr")
  185. mux := http.NewServeMux()
  186. rpcserver.RegisterEventsHandler(mux, n.evsw)
  187. rpcserver.RegisterRPCFuncs(mux, core.Routes)
  188. rpcserver.StartHTTPServer(listenAddr, mux)
  189. }
  190. func (n *Node) Switch() *p2p.Switch {
  191. return n.sw
  192. }
  193. func (n *Node) BlockStore() *bc.BlockStore {
  194. return n.blockStore
  195. }
  196. func (n *Node) ConsensusState() *consensus.ConsensusState {
  197. return n.consensusState
  198. }
  199. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  200. return n.mempoolReactor
  201. }
  202. func (n *Node) EventSwitch() *events.EventSwitch {
  203. return n.evsw
  204. }
  205. func makeNodeInfo(sw *p2p.Switch) *types.NodeInfo {
  206. nodeInfo := &types.NodeInfo{
  207. ChainID: config.GetString("chain_id"),
  208. Moniker: config.GetString("moniker"),
  209. Version: config.GetString("version"),
  210. }
  211. if !sw.IsListening() {
  212. return nodeInfo
  213. }
  214. p2pListener := sw.Listeners()[0]
  215. p2pHost := p2pListener.ExternalAddress().IP.String()
  216. p2pPort := p2pListener.ExternalAddress().Port
  217. rpcListenAddr := config.GetString("rpc_laddr")
  218. _, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr)
  219. rpcPort, err := strconv.Atoi(rpcPortStr)
  220. if err != nil {
  221. panic(Fmt("Expected numeric RPC.ListenAddr port but got %v", rpcPortStr))
  222. }
  223. // We assume that the rpcListener has the same ExternalAddress.
  224. // This is probably true because both P2P and RPC listeners use UPnP.
  225. nodeInfo.Host = p2pHost
  226. nodeInfo.P2PPort = p2pPort
  227. nodeInfo.RPCPort = uint16(rpcPort)
  228. return nodeInfo
  229. }
  230. //------------------------------------------------------------------------------
  231. func RunNode() {
  232. // Create & start node
  233. n := NewNode()
  234. l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), false)
  235. n.AddListener(l)
  236. n.Start()
  237. // If seedNode is provided by config, dial out.
  238. if len(config.GetString("seeds")) > 0 {
  239. n.DialSeed()
  240. }
  241. // Run the RPC server.
  242. if config.GetString("rpc_laddr") != "" {
  243. n.StartRPC()
  244. }
  245. // Sleep forever and then...
  246. TrapSignal(func() {
  247. n.Stop()
  248. })
  249. }