You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

421 lines
12 KiB

10 years ago
10 years ago
10 years ago
11 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
9 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
9 years ago
8 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "io/ioutil"
  5. "net"
  6. "net/http"
  7. "strings"
  8. "time"
  9. . "github.com/tendermint/go-common"
  10. cfg "github.com/tendermint/go-config"
  11. "github.com/tendermint/go-crypto"
  12. dbm "github.com/tendermint/go-db"
  13. "github.com/tendermint/go-events"
  14. "github.com/tendermint/go-p2p"
  15. "github.com/tendermint/go-rpc"
  16. "github.com/tendermint/go-rpc/server"
  17. "github.com/tendermint/go-wire"
  18. bc "github.com/tendermint/tendermint/blockchain"
  19. "github.com/tendermint/tendermint/consensus"
  20. mempl "github.com/tendermint/tendermint/mempool"
  21. "github.com/tendermint/tendermint/proxy"
  22. rpccore "github.com/tendermint/tendermint/rpc/core"
  23. sm "github.com/tendermint/tendermint/state"
  24. "github.com/tendermint/tendermint/types"
  25. "github.com/tendermint/tendermint/version"
  26. )
  27. import _ "net/http/pprof"
  28. type Node struct {
  29. config cfg.Config
  30. sw *p2p.Switch
  31. evsw *events.EventSwitch
  32. blockStore *bc.BlockStore
  33. bcReactor *bc.BlockchainReactor
  34. mempoolReactor *mempl.MempoolReactor
  35. consensusState *consensus.ConsensusState
  36. consensusReactor *consensus.ConsensusReactor
  37. privValidator *types.PrivValidator
  38. genesisDoc *types.GenesisDoc
  39. privKey crypto.PrivKeyEd25519
  40. proxyApp proxy.AppConns
  41. }
  42. func NewNode(config cfg.Config, privValidator *types.PrivValidator) *Node {
  43. EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
  44. // Get BlockStore
  45. blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir"))
  46. blockStore := bc.NewBlockStore(blockStoreDB)
  47. // Get State db
  48. stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
  49. // Get State
  50. state := getState(config, stateDB)
  51. // Create the proxyApp, which houses two connections,
  52. // one for the consensus and one for the mempool.
  53. proxyApp := proxy.NewAppConns(config, state, blockStore)
  54. // add the chainid and number of validators to the global config
  55. config.Set("chain_id", state.ChainID)
  56. config.Set("num_vals", state.Validators.Size())
  57. // Generate node PrivKey
  58. privKey := crypto.GenPrivKeyEd25519()
  59. // Make event switch
  60. eventSwitch := events.NewEventSwitch()
  61. _, err := eventSwitch.Start()
  62. if err != nil {
  63. Exit(Fmt("Failed to start switch: %v", err))
  64. }
  65. // Decide whether to fast-sync or not
  66. // We don't fast-sync when the only validator is us.
  67. fastSync := config.GetBool("fast_sync")
  68. if state.Validators.Size() == 1 {
  69. addr, _ := state.Validators.GetByIndex(0)
  70. if bytes.Equal(privValidator.Address, addr) {
  71. fastSync = false
  72. }
  73. }
  74. // Make BlockchainReactor
  75. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
  76. // Make MempoolReactor
  77. mempool := mempl.NewMempool(config, proxyApp.Mempool())
  78. mempoolReactor := mempl.NewMempoolReactor(config, mempool)
  79. // Make ConsensusReactor
  80. consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
  81. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, fastSync)
  82. if privValidator != nil {
  83. consensusReactor.SetPrivValidator(privValidator)
  84. }
  85. // deterministic accountability
  86. err = consensusState.OpenWAL(config.GetString("cswal"))
  87. if err != nil {
  88. log.Error("Failed to open cswal", "error", err.Error())
  89. }
  90. // Make p2p network switch
  91. sw := p2p.NewSwitch(config.GetConfig("p2p"))
  92. sw.AddReactor("MEMPOOL", mempoolReactor)
  93. sw.AddReactor("BLOCKCHAIN", bcReactor)
  94. sw.AddReactor("CONSENSUS", consensusReactor)
  95. // add the event switch to all services
  96. // they should all satisfy events.Eventable
  97. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  98. // run the profile server
  99. profileHost := config.GetString("prof_laddr")
  100. if profileHost != "" {
  101. go func() {
  102. log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
  103. }()
  104. }
  105. return &Node{
  106. config: config,
  107. sw: sw,
  108. evsw: eventSwitch,
  109. blockStore: blockStore,
  110. bcReactor: bcReactor,
  111. mempoolReactor: mempoolReactor,
  112. consensusState: consensusState,
  113. consensusReactor: consensusReactor,
  114. privValidator: privValidator,
  115. genesisDoc: state.GenesisDoc,
  116. privKey: privKey,
  117. proxyApp: proxyApp,
  118. }
  119. }
  120. // Call Start() after adding the listeners.
  121. func (n *Node) Start() error {
  122. n.sw.SetNodeInfo(makeNodeInfo(n.config, n.sw, n.privKey))
  123. n.sw.SetNodePrivKey(n.privKey)
  124. _, err := n.sw.Start()
  125. return err
  126. }
  127. func (n *Node) Stop() {
  128. log.Notice("Stopping Node")
  129. // TODO: gracefully disconnect from peers.
  130. n.sw.Stop()
  131. }
  132. // Add the event switch to reactors, mempool, etc.
  133. func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
  134. for _, e := range eventables {
  135. e.SetEventSwitch(evsw)
  136. }
  137. }
  138. // Add a Listener to accept inbound peer connections.
  139. // Add listeners before starting the Node.
  140. // The first listener is the primary listener (in NodeInfo)
  141. func (n *Node) AddListener(l p2p.Listener) {
  142. log.Notice(Fmt("Added %v", l))
  143. n.sw.AddListener(l)
  144. }
  145. func (n *Node) StartRPC() ([]net.Listener, error) {
  146. rpccore.SetConfig(n.config)
  147. rpccore.SetEventSwitch(n.evsw)
  148. rpccore.SetBlockStore(n.blockStore)
  149. rpccore.SetConsensusState(n.consensusState)
  150. rpccore.SetConsensusReactor(n.consensusReactor)
  151. rpccore.SetMempoolReactor(n.mempoolReactor)
  152. rpccore.SetSwitch(n.sw)
  153. rpccore.SetPrivValidator(n.privValidator)
  154. rpccore.SetGenesisDoc(n.genesisDoc)
  155. listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",")
  156. // we may expose the rpc over both a unix and tcp socket
  157. listeners := make([]net.Listener, len(listenAddrs))
  158. for i, listenAddr := range listenAddrs {
  159. mux := http.NewServeMux()
  160. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  161. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  162. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes)
  163. listener, err := rpcserver.StartHTTPServer(listenAddr, mux)
  164. if err != nil {
  165. return nil, err
  166. }
  167. listeners[i] = listener
  168. }
  169. return listeners, nil
  170. }
  171. func (n *Node) Switch() *p2p.Switch {
  172. return n.sw
  173. }
  174. func (n *Node) BlockStore() *bc.BlockStore {
  175. return n.blockStore
  176. }
  177. func (n *Node) ConsensusState() *consensus.ConsensusState {
  178. return n.consensusState
  179. }
  180. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  181. return n.consensusReactor
  182. }
  183. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  184. return n.mempoolReactor
  185. }
  186. func (n *Node) EventSwitch() *events.EventSwitch {
  187. return n.evsw
  188. }
  189. // XXX: for convenience
  190. func (n *Node) PrivValidator() *types.PrivValidator {
  191. return n.privValidator
  192. }
  193. func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
  194. nodeInfo := &p2p.NodeInfo{
  195. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  196. Moniker: config.GetString("moniker"),
  197. Network: config.GetString("chain_id"),
  198. Version: version.Version,
  199. Other: []string{
  200. Fmt("wire_version=%v", wire.Version),
  201. Fmt("p2p_version=%v", p2p.Version),
  202. Fmt("consensus_version=%v", consensus.Version),
  203. Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  204. },
  205. }
  206. // include git hash in the nodeInfo if available
  207. if rev, err := ReadFile(config.GetString("revision_file")); err == nil {
  208. nodeInfo.Other = append(nodeInfo.Other, Fmt("revision=%v", string(rev)))
  209. }
  210. if !sw.IsListening() {
  211. return nodeInfo
  212. }
  213. p2pListener := sw.Listeners()[0]
  214. p2pHost := p2pListener.ExternalAddress().IP.String()
  215. p2pPort := p2pListener.ExternalAddress().Port
  216. rpcListenAddr := config.GetString("rpc_laddr")
  217. // We assume that the rpcListener has the same ExternalAddress.
  218. // This is probably true because both P2P and RPC listeners use UPnP,
  219. // except of course if the rpc is only bound to localhost
  220. nodeInfo.ListenAddr = Fmt("%v:%v", p2pHost, p2pPort)
  221. nodeInfo.Other = append(nodeInfo.Other, Fmt("rpc_addr=%v", rpcListenAddr))
  222. return nodeInfo
  223. }
  224. // Load the most recent state from "state" db,
  225. // or create a new one (and save) from genesis.
  226. func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
  227. state := sm.LoadState(stateDB)
  228. if state == nil {
  229. state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  230. state.Save()
  231. }
  232. return state
  233. }
  234. //------------------------------------------------------------------------------
  235. // Users wishing to use an external signer for their validators
  236. // should fork tendermint/tendermint and implement RunNode to
  237. // load their custom priv validator and call NewNode
  238. func RunNode(config cfg.Config) {
  239. // Wait until the genesis doc becomes available
  240. genDocFile := config.GetString("genesis_file")
  241. if !FileExists(genDocFile) {
  242. log.Notice(Fmt("Waiting for genesis file %v...", genDocFile))
  243. for {
  244. time.Sleep(time.Second)
  245. if !FileExists(genDocFile) {
  246. continue
  247. }
  248. jsonBlob, err := ioutil.ReadFile(genDocFile)
  249. if err != nil {
  250. Exit(Fmt("Couldn't read GenesisDoc file: %v", err))
  251. }
  252. genDoc := types.GenesisDocFromJSON(jsonBlob)
  253. if genDoc.ChainID == "" {
  254. PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile))
  255. }
  256. config.Set("chain_id", genDoc.ChainID)
  257. }
  258. }
  259. // Get PrivValidator
  260. privValidatorFile := config.GetString("priv_validator_file")
  261. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  262. // Create & start node
  263. n := NewNode(config, privValidator)
  264. protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
  265. l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
  266. n.AddListener(l)
  267. err := n.Start()
  268. if err != nil {
  269. Exit(Fmt("Failed to start node: %v", err))
  270. }
  271. log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
  272. // If seedNode is provided by config, dial out.
  273. if config.GetString("seeds") != "" {
  274. seeds := strings.Split(config.GetString("seeds"), ",")
  275. n.sw.DialSeeds(seeds)
  276. }
  277. // Run the RPC server.
  278. if config.GetString("rpc_laddr") != "" {
  279. _, err := n.StartRPC()
  280. if err != nil {
  281. PanicCrisis(err)
  282. }
  283. }
  284. // Sleep forever and then...
  285. TrapSignal(func() {
  286. n.Stop()
  287. })
  288. }
  289. func (n *Node) NodeInfo() *p2p.NodeInfo {
  290. return n.sw.NodeInfo()
  291. }
  292. func (n *Node) DialSeeds(seeds []string) {
  293. n.sw.DialSeeds(seeds)
  294. }
  295. //------------------------------------------------------------------------------
  296. // replay
  297. // convenience for replay mode
  298. func newConsensusState(config cfg.Config) *consensus.ConsensusState {
  299. // Get BlockStore
  300. blockStoreDB := dbm.NewDB("blockstore", config.GetString("db_backend"), config.GetString("db_dir"))
  301. blockStore := bc.NewBlockStore(blockStoreDB)
  302. // Get State
  303. stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
  304. state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  305. // Create two proxyAppConn connections,
  306. // one for the consensus and one for the mempool.
  307. proxyApp := proxy.NewAppConns(config, state, blockStore)
  308. // add the chainid to the global config
  309. config.Set("chain_id", state.ChainID)
  310. // Make event switch
  311. eventSwitch := events.NewEventSwitch()
  312. _, err := eventSwitch.Start()
  313. if err != nil {
  314. Exit(Fmt("Failed to start event switch: %v", err))
  315. }
  316. mempool := mempl.NewMempool(config, proxyApp.Mempool())
  317. consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
  318. consensusState.SetEventSwitch(eventSwitch)
  319. return consensusState
  320. }
  321. func RunReplayConsole(config cfg.Config) {
  322. walFile := config.GetString("cswal")
  323. if walFile == "" {
  324. Exit("cswal file name not set in tendermint config")
  325. }
  326. consensusState := newConsensusState(config)
  327. if err := consensusState.ReplayConsole(walFile); err != nil {
  328. Exit(Fmt("Error during consensus replay: %v", err))
  329. }
  330. }
  331. func RunReplay(config cfg.Config) {
  332. walFile := config.GetString("cswal")
  333. if walFile == "" {
  334. Exit("cswal file name not set in tendermint config")
  335. }
  336. consensusState := newConsensusState(config)
  337. if err := consensusState.ReplayMessages(walFile); err != nil {
  338. Exit(Fmt("Error during consensus replay: %v", err))
  339. }
  340. log.Notice("Replay run successfully")
  341. }
  342. // Defaults to tcp
  343. func ProtocolAndAddress(listenAddr string) (string, string) {
  344. protocol, address := "tcp", listenAddr
  345. parts := strings.SplitN(address, "://", 2)
  346. if len(parts) == 2 {
  347. protocol, address = parts[0], parts[1]
  348. }
  349. return protocol, address
  350. }