You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

462 lines
14 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "io/ioutil"
  5. "net"
  6. "net/http"
  7. "strings"
  8. "sync"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. cfg "github.com/tendermint/go-config"
  12. "github.com/tendermint/go-crypto"
  13. dbm "github.com/tendermint/go-db"
  14. "github.com/tendermint/go-events"
  15. "github.com/tendermint/go-p2p"
  16. "github.com/tendermint/go-rpc"
  17. "github.com/tendermint/go-rpc/server"
  18. "github.com/tendermint/go-wire"
  19. bc "github.com/tendermint/tendermint/blockchain"
  20. "github.com/tendermint/tendermint/consensus"
  21. mempl "github.com/tendermint/tendermint/mempool"
  22. "github.com/tendermint/tendermint/proxy"
  23. rpccore "github.com/tendermint/tendermint/rpc/core"
  24. sm "github.com/tendermint/tendermint/state"
  25. "github.com/tendermint/tendermint/types"
  26. "github.com/tendermint/tendermint/version"
  27. tmspcli "github.com/tendermint/tmsp/client"
  28. "github.com/tendermint/tmsp/example/dummy"
  29. "github.com/tendermint/tmsp/example/nil"
  30. )
  31. import _ "net/http/pprof"
  32. type Node struct {
  33. config cfg.Config
  34. sw *p2p.Switch
  35. evsw *events.EventSwitch
  36. blockStore *bc.BlockStore
  37. bcReactor *bc.BlockchainReactor
  38. mempoolReactor *mempl.MempoolReactor
  39. consensusState *consensus.ConsensusState
  40. consensusReactor *consensus.ConsensusReactor
  41. privValidator *types.PrivValidator
  42. genesisDoc *types.GenesisDoc
  43. privKey crypto.PrivKeyEd25519
  44. }
  45. func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr, transport string, appHash []byte) proxy.AppConn) *Node {
  46. EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
  47. // Get BlockStore
  48. blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir"))
  49. blockStore := bc.NewBlockStore(blockStoreDB)
  50. // Get State db
  51. stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
  52. // Get State
  53. state := getState(config, stateDB)
  54. // Create two proxyAppConn connections,
  55. // one for the consensus and one for the mempool.
  56. proxyAddr := config.GetString("proxy_app")
  57. transport := config.GetString("tmsp")
  58. proxyAppConnMempool := getProxyApp(proxyAddr, transport, state.AppHash)
  59. proxyAppConnConsensus := getProxyApp(proxyAddr, transport, state.AppHash)
  60. // add the chainid and number of validators to the global config
  61. config.Set("chain_id", state.ChainID)
  62. config.Set("num_vals", state.Validators.Size())
  63. // Generate node PrivKey
  64. privKey := crypto.GenPrivKeyEd25519()
  65. // Make event switch
  66. eventSwitch := events.NewEventSwitch()
  67. _, err := eventSwitch.Start()
  68. if err != nil {
  69. Exit(Fmt("Failed to start switch: %v", err))
  70. }
  71. // Decide whether to fast-sync or not
  72. // We don't fast-sync when the only validator is us.
  73. fastSync := config.GetBool("fast_sync")
  74. if state.Validators.Size() == 1 {
  75. addr, _ := state.Validators.GetByIndex(0)
  76. if bytes.Equal(privValidator.Address, addr) {
  77. fastSync = false
  78. }
  79. }
  80. // Make BlockchainReactor
  81. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, fastSync)
  82. // Make MempoolReactor
  83. mempool := mempl.NewMempool(config, proxyAppConnMempool)
  84. mempoolReactor := mempl.NewMempoolReactor(config, mempool)
  85. // Make ConsensusReactor
  86. consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
  87. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, fastSync)
  88. if privValidator != nil {
  89. consensusReactor.SetPrivValidator(privValidator)
  90. }
  91. // deterministic accountability
  92. err = consensusState.OpenWAL(config.GetString("cswal"))
  93. if err != nil {
  94. log.Error("Failed to open cswal", "error", err.Error())
  95. }
  96. // Make p2p network switch
  97. sw := p2p.NewSwitch(config.GetConfig("p2p"))
  98. sw.AddReactor("MEMPOOL", mempoolReactor)
  99. sw.AddReactor("BLOCKCHAIN", bcReactor)
  100. sw.AddReactor("CONSENSUS", consensusReactor)
  101. // add the event switch to all services
  102. // they should all satisfy events.Eventable
  103. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  104. // run the profile server
  105. profileHost := config.GetString("prof_laddr")
  106. if profileHost != "" {
  107. go func() {
  108. log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
  109. }()
  110. }
  111. return &Node{
  112. config: config,
  113. sw: sw,
  114. evsw: eventSwitch,
  115. blockStore: blockStore,
  116. bcReactor: bcReactor,
  117. mempoolReactor: mempoolReactor,
  118. consensusState: consensusState,
  119. consensusReactor: consensusReactor,
  120. privValidator: privValidator,
  121. genesisDoc: state.GenesisDoc,
  122. privKey: privKey,
  123. }
  124. }
  125. // Call Start() after adding the listeners.
  126. func (n *Node) Start() error {
  127. n.sw.SetNodeInfo(makeNodeInfo(n.config, n.sw, n.privKey))
  128. n.sw.SetNodePrivKey(n.privKey)
  129. _, err := n.sw.Start()
  130. return err
  131. }
  132. func (n *Node) Stop() {
  133. log.Notice("Stopping Node")
  134. // TODO: gracefully disconnect from peers.
  135. n.sw.Stop()
  136. }
  137. // Add the event switch to reactors, mempool, etc.
  138. func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
  139. for _, e := range eventables {
  140. e.SetEventSwitch(evsw)
  141. }
  142. }
  143. // Add a Listener to accept inbound peer connections.
  144. // Add listeners before starting the Node.
  145. // The first listener is the primary listener (in NodeInfo)
  146. func (n *Node) AddListener(l p2p.Listener) {
  147. log.Notice(Fmt("Added %v", l))
  148. n.sw.AddListener(l)
  149. }
  150. func (n *Node) StartRPC() ([]net.Listener, error) {
  151. rpccore.SetConfig(n.config)
  152. rpccore.SetEventSwitch(n.evsw)
  153. rpccore.SetBlockStore(n.blockStore)
  154. rpccore.SetConsensusState(n.consensusState)
  155. rpccore.SetConsensusReactor(n.consensusReactor)
  156. rpccore.SetMempoolReactor(n.mempoolReactor)
  157. rpccore.SetSwitch(n.sw)
  158. rpccore.SetPrivValidator(n.privValidator)
  159. rpccore.SetGenesisDoc(n.genesisDoc)
  160. listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",")
  161. // we may expose the rpc over both a unix and tcp socket
  162. listeners := make([]net.Listener, len(listenAddrs))
  163. for i, listenAddr := range listenAddrs {
  164. mux := http.NewServeMux()
  165. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  166. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  167. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes)
  168. listener, err := rpcserver.StartHTTPServer(listenAddr, mux)
  169. if err != nil {
  170. return nil, err
  171. }
  172. listeners[i] = listener
  173. }
  174. return listeners, nil
  175. }
  176. func (n *Node) Switch() *p2p.Switch {
  177. return n.sw
  178. }
  179. func (n *Node) BlockStore() *bc.BlockStore {
  180. return n.blockStore
  181. }
  182. func (n *Node) ConsensusState() *consensus.ConsensusState {
  183. return n.consensusState
  184. }
  185. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  186. return n.consensusReactor
  187. }
  188. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  189. return n.mempoolReactor
  190. }
  191. func (n *Node) EventSwitch() *events.EventSwitch {
  192. return n.evsw
  193. }
  194. // XXX: for convenience
  195. func (n *Node) PrivValidator() *types.PrivValidator {
  196. return n.privValidator
  197. }
  198. func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
  199. nodeInfo := &p2p.NodeInfo{
  200. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  201. Moniker: config.GetString("moniker"),
  202. Network: config.GetString("chain_id"),
  203. Version: version.Version,
  204. Other: []string{
  205. Fmt("wire_version=%v", wire.Version),
  206. Fmt("p2p_version=%v", p2p.Version),
  207. Fmt("consensus_version=%v", consensus.Version),
  208. Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  209. },
  210. }
  211. // include git hash in the nodeInfo if available
  212. if rev, err := ReadFile(config.GetString("revision_file")); err == nil {
  213. nodeInfo.Other = append(nodeInfo.Other, Fmt("revision=%v", string(rev)))
  214. }
  215. if !sw.IsListening() {
  216. return nodeInfo
  217. }
  218. p2pListener := sw.Listeners()[0]
  219. p2pHost := p2pListener.ExternalAddress().IP.String()
  220. p2pPort := p2pListener.ExternalAddress().Port
  221. rpcListenAddr := config.GetString("rpc_laddr")
  222. // We assume that the rpcListener has the same ExternalAddress.
  223. // This is probably true because both P2P and RPC listeners use UPnP,
  224. // except of course if the rpc is only bound to localhost
  225. nodeInfo.ListenAddr = Fmt("%v:%v", p2pHost, p2pPort)
  226. nodeInfo.Other = append(nodeInfo.Other, Fmt("rpc_addr=%v", rpcListenAddr))
  227. return nodeInfo
  228. }
  229. // Get a connection to the proxyAppConn addr.
  230. // Check the current hash, and panic if it doesn't match.
  231. func GetProxyApp(addr, transport string, hash []byte) (proxyAppConn proxy.AppConn) {
  232. // use local app (for testing)
  233. switch addr {
  234. case "nilapp":
  235. app := nilapp.NewNilApplication()
  236. mtx := new(sync.Mutex)
  237. proxyAppConn = tmspcli.NewLocalClient(mtx, app)
  238. case "dummy":
  239. app := dummy.NewDummyApplication()
  240. mtx := new(sync.Mutex)
  241. proxyAppConn = tmspcli.NewLocalClient(mtx, app)
  242. default:
  243. // Run forever in a loop
  244. remoteApp, err := proxy.NewRemoteAppConn(addr, transport)
  245. if err != nil {
  246. Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
  247. }
  248. proxyAppConn = remoteApp
  249. }
  250. // Check the hash
  251. res := proxyAppConn.CommitSync()
  252. if res.IsErr() {
  253. PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", res))
  254. }
  255. if !bytes.Equal(hash, res.Data) {
  256. log.Warn(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, res.Data))
  257. }
  258. return proxyAppConn
  259. }
  260. // Load the most recent state from "state" db,
  261. // or create a new one (and save) from genesis.
  262. func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
  263. state := sm.LoadState(stateDB)
  264. if state == nil {
  265. state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  266. state.Save()
  267. }
  268. return state
  269. }
  270. //------------------------------------------------------------------------------
  271. // Users wishing to use an external signer for their validators
  272. // should fork tendermint/tendermint and implement RunNode to
  273. // load their custom priv validator and call NewNode(privVal, getProxyFunc)
  274. func RunNode(config cfg.Config) {
  275. // Wait until the genesis doc becomes available
  276. genDocFile := config.GetString("genesis_file")
  277. if !FileExists(genDocFile) {
  278. log.Notice(Fmt("Waiting for genesis file %v...", genDocFile))
  279. for {
  280. time.Sleep(time.Second)
  281. if !FileExists(genDocFile) {
  282. continue
  283. }
  284. jsonBlob, err := ioutil.ReadFile(genDocFile)
  285. if err != nil {
  286. Exit(Fmt("Couldn't read GenesisDoc file: %v", err))
  287. }
  288. genDoc := types.GenesisDocFromJSON(jsonBlob)
  289. if genDoc.ChainID == "" {
  290. PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile))
  291. }
  292. config.Set("chain_id", genDoc.ChainID)
  293. }
  294. }
  295. // Get PrivValidator
  296. privValidatorFile := config.GetString("priv_validator_file")
  297. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  298. // Create & start node
  299. n := NewNode(config, privValidator, GetProxyApp)
  300. protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
  301. l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
  302. n.AddListener(l)
  303. err := n.Start()
  304. if err != nil {
  305. Exit(Fmt("Failed to start node: %v", err))
  306. }
  307. log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
  308. // If seedNode is provided by config, dial out.
  309. if config.GetString("seeds") != "" {
  310. seeds := strings.Split(config.GetString("seeds"), ",")
  311. n.sw.DialSeeds(seeds)
  312. }
  313. // Run the RPC server.
  314. if config.GetString("rpc_laddr") != "" {
  315. _, err := n.StartRPC()
  316. if err != nil {
  317. PanicCrisis(err)
  318. }
  319. }
  320. // Sleep forever and then...
  321. TrapSignal(func() {
  322. n.Stop()
  323. })
  324. }
  325. func (n *Node) NodeInfo() *p2p.NodeInfo {
  326. return n.sw.NodeInfo()
  327. }
  328. func (n *Node) DialSeeds(seeds []string) {
  329. n.sw.DialSeeds(seeds)
  330. }
  331. //------------------------------------------------------------------------------
  332. // replay
  333. // convenience for replay mode
  334. func newConsensusState(config cfg.Config) *consensus.ConsensusState {
  335. // Get BlockStore
  336. blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir"))
  337. blockStore := bc.NewBlockStore(blockStoreDB)
  338. // Get State
  339. stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
  340. state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  341. // Create two proxyAppConn connections,
  342. // one for the consensus and one for the mempool.
  343. proxyAddr := config.GetString("proxy_app")
  344. transport := config.GetString("tmsp")
  345. proxyAppConnMempool := GetProxyApp(proxyAddr, transport, state.AppHash)
  346. proxyAppConnConsensus := GetProxyApp(proxyAddr, transport, state.AppHash)
  347. // add the chainid to the global config
  348. config.Set("chain_id", state.ChainID)
  349. // Make event switch
  350. eventSwitch := events.NewEventSwitch()
  351. _, err := eventSwitch.Start()
  352. if err != nil {
  353. Exit(Fmt("Failed to start event switch: %v", err))
  354. }
  355. mempool := mempl.NewMempool(config, proxyAppConnMempool)
  356. consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
  357. consensusState.SetEventSwitch(eventSwitch)
  358. return consensusState
  359. }
  360. func RunReplayConsole(config cfg.Config) {
  361. walFile := config.GetString("cswal")
  362. if walFile == "" {
  363. Exit("cswal file name not set in tendermint config")
  364. }
  365. consensusState := newConsensusState(config)
  366. if err := consensusState.ReplayConsole(walFile); err != nil {
  367. Exit(Fmt("Error during consensus replay: %v", err))
  368. }
  369. }
  370. func RunReplay(config cfg.Config) {
  371. walFile := config.GetString("cswal")
  372. if walFile == "" {
  373. Exit("cswal file name not set in tendermint config")
  374. }
  375. consensusState := newConsensusState(config)
  376. if err := consensusState.ReplayMessages(walFile); err != nil {
  377. Exit(Fmt("Error during consensus replay: %v", err))
  378. }
  379. log.Notice("Replay run successfully")
  380. }
  381. // Defaults to tcp
  382. func ProtocolAndAddress(listenAddr string) (string, string) {
  383. protocol, address := "tcp", listenAddr
  384. parts := strings.SplitN(address, "://", 2)
  385. if len(parts) == 2 {
  386. protocol, address = parts[0], parts[1]
  387. }
  388. return protocol, address
  389. }