You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

432 lines
12 KiB

10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
9 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
9 years ago
9 years ago
9 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "io/ioutil"
  5. "net"
  6. "net/http"
  7. "strings"
  8. "sync"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. "github.com/tendermint/go-crypto"
  12. dbm "github.com/tendermint/go-db"
  13. "github.com/tendermint/go-events"
  14. "github.com/tendermint/go-p2p"
  15. "github.com/tendermint/go-rpc"
  16. "github.com/tendermint/go-rpc/server"
  17. "github.com/tendermint/go-wire"
  18. bc "github.com/tendermint/tendermint/blockchain"
  19. "github.com/tendermint/tendermint/consensus"
  20. mempl "github.com/tendermint/tendermint/mempool"
  21. "github.com/tendermint/tendermint/proxy"
  22. rpccore "github.com/tendermint/tendermint/rpc/core"
  23. sm "github.com/tendermint/tendermint/state"
  24. "github.com/tendermint/tendermint/types"
  25. "github.com/tendermint/tendermint/version"
  26. tmspcli "github.com/tendermint/tmsp/client"
  27. "github.com/tendermint/tmsp/example/dummy"
  28. "github.com/tendermint/tmsp/example/nil"
  29. )
  30. import _ "net/http/pprof"
  31. type Node struct {
  32. sw *p2p.Switch
  33. evsw *events.EventSwitch
  34. blockStore *bc.BlockStore
  35. bcReactor *bc.BlockchainReactor
  36. mempoolReactor *mempl.MempoolReactor
  37. consensusState *consensus.ConsensusState
  38. consensusReactor *consensus.ConsensusReactor
  39. privValidator *types.PrivValidator
  40. genesisDoc *types.GenesisDoc
  41. privKey crypto.PrivKeyEd25519
  42. }
  43. func NewNode(privValidator *types.PrivValidator, getProxyApp func(proxyAddr string, appHash []byte) proxy.AppConn) *Node {
  44. EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
  45. // Get BlockStore
  46. blockStoreDB := dbm.GetDB("blockstore")
  47. blockStore := bc.NewBlockStore(blockStoreDB)
  48. // Get State
  49. state := getState()
  50. // Create two proxyAppConn connections,
  51. // one for the consensus and one for the mempool.
  52. proxyAddr := config.GetString("proxy_app")
  53. proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash)
  54. proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash)
  55. // add the chainid and number of validators to the global config
  56. config.Set("chain_id", state.ChainID)
  57. config.Set("num_vals", state.Validators.Size())
  58. // Generate node PrivKey
  59. privKey := crypto.GenPrivKeyEd25519()
  60. // Make event switch
  61. eventSwitch := events.NewEventSwitch()
  62. _, err := eventSwitch.Start()
  63. if err != nil {
  64. Exit(Fmt("Failed to start switch: %v", err))
  65. }
  66. // Decide whether to fast-sync or not
  67. // We don't fast-sync when the only validator is us.
  68. fastSync := config.GetBool("fast_sync")
  69. if state.Validators.Size() == 1 {
  70. addr, _ := state.Validators.GetByIndex(0)
  71. if bytes.Equal(privValidator.Address, addr) {
  72. fastSync = false
  73. }
  74. }
  75. // Make BlockchainReactor
  76. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, fastSync)
  77. // Make MempoolReactor
  78. mempool := mempl.NewMempool(proxyAppConnMempool)
  79. mempoolReactor := mempl.NewMempoolReactor(mempool)
  80. // Make ConsensusReactor
  81. consensusState := consensus.NewConsensusState(state.Copy(), proxyAppConnConsensus, blockStore, mempool)
  82. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, fastSync)
  83. if privValidator != nil {
  84. consensusReactor.SetPrivValidator(privValidator)
  85. }
  86. // deterministic accountability
  87. err = consensusState.OpenWAL(config.GetString("cswal"))
  88. if err != nil {
  89. log.Error("Failed to open cswal", "error", err.Error())
  90. }
  91. // Make p2p network switch
  92. sw := p2p.NewSwitch()
  93. sw.AddReactor("MEMPOOL", mempoolReactor)
  94. sw.AddReactor("BLOCKCHAIN", bcReactor)
  95. sw.AddReactor("CONSENSUS", consensusReactor)
  96. // add the event switch to all services
  97. // they should all satisfy events.Eventable
  98. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  99. // run the profile server
  100. profileHost := config.GetString("prof_laddr")
  101. if profileHost != "" {
  102. go func() {
  103. log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
  104. }()
  105. }
  106. return &Node{
  107. sw: sw,
  108. evsw: eventSwitch,
  109. blockStore: blockStore,
  110. bcReactor: bcReactor,
  111. mempoolReactor: mempoolReactor,
  112. consensusState: consensusState,
  113. consensusReactor: consensusReactor,
  114. privValidator: privValidator,
  115. genesisDoc: state.GenesisDoc,
  116. privKey: privKey,
  117. }
  118. }
  119. // Call Start() after adding the listeners.
  120. func (n *Node) Start() error {
  121. n.sw.SetNodeInfo(makeNodeInfo(n.sw, n.privKey))
  122. n.sw.SetNodePrivKey(n.privKey)
  123. _, err := n.sw.Start()
  124. return err
  125. }
  126. func (n *Node) Stop() {
  127. log.Notice("Stopping Node")
  128. // TODO: gracefully disconnect from peers.
  129. n.sw.Stop()
  130. }
  131. // Add the event switch to reactors, mempool, etc.
  132. func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
  133. for _, e := range eventables {
  134. e.SetEventSwitch(evsw)
  135. }
  136. }
  137. // Add a Listener to accept inbound peer connections.
  138. // Add listeners before starting the Node.
  139. // The first listener is the primary listener (in NodeInfo)
  140. func (n *Node) AddListener(l p2p.Listener) {
  141. log.Notice(Fmt("Added %v", l))
  142. n.sw.AddListener(l)
  143. }
  144. func (n *Node) StartRPC() ([]net.Listener, error) {
  145. rpccore.SetBlockStore(n.blockStore)
  146. rpccore.SetConsensusState(n.consensusState)
  147. rpccore.SetConsensusReactor(n.consensusReactor)
  148. rpccore.SetMempoolReactor(n.mempoolReactor)
  149. rpccore.SetSwitch(n.sw)
  150. rpccore.SetPrivValidator(n.privValidator)
  151. rpccore.SetGenesisDoc(n.genesisDoc)
  152. listenAddrs := strings.Split(config.GetString("rpc_laddr"), ",")
  153. // we may expose the rpc over both a unix and tcp socket
  154. listeners := make([]net.Listener, len(listenAddrs))
  155. for i, listenAddr := range listenAddrs {
  156. mux := http.NewServeMux()
  157. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  158. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  159. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes)
  160. listener, err := rpcserver.StartHTTPServer(listenAddr, mux)
  161. if err != nil {
  162. return nil, err
  163. }
  164. listeners[i] = listener
  165. }
  166. return listeners, nil
  167. }
  168. func (n *Node) Switch() *p2p.Switch {
  169. return n.sw
  170. }
  171. func (n *Node) BlockStore() *bc.BlockStore {
  172. return n.blockStore
  173. }
  174. func (n *Node) ConsensusState() *consensus.ConsensusState {
  175. return n.consensusState
  176. }
  177. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  178. return n.mempoolReactor
  179. }
  180. func (n *Node) EventSwitch() *events.EventSwitch {
  181. return n.evsw
  182. }
  183. func makeNodeInfo(sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
  184. nodeInfo := &p2p.NodeInfo{
  185. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  186. Moniker: config.GetString("moniker"),
  187. Network: config.GetString("chain_id"),
  188. Version: version.Version,
  189. Other: []string{
  190. Fmt("wire_version=%v", wire.Version),
  191. Fmt("p2p_version=%v", p2p.Version),
  192. Fmt("consensus_version=%v", consensus.Version),
  193. Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  194. },
  195. }
  196. // include git hash in the nodeInfo if available
  197. if rev, err := ReadFile(config.GetString("revision_file")); err == nil {
  198. nodeInfo.Other = append(nodeInfo.Other, Fmt("revision=%v", string(rev)))
  199. }
  200. if !sw.IsListening() {
  201. return nodeInfo
  202. }
  203. p2pListener := sw.Listeners()[0]
  204. p2pHost := p2pListener.ExternalAddress().IP.String()
  205. p2pPort := p2pListener.ExternalAddress().Port
  206. rpcListenAddr := config.GetString("rpc_laddr")
  207. // We assume that the rpcListener has the same ExternalAddress.
  208. // This is probably true because both P2P and RPC listeners use UPnP,
  209. // except of course if the rpc is only bound to localhost
  210. nodeInfo.ListenAddr = Fmt("%v:%v", p2pHost, p2pPort)
  211. nodeInfo.Other = append(nodeInfo.Other, Fmt("rpc_addr=%v", rpcListenAddr))
  212. return nodeInfo
  213. }
  214. // Get a connection to the proxyAppConn addr.
  215. // Check the current hash, and panic if it doesn't match.
  216. func GetProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) {
  217. // use local app (for testing)
  218. switch addr {
  219. case "nilapp":
  220. app := nilapp.NewNilApplication()
  221. mtx := new(sync.Mutex)
  222. proxyAppConn = tmspcli.NewLocalClient(mtx, app)
  223. case "dummy":
  224. app := dummy.NewDummyApplication()
  225. mtx := new(sync.Mutex)
  226. proxyAppConn = tmspcli.NewLocalClient(mtx, app)
  227. default:
  228. // Run forever in a loop
  229. remoteApp, err := proxy.NewRemoteAppConn(addr)
  230. if err != nil {
  231. Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
  232. }
  233. proxyAppConn = remoteApp
  234. }
  235. // Check the hash
  236. res := proxyAppConn.CommitSync()
  237. if res.IsErr() {
  238. PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", res))
  239. }
  240. if !bytes.Equal(hash, res.Data) {
  241. log.Warn(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, res.Data))
  242. }
  243. return proxyAppConn
  244. }
  245. // Load the most recent state from "state" db,
  246. // or create a new one (and save) from genesis.
  247. func getState() *sm.State {
  248. stateDB := dbm.GetDB("state")
  249. state := sm.LoadState(stateDB)
  250. if state == nil {
  251. state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  252. state.Save()
  253. }
  254. return state
  255. }
  256. //------------------------------------------------------------------------------
  257. // Users wishing to use an external signer for their validators
  258. // should fork tendermint/tendermint and implement RunNode to
  259. // load their custom priv validator and call NewNode(privVal, getProxyFunc)
  260. func RunNode() {
  261. // Wait until the genesis doc becomes available
  262. genDocFile := config.GetString("genesis_file")
  263. if !FileExists(genDocFile) {
  264. log.Notice(Fmt("Waiting for genesis file %v...", genDocFile))
  265. for {
  266. time.Sleep(time.Second)
  267. if !FileExists(genDocFile) {
  268. continue
  269. }
  270. jsonBlob, err := ioutil.ReadFile(genDocFile)
  271. if err != nil {
  272. Exit(Fmt("Couldn't read GenesisDoc file: %v", err))
  273. }
  274. genDoc := types.GenesisDocFromJSON(jsonBlob)
  275. if genDoc.ChainID == "" {
  276. PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile))
  277. }
  278. config.Set("chain_id", genDoc.ChainID)
  279. config.Set("genesis_doc", genDoc)
  280. }
  281. }
  282. // Get PrivValidator
  283. privValidatorFile := config.GetString("priv_validator_file")
  284. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  285. // Create & start node
  286. n := NewNode(privValidator, GetProxyApp)
  287. l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp"))
  288. n.AddListener(l)
  289. err := n.Start()
  290. if err != nil {
  291. Exit(Fmt("Failed to start node: %v", err))
  292. }
  293. log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
  294. // If seedNode is provided by config, dial out.
  295. if config.GetString("seeds") != "" {
  296. seeds := strings.Split(config.GetString("seeds"), ",")
  297. n.sw.DialSeeds(seeds)
  298. }
  299. // Run the RPC server.
  300. if config.GetString("rpc_laddr") != "" {
  301. _, err := n.StartRPC()
  302. if err != nil {
  303. PanicCrisis(err)
  304. }
  305. }
  306. // Sleep forever and then...
  307. TrapSignal(func() {
  308. n.Stop()
  309. })
  310. }
  311. func (n *Node) NodeInfo() *p2p.NodeInfo {
  312. return n.sw.NodeInfo()
  313. }
  314. func (n *Node) DialSeeds(seeds []string) {
  315. n.sw.DialSeeds(seeds)
  316. }
  317. //------------------------------------------------------------------------------
  318. // replay
  319. // convenience for replay mode
  320. func newConsensusState() *consensus.ConsensusState {
  321. // Get BlockStore
  322. blockStoreDB := dbm.GetDB("blockstore")
  323. blockStore := bc.NewBlockStore(blockStoreDB)
  324. // Get State
  325. stateDB := dbm.GetDB("state")
  326. state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  327. // Create two proxyAppConn connections,
  328. // one for the consensus and one for the mempool.
  329. proxyAddr := config.GetString("proxy_app")
  330. proxyAppConnMempool := GetProxyApp(proxyAddr, state.AppHash)
  331. proxyAppConnConsensus := GetProxyApp(proxyAddr, state.AppHash)
  332. // add the chainid to the global config
  333. config.Set("chain_id", state.ChainID)
  334. // Make event switch
  335. eventSwitch := events.NewEventSwitch()
  336. _, err := eventSwitch.Start()
  337. if err != nil {
  338. Exit(Fmt("Failed to start event switch: %v", err))
  339. }
  340. mempool := mempl.NewMempool(proxyAppConnMempool)
  341. consensusState := consensus.NewConsensusState(state.Copy(), proxyAppConnConsensus, blockStore, mempool)
  342. consensusState.SetEventSwitch(eventSwitch)
  343. return consensusState
  344. }
  345. func RunReplayConsole() {
  346. walFile := config.GetString("cswal")
  347. if walFile == "" {
  348. Exit("cswal file name not set in tendermint config")
  349. }
  350. consensusState := newConsensusState()
  351. if err := consensusState.ReplayConsole(walFile); err != nil {
  352. Exit(Fmt("Error during consensus replay: %v", err))
  353. }
  354. }
  355. func RunReplay() {
  356. walFile := config.GetString("cswal")
  357. if walFile == "" {
  358. Exit("cswal file name not set in tendermint config")
  359. }
  360. consensusState := newConsensusState()
  361. if err := consensusState.ReplayMessages(walFile); err != nil {
  362. Exit(Fmt("Error during consensus replay: %v", err))
  363. }
  364. log.Notice("Replay run successfully")
  365. }