You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

447 lines
13 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "io/ioutil"
  5. "net"
  6. "net/http"
  7. "strings"
  8. "sync"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. cfg "github.com/tendermint/go-config"
  12. "github.com/tendermint/go-crypto"
  13. dbm "github.com/tendermint/go-db"
  14. "github.com/tendermint/go-events"
  15. "github.com/tendermint/go-p2p"
  16. "github.com/tendermint/go-rpc"
  17. "github.com/tendermint/go-rpc/server"
  18. "github.com/tendermint/go-wire"
  19. bc "github.com/tendermint/tendermint/blockchain"
  20. "github.com/tendermint/tendermint/consensus"
  21. mempl "github.com/tendermint/tendermint/mempool"
  22. "github.com/tendermint/tendermint/proxy"
  23. rpccore "github.com/tendermint/tendermint/rpc/core"
  24. sm "github.com/tendermint/tendermint/state"
  25. "github.com/tendermint/tendermint/types"
  26. "github.com/tendermint/tendermint/version"
  27. tmspcli "github.com/tendermint/tmsp/client"
  28. "github.com/tendermint/tmsp/example/dummy"
  29. "github.com/tendermint/tmsp/example/nil"
  30. )
  31. import _ "net/http/pprof"
  32. type Node struct {
  33. config cfg.Config
  34. sw *p2p.Switch
  35. evsw *events.EventSwitch
  36. blockStore *bc.BlockStore
  37. bcReactor *bc.BlockchainReactor
  38. mempoolReactor *mempl.MempoolReactor
  39. consensusState *consensus.ConsensusState
  40. consensusReactor *consensus.ConsensusReactor
  41. privValidator *types.PrivValidator
  42. genesisDoc *types.GenesisDoc
  43. privKey crypto.PrivKeyEd25519
  44. }
  45. func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr, transport string, appHash []byte) proxy.AppConn) *Node {
  46. EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
  47. // Get BlockStore
  48. blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir"))
  49. blockStore := bc.NewBlockStore(blockStoreDB)
  50. // Get State db
  51. stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
  52. // Get State
  53. state := getState(config, stateDB)
  54. // Create two proxyAppConn connections,
  55. // one for the consensus and one for the mempool.
  56. proxyAddr := config.GetString("proxy_app")
  57. transport := config.GetString("tmsp")
  58. proxyAppConnMempool := getProxyApp(proxyAddr, transport, state.AppHash)
  59. proxyAppConnConsensus := getProxyApp(proxyAddr, transport, state.AppHash)
  60. // add the chainid and number of validators to the global config
  61. config.Set("chain_id", state.ChainID)
  62. config.Set("num_vals", state.Validators.Size())
  63. // Generate node PrivKey
  64. privKey := crypto.GenPrivKeyEd25519()
  65. // Make event switch
  66. eventSwitch := events.NewEventSwitch()
  67. _, err := eventSwitch.Start()
  68. if err != nil {
  69. Exit(Fmt("Failed to start switch: %v", err))
  70. }
  71. // Decide whether to fast-sync or not
  72. // We don't fast-sync when the only validator is us.
  73. fastSync := config.GetBool("fast_sync")
  74. if state.Validators.Size() == 1 {
  75. addr, _ := state.Validators.GetByIndex(0)
  76. if bytes.Equal(privValidator.Address, addr) {
  77. fastSync = false
  78. }
  79. }
  80. // Make BlockchainReactor
  81. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, fastSync)
  82. // Make MempoolReactor
  83. mempool := mempl.NewMempool(config, proxyAppConnMempool)
  84. mempoolReactor := mempl.NewMempoolReactor(config, mempool)
  85. // Make ConsensusReactor
  86. consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
  87. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, fastSync)
  88. if privValidator != nil {
  89. consensusReactor.SetPrivValidator(privValidator)
  90. }
  91. // deterministic accountability
  92. err = consensusState.OpenWAL(config.GetString("cswal"))
  93. if err != nil {
  94. log.Error("Failed to open cswal", "error", err.Error())
  95. }
  96. // Make p2p network switch
  97. sw := p2p.NewSwitch(config)
  98. sw.AddReactor("MEMPOOL", mempoolReactor)
  99. sw.AddReactor("BLOCKCHAIN", bcReactor)
  100. sw.AddReactor("CONSENSUS", consensusReactor)
  101. // add the event switch to all services
  102. // they should all satisfy events.Eventable
  103. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  104. // run the profile server
  105. profileHost := config.GetString("prof_laddr")
  106. if profileHost != "" {
  107. go func() {
  108. log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
  109. }()
  110. }
  111. return &Node{
  112. config: config,
  113. sw: sw,
  114. evsw: eventSwitch,
  115. blockStore: blockStore,
  116. bcReactor: bcReactor,
  117. mempoolReactor: mempoolReactor,
  118. consensusState: consensusState,
  119. consensusReactor: consensusReactor,
  120. privValidator: privValidator,
  121. genesisDoc: state.GenesisDoc,
  122. privKey: privKey,
  123. }
  124. }
  125. // Call Start() after adding the listeners.
  126. func (n *Node) Start() error {
  127. n.sw.SetNodeInfo(makeNodeInfo(n.config, n.sw, n.privKey))
  128. n.sw.SetNodePrivKey(n.privKey)
  129. _, err := n.sw.Start()
  130. return err
  131. }
  132. func (n *Node) Stop() {
  133. log.Notice("Stopping Node")
  134. // TODO: gracefully disconnect from peers.
  135. n.sw.Stop()
  136. }
  137. // Add the event switch to reactors, mempool, etc.
  138. func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
  139. for _, e := range eventables {
  140. e.SetEventSwitch(evsw)
  141. }
  142. }
  143. // Add a Listener to accept inbound peer connections.
  144. // Add listeners before starting the Node.
  145. // The first listener is the primary listener (in NodeInfo)
  146. func (n *Node) AddListener(l p2p.Listener) {
  147. log.Notice(Fmt("Added %v", l))
  148. n.sw.AddListener(l)
  149. }
  150. func (n *Node) StartRPC() ([]net.Listener, error) {
  151. rpccore.SetBlockStore(n.blockStore)
  152. rpccore.SetConsensusState(n.consensusState)
  153. rpccore.SetConsensusReactor(n.consensusReactor)
  154. rpccore.SetMempoolReactor(n.mempoolReactor)
  155. rpccore.SetSwitch(n.sw)
  156. rpccore.SetPrivValidator(n.privValidator)
  157. rpccore.SetGenesisDoc(n.genesisDoc)
  158. listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",")
  159. // we may expose the rpc over both a unix and tcp socket
  160. listeners := make([]net.Listener, len(listenAddrs))
  161. for i, listenAddr := range listenAddrs {
  162. mux := http.NewServeMux()
  163. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  164. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  165. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes)
  166. listener, err := rpcserver.StartHTTPServer(listenAddr, mux)
  167. if err != nil {
  168. return nil, err
  169. }
  170. listeners[i] = listener
  171. }
  172. return listeners, nil
  173. }
  174. func (n *Node) Switch() *p2p.Switch {
  175. return n.sw
  176. }
  177. func (n *Node) BlockStore() *bc.BlockStore {
  178. return n.blockStore
  179. }
  180. func (n *Node) ConsensusState() *consensus.ConsensusState {
  181. return n.consensusState
  182. }
  183. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  184. return n.consensusReactor
  185. }
  186. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  187. return n.mempoolReactor
  188. }
  189. func (n *Node) EventSwitch() *events.EventSwitch {
  190. return n.evsw
  191. }
  192. // XXX: for convenience
  193. func (n *Node) PrivValidator() *types.PrivValidator {
  194. return n.privValidator
  195. }
  196. func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
  197. nodeInfo := &p2p.NodeInfo{
  198. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  199. Moniker: config.GetString("moniker"),
  200. Network: config.GetString("chain_id"),
  201. Version: version.Version,
  202. Other: []string{
  203. Fmt("wire_version=%v", wire.Version),
  204. Fmt("p2p_version=%v", p2p.Version),
  205. Fmt("consensus_version=%v", consensus.Version),
  206. Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  207. },
  208. }
  209. // include git hash in the nodeInfo if available
  210. if rev, err := ReadFile(config.GetString("revision_file")); err == nil {
  211. nodeInfo.Other = append(nodeInfo.Other, Fmt("revision=%v", string(rev)))
  212. }
  213. if !sw.IsListening() {
  214. return nodeInfo
  215. }
  216. p2pListener := sw.Listeners()[0]
  217. p2pHost := p2pListener.ExternalAddress().IP.String()
  218. p2pPort := p2pListener.ExternalAddress().Port
  219. rpcListenAddr := config.GetString("rpc_laddr")
  220. // We assume that the rpcListener has the same ExternalAddress.
  221. // This is probably true because both P2P and RPC listeners use UPnP,
  222. // except of course if the rpc is only bound to localhost
  223. nodeInfo.ListenAddr = Fmt("%v:%v", p2pHost, p2pPort)
  224. nodeInfo.Other = append(nodeInfo.Other, Fmt("rpc_addr=%v", rpcListenAddr))
  225. return nodeInfo
  226. }
  227. // Get a connection to the proxyAppConn addr.
  228. // Check the current hash, and panic if it doesn't match.
  229. func GetProxyApp(addr, transport string, hash []byte) (proxyAppConn proxy.AppConn) {
  230. // use local app (for testing)
  231. switch addr {
  232. case "nilapp":
  233. app := nilapp.NewNilApplication()
  234. mtx := new(sync.Mutex)
  235. proxyAppConn = tmspcli.NewLocalClient(mtx, app)
  236. case "dummy":
  237. app := dummy.NewDummyApplication()
  238. mtx := new(sync.Mutex)
  239. proxyAppConn = tmspcli.NewLocalClient(mtx, app)
  240. default:
  241. // Run forever in a loop
  242. remoteApp, err := proxy.NewRemoteAppConn(addr, transport)
  243. if err != nil {
  244. Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
  245. }
  246. proxyAppConn = remoteApp
  247. }
  248. // Check the hash
  249. res := proxyAppConn.CommitSync()
  250. if res.IsErr() {
  251. PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", res))
  252. }
  253. if !bytes.Equal(hash, res.Data) {
  254. log.Warn(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, res.Data))
  255. }
  256. return proxyAppConn
  257. }
  258. // Load the most recent state from "state" db,
  259. // or create a new one (and save) from genesis.
  260. func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
  261. state := sm.LoadState(stateDB)
  262. if state == nil {
  263. state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  264. state.Save()
  265. }
  266. return state
  267. }
  268. //------------------------------------------------------------------------------
  269. // Users wishing to use an external signer for their validators
  270. // should fork tendermint/tendermint and implement RunNode to
  271. // load their custom priv validator and call NewNode(privVal, getProxyFunc)
  272. func RunNode(config cfg.Config) {
  273. // Wait until the genesis doc becomes available
  274. genDocFile := config.GetString("genesis_file")
  275. if !FileExists(genDocFile) {
  276. log.Notice(Fmt("Waiting for genesis file %v...", genDocFile))
  277. for {
  278. time.Sleep(time.Second)
  279. if !FileExists(genDocFile) {
  280. continue
  281. }
  282. jsonBlob, err := ioutil.ReadFile(genDocFile)
  283. if err != nil {
  284. Exit(Fmt("Couldn't read GenesisDoc file: %v", err))
  285. }
  286. genDoc := types.GenesisDocFromJSON(jsonBlob)
  287. if genDoc.ChainID == "" {
  288. PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile))
  289. }
  290. config.Set("chain_id", genDoc.ChainID)
  291. }
  292. }
  293. // Get PrivValidator
  294. privValidatorFile := config.GetString("priv_validator_file")
  295. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  296. // Create & start node
  297. n := NewNode(config, privValidator, GetProxyApp)
  298. l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp"))
  299. n.AddListener(l)
  300. err := n.Start()
  301. if err != nil {
  302. Exit(Fmt("Failed to start node: %v", err))
  303. }
  304. log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
  305. // If seedNode is provided by config, dial out.
  306. if config.GetString("seeds") != "" {
  307. seeds := strings.Split(config.GetString("seeds"), ",")
  308. n.sw.DialSeeds(seeds)
  309. }
  310. // Run the RPC server.
  311. if config.GetString("rpc_laddr") != "" {
  312. _, err := n.StartRPC()
  313. if err != nil {
  314. PanicCrisis(err)
  315. }
  316. }
  317. // Sleep forever and then...
  318. TrapSignal(func() {
  319. n.Stop()
  320. })
  321. }
  322. func (n *Node) NodeInfo() *p2p.NodeInfo {
  323. return n.sw.NodeInfo()
  324. }
  325. func (n *Node) DialSeeds(seeds []string) {
  326. n.sw.DialSeeds(seeds)
  327. }
  328. //------------------------------------------------------------------------------
  329. // replay
  330. // convenience for replay mode
  331. func newConsensusState(config cfg.Config) *consensus.ConsensusState {
  332. // Get BlockStore
  333. blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir"))
  334. blockStore := bc.NewBlockStore(blockStoreDB)
  335. // Get State
  336. stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
  337. state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  338. // Create two proxyAppConn connections,
  339. // one for the consensus and one for the mempool.
  340. proxyAddr := config.GetString("proxy_app")
  341. transport := config.GetString("tmsp")
  342. proxyAppConnMempool := GetProxyApp(proxyAddr, transport, state.AppHash)
  343. proxyAppConnConsensus := GetProxyApp(proxyAddr, transport, state.AppHash)
  344. // add the chainid to the global config
  345. config.Set("chain_id", state.ChainID)
  346. // Make event switch
  347. eventSwitch := events.NewEventSwitch()
  348. _, err := eventSwitch.Start()
  349. if err != nil {
  350. Exit(Fmt("Failed to start event switch: %v", err))
  351. }
  352. mempool := mempl.NewMempool(config, proxyAppConnMempool)
  353. consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
  354. consensusState.SetEventSwitch(eventSwitch)
  355. return consensusState
  356. }
  357. func RunReplayConsole(config cfg.Config) {
  358. walFile := config.GetString("cswal")
  359. if walFile == "" {
  360. Exit("cswal file name not set in tendermint config")
  361. }
  362. consensusState := newConsensusState(config)
  363. if err := consensusState.ReplayConsole(walFile); err != nil {
  364. Exit(Fmt("Error during consensus replay: %v", err))
  365. }
  366. }
  367. func RunReplay(config cfg.Config) {
  368. walFile := config.GetString("cswal")
  369. if walFile == "" {
  370. Exit("cswal file name not set in tendermint config")
  371. }
  372. consensusState := newConsensusState(config)
  373. if err := consensusState.ReplayMessages(walFile); err != nil {
  374. Exit(Fmt("Error during consensus replay: %v", err))
  375. }
  376. log.Notice("Replay run successfully")
  377. }