Browse Source

Use rpc/client/ws_client; OnStart() returns error

pull/130/head
Jae Kwon 10 years ago
parent
commit
e86073ec96
19 changed files with 112 additions and 97 deletions
  1. +2
    -1
      blockchain/pool.go
  2. +3
    -2
      blockchain/reactor.go
  3. +2
    -2
      cmd/sim_txs/main.go
  4. +12
    -11
      common/service.go
  5. +7
    -3
      consensus/reactor.go
  6. +3
    -2
      consensus/state.go
  7. +29
    -47
      crawler/crawl.go
  8. +7
    -0
      crawler/log.go
  9. +2
    -1
      events/events.go
  10. +1
    -5
      mempool/reactor.go
  11. +3
    -2
      node/node.go
  12. +2
    -1
      p2p/addrbook.go
  13. +3
    -2
      p2p/connection.go
  14. +2
    -1
      p2p/listener.go
  15. +4
    -3
      p2p/peer.go
  16. +3
    -2
      p2p/pex_reactor.go
  17. +2
    -1
      p2p/switch.go
  18. +23
    -10
      rpc/client/ws_client.go
  19. +2
    -1
      rpc/server/handlers.go

+ 2
- 1
blockchain/pool.go View File

@ -68,10 +68,11 @@ func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- s
return bp
}
func (pool *BlockPool) OnStart() {
func (pool *BlockPool) OnStart() error {
pool.BaseService.OnStart()
pool.repeater = NewRepeatTimer("", requestIntervalMS*time.Millisecond)
go pool.run()
return nil
}
func (pool *BlockPool) OnStop() {


+ 3
- 2
blockchain/reactor.go View File

@ -7,12 +7,12 @@ import (
"reflect"
"time"
"github.com/tendermint/tendermint/wire"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/wire"
)
const (
@ -76,12 +76,13 @@ func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *Blockc
return bcR
}
func (bcR *BlockchainReactor) OnStart() {
func (bcR *BlockchainReactor) OnStart() error {
bcR.BaseReactor.OnStart()
if bcR.sync {
bcR.pool.Start()
go bcR.poolRoutine()
}
return nil
}
func (bcR *BlockchainReactor) OnStop() {


+ 2
- 2
cmd/sim_txs/main.go View File

@ -75,12 +75,12 @@ func main() {
sendTx := makeRandomTransaction(10, rootAccount.Sequence+1, root, 2, accounts)
fmt.Println(sendTx)
wsClient, err := rpcclient.NewWSClient("ws://" + remote + "/websocket")
wsClient := rpcclient.NewWSClient("ws://" + remote + "/websocket")
_, err = wsClient.Start()
if err != nil {
Exit(Fmt("Failed to establish websocket connection: %v", err))
}
wsClient.Subscribe(types.EventStringAccInput(sendTx.Inputs[0].Address))
wsClient.Start()
go func() {
for {


+ 12
- 11
common/service.go View File

@ -23,13 +23,13 @@ func NewFooService() *FooService {
return fs
}
func (fs *FooService) OnStart() {
func (fs *FooService) OnStart() error {
fs.BaseService.OnStart() // Always call the overridden method.
// initialize private fields
// start subroutines, etc.
}
func (fs *FooService) OnStop() {
func (fs *FooService) OnStop() error {
fs.BaseService.OnStop() // Always call the overridden method.
// close/destroy private fields
// stop subroutines, etc.
@ -42,8 +42,8 @@ import "sync/atomic"
import "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/log15"
type Service interface {
Start() bool
OnStart()
Start() (bool, error)
OnStart() error
Stop() bool
OnStop()
@ -72,24 +72,24 @@ func NewBaseService(log log15.Logger, name string, impl Service) *BaseService {
}
// Implements Servce
func (bs *BaseService) Start() bool {
func (bs *BaseService) Start() (bool, error) {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 {
bs.log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
return false
return false, nil
} else {
bs.log.Notice(Fmt("Starting %v", bs.name), "impl", bs.impl)
}
bs.impl.OnStart()
return true
err := bs.impl.OnStart()
return true, err
} else {
bs.log.Info(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl)
return false
return false, nil
}
}
// Implements Service
func (bs *BaseService) OnStart() {}
func (bs *BaseService) OnStart() error { return nil }
// Implements Service
func (bs *BaseService) Stop() bool {
@ -131,8 +131,9 @@ func NewQuitService(log log15.Logger, name string, impl Service) *QuitService {
}
// NOTE: when overriding OnStart, must call .QuitService.OnStart().
func (qs *QuitService) OnStart() {
func (qs *QuitService) OnStart() error {
qs.Quit = make(chan struct{})
return nil
}
// NOTE: when overriding OnStop, must call .QuitService.OnStop().


+ 7
- 3
consensus/reactor.go View File

@ -8,7 +8,6 @@ import (
"sync"
"time"
"github.com/tendermint/tendermint/wire"
bc "github.com/tendermint/tendermint/blockchain"
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/consensus/types"
@ -16,6 +15,7 @@ import (
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/wire"
)
const (
@ -49,13 +49,17 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto
return conR
}
func (conR *ConsensusReactor) OnStart() {
func (conR *ConsensusReactor) OnStart() error {
log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
conR.BaseReactor.OnStart()
if !conR.fastSync {
conR.conS.Start()
_, err := conR.conS.Start()
if err != nil {
return err
}
}
go conR.broadcastNewRoundStepRoutine()
return nil
}
func (conR *ConsensusReactor) OnStop() {


+ 3
- 2
consensus/state.go View File

@ -158,7 +158,6 @@ import (
"time"
acm "github.com/tendermint/tendermint/account"
"github.com/tendermint/tendermint/wire"
bc "github.com/tendermint/tendermint/blockchain"
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/consensus/types"
@ -166,6 +165,7 @@ import (
mempl "github.com/tendermint/tendermint/mempool"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/wire"
)
var (
@ -360,9 +360,10 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState {
return cs.newStepCh
}
func (cs *ConsensusState) OnStart() {
func (cs *ConsensusState) OnStart() error {
cs.BaseService.OnStart()
cs.scheduleRound0(cs.Height)
return nil
}
func (cs *ConsensusState) OnStop() {


+ 29
- 47
crawler/crawl.go View File

@ -2,13 +2,15 @@ package crawler
import (
"fmt"
"github.com/tendermint/tendermint/wire"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/core_client"
"github.com/tendermint/tendermint/types"
"time"
"io/ioutil"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
core "github.com/tendermint/tendermint/rpc/core_client"
"github.com/tendermint/tendermint/rpc/types"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/wire"
)
const (
@ -58,14 +60,14 @@ func (n *Node) SetInfo(status *ctypes.ResponseStatus, netinfo *ctypes.ResponseNe
// A node client is used to talk to a node over rpc and websockets
type NodeClient struct {
rpc rpcclient.Client
rpc core.Client
ws *rpcclient.WSClient
}
// Create a new client for the node at the given addr
func NewNodeClient(addr string) *NodeClient {
return &NodeClient{
rpc: rpcclient.NewClient("http://"+addr, "JSONRPC"),
rpc: core.NewClient("http://"+addr, "JSONRPC"),
ws: rpcclient.NewWSClient("ws://" + addr + "/events"),
}
}
@ -90,6 +92,8 @@ func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) {
// A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes.
// Maps are only accessed by one go-routine, mediated by the checkQueue
type Crawler struct {
QuitService
self *Node
client *NodeClient
@ -98,22 +102,22 @@ type Crawler struct {
nodes map[string]*Node
nodeQueue chan *Node
quit chan struct{}
}
// Create a new Crawler using the local RPC server at addr
func NewCrawler(host string, port uint16) *Crawler {
return &Crawler{
crawler := &Crawler{
self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))},
checkQueue: make(chan nodeInfo, CheckQueueBufferSize),
nodePool: make(map[string]*Node),
nodes: make(map[string]*Node),
nodeQueue: make(chan *Node, NodeQueueBufferSize),
quit: make(chan struct{}),
}
crawler.QuitService = *NewQuitService(log, "Crawler", crawler)
return crawler
}
func (c *Crawler) Start() error {
func (c *Crawler) OnStart() error {
// connect to local node first, set info,
// and fire peers onto the checkQueue
if err := c.pollNode(c.self); err != nil {
@ -122,10 +126,8 @@ func (c *Crawler) Start() error {
// connect to weboscket, subscribe to local events
// and run the read loop to listen for new blocks
if r, err := c.self.client.ws.Dial(); err != nil {
fmt.Println(r)
b, _ := ioutil.ReadAll(r.Body)
fmt.Println(string(b))
_, err := c.self.client.ws.Start()
if err != nil {
return err
}
if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil {
@ -147,20 +149,16 @@ func (c *Crawler) Start() error {
return nil
}
func (c *Crawler) Stop() {
close(c.quit)
}
// listen for events from the node and ping it for peers on a ticker
func (c *Crawler) readLoop(node *Node) {
wsChan := node.client.ws.Read()
eventsCh := node.client.ws.EventsCh
getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds)
for {
select {
case wsMsg := <-wsChan:
case eventMsg := <-eventsCh:
// update the node with his new info
if err := c.consumeMessage(wsMsg, node); err != nil {
if err := c.consumeMessage(eventMsg, node); err != nil {
// lost the node, put him back on the checkQueu
c.checkNode(nodeInfo{
host: node.Host,
@ -178,37 +176,21 @@ func (c *Crawler) readLoop(node *Node) {
disconnected: true,
})
}
case <-c.quit:
case <-c.Quit:
return
}
}
}
func (c *Crawler) consumeMessage(wsMsg *rpcclient.WSMsg, node *Node) error {
if wsMsg.Error != nil {
return wsMsg.Error
}
// unmarshal block event
var response struct {
Event string
Data *types.Block
Error string
}
func (c *Crawler) consumeMessage(eventMsg rpctypes.RPCEventResult, node *Node) error {
var block *types.Block
var err error
wire.ReadJSON(&response, wsMsg.Data, &err)
if err != nil {
return err
}
if response.Error != "" {
return fmt.Errorf(response.Error)
}
block := response.Data
wire.ReadJSONObject(block, eventMsg.Data, &err)
node.LastSeen = time.Now()
node.BlockHeight = block.Height
node.BlockHistory[block.Height] = node.LastSeen
return nil
}
@ -251,7 +233,7 @@ func (c *Crawler) checkLoop() {
// queue it for connecting to
c.nodeQueue <- n
case <-c.quit:
case <-c.Quit:
return
}
}
@ -263,7 +245,7 @@ func (c *Crawler) connectLoop() {
select {
case node := <-c.nodeQueue:
go c.connectToNode(node)
case <-c.quit:
case <-c.Quit:
// close all connections
for addr, node := range c.nodes {
_, _ = addr, node
@ -278,9 +260,9 @@ func (c *Crawler) connectToNode(node *Node) {
addr := node.Address()
node.client = NewNodeClient(addr)
if b, err := node.client.ws.Dial(); err != nil {
fmt.Println("err on ws dial:", b, err)
_, err := node.client.ws.Start()
if err != nil {
fmt.Println("err on ws start:", err)
// set failed, return
}


+ 7
- 0
crawler/log.go View File

@ -0,0 +1,7 @@
package crawler
import (
"github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/log15"
)
var log = log15.New("module", "crawler")

+ 2
- 1
events/events.go View File

@ -31,10 +31,11 @@ func NewEventSwitch() *EventSwitch {
return evsw
}
func (evsw *EventSwitch) OnStart() {
func (evsw *EventSwitch) OnStart() error {
evsw.BaseService.OnStart()
evsw.eventCells = make(map[string]*eventCell)
evsw.listeners = make(map[string]*eventListener)
return nil
}
func (evsw *EventSwitch) OnStop() {


+ 1
- 5
mempool/reactor.go View File

@ -5,11 +5,11 @@ import (
"fmt"
"reflect"
"github.com/tendermint/tendermint/wire"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/wire"
)
var (
@ -34,10 +34,6 @@ func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
return memR
}
// func (memR *MempoolReactor) OnStart() { memR.BaseReactor.OnStart() }
// func (memR *MempoolReactor) OnStop() { memR.BaseReactor.OnStop() }
// Implements Reactor
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{


+ 3
- 2
node/node.go View File

@ -11,7 +11,6 @@ import (
"time"
acm "github.com/tendermint/tendermint/account"
"github.com/tendermint/tendermint/wire"
bc "github.com/tendermint/tendermint/blockchain"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/consensus"
@ -23,6 +22,7 @@ import (
"github.com/tendermint/tendermint/rpc/server"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/wire"
)
import _ "net/http/pprof"
@ -141,11 +141,12 @@ func NewNode() *Node {
}
// Call Start() after adding the listeners.
func (n *Node) Start() {
func (n *Node) Start() error {
n.book.Start()
n.sw.SetNodeInfo(makeNodeInfo(n.sw, n.privKey))
n.sw.SetNodePrivKey(n.privKey)
n.sw.Start()
return nil
}
func (n *Node) Stop() {


+ 2
- 1
p2p/addrbook.go View File

@ -121,11 +121,12 @@ func (a *AddrBook) init() {
}
}
func (a *AddrBook) OnStart() {
func (a *AddrBook) OnStart() error {
a.QuitService.OnStart()
a.loadFromFile(a.filePath)
a.wg.Add(1)
go a.saveRoutine()
return nil
}
func (a *AddrBook) OnStop() {


+ 3
- 2
p2p/connection.go View File

@ -11,8 +11,8 @@ import (
"time"
flow "github.com/tendermint/tendermint/Godeps/_workspace/src/code.google.com/p/mxk/go1/flowcontrol"
"github.com/tendermint/tendermint/wire" //"github.com/tendermint/log15"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/wire" //"github.com/tendermint/log15"
)
const (
@ -127,7 +127,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei
return mconn
}
func (c *MConnection) OnStart() {
func (c *MConnection) OnStart() error {
c.BaseService.OnStart()
c.quit = make(chan struct{})
c.flushTimer = NewThrottleTimer("flush", flushThrottleMS*time.Millisecond)
@ -135,6 +135,7 @@ func (c *MConnection) OnStart() {
c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second)
go c.sendRoutine()
go c.recvRoutine()
return nil
}
func (c *MConnection) OnStop() {


+ 2
- 1
p2p/listener.go View File

@ -108,9 +108,10 @@ SKIP_UPNP:
return dl
}
func (l *DefaultListener) OnStart() {
func (l *DefaultListener) OnStart() error {
l.BaseService.OnStart()
go l.listenRoutine()
return nil
}
func (l *DefaultListener) OnStop() {


+ 4
- 3
p2p/peer.go View File

@ -5,9 +5,9 @@ import (
"io"
"net"
"github.com/tendermint/tendermint/wire"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/wire"
)
type Peer struct {
@ -72,9 +72,10 @@ func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactor
return p
}
func (p *Peer) OnStart() {
func (p *Peer) OnStart() error {
p.BaseService.OnStart()
p.mconn.Start()
_, err := p.mconn.Start()
return err
}
func (p *Peer) OnStop() {


+ 3
- 2
p2p/pex_reactor.go View File

@ -8,9 +8,9 @@ import (
"reflect"
"time"
"github.com/tendermint/tendermint/wire"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/wire"
)
var pexErrInvalidMessage = errors.New("Invalid PEX message")
@ -41,9 +41,10 @@ func NewPEXReactor(book *AddrBook) *PEXReactor {
return pexR
}
func (pexR *PEXReactor) OnStart() {
func (pexR *PEXReactor) OnStart() error {
pexR.BaseReactor.OnStart()
go pexR.ensurePeersRoutine()
return nil
}
func (pexR *PEXReactor) OnStop() {


+ 2
- 1
p2p/switch.go View File

@ -153,7 +153,7 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey acm.PrivKeyEd25519) {
}
// Switch.Start() starts all the reactors, peers, and listeners.
func (sw *Switch) OnStart() {
func (sw *Switch) OnStart() error {
sw.BaseService.OnStart()
// Start reactors
for _, reactor := range sw.reactors {
@ -167,6 +167,7 @@ func (sw *Switch) OnStart() {
for _, listener := range sw.listeners {
go sw.listenerRoutine(listener)
}
return nil
}
func (sw *Switch) OnStop() {


+ 23
- 10
rpc/client/ws_client.go View File

@ -16,31 +16,44 @@ const wsResponsesChannelCapacity = 10
type WSClient struct {
QuitService
Address string
*websocket.Conn
EventsCh chan rpctypes.RPCEventResult
ResponsesCh chan rpctypes.RPCResponse
}
// create a new connection
func NewWSClient(addr string) (*WSClient, error) {
dialer := websocket.DefaultDialer
rHeader := http.Header{}
con, _, err := dialer.Dial(addr, rHeader)
if err != nil {
return nil, err
}
func NewWSClient(addr string) *WSClient {
wsClient := &WSClient{
Conn: con,
Address: addr,
Conn: nil,
EventsCh: make(chan rpctypes.RPCEventResult, wsEventsChannelCapacity),
ResponsesCh: make(chan rpctypes.RPCResponse, wsResponsesChannelCapacity),
}
wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient)
return wsClient, nil
return wsClient
}
func (wsc *WSClient) OnStart() {
func (wsc *WSClient) OnStart() error {
wsc.QuitService.OnStart()
err := wsc.dial()
if err != nil {
return err
}
go wsc.receiveEventsRoutine()
return nil
}
func (wsc *WSClient) dial() error {
// Dial
dialer := websocket.DefaultDialer
rHeader := http.Header{}
con, _, err := dialer.Dial(wsc.Address, rHeader)
if err != nil {
return err
}
wsc.Conn = con
return nil
}
func (wsc *WSClient) OnStop() {


+ 2
- 1
rpc/server/handlers.go View File

@ -240,7 +240,7 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw
}
// wsc.Start() blocks until the connection closes.
func (wsc *WSConnection) OnStart() {
func (wsc *WSConnection) OnStart() error {
wsc.QuitService.OnStart()
// Read subscriptions/unsubscriptions to events
@ -262,6 +262,7 @@ func (wsc *WSConnection) OnStart() {
// Write responses, BLOCKING.
wsc.writeRoutine()
return nil
}
func (wsc *WSConnection) OnStop() {


Loading…
Cancel
Save