You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

339 lines
9.3 KiB

10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "io/ioutil"
  5. "math/rand"
  6. "net"
  7. "net/http"
  8. "strings"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. "github.com/tendermint/go-crypto"
  12. dbm "github.com/tendermint/go-db"
  13. "github.com/tendermint/go-p2p"
  14. "github.com/tendermint/go-wire"
  15. bc "github.com/tendermint/tendermint/blockchain"
  16. "github.com/tendermint/tendermint/consensus"
  17. "github.com/tendermint/tendermint/events"
  18. mempl "github.com/tendermint/tendermint/mempool"
  19. "github.com/tendermint/tendermint/proxy"
  20. "github.com/tendermint/tendermint/rpc"
  21. "github.com/tendermint/tendermint/rpc/core"
  22. "github.com/tendermint/tendermint/rpc/server"
  23. sm "github.com/tendermint/tendermint/state"
  24. "github.com/tendermint/tendermint/types"
  25. )
  26. import _ "net/http/pprof"
  27. type Node struct {
  28. sw *p2p.Switch
  29. evsw *events.EventSwitch
  30. blockStore *bc.BlockStore
  31. bcReactor *bc.BlockchainReactor
  32. mempoolReactor *mempl.MempoolReactor
  33. consensusState *consensus.ConsensusState
  34. consensusReactor *consensus.ConsensusReactor
  35. privValidator *types.PrivValidator
  36. genesisDoc *types.GenesisDoc
  37. privKey crypto.PrivKeyEd25519
  38. }
  39. func NewNode() *Node {
  40. // Get BlockStore
  41. blockStoreDB := dbm.GetDB("blockstore")
  42. blockStore := bc.NewBlockStore(blockStoreDB)
  43. // Get State
  44. state := getState()
  45. // Create two proxyAppCtx connections,
  46. // one for the consensus and one for the mempool.
  47. proxyAddr := config.GetString("proxy_app")
  48. proxyAppCtxMempool := getProxyApp(proxyAddr, state.LastAppHash)
  49. proxyAppCtxConsensus := getProxyApp(proxyAddr, state.LastAppHash)
  50. // add the chainid to the global config
  51. config.Set("chain_id", state.ChainID)
  52. // Get PrivValidator
  53. privValidatorFile := config.GetString("priv_validator_file")
  54. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  55. // Generate node PrivKey
  56. privKey := crypto.GenPrivKeyEd25519()
  57. // Make event switch
  58. eventSwitch := events.NewEventSwitch()
  59. _, err := eventSwitch.Start()
  60. if err != nil {
  61. Exit(Fmt("Failed to start switch: %v", err))
  62. }
  63. // Make BlockchainReactor
  64. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppCtxConsensus, blockStore, config.GetBool("fast_sync"))
  65. // Make MempoolReactor
  66. mempool := mempl.NewMempool(proxyAppCtxMempool)
  67. mempoolReactor := mempl.NewMempoolReactor(mempool)
  68. // Make ConsensusReactor
  69. consensusState := consensus.NewConsensusState(state.Copy(), proxyAppCtxConsensus, blockStore, mempool)
  70. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync"))
  71. if privValidator != nil {
  72. consensusReactor.SetPrivValidator(privValidator)
  73. }
  74. // Make p2p network switch
  75. sw := p2p.NewSwitch()
  76. sw.AddReactor("MEMPOOL", mempoolReactor)
  77. sw.AddReactor("BLOCKCHAIN", bcReactor)
  78. sw.AddReactor("CONSENSUS", consensusReactor)
  79. // add the event switch to all services
  80. // they should all satisfy events.Eventable
  81. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  82. // run the profile server
  83. profileHost := config.GetString("prof_laddr")
  84. if profileHost != "" {
  85. go func() {
  86. log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
  87. }()
  88. }
  89. return &Node{
  90. sw: sw,
  91. evsw: eventSwitch,
  92. blockStore: blockStore,
  93. bcReactor: bcReactor,
  94. mempoolReactor: mempoolReactor,
  95. consensusState: consensusState,
  96. consensusReactor: consensusReactor,
  97. privValidator: privValidator,
  98. genesisDoc: state.GenesisDoc,
  99. privKey: privKey,
  100. }
  101. }
  102. // Call Start() after adding the listeners.
  103. func (n *Node) Start() error {
  104. n.sw.SetNodeInfo(makeNodeInfo(n.sw, n.privKey))
  105. n.sw.SetNodePrivKey(n.privKey)
  106. _, err := n.sw.Start()
  107. return err
  108. }
  109. func (n *Node) Stop() {
  110. log.Notice("Stopping Node")
  111. // TODO: gracefully disconnect from peers.
  112. n.sw.Stop()
  113. }
  114. // Add the event switch to reactors, mempool, etc.
  115. func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
  116. for _, e := range eventables {
  117. e.SetEventSwitch(evsw)
  118. }
  119. }
  120. // Add a Listener to accept inbound peer connections.
  121. // Add listeners before starting the Node.
  122. // The first listener is the primary listener (in NodeInfo)
  123. func (n *Node) AddListener(l p2p.Listener) {
  124. log.Notice(Fmt("Added %v", l))
  125. n.sw.AddListener(l)
  126. }
  127. // Dial a list of seeds in random order
  128. // Spawns a go routine for each dial
  129. func (n *Node) DialSeed() {
  130. // permute the list, dial them in random order.
  131. seeds := strings.Split(config.GetString("seeds"), ",")
  132. perm := rand.Perm(len(seeds))
  133. for i := 0; i < len(perm); i++ {
  134. go func(i int) {
  135. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  136. j := perm[i]
  137. addr := p2p.NewNetAddressString(seeds[j])
  138. n.dialSeed(addr)
  139. }(i)
  140. }
  141. }
  142. func (n *Node) dialSeed(addr *p2p.NetAddress) {
  143. peer, err := n.sw.DialPeerWithAddress(addr)
  144. if err != nil {
  145. log.Error("Error dialing seed", "error", err)
  146. return
  147. } else {
  148. log.Notice("Connected to seed", "peer", peer)
  149. }
  150. }
  151. func (n *Node) StartRPC() (net.Listener, error) {
  152. core.SetBlockStore(n.blockStore)
  153. core.SetConsensusState(n.consensusState)
  154. core.SetConsensusReactor(n.consensusReactor)
  155. core.SetMempoolReactor(n.mempoolReactor)
  156. core.SetSwitch(n.sw)
  157. core.SetPrivValidator(n.privValidator)
  158. core.SetGenesisDoc(n.genesisDoc)
  159. listenAddr := config.GetString("rpc_laddr")
  160. mux := http.NewServeMux()
  161. wm := rpcserver.NewWebsocketManager(core.Routes, n.evsw)
  162. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  163. rpcserver.RegisterRPCFuncs(mux, core.Routes)
  164. return rpcserver.StartHTTPServer(listenAddr, mux)
  165. }
  166. func (n *Node) Switch() *p2p.Switch {
  167. return n.sw
  168. }
  169. func (n *Node) BlockStore() *bc.BlockStore {
  170. return n.blockStore
  171. }
  172. func (n *Node) ConsensusState() *consensus.ConsensusState {
  173. return n.consensusState
  174. }
  175. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  176. return n.mempoolReactor
  177. }
  178. func (n *Node) EventSwitch() *events.EventSwitch {
  179. return n.evsw
  180. }
  181. func makeNodeInfo(sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
  182. nodeInfo := &p2p.NodeInfo{
  183. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  184. Moniker: config.GetString("moniker"),
  185. Network: config.GetString("chain_id"),
  186. Version: Version,
  187. Other: []string{
  188. Fmt("p2p_version=%v", p2p.Version),
  189. Fmt("rpc_version=%v", rpc.Version),
  190. Fmt("wire_version=%v", wire.Version),
  191. },
  192. }
  193. // include git hash in the nodeInfo if available
  194. if rev, err := ReadFile(config.GetString("revision_file")); err == nil {
  195. nodeInfo.Other = append(nodeInfo.Other, Fmt("revision=%v", string(rev)))
  196. }
  197. if !sw.IsListening() {
  198. return nodeInfo
  199. }
  200. p2pListener := sw.Listeners()[0]
  201. p2pHost := p2pListener.ExternalAddress().IP.String()
  202. p2pPort := p2pListener.ExternalAddress().Port
  203. rpcListenAddr := config.GetString("rpc_laddr")
  204. // We assume that the rpcListener has the same ExternalAddress.
  205. // This is probably true because both P2P and RPC listeners use UPnP,
  206. // except of course if the rpc is only bound to localhost
  207. nodeInfo.ListenAddr = Fmt("%v:%v", p2pHost, p2pPort)
  208. nodeInfo.Other = append(nodeInfo.Other, Fmt("rpc_addr=%v", rpcListenAddr))
  209. return nodeInfo
  210. }
  211. //------------------------------------------------------------------------------
  212. func RunNode() {
  213. // Wait until the genesis doc becomes available
  214. genDocFile := config.GetString("genesis_file")
  215. if !FileExists(genDocFile) {
  216. log.Notice(Fmt("Waiting for genesis file %v...", genDocFile))
  217. for {
  218. time.Sleep(time.Second)
  219. if !FileExists(genDocFile) {
  220. continue
  221. }
  222. jsonBlob, err := ioutil.ReadFile(genDocFile)
  223. if err != nil {
  224. Exit(Fmt("Couldn't read GenesisDoc file: %v", err))
  225. }
  226. genDoc := types.GenesisDocFromJSON(jsonBlob)
  227. if genDoc.ChainID == "" {
  228. PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile))
  229. }
  230. config.Set("chain_id", genDoc.ChainID)
  231. config.Set("genesis_doc", genDoc)
  232. }
  233. }
  234. // Create & start node
  235. n := NewNode()
  236. l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp"))
  237. n.AddListener(l)
  238. err := n.Start()
  239. if err != nil {
  240. Exit(Fmt("Failed to start node: %v", err))
  241. }
  242. log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
  243. // If seedNode is provided by config, dial out.
  244. if config.GetString("seeds") != "" {
  245. n.DialSeed()
  246. }
  247. // Run the RPC server.
  248. if config.GetString("rpc_laddr") != "" {
  249. _, err := n.StartRPC()
  250. if err != nil {
  251. PanicCrisis(err)
  252. }
  253. }
  254. // Sleep forever and then...
  255. TrapSignal(func() {
  256. n.Stop()
  257. })
  258. }
  259. // Load the most recent state from "state" db,
  260. // or create a new one (and save) from genesis.
  261. func getState() *sm.State {
  262. stateDB := dbm.GetDB("state")
  263. state := sm.LoadState(stateDB)
  264. if state == nil {
  265. state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  266. state.Save()
  267. }
  268. return state
  269. }
  270. // Get a connection to the proxyAppCtx addr.
  271. // Check the current hash, and panic if it doesn't match.
  272. func getProxyApp(addr string, hash []byte) proxy.AppContext {
  273. proxyConn, err := Connect(addr)
  274. if err != nil {
  275. Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
  276. }
  277. proxyAppCtx := proxy.NewRemoteAppContext(proxyConn, 1024)
  278. proxyAppCtx.Start()
  279. // Check the hash
  280. currentHash, err := proxyAppCtx.GetHashSync()
  281. if err != nil {
  282. PanicCrisis(Fmt("Error in getting proxyAppCtx hash: %v", err))
  283. }
  284. if !bytes.Equal(hash, currentHash) {
  285. PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash))
  286. }
  287. return proxyAppCtx
  288. }