You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

336 lines
9.1 KiB

10 years ago
10 years ago
9 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
  1. package node
  2. import (
  3. "bytes"
  4. "io/ioutil"
  5. "math/rand"
  6. "net"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "time"
  11. . "github.com/tendermint/go-common"
  12. "github.com/tendermint/go-crypto"
  13. dbm "github.com/tendermint/go-db"
  14. "github.com/tendermint/go-p2p"
  15. "github.com/tendermint/go-wire"
  16. bc "github.com/tendermint/tendermint/blockchain"
  17. "github.com/tendermint/tendermint/consensus"
  18. "github.com/tendermint/tendermint/events"
  19. mempl "github.com/tendermint/tendermint/mempool"
  20. "github.com/tendermint/tendermint/rpc"
  21. "github.com/tendermint/tendermint/rpc/core"
  22. "github.com/tendermint/tendermint/rpc/server"
  23. sm "github.com/tendermint/tendermint/state"
  24. "github.com/tendermint/tendermint/types"
  25. )
  26. import _ "net/http/pprof"
  27. type Node struct {
  28. sw *p2p.Switch
  29. evsw *events.EventSwitch
  30. book *p2p.AddrBook
  31. blockStore *bc.BlockStore
  32. pexReactor *p2p.PEXReactor
  33. bcReactor *bc.BlockchainReactor
  34. mempoolReactor *mempl.MempoolReactor
  35. consensusState *consensus.ConsensusState
  36. consensusReactor *consensus.ConsensusReactor
  37. privValidator *types.PrivValidator
  38. genDoc *types.GenesisDoc
  39. privKey crypto.PrivKeyEd25519
  40. }
  41. func NewNode() *Node {
  42. // Get BlockStore
  43. blockStoreDB := dbm.GetDB("blockstore")
  44. blockStore := bc.NewBlockStore(blockStoreDB)
  45. // Get State
  46. stateDB := dbm.GetDB("state")
  47. state := sm.LoadState(stateDB)
  48. var genDoc *types.GenesisDoc
  49. if state == nil {
  50. genDoc, state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  51. state.Save()
  52. // write the gendoc to db
  53. buf, n, err := new(bytes.Buffer), new(int64), new(error)
  54. wire.WriteJSON(genDoc, buf, n, err)
  55. stateDB.Set(types.GenDocKey, buf.Bytes())
  56. if *err != nil {
  57. Exit(Fmt("Unable to write gendoc to db: %v", err))
  58. }
  59. } else {
  60. genDocBytes := stateDB.Get(types.GenDocKey)
  61. err := new(error)
  62. wire.ReadJSONPtr(&genDoc, genDocBytes, err)
  63. if *err != nil {
  64. Exit(Fmt("Unable to read gendoc from db: %v", err))
  65. }
  66. }
  67. // add the chainid to the global config
  68. config.Set("chain_id", state.ChainID)
  69. // Get PrivValidator
  70. privValidatorFile := config.GetString("priv_validator_file")
  71. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  72. // Generate node PrivKey
  73. privKey := crypto.GenPrivKeyEd25519()
  74. // Make event switch
  75. eventSwitch := events.NewEventSwitch()
  76. _, err := eventSwitch.Start()
  77. if err != nil {
  78. Exit(Fmt("Failed to start switch: %v", err))
  79. }
  80. // Make PEXReactor
  81. book := p2p.NewAddrBook(config.GetString("addrbook_file"))
  82. pexReactor := p2p.NewPEXReactor(book)
  83. // Make BlockchainReactor
  84. bcReactor := bc.NewBlockchainReactor(state.Copy(), blockStore, config.GetBool("fast_sync"))
  85. // Make MempoolReactor
  86. mempool := mempl.NewMempool(state.Copy())
  87. mempoolReactor := mempl.NewMempoolReactor(mempool)
  88. // Make ConsensusReactor
  89. consensusState := consensus.NewConsensusState(state.Copy(), blockStore, mempoolReactor)
  90. consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync"))
  91. if privValidator != nil {
  92. consensusReactor.SetPrivValidator(privValidator)
  93. }
  94. // Make p2p network switch
  95. sw := p2p.NewSwitch()
  96. sw.AddReactor("PEX", pexReactor)
  97. sw.AddReactor("MEMPOOL", mempoolReactor)
  98. sw.AddReactor("BLOCKCHAIN", bcReactor)
  99. sw.AddReactor("CONSENSUS", consensusReactor)
  100. // add the event switch to all services
  101. // they should all satisfy events.Eventable
  102. SetFireable(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
  103. // run the profile server
  104. profileHost := config.GetString("prof_laddr")
  105. if profileHost != "" {
  106. go func() {
  107. log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
  108. }()
  109. }
  110. return &Node{
  111. sw: sw,
  112. evsw: eventSwitch,
  113. book: book,
  114. blockStore: blockStore,
  115. pexReactor: pexReactor,
  116. bcReactor: bcReactor,
  117. mempoolReactor: mempoolReactor,
  118. consensusState: consensusState,
  119. consensusReactor: consensusReactor,
  120. privValidator: privValidator,
  121. genDoc: genDoc,
  122. privKey: privKey,
  123. }
  124. }
  125. // Call Start() after adding the listeners.
  126. func (n *Node) Start() error {
  127. n.book.Start()
  128. n.sw.SetNodeInfo(makeNodeInfo(n.sw, n.privKey))
  129. n.sw.SetNodePrivKey(n.privKey)
  130. _, err := n.sw.Start()
  131. return err
  132. }
  133. func (n *Node) Stop() {
  134. log.Notice("Stopping Node")
  135. // TODO: gracefully disconnect from peers.
  136. n.sw.Stop()
  137. n.book.Stop()
  138. }
  139. // Add the event switch to reactors, mempool, etc.
  140. func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
  141. for _, e := range eventables {
  142. e.SetFireable(evsw)
  143. }
  144. }
  145. // Add a Listener to accept inbound peer connections.
  146. // Add listeners before starting the Node.
  147. // The first listener is the primary listener (in NodeInfo)
  148. func (n *Node) AddListener(l p2p.Listener) {
  149. log.Notice(Fmt("Added %v", l))
  150. n.sw.AddListener(l)
  151. n.book.AddOurAddress(l.ExternalAddress())
  152. }
  153. // Dial a list of seeds in random order
  154. // Spawns a go routine for each dial
  155. func (n *Node) DialSeed() {
  156. // permute the list, dial them in random order.
  157. seeds := strings.Split(config.GetString("seeds"), ",")
  158. perm := rand.Perm(len(seeds))
  159. for i := 0; i < len(perm); i++ {
  160. go func(i int) {
  161. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  162. j := perm[i]
  163. addr := p2p.NewNetAddressString(seeds[j])
  164. n.dialSeed(addr)
  165. }(i)
  166. }
  167. }
  168. func (n *Node) dialSeed(addr *p2p.NetAddress) {
  169. peer, err := n.sw.DialPeerWithAddress(addr)
  170. if err != nil {
  171. log.Error("Error dialing seed", "error", err)
  172. //n.book.MarkAttempt(addr)
  173. return
  174. } else {
  175. log.Notice("Connected to seed", "peer", peer)
  176. n.book.AddAddress(addr, addr)
  177. }
  178. }
  179. func (n *Node) StartRPC() (net.Listener, error) {
  180. core.SetBlockStore(n.blockStore)
  181. core.SetConsensusState(n.consensusState)
  182. core.SetConsensusReactor(n.consensusReactor)
  183. core.SetMempoolReactor(n.mempoolReactor)
  184. core.SetSwitch(n.sw)
  185. core.SetPrivValidator(n.privValidator)
  186. core.SetGenDoc(n.genDoc)
  187. listenAddr := config.GetString("rpc_laddr")
  188. mux := http.NewServeMux()
  189. wm := rpcserver.NewWebsocketManager(core.Routes, n.evsw)
  190. mux.HandleFunc("/websocket", wm.WebsocketHandler)
  191. rpcserver.RegisterRPCFuncs(mux, core.Routes)
  192. return rpcserver.StartHTTPServer(listenAddr, mux)
  193. }
  194. func (n *Node) Switch() *p2p.Switch {
  195. return n.sw
  196. }
  197. func (n *Node) BlockStore() *bc.BlockStore {
  198. return n.blockStore
  199. }
  200. func (n *Node) ConsensusState() *consensus.ConsensusState {
  201. return n.consensusState
  202. }
  203. func (n *Node) MempoolReactor() *mempl.MempoolReactor {
  204. return n.mempoolReactor
  205. }
  206. func (n *Node) EventSwitch() *events.EventSwitch {
  207. return n.evsw
  208. }
  209. func makeNodeInfo(sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
  210. nodeInfo := &p2p.NodeInfo{
  211. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  212. Moniker: config.GetString("moniker"),
  213. Network: config.GetString("chain_id"),
  214. Version: Version,
  215. Other: []string{
  216. Fmt("p2p_version=%v" + p2p.Version),
  217. Fmt("rpc_version=%v" + rpc.Version),
  218. Fmt("wire_version=%v" + wire.Version),
  219. },
  220. }
  221. // include git hash in the nodeInfo if available
  222. if rev, err := ReadFile(config.GetString("revision_file")); err == nil {
  223. nodeInfo.Other = append(nodeInfo.Other, Fmt("revision=%v", string(rev)))
  224. }
  225. if !sw.IsListening() {
  226. return nodeInfo
  227. }
  228. p2pListener := sw.Listeners()[0]
  229. p2pHost := p2pListener.ExternalAddress().IP.String()
  230. p2pPort := p2pListener.ExternalAddress().Port
  231. rpcListenAddr := config.GetString("rpc_laddr")
  232. _, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr)
  233. rpcPort, err := strconv.Atoi(rpcPortStr)
  234. if err != nil {
  235. PanicSanity(Fmt("Expected numeric RPC.ListenAddr port but got %v", rpcPortStr))
  236. }
  237. // We assume that the rpcListener has the same ExternalAddress.
  238. // This is probably true because both P2P and RPC listeners use UPnP,
  239. // except of course if the rpc is only bound to localhost
  240. nodeInfo.Address = Fmt("%v:%v", p2pHost, p2pPort)
  241. nodeInfo.Other = append(nodeInfo.Other, Fmt("rpc_port=%v", rpcPort))
  242. return nodeInfo
  243. }
  244. //------------------------------------------------------------------------------
  245. func RunNode() {
  246. // Wait until the genesis doc becomes available
  247. genDocFile := config.GetString("genesis_file")
  248. if !FileExists(genDocFile) {
  249. log.Notice(Fmt("Waiting for genesis file %v...", genDocFile))
  250. for {
  251. time.Sleep(time.Second)
  252. if FileExists(genDocFile) {
  253. break
  254. }
  255. jsonBlob, err := ioutil.ReadFile(genDocFile)
  256. if err != nil {
  257. Exit(Fmt("Couldn't read GenesisDoc file: %v", err))
  258. }
  259. genDoc := types.GenesisDocFromJSON(jsonBlob)
  260. if genDoc.ChainID == "" {
  261. PanicSanity(Fmt("Genesis doc %v must include non-empty chain_id", genDocFile))
  262. }
  263. config.Set("chain_id", genDoc.ChainID)
  264. config.Set("genesis_doc", genDoc)
  265. }
  266. }
  267. // Create & start node
  268. n := NewNode()
  269. l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp"))
  270. n.AddListener(l)
  271. err := n.Start()
  272. if err != nil {
  273. Exit(Fmt("Failed to start node: %v", err))
  274. }
  275. log.Notice("Started node", "nodeInfo", n.sw.NodeInfo())
  276. // If seedNode is provided by config, dial out.
  277. if config.GetString("seeds") != "" {
  278. n.DialSeed()
  279. }
  280. // Run the RPC server.
  281. if config.GetString("rpc_laddr") != "" {
  282. _, err := n.StartRPC()
  283. if err != nil {
  284. PanicCrisis(err)
  285. }
  286. }
  287. // Sleep forever and then...
  288. TrapSignal(func() {
  289. n.Stop()
  290. })
  291. }