You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

311 lines
8.5 KiB

10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "math/rand"
  5. "net"
  6. "net/http"
  7. "os"
  8. "strconv"
  9. "strings"
  10. "time"
  11. acm "github.com/tendermint/tendermint/account"
  12. bc "github.com/tendermint/tendermint/blockchain"
  13. . "github.com/tendermint/tendermint/common"
  14. "github.com/tendermint/tendermint/consensus"
  15. dbm "github.com/tendermint/tendermint/db"
  16. "github.com/tendermint/tendermint/events"
  17. mempl "github.com/tendermint/tendermint/mempool"
  18. "github.com/tendermint/tendermint/p2p"
  19. "github.com/tendermint/tendermint/rpc/core"
  20. "github.com/tendermint/tendermint/rpc/server"
  21. sm "github.com/tendermint/tendermint/state"
  22. stypes "github.com/tendermint/tendermint/state/types"
  23. "github.com/tendermint/tendermint/types"
  24. "github.com/tendermint/tendermint/wire"
  25. )
  26. import _ "net/http/pprof"
  27. type Node struct {
  28. sw *p2p.Switch
  29. evsw *events.EventSwitch
  30. book *p2p.AddrBook
  31. blockStore *bc.BlockStore
  32. pexReactor *p2p.PEXReactor
  33. bcReactor *bc.BlockchainReactor
  34. mempoolReactor *mempl.MempoolReactor
  35. consensusState *consensus.ConsensusState
  36. consensusReactor *consensus.ConsensusReactor
  37. privValidator *types.PrivValidator
  38. genDoc *stypes.GenesisDoc
  39. privKey acm.PrivKeyEd25519
  40. }
  41. func NewNode() *Node {
  42. // Get BlockStore
  43. blockStoreDB := dbm.GetDB("blockstore")
  44. blockStore := bc.NewBlockStore(blockStoreDB)
  45. // Get State
  46. stateDB := dbm.GetDB("state")
  47. state := sm.LoadState(stateDB)
  48. var genDoc *stypes.GenesisDoc
  49. if state == nil {
  50. genDoc, state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  51. state.Save()
  52. // write the gendoc to db
  53. buf, n, err := new(bytes.Buffer), new(int64), new(error)
  54. wire.WriteJSON(genDoc, buf, n, err)
  55. stateDB.Set(stypes.GenDocKey, buf.Bytes())
  56. if *err != nil {
  57. Exit(Fmt("Unable to write gendoc to db: %v", err))
  58. }
  59. } else {
  60. genDocBytes := stateDB.Get(stypes.GenDocKey)
  61. err := new(error)
  62. wire.ReadJSONPtr(&genDoc, genDocBytes, err)
  63. if *err != nil {
  64. Exit(Fmt("Unable to read gendoc from db: %v", err))
  65. }
  66. }
  67. // add the chainid to the global config
  68. config.Set("chain_id", state.ChainID)
  69. // Get PrivValidator
  70. var privValidator *types.PrivValidator
  71. privValidatorFile := config.GetString("priv_validator_file")
  72. if _, err := os.Stat(privValidatorFile); err == nil {
  73. privValidator = types.LoadPrivValidator(privValidatorFile)
  74. log.Notice("Loaded PrivValidator",
  75. "file", privValidatorFile, "privValidator", privValidator)
  76. } else {
  77. privValidator = types.GenPrivValidator()
  78. privValidator.SetFile(privValidatorFile)
  79. privValidator.Save()
  80. log.Notice("Generated PrivValidator", "file", privValidatorFile)
  81. }
  82. // Generate node PrivKey
  83. privKey := acm.GenPrivKeyEd25519()
  84. // Make event switch
  85. eventSwitch := events.NewEventSwitch()
  86. _, err := eventSwitch.Start()
  87. if err != nil {
  88. Exit(Fmt("Failed to start switch: %v", err))
  89. }
  90. // Make PEXReactor
  91. book := p2p.NewAddrBook(config.GetString("addrbook_file"))
  92. pexReactor := p2p.NewPEXReactor(book)
  93. // Make BlockchainReactor
  94. bcReactor := bc.NewBlockchainReactor(state.Copy(), blockStore, config.GetBool("fast_sync"))
  95. // Make MempoolReactor
  96. mempool := mempl.NewMempool(state.Copy())
  97. mempoolReactor := mempl.NewMempoolReactor(mempool)
  98. // Make ConsensusReactor
  99. consensusState := consensus.NewConsensusState(state.Copy(), blockStore, mempoolReactor)
  100. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync"))
  101. if privValidator != nil {
  102. consensusReactor.SetPrivValidator(privValidator)
  103. }
  104. // Make p2p network switch
  105. sw := p2p.NewSwitch()
  106. sw.AddReactor("PEX", pexReactor)
  107. sw.AddReactor("MEMPOOL", mempoolReactor)
  108. sw.AddReactor("BLOCKCHAIN", bcReactor)
  109. sw.AddReactor("CONSENSUS", consensusReactor)
  110. // add the event switch to all services
  111. // they should all satisfy events.Eventable
  112. SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
  113. return &Node{
  114. sw: sw,
  115. evsw: eventSwitch,
  116. book: book,
  117. blockStore: blockStore,
  118. pexReactor: pexReactor,
  119. bcReactor: bcReactor,
  120. mempoolReactor: mempoolReactor,
  121. consensusState: consensusState,
  122. consensusReactor: consensusReactor,
  123. privValidator: privValidator,
  124. genDoc: genDoc,
  125. privKey: privKey,
  126. }
  127. }
  128. // Call Start() after adding the listeners.
  129. func (n *Node) Start() error {
  130. n.book.Start()
  131. n.sw.SetNodeInfo(makeNodeInfo(n.sw, n.privKey))
  132. n.sw.SetNodePrivKey(n.privKey)
  133. _, err := n.sw.Start()
  134. return err
  135. }
  136. func (n *Node) Stop() {
  137. log.Notice("Stopping Node")
  138. // TODO: gracefully disconnect from peers.
  139. n.sw.Stop()
  140. n.book.Stop()
  141. }
  142. // Add the event switch to reactors, mempool, etc.
  143. func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
  144. for _, e := range eventables {
  145. e.SetFireable(evsw)
  146. }
  147. }
  148. // Add a Listener to accept inbound peer connections.
  149. // Add listeners before starting the Node.
  150. // The first listener is the primary listener (in NodeInfo)
  151. func (n *Node) AddListener(l p2p.Listener) {
  152. log.Notice(Fmt("Added %v", l))
  153. n.sw.AddListener(l)
  154. n.book.AddOurAddress(l.ExternalAddress())
  155. }
  156. // Dial a list of seeds in random order
  157. // Spawns a go routine for each dial
  158. func (n *Node) DialSeed() {
  159. // permute the list, dial them in random order.
  160. seeds := strings.Split(config.GetString("seeds"), ",")
  161. perm := rand.Perm(len(seeds))
  162. for i := 0; i < len(perm); i++ {
  163. go func(i int) {
  164. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  165. j := perm[i]
  166. addr := p2p.NewNetAddressString(seeds[j])
  167. n.dialSeed(addr)
  168. }(i)
  169. }
  170. }
  171. func (n *Node) dialSeed(addr *p2p.NetAddress) {
  172. peer, err := n.sw.DialPeerWithAddress(addr)
  173. if err != nil {
  174. log.Error("Error dialing seed", "error", err)
  175. //n.book.MarkAttempt(addr)
  176. return
  177. } else {
  178. log.Notice("Connected to seed", "peer", peer)
  179. n.book.AddAddress(addr, addr)
  180. }
  181. }
  182. func (n *Node) StartRPC() (net.Listener, error) {
  183. core.SetBlockStore(n.blockStore)
  184. core.SetConsensusState(n.consensusState)
  185. core.SetConsensusReactor(n.consensusReactor)
  186. core.SetMempoolReactor(n.mempoolReactor)
  187. core.SetSwitch(n.sw)
  188. core.SetPrivValidator(n.privValidator)
  189. core.SetGenDoc(n.genDoc)
  190. listenAddr := config.GetString("rpc_laddr")
  191. mux := http.NewServeMux()
  192. wm := rpcserver.NewWebsocketManager(core.Routes, n.evsw)
  193. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  194. rpcserver.RegisterRPCFuncs(mux, core.Routes)
  195. return rpcserver.StartHTTPServer(listenAddr, mux)
  196. }
  197. func (n *Node) Switch() *p2p.Switch {
  198. return n.sw
  199. }
  200. func (n *Node) BlockStore() *bc.BlockStore {
  201. return n.blockStore
  202. }
  203. func (n *Node) ConsensusState() *consensus.ConsensusState {
  204. return n.consensusState
  205. }
  206. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  207. return n.mempoolReactor
  208. }
  209. func (n *Node) EventSwitch() *events.EventSwitch {
  210. return n.evsw
  211. }
  212. func makeNodeInfo(sw *p2p.Switch, privKey acm.PrivKeyEd25519) *types.NodeInfo {
  213. nodeInfo := &types.NodeInfo{
  214. PubKey: privKey.PubKey().(acm.PubKeyEd25519),
  215. Moniker: config.GetString("moniker"),
  216. ChainID: config.GetString("chain_id"),
  217. Version: config.GetString("version"),
  218. }
  219. // include git hash in the nodeInfo if available
  220. if rev, err := ReadFile(config.GetString("revisions_file")); err == nil {
  221. nodeInfo.Revision = string(rev)
  222. }
  223. if !sw.IsListening() {
  224. return nodeInfo
  225. }
  226. p2pListener := sw.Listeners()[0]
  227. p2pHost := p2pListener.ExternalAddress().IP.String()
  228. p2pPort := p2pListener.ExternalAddress().Port
  229. rpcListenAddr := config.GetString("rpc_laddr")
  230. _, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr)
  231. rpcPort, err := strconv.Atoi(rpcPortStr)
  232. if err != nil {
  233. PanicSanity(Fmt("Expected numeric RPC.ListenAddr port but got %v", rpcPortStr))
  234. }
  235. // We assume that the rpcListener has the same ExternalAddress.
  236. // This is probably true because both P2P and RPC listeners use UPnP,
  237. // except of course if the rpc is only bound to localhost
  238. nodeInfo.Host = p2pHost
  239. nodeInfo.P2PPort = p2pPort
  240. nodeInfo.RPCPort = uint16(rpcPort)
  241. return nodeInfo
  242. }
  243. //------------------------------------------------------------------------------
  244. func RunNode() {
  245. // Create & start node
  246. n := NewNode()
  247. l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"))
  248. n.AddListener(l)
  249. err := n.Start()
  250. if err != nil {
  251. Exit(Fmt("Failed to start node: %v", err))
  252. }
  253. log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
  254. // If seedNode is provided by config, dial out.
  255. if config.GetString("seeds") != "" {
  256. n.DialSeed()
  257. }
  258. // Run the RPC server.
  259. if config.GetString("rpc_laddr") != "" {
  260. _, err := n.StartRPC()
  261. if err != nil {
  262. PanicCrisis(err)
  263. }
  264. }
  265. // Sleep forever and then...
  266. TrapSignal(func() {
  267. n.Stop()
  268. })
  269. }