You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

445 lines
13 KiB

10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
9 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
9 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "io/ioutil"
  5. "net"
  6. "net/http"
  7. "strings"
  8. "sync"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. cfg "github.com/tendermint/go-config"
  12. "github.com/tendermint/go-crypto"
  13. dbm "github.com/tendermint/go-db"
  14. "github.com/tendermint/go-events"
  15. "github.com/tendermint/go-p2p"
  16. "github.com/tendermint/go-rpc"
  17. "github.com/tendermint/go-rpc/server"
  18. "github.com/tendermint/go-wire"
  19. bc "github.com/tendermint/tendermint/blockchain"
  20. "github.com/tendermint/tendermint/consensus"
  21. mempl "github.com/tendermint/tendermint/mempool"
  22. "github.com/tendermint/tendermint/proxy"
  23. rpccore "github.com/tendermint/tendermint/rpc/core"
  24. sm "github.com/tendermint/tendermint/state"
  25. "github.com/tendermint/tendermint/types"
  26. "github.com/tendermint/tendermint/version"
  27. tmspcli "github.com/tendermint/tmsp/client"
  28. "github.com/tendermint/tmsp/example/dummy"
  29. "github.com/tendermint/tmsp/example/nil"
  30. )
  31. import _ "net/http/pprof"
  32. type Node struct {
  33. config cfg.Config
  34. sw *p2p.Switch
  35. evsw *events.EventSwitch
  36. blockStore *bc.BlockStore
  37. bcReactor *bc.BlockchainReactor
  38. mempoolReactor *mempl.MempoolReactor
  39. consensusState *consensus.ConsensusState
  40. consensusReactor *consensus.ConsensusReactor
  41. privValidator *types.PrivValidator
  42. genesisDoc *types.GenesisDoc
  43. privKey crypto.PrivKeyEd25519
  44. }
  45. func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr string, appHash []byte) proxy.AppConn) *Node {
  46. EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
  47. // Get BlockStore
  48. blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir"))
  49. blockStore := bc.NewBlockStore(blockStoreDB)
  50. // Get State db
  51. stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
  52. // Get State
  53. state := getState(config, stateDB)
  54. // Create two proxyAppConn connections,
  55. // one for the consensus and one for the mempool.
  56. proxyAddr := config.GetString("proxy_app")
  57. proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash)
  58. proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash)
  59. // add the chainid and number of validators to the global config
  60. config.Set("chain_id", state.ChainID)
  61. config.Set("num_vals", state.Validators.Size())
  62. // Generate node PrivKey
  63. privKey := crypto.GenPrivKeyEd25519()
  64. // Make event switch
  65. eventSwitch := events.NewEventSwitch()
  66. _, err := eventSwitch.Start()
  67. if err != nil {
  68. Exit(Fmt("Failed to start switch: %v", err))
  69. }
  70. // Decide whether to fast-sync or not
  71. // We don't fast-sync when the only validator is us.
  72. fastSync := config.GetBool("fast_sync")
  73. if state.Validators.Size() == 1 {
  74. addr, _ := state.Validators.GetByIndex(0)
  75. if bytes.Equal(privValidator.Address, addr) {
  76. fastSync = false
  77. }
  78. }
  79. // Make BlockchainReactor
  80. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, fastSync)
  81. // Make MempoolReactor
  82. mempool := mempl.NewMempool(config, proxyAppConnMempool)
  83. mempoolReactor := mempl.NewMempoolReactor(config, mempool)
  84. // Make ConsensusReactor
  85. consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
  86. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, fastSync)
  87. if privValidator != nil {
  88. consensusReactor.SetPrivValidator(privValidator)
  89. }
  90. // deterministic accountability
  91. err = consensusState.OpenWAL(config.GetString("cswal"))
  92. if err != nil {
  93. log.Error("Failed to open cswal", "error", err.Error())
  94. }
  95. // Make p2p network switch
  96. sw := p2p.NewSwitch(config)
  97. sw.AddReactor("MEMPOOL", mempoolReactor)
  98. sw.AddReactor("BLOCKCHAIN", bcReactor)
  99. sw.AddReactor("CONSENSUS", consensusReactor)
  100. // add the event switch to all services
  101. // they should all satisfy events.Eventable
  102. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  103. // run the profile server
  104. profileHost := config.GetString("prof_laddr")
  105. if profileHost != "" {
  106. go func() {
  107. log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
  108. }()
  109. }
  110. return &Node{
  111. config: config,
  112. sw: sw,
  113. evsw: eventSwitch,
  114. blockStore: blockStore,
  115. bcReactor: bcReactor,
  116. mempoolReactor: mempoolReactor,
  117. consensusState: consensusState,
  118. consensusReactor: consensusReactor,
  119. privValidator: privValidator,
  120. genesisDoc: state.GenesisDoc,
  121. privKey: privKey,
  122. }
  123. }
  124. // Call Start() after adding the listeners.
  125. func (n *Node) Start() error {
  126. n.sw.SetNodeInfo(makeNodeInfo(n.config, n.sw, n.privKey))
  127. n.sw.SetNodePrivKey(n.privKey)
  128. _, err := n.sw.Start()
  129. return err
  130. }
  131. func (n *Node) Stop() {
  132. log.Notice("Stopping Node")
  133. // TODO: gracefully disconnect from peers.
  134. n.sw.Stop()
  135. }
  136. // Add the event switch to reactors, mempool, etc.
  137. func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
  138. for _, e := range eventables {
  139. e.SetEventSwitch(evsw)
  140. }
  141. }
  142. // Add a Listener to accept inbound peer connections.
  143. // Add listeners before starting the Node.
  144. // The first listener is the primary listener (in NodeInfo)
  145. func (n *Node) AddListener(l p2p.Listener) {
  146. log.Notice(Fmt("Added %v", l))
  147. n.sw.AddListener(l)
  148. }
  149. func (n *Node) StartRPC() ([]net.Listener, error) {
  150. rpccore.SetBlockStore(n.blockStore)
  151. rpccore.SetConsensusState(n.consensusState)
  152. rpccore.SetConsensusReactor(n.consensusReactor)
  153. rpccore.SetMempoolReactor(n.mempoolReactor)
  154. rpccore.SetSwitch(n.sw)
  155. rpccore.SetPrivValidator(n.privValidator)
  156. rpccore.SetGenesisDoc(n.genesisDoc)
  157. listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",")
  158. // we may expose the rpc over both a unix and tcp socket
  159. listeners := make([]net.Listener, len(listenAddrs))
  160. for i, listenAddr := range listenAddrs {
  161. mux := http.NewServeMux()
  162. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  163. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  164. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes)
  165. listener, err := rpcserver.StartHTTPServer(listenAddr, mux)
  166. if err != nil {
  167. return nil, err
  168. }
  169. listeners[i] = listener
  170. }
  171. return listeners, nil
  172. }
  173. func (n *Node) Switch() *p2p.Switch {
  174. return n.sw
  175. }
  176. func (n *Node) BlockStore() *bc.BlockStore {
  177. return n.blockStore
  178. }
  179. func (n *Node) ConsensusState() *consensus.ConsensusState {
  180. return n.consensusState
  181. }
  182. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  183. return n.consensusReactor
  184. }
  185. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  186. return n.mempoolReactor
  187. }
  188. func (n *Node) EventSwitch() *events.EventSwitch {
  189. return n.evsw
  190. }
  191. // XXX: for convenience
  192. func (n *Node) PrivValidator() *types.PrivValidator {
  193. return n.privValidator
  194. }
  195. func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
  196. nodeInfo := &p2p.NodeInfo{
  197. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  198. Moniker: config.GetString("moniker"),
  199. Network: config.GetString("chain_id"),
  200. Version: version.Version,
  201. Other: []string{
  202. Fmt("wire_version=%v", wire.Version),
  203. Fmt("p2p_version=%v", p2p.Version),
  204. Fmt("consensus_version=%v", consensus.Version),
  205. Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  206. },
  207. }
  208. // include git hash in the nodeInfo if available
  209. if rev, err := ReadFile(config.GetString("revision_file")); err == nil {
  210. nodeInfo.Other = append(nodeInfo.Other, Fmt("revision=%v", string(rev)))
  211. }
  212. if !sw.IsListening() {
  213. return nodeInfo
  214. }
  215. p2pListener := sw.Listeners()[0]
  216. p2pHost := p2pListener.ExternalAddress().IP.String()
  217. p2pPort := p2pListener.ExternalAddress().Port
  218. rpcListenAddr := config.GetString("rpc_laddr")
  219. // We assume that the rpcListener has the same ExternalAddress.
  220. // This is probably true because both P2P and RPC listeners use UPnP,
  221. // except of course if the rpc is only bound to localhost
  222. nodeInfo.ListenAddr = Fmt("%v:%v", p2pHost, p2pPort)
  223. nodeInfo.Other = append(nodeInfo.Other, Fmt("rpc_addr=%v", rpcListenAddr))
  224. return nodeInfo
  225. }
  226. // Get a connection to the proxyAppConn addr.
  227. // Check the current hash, and panic if it doesn't match.
  228. func GetProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) {
  229. // use local app (for testing)
  230. switch addr {
  231. case "nilapp":
  232. app := nilapp.NewNilApplication()
  233. mtx := new(sync.Mutex)
  234. proxyAppConn = tmspcli.NewLocalClient(mtx, app)
  235. case "dummy":
  236. app := dummy.NewDummyApplication()
  237. mtx := new(sync.Mutex)
  238. proxyAppConn = tmspcli.NewLocalClient(mtx, app)
  239. default:
  240. // Run forever in a loop
  241. remoteApp, err := proxy.NewRemoteAppConn(addr)
  242. if err != nil {
  243. Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
  244. }
  245. proxyAppConn = remoteApp
  246. }
  247. // Check the hash
  248. res := proxyAppConn.CommitSync()
  249. if res.IsErr() {
  250. PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", res))
  251. }
  252. if !bytes.Equal(hash, res.Data) {
  253. log.Warn(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, res.Data))
  254. }
  255. return proxyAppConn
  256. }
  257. // Load the most recent state from "state" db,
  258. // or create a new one (and save) from genesis.
  259. func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
  260. state := sm.LoadState(stateDB)
  261. if state == nil {
  262. state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  263. state.Save()
  264. }
  265. return state
  266. }
  267. //------------------------------------------------------------------------------
  268. // Users wishing to use an external signer for their validators
  269. // should fork tendermint/tendermint and implement RunNode to
  270. // load their custom priv validator and call NewNode(privVal, getProxyFunc)
  271. func RunNode(config cfg.Config) {
  272. // Wait until the genesis doc becomes available
  273. genDocFile := config.GetString("genesis_file")
  274. if !FileExists(genDocFile) {
  275. log.Notice(Fmt("Waiting for genesis file %v...", genDocFile))
  276. for {
  277. time.Sleep(time.Second)
  278. if !FileExists(genDocFile) {
  279. continue
  280. }
  281. jsonBlob, err := ioutil.ReadFile(genDocFile)
  282. if err != nil {
  283. Exit(Fmt("Couldn't read GenesisDoc file: %v", err))
  284. }
  285. genDoc := types.GenesisDocFromJSON(jsonBlob)
  286. if genDoc.ChainID == "" {
  287. PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile))
  288. }
  289. config.Set("chain_id", genDoc.ChainID)
  290. }
  291. }
  292. // Get PrivValidator
  293. privValidatorFile := config.GetString("priv_validator_file")
  294. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  295. // Create & start node
  296. n := NewNode(config, privValidator, GetProxyApp)
  297. l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp"))
  298. n.AddListener(l)
  299. err := n.Start()
  300. if err != nil {
  301. Exit(Fmt("Failed to start node: %v", err))
  302. }
  303. log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
  304. // If seedNode is provided by config, dial out.
  305. if config.GetString("seeds") != "" {
  306. seeds := strings.Split(config.GetString("seeds"), ",")
  307. n.sw.DialSeeds(seeds)
  308. }
  309. // Run the RPC server.
  310. if config.GetString("rpc_laddr") != "" {
  311. _, err := n.StartRPC()
  312. if err != nil {
  313. PanicCrisis(err)
  314. }
  315. }
  316. // Sleep forever and then...
  317. TrapSignal(func() {
  318. n.Stop()
  319. })
  320. }
  321. func (n *Node) NodeInfo() *p2p.NodeInfo {
  322. return n.sw.NodeInfo()
  323. }
  324. func (n *Node) DialSeeds(seeds []string) {
  325. n.sw.DialSeeds(seeds)
  326. }
  327. //------------------------------------------------------------------------------
  328. // replay
  329. // convenience for replay mode
  330. func newConsensusState(config cfg.Config) *consensus.ConsensusState {
  331. // Get BlockStore
  332. blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir"))
  333. blockStore := bc.NewBlockStore(blockStoreDB)
  334. // Get State
  335. stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
  336. state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  337. // Create two proxyAppConn connections,
  338. // one for the consensus and one for the mempool.
  339. proxyAddr := config.GetString("proxy_app")
  340. proxyAppConnMempool := GetProxyApp(proxyAddr, state.AppHash)
  341. proxyAppConnConsensus := GetProxyApp(proxyAddr, state.AppHash)
  342. // add the chainid to the global config
  343. config.Set("chain_id", state.ChainID)
  344. // Make event switch
  345. eventSwitch := events.NewEventSwitch()
  346. _, err := eventSwitch.Start()
  347. if err != nil {
  348. Exit(Fmt("Failed to start event switch: %v", err))
  349. }
  350. mempool := mempl.NewMempool(config, proxyAppConnMempool)
  351. consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
  352. consensusState.SetEventSwitch(eventSwitch)
  353. return consensusState
  354. }
  355. func RunReplayConsole(config cfg.Config) {
  356. walFile := config.GetString("cswal")
  357. if walFile == "" {
  358. Exit("cswal file name not set in tendermint config")
  359. }
  360. consensusState := newConsensusState(config)
  361. if err := consensusState.ReplayConsole(walFile); err != nil {
  362. Exit(Fmt("Error during consensus replay: %v", err))
  363. }
  364. }
  365. func RunReplay(config cfg.Config) {
  366. walFile := config.GetString("cswal")
  367. if walFile == "" {
  368. Exit("cswal file name not set in tendermint config")
  369. }
  370. consensusState := newConsensusState(config)
  371. if err := consensusState.ReplayMessages(walFile); err != nil {
  372. Exit(Fmt("Error during consensus replay: %v", err))
  373. }
  374. log.Notice("Replay run successfully")
  375. }