You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

463 lines
14 KiB

10 years ago
8 years ago
10 years ago
11 years ago
11 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
11 years ago
8 years ago
8 years ago
11 years ago
11 years ago
8 years ago
10 years ago
8 years ago
9 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "errors"
  5. "net"
  6. "net/http"
  7. "strings"
  8. abci "github.com/tendermint/abci/types"
  9. crypto "github.com/tendermint/go-crypto"
  10. wire "github.com/tendermint/go-wire"
  11. bc "github.com/tendermint/tendermint/blockchain"
  12. cfg "github.com/tendermint/tendermint/config"
  13. "github.com/tendermint/tendermint/consensus"
  14. mempl "github.com/tendermint/tendermint/mempool"
  15. p2p "github.com/tendermint/tendermint/p2p"
  16. "github.com/tendermint/tendermint/proxy"
  17. rpccore "github.com/tendermint/tendermint/rpc/core"
  18. grpccore "github.com/tendermint/tendermint/rpc/grpc"
  19. rpc "github.com/tendermint/tendermint/rpc/lib"
  20. rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
  21. sm "github.com/tendermint/tendermint/state"
  22. "github.com/tendermint/tendermint/state/txindex"
  23. "github.com/tendermint/tendermint/state/txindex/kv"
  24. "github.com/tendermint/tendermint/state/txindex/null"
  25. "github.com/tendermint/tendermint/types"
  26. "github.com/tendermint/tendermint/version"
  27. cmn "github.com/tendermint/tmlibs/common"
  28. dbm "github.com/tendermint/tmlibs/db"
  29. "github.com/tendermint/tmlibs/log"
  30. _ "net/http/pprof"
  31. )
  32. type Node struct {
  33. cmn.BaseService
  34. // config
  35. config *cfg.Config
  36. genesisDoc *types.GenesisDoc // initial validator set
  37. privValidator *types.PrivValidator // local node's validator key
  38. // network
  39. privKey crypto.PrivKeyEd25519 // local node's p2p key
  40. sw *p2p.Switch // p2p connections
  41. addrBook *p2p.AddrBook // known peers
  42. // services
  43. evsw types.EventSwitch // pub/sub for services
  44. blockStore *bc.BlockStore // store the blockchain to disk
  45. bcReactor *bc.BlockchainReactor // for fast-syncing
  46. mempoolReactor *mempl.MempoolReactor // for gossipping transactions
  47. consensusState *consensus.ConsensusState // latest consensus state
  48. consensusReactor *consensus.ConsensusReactor // for participating in the consensus
  49. proxyApp proxy.AppConns // connection to the application
  50. rpcListeners []net.Listener // rpc servers
  51. txIndexer txindex.TxIndexer
  52. }
  53. func NewNodeDefault(config *cfg.Config, logger log.Logger) *Node {
  54. // Get PrivValidator
  55. privValidator := types.LoadOrGenPrivValidator(config.PrivValidatorFile(), logger)
  56. return NewNode(config, privValidator,
  57. proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), logger)
  58. }
  59. func NewNode(config *cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator, logger log.Logger) *Node {
  60. // Get BlockStore
  61. blockStoreDB := dbm.NewDB("blockstore", config.DBBackend, config.DBDir())
  62. blockStore := bc.NewBlockStore(blockStoreDB)
  63. consensusLogger := logger.With("module", "consensus")
  64. stateLogger := logger.With("module", "state")
  65. // Get State
  66. stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir())
  67. state := sm.GetState(stateDB, config.GenesisFile())
  68. state.SetLogger(stateLogger)
  69. // Create the proxyApp, which manages connections (consensus, mempool, query)
  70. // and sync tendermint and the app by replaying any necessary blocks
  71. handshaker := consensus.NewHandshaker(state, blockStore)
  72. handshaker.SetLogger(consensusLogger)
  73. proxyApp := proxy.NewAppConns(clientCreator, handshaker)
  74. proxyApp.SetLogger(logger.With("module", "proxy"))
  75. if _, err := proxyApp.Start(); err != nil {
  76. cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err))
  77. }
  78. // reload the state (it may have been updated by the handshake)
  79. state = sm.LoadState(stateDB)
  80. state.SetLogger(stateLogger)
  81. // Transaction indexing
  82. var txIndexer txindex.TxIndexer
  83. switch config.TxIndex {
  84. case "kv":
  85. store := dbm.NewDB("tx_index", config.DBBackend, config.DBDir())
  86. txIndexer = kv.NewTxIndex(store)
  87. default:
  88. txIndexer = &null.TxIndex{}
  89. }
  90. state.TxIndexer = txIndexer
  91. // Generate node PrivKey
  92. privKey := crypto.GenPrivKeyEd25519()
  93. // Make event switch
  94. eventSwitch := types.NewEventSwitch()
  95. eventSwitch.SetLogger(logger.With("module", "types"))
  96. _, err := eventSwitch.Start()
  97. if err != nil {
  98. cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
  99. }
  100. // Decide whether to fast-sync or not
  101. // We don't fast-sync when the only validator is us.
  102. fastSync := config.FastSync
  103. if state.Validators.Size() == 1 {
  104. addr, _ := state.Validators.GetByIndex(0)
  105. if bytes.Equal(privValidator.Address, addr) {
  106. fastSync = false
  107. }
  108. }
  109. // Log whether this node is a validator or an observer
  110. if state.Validators.HasAddress(privValidator.Address) {
  111. consensusLogger.Info("This node is a validator")
  112. } else {
  113. consensusLogger.Info("This node is not a validator")
  114. }
  115. // Make BlockchainReactor
  116. bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
  117. bcReactor.SetLogger(logger.With("module", "blockchain"))
  118. // Make MempoolReactor
  119. mempoolLogger := logger.With("module", "mempool")
  120. mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)
  121. mempool.SetLogger(mempoolLogger)
  122. mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
  123. mempoolReactor.SetLogger(mempoolLogger)
  124. if config.Consensus.WaitForTxs() {
  125. mempool.EnableTxsAvailable()
  126. }
  127. // Make ConsensusReactor
  128. consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
  129. consensusState.SetLogger(consensusLogger)
  130. if privValidator != nil {
  131. consensusState.SetPrivValidator(privValidator)
  132. }
  133. consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
  134. consensusReactor.SetLogger(consensusLogger)
  135. p2pLogger := logger.With("module", "p2p")
  136. sw := p2p.NewSwitch(config.P2P)
  137. sw.SetLogger(p2pLogger)
  138. sw.AddReactor("MEMPOOL", mempoolReactor)
  139. sw.AddReactor("BLOCKCHAIN", bcReactor)
  140. sw.AddReactor("CONSENSUS", consensusReactor)
  141. // Optionally, start the pex reactor
  142. var addrBook *p2p.AddrBook
  143. if config.P2P.PexReactor {
  144. addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
  145. addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
  146. pexReactor := p2p.NewPEXReactor(addrBook)
  147. pexReactor.SetLogger(p2pLogger)
  148. sw.AddReactor("PEX", pexReactor)
  149. }
  150. // Filter peers by addr or pubkey with an ABCI query.
  151. // If the query return code is OK, add peer.
  152. // XXX: Query format subject to change
  153. if config.FilterPeers {
  154. // NOTE: addr is ip:port
  155. sw.SetAddrFilter(func(addr net.Addr) error {
  156. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/addr/%s", addr.String())})
  157. if err != nil {
  158. return err
  159. }
  160. if resQuery.Code.IsOK() {
  161. return nil
  162. }
  163. return errors.New(resQuery.Code.String())
  164. })
  165. sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  166. resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/pubkey/%X", pubkey.Bytes())})
  167. if err != nil {
  168. return err
  169. }
  170. if resQuery.Code.IsOK() {
  171. return nil
  172. }
  173. return errors.New(resQuery.Code.String())
  174. })
  175. }
  176. // add the event switch to all services
  177. // they should all satisfy events.Eventable
  178. SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  179. // run the profile server
  180. profileHost := config.ProfListenAddress
  181. if profileHost != "" {
  182. go func() {
  183. logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil))
  184. }()
  185. }
  186. node := &Node{
  187. config: config,
  188. genesisDoc: state.GenesisDoc,
  189. privValidator: privValidator,
  190. privKey: privKey,
  191. sw: sw,
  192. addrBook: addrBook,
  193. evsw: eventSwitch,
  194. blockStore: blockStore,
  195. bcReactor: bcReactor,
  196. mempoolReactor: mempoolReactor,
  197. consensusState: consensusState,
  198. consensusReactor: consensusReactor,
  199. proxyApp: proxyApp,
  200. txIndexer: txIndexer,
  201. }
  202. node.BaseService = *cmn.NewBaseService(logger, "Node", node)
  203. return node
  204. }
  205. func (n *Node) OnStart() error {
  206. // Create & add listener
  207. protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
  208. l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, n.Logger.With("module", "p2p"))
  209. n.sw.AddListener(l)
  210. // Start the switch
  211. n.sw.SetNodeInfo(n.makeNodeInfo())
  212. n.sw.SetNodePrivKey(n.privKey)
  213. _, err := n.sw.Start()
  214. if err != nil {
  215. return err
  216. }
  217. // If seeds exist, add them to the address book and dial out
  218. if n.config.P2P.Seeds != "" {
  219. // dial out
  220. seeds := strings.Split(n.config.P2P.Seeds, ",")
  221. if err := n.DialSeeds(seeds); err != nil {
  222. return err
  223. }
  224. }
  225. // Run the RPC server
  226. if n.config.RPC.ListenAddress != "" {
  227. listeners, err := n.startRPC()
  228. if err != nil {
  229. return err
  230. }
  231. n.rpcListeners = listeners
  232. }
  233. return nil
  234. }
  235. func (n *Node) OnStop() {
  236. n.BaseService.OnStop()
  237. n.Logger.Info("Stopping Node")
  238. // TODO: gracefully disconnect from peers.
  239. n.sw.Stop()
  240. for _, l := range n.rpcListeners {
  241. n.Logger.Info("Closing rpc listener", "listener", l)
  242. if err := l.Close(); err != nil {
  243. n.Logger.Error("Error closing listener", "listener", l, "err", err)
  244. }
  245. }
  246. }
  247. func (n *Node) RunForever() {
  248. // Sleep forever and then...
  249. cmn.TrapSignal(func() {
  250. n.Stop()
  251. })
  252. }
  253. // Add the event switch to reactors, mempool, etc.
  254. func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
  255. for _, e := range eventables {
  256. e.SetEventSwitch(evsw)
  257. }
  258. }
  259. // Add a Listener to accept inbound peer connections.
  260. // Add listeners before starting the Node.
  261. // The first listener is the primary listener (in NodeInfo)
  262. func (n *Node) AddListener(l p2p.Listener) {
  263. n.sw.AddListener(l)
  264. }
  265. // ConfigureRPC sets all variables in rpccore so they will serve
  266. // rpc calls from this node
  267. func (n *Node) ConfigureRPC() {
  268. rpccore.SetEventSwitch(n.evsw)
  269. rpccore.SetBlockStore(n.blockStore)
  270. rpccore.SetConsensusState(n.consensusState)
  271. rpccore.SetMempool(n.mempoolReactor.Mempool)
  272. rpccore.SetSwitch(n.sw)
  273. rpccore.SetPubKey(n.privValidator.PubKey)
  274. rpccore.SetGenesisDoc(n.genesisDoc)
  275. rpccore.SetAddrBook(n.addrBook)
  276. rpccore.SetProxyAppQuery(n.proxyApp.Query())
  277. rpccore.SetTxIndexer(n.txIndexer)
  278. rpccore.SetConsensusReactor(n.consensusReactor)
  279. rpccore.SetLogger(n.Logger.With("module", "rpc"))
  280. }
  281. func (n *Node) startRPC() ([]net.Listener, error) {
  282. n.ConfigureRPC()
  283. listenAddrs := strings.Split(n.config.RPC.ListenAddress, ",")
  284. if n.config.RPC.Unsafe {
  285. rpccore.AddUnsafeRoutes()
  286. }
  287. // we may expose the rpc over both a unix and tcp socket
  288. listeners := make([]net.Listener, len(listenAddrs))
  289. for i, listenAddr := range listenAddrs {
  290. mux := http.NewServeMux()
  291. rpcLogger := n.Logger.With("module", "rpc-server")
  292. wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
  293. wm.SetLogger(rpcLogger.With("protocol", "websocket"))
  294. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  295. rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
  296. listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
  297. if err != nil {
  298. return nil, err
  299. }
  300. listeners[i] = listener
  301. }
  302. // we expose a simplified api over grpc for convenience to app devs
  303. grpcListenAddr := n.config.RPC.GRPCListenAddress
  304. if grpcListenAddr != "" {
  305. listener, err := grpccore.StartGRPCServer(grpcListenAddr)
  306. if err != nil {
  307. return nil, err
  308. }
  309. listeners = append(listeners, listener)
  310. }
  311. return listeners, nil
  312. }
  313. func (n *Node) Switch() *p2p.Switch {
  314. return n.sw
  315. }
  316. func (n *Node) BlockStore() *bc.BlockStore {
  317. return n.blockStore
  318. }
  319. func (n *Node) ConsensusState() *consensus.ConsensusState {
  320. return n.consensusState
  321. }
  322. func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
  323. return n.consensusReactor
  324. }
  325. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  326. return n.mempoolReactor
  327. }
  328. func (n *Node) EventSwitch() types.EventSwitch {
  329. return n.evsw
  330. }
  331. // XXX: for convenience
  332. func (n *Node) PrivValidator() *types.PrivValidator {
  333. return n.privValidator
  334. }
  335. func (n *Node) GenesisDoc() *types.GenesisDoc {
  336. return n.genesisDoc
  337. }
  338. func (n *Node) ProxyApp() proxy.AppConns {
  339. return n.proxyApp
  340. }
  341. func (n *Node) makeNodeInfo() *p2p.NodeInfo {
  342. txIndexerStatus := "on"
  343. if _, ok := n.txIndexer.(*null.TxIndex); ok {
  344. txIndexerStatus = "off"
  345. }
  346. nodeInfo := &p2p.NodeInfo{
  347. PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
  348. Moniker: n.config.Moniker,
  349. Network: n.consensusState.GetState().ChainID,
  350. Version: version.Version,
  351. Other: []string{
  352. cmn.Fmt("wire_version=%v", wire.Version),
  353. cmn.Fmt("p2p_version=%v", p2p.Version),
  354. cmn.Fmt("consensus_version=%v", consensus.Version),
  355. cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
  356. cmn.Fmt("tx_index=%v", txIndexerStatus),
  357. },
  358. }
  359. // include git hash in the nodeInfo if available
  360. // TODO: use ld-flags
  361. /*if rev, err := cmn.ReadFile(n.config.GetString("revision_file")); err == nil {
  362. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev)))
  363. }*/
  364. if !n.sw.IsListening() {
  365. return nodeInfo
  366. }
  367. p2pListener := n.sw.Listeners()[0]
  368. p2pHost := p2pListener.ExternalAddress().IP.String()
  369. p2pPort := p2pListener.ExternalAddress().Port
  370. rpcListenAddr := n.config.RPC.ListenAddress
  371. // We assume that the rpcListener has the same ExternalAddress.
  372. // This is probably true because both P2P and RPC listeners use UPnP,
  373. // except of course if the rpc is only bound to localhost
  374. nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
  375. nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
  376. return nodeInfo
  377. }
  378. //------------------------------------------------------------------------------
  379. func (n *Node) NodeInfo() *p2p.NodeInfo {
  380. return n.sw.NodeInfo()
  381. }
  382. func (n *Node) DialSeeds(seeds []string) error {
  383. return n.sw.DialSeeds(n.addrBook, seeds)
  384. }
  385. // Defaults to tcp
  386. func ProtocolAndAddress(listenAddr string) (string, string) {
  387. protocol, address := "tcp", listenAddr
  388. parts := strings.SplitN(address, "://", 2)
  389. if len(parts) == 2 {
  390. protocol, address = parts[0], parts[1]
  391. }
  392. return protocol, address
  393. }
  394. //------------------------------------------------------------------------------