You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

408 lines
12 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "io/ioutil"
  5. "net"
  6. "net/http"
  7. "strings"
  8. "sync"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. "github.com/tendermint/go-crypto"
  12. dbm "github.com/tendermint/go-db"
  13. "github.com/tendermint/go-events"
  14. "github.com/tendermint/go-p2p"
  15. "github.com/tendermint/go-rpc"
  16. "github.com/tendermint/go-rpc/server"
  17. "github.com/tendermint/go-wire"
  18. bc "github.com/tendermint/tendermint/blockchain"
  19. "github.com/tendermint/tendermint/consensus"
  20. mempl "github.com/tendermint/tendermint/mempool"
  21. "github.com/tendermint/tendermint/proxy"
  22. rpccore "github.com/tendermint/tendermint/rpc/core"
  23. sm "github.com/tendermint/tendermint/state"
  24. "github.com/tendermint/tendermint/types"
  25. "github.com/tendermint/tendermint/version"
  26. "github.com/tendermint/tmsp/example/dummy"
  27. "github.com/tendermint/tmsp/example/nil"
  28. )
  29. import _ "net/http/pprof"
  30. type Node struct {
  31. sw *p2p.Switch
  32. evsw *events.EventSwitch
  33. blockStore *bc.BlockStore
  34. bcReactor *bc.BlockchainReactor
  35. mempoolReactor *mempl.MempoolReactor
  36. consensusState *consensus.ConsensusState
  37. consensusReactor *consensus.ConsensusReactor
  38. privValidator *types.PrivValidator
  39. genesisDoc *types.GenesisDoc
  40. privKey crypto.PrivKeyEd25519
  41. }
  42. func NewNode(privValidator *types.PrivValidator) *Node {
  43. // Get BlockStore
  44. blockStoreDB := dbm.GetDB("blockstore")
  45. blockStore := bc.NewBlockStore(blockStoreDB)
  46. // Get State
  47. state := getState()
  48. // Create two proxyAppConn connections,
  49. // one for the consensus and one for the mempool.
  50. proxyAddr := config.GetString("proxy_app")
  51. proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash)
  52. proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash)
  53. // add the chainid and number of validators to the global config
  54. config.Set("chain_id", state.ChainID)
  55. config.Set("num_vals", state.Validators.Size())
  56. // Generate node PrivKey
  57. privKey := crypto.GenPrivKeyEd25519()
  58. // Make event switch
  59. eventSwitch := events.NewEventSwitch()
  60. _, err := eventSwitch.Start()
  61. if err != nil {
  62. Exit(Fmt("Failed to start switch: %v", err))
  63. }
  64. // Make BlockchainReactor
  65. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, config.GetBool("fast_sync"))
  66. // Make MempoolReactor
  67. mempool := mempl.NewMempool(proxyAppConnMempool)
  68. mempoolReactor := mempl.NewMempoolReactor(mempool)
  69. // Make ConsensusReactor
  70. consensusState := consensus.NewConsensusState(state.Copy(), proxyAppConnConsensus, blockStore, mempool)
  71. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync"))
  72. if privValidator != nil {
  73. consensusReactor.SetPrivValidator(privValidator)
  74. }
  75. // deterministic accountability
  76. err = consensusState.OpenWAL(config.GetString("cswal"))
  77. if err != nil {
  78. log.Error("Failed to open cswal", "error", err.Error())
  79. }
  80. // Make p2p network switch
  81. sw := p2p.NewSwitch()
  82. sw.AddReactor("MEMPOOL", mempoolReactor)
  83. sw.AddReactor("BLOCKCHAIN", bcReactor)
  84. sw.AddReactor("CONSENSUS", consensusReactor)
  85. // add the event switch to all services
  86. // they should all satisfy events.Eventable
  87. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  88. // run the profile server
  89. profileHost := config.GetString("prof_laddr")
  90. if profileHost != "" {
  91. go func() {
  92. log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
  93. }()
  94. }
  95. return &Node{
  96. sw: sw,
  97. evsw: eventSwitch,
  98. blockStore: blockStore,
  99. bcReactor: bcReactor,
  100. mempoolReactor: mempoolReactor,
  101. consensusState: consensusState,
  102. consensusReactor: consensusReactor,
  103. privValidator: privValidator,
  104. genesisDoc: state.GenesisDoc,
  105. privKey: privKey,
  106. }
  107. }
  108. // Call Start() after adding the listeners.
  109. func (n *Node) Start() error {
  110. n.sw.SetNodeInfo(makeNodeInfo(n.sw, n.privKey))
  111. n.sw.SetNodePrivKey(n.privKey)
  112. _, err := n.sw.Start()
  113. return err
  114. }
  115. func (n *Node) Stop() {
  116. log.Notice("Stopping Node")
  117. // TODO: gracefully disconnect from peers.
  118. n.sw.Stop()
  119. }
  120. // Add the event switch to reactors, mempool, etc.
  121. func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
  122. for _, e := range eventables {
  123. e.SetEventSwitch(evsw)
  124. }
  125. }
  126. // Add a Listener to accept inbound peer connections.
  127. // Add listeners before starting the Node.
  128. // The first listener is the primary listener (in NodeInfo)
  129. func (n *Node) AddListener(l p2p.Listener) {
  130. log.Notice(Fmt("Added %v", l))
  131. n.sw.AddListener(l)
  132. }
  133. func (n *Node) StartRPC() ([]net.Listener, error) {
  134. rpccore.SetBlockStore(n.blockStore)
  135. rpccore.SetConsensusState(n.consensusState)
  136. rpccore.SetConsensusReactor(n.consensusReactor)
  137. rpccore.SetMempoolReactor(n.mempoolReactor)
  138. rpccore.SetSwitch(n.sw)
  139. rpccore.SetPrivValidator(n.privValidator)
  140. rpccore.SetGenesisDoc(n.genesisDoc)
  141. listenAddrs := strings.Split(config.GetString("rpc_laddr"), ",")
  142. // we may expose the rpc over both a unix and tcp socket
  143. listeners := make([]net.Listener, len(listenAddrs))
  144. for i, listenAddr := range listenAddrs {
  145. mux := http.NewServeMux()
  146. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  147. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  148. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes)
  149. listener, err := rpcserver.StartHTTPServer(listenAddr, mux)
  150. if err != nil {
  151. return nil, err
  152. }
  153. listeners[i] = listener
  154. }
  155. return listeners, nil
  156. }
  157. func (n *Node) Switch() *p2p.Switch {
  158. return n.sw
  159. }
  160. func (n *Node) BlockStore() *bc.BlockStore {
  161. return n.blockStore
  162. }
  163. func (n *Node) ConsensusState() *consensus.ConsensusState {
  164. return n.consensusState
  165. }
  166. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  167. return n.mempoolReactor
  168. }
  169. func (n *Node) EventSwitch() *events.EventSwitch {
  170. return n.evsw
  171. }
  172. func makeNodeInfo(sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
  173. nodeInfo := &p2p.NodeInfo{
  174. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  175. Moniker: config.GetString("moniker"),
  176. Network: config.GetString("chain_id"),
  177. Version: version.Version,
  178. Other: []string{
  179. Fmt("wire_version=%v", wire.Version),
  180. Fmt("p2p_version=%v", p2p.Version),
  181. Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  182. },
  183. }
  184. // include git hash in the nodeInfo if available
  185. if rev, err := ReadFile(config.GetString("revision_file")); err == nil {
  186. nodeInfo.Other = append(nodeInfo.Other, Fmt("revision=%v", string(rev)))
  187. }
  188. if !sw.IsListening() {
  189. return nodeInfo
  190. }
  191. p2pListener := sw.Listeners()[0]
  192. p2pHost := p2pListener.ExternalAddress().IP.String()
  193. p2pPort := p2pListener.ExternalAddress().Port
  194. rpcListenAddr := config.GetString("rpc_laddr")
  195. // We assume that the rpcListener has the same ExternalAddress.
  196. // This is probably true because both P2P and RPC listeners use UPnP,
  197. // except of course if the rpc is only bound to localhost
  198. nodeInfo.ListenAddr = Fmt("%v:%v", p2pHost, p2pPort)
  199. nodeInfo.Other = append(nodeInfo.Other, Fmt("rpc_addr=%v", rpcListenAddr))
  200. return nodeInfo
  201. }
  202. // Get a connection to the proxyAppConn addr.
  203. // Check the current hash, and panic if it doesn't match.
  204. func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) {
  205. // use local app (for testing)
  206. switch addr {
  207. case "nilapp":
  208. app := nilapp.NewNilApplication()
  209. mtx := new(sync.Mutex)
  210. proxyAppConn = proxy.NewLocalAppConn(mtx, app)
  211. case "dummy":
  212. app := dummy.NewDummyApplication()
  213. mtx := new(sync.Mutex)
  214. proxyAppConn = proxy.NewLocalAppConn(mtx, app)
  215. default:
  216. remoteApp, err := proxy.NewRemoteAppConn(addr)
  217. if err != nil {
  218. Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
  219. }
  220. proxyAppConn = remoteApp
  221. }
  222. // Check the hash
  223. currentHash, _, err := proxyAppConn.CommitSync()
  224. if err != nil {
  225. PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", err))
  226. }
  227. if !bytes.Equal(hash, currentHash) {
  228. PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash))
  229. }
  230. return proxyAppConn
  231. }
  232. // Load the most recent state from "state" db,
  233. // or create a new one (and save) from genesis.
  234. func getState() *sm.State {
  235. stateDB := dbm.GetDB("state")
  236. state := sm.LoadState(stateDB)
  237. if state == nil {
  238. state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  239. state.Save()
  240. }
  241. return state
  242. }
  243. //------------------------------------------------------------------------------
  244. // Users wishing to use an external signer for their validators
  245. // should fork tendermint/tendermint and implement RunNode to
  246. // load their custom priv validator and call NewNode(privVal)
  247. func RunNode() {
  248. // Wait until the genesis doc becomes available
  249. genDocFile := config.GetString("genesis_file")
  250. if !FileExists(genDocFile) {
  251. log.Notice(Fmt("Waiting for genesis file %v...", genDocFile))
  252. for {
  253. time.Sleep(time.Second)
  254. if !FileExists(genDocFile) {
  255. continue
  256. }
  257. jsonBlob, err := ioutil.ReadFile(genDocFile)
  258. if err != nil {
  259. Exit(Fmt("Couldn't read GenesisDoc file: %v", err))
  260. }
  261. genDoc := types.GenesisDocFromJSON(jsonBlob)
  262. if genDoc.ChainID == "" {
  263. PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile))
  264. }
  265. config.Set("chain_id", genDoc.ChainID)
  266. config.Set("genesis_doc", genDoc)
  267. }
  268. }
  269. // Get PrivValidator
  270. privValidatorFile := config.GetString("priv_validator_file")
  271. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  272. // Create & start node
  273. n := NewNode(privValidator)
  274. l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp"))
  275. n.AddListener(l)
  276. err := n.Start()
  277. if err != nil {
  278. Exit(Fmt("Failed to start node: %v", err))
  279. }
  280. log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
  281. // If seedNode is provided by config, dial out.
  282. if config.GetString("seeds") != "" {
  283. seeds := strings.Split(config.GetString("seeds"), ",")
  284. n.sw.DialSeeds(seeds)
  285. }
  286. // Run the RPC server.
  287. if config.GetString("rpc_laddr") != "" {
  288. _, err := n.StartRPC()
  289. if err != nil {
  290. PanicCrisis(err)
  291. }
  292. }
  293. // Sleep forever and then...
  294. TrapSignal(func() {
  295. n.Stop()
  296. })
  297. }
  298. //------------------------------------------------------------------------------
  299. // replay
  300. // convenience for replay mode
  301. func newConsensusState() *consensus.ConsensusState {
  302. // Get BlockStore
  303. blockStoreDB := dbm.GetDB("blockstore")
  304. blockStore := bc.NewBlockStore(blockStoreDB)
  305. // Get State
  306. stateDB := dbm.GetDB("state")
  307. state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  308. // Create two proxyAppConn connections,
  309. // one for the consensus and one for the mempool.
  310. proxyAddr := config.GetString("proxy_app")
  311. proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash)
  312. proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash)
  313. // add the chainid to the global config
  314. config.Set("chain_id", state.ChainID)
  315. // Make event switch
  316. eventSwitch := events.NewEventSwitch()
  317. _, err := eventSwitch.Start()
  318. if err != nil {
  319. Exit(Fmt("Failed to start event switch: %v", err))
  320. }
  321. mempool := mempl.NewMempool(proxyAppConnMempool)
  322. consensusState := consensus.NewConsensusState(state.Copy(), proxyAppConnConsensus, blockStore, mempool)
  323. consensusState.SetEventSwitch(eventSwitch)
  324. return consensusState
  325. }
  326. func RunReplayConsole() {
  327. walFile := config.GetString("cswal")
  328. if walFile == "" {
  329. Exit("cswal file name not set in tendermint config")
  330. }
  331. consensusState := newConsensusState()
  332. if err := consensusState.ReplayConsole(walFile); err != nil {
  333. Exit(Fmt("Error during consensus replay: %v", err))
  334. }
  335. }
  336. func RunReplay() {
  337. walFile := config.GetString("cswal")
  338. if walFile == "" {
  339. Exit("cswal file name not set in tendermint config")
  340. }
  341. consensusState := newConsensusState()
  342. if err := consensusState.ReplayMessages(walFile); err != nil {
  343. Exit(Fmt("Error during consensus replay: %v", err))
  344. }
  345. log.Notice("Replay run successfully")
  346. }