Browse Source

events: integrate event switch into services via Eventable interface

pull/46/head
Ethan Buchman 10 years ago
parent
commit
474bf31400
7 changed files with 100 additions and 41 deletions
  1. +8
    -1
      blockchain/reactor.go
  2. +8
    -0
      consensus/reactor.go
  3. +18
    -1
      daemon/daemon.go
  4. +6
    -0
      events/events.go
  5. +8
    -0
      mempool/reactor.go
  6. +8
    -0
      p2p/pex_reactor.go
  7. +44
    -39
      rpc/handlers.go

+ 8
- 1
blockchain/reactor.go View File

@ -9,6 +9,7 @@ import (
"github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
@ -19,7 +20,6 @@ const (
defaultChannelCapacity = 100
defaultSleepIntervalMS = 500
trySyncIntervalMS = 100
// stop syncing when last block's time is
// within this much of the system time.
stopSyncingDurationMinutes = 10
@ -41,6 +41,8 @@ type BlockchainReactor struct {
lastBlock *types.Block
quit chan struct{}
running uint32
evsw *events.EventSwitch
}
func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
@ -239,6 +241,11 @@ func (bcR *BlockchainReactor) BroadcastStatus() error {
return nil
}
// implements events.Eventable
func (bcR *BlockchainReactor) AddEventSwitch(evsw *events.EventSwitch) {
bcR.evsw = evsw
}
//-----------------------------------------------------------------------------
// Messages


+ 8
- 0
consensus/reactor.go View File

@ -12,6 +12,7 @@ import (
bc "github.com/tendermint/tendermint/blockchain"
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
@ -38,6 +39,8 @@ type ConsensusReactor struct {
blockStore *bc.BlockStore
conS *ConsensusState
evsw *events.EventSwitch
}
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
@ -230,6 +233,11 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) {
conR.conS.updateToState(state, false)
}
// implements events.Eventable
func (conR *ConsensusReactor) AddEventSwitch(evsw *events.EventSwitch) {
conR.evsw = evsw
}
//--------------------------------------
func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {


+ 18
- 1
daemon/daemon.go View File

@ -9,6 +9,7 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
dbm "github.com/tendermint/tendermint/db"
"github.com/tendermint/tendermint/events"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/rpc"
@ -18,6 +19,7 @@ import (
type Node struct {
sw *p2p.Switch
evsw *events.EventSwitch
book *p2p.AddrBook
blockStore *bc.BlockStore
pexReactor *p2p.PEXReactor
@ -50,6 +52,9 @@ func NewNode() *Node {
log.Info("No PrivValidator found", "file", config.App().GetString("PrivValidatorFile"))
}
eventSwitch := new(events.EventSwitch)
eventSwitch.Start()
// Get PEXReactor
book := p2p.NewAddrBook(config.App().GetString("AddrBookFile"))
pexReactor := p2p.NewPEXReactor(book)
@ -75,8 +80,13 @@ func NewNode() *Node {
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
// add the event switch to all services
// they should all satisfy events.Eventable
AddEventSwitch(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
return &Node{
sw: sw,
evsw: eventSwitch,
book: book,
blockStore: blockStore,
pexReactor: pexReactor,
@ -105,6 +115,13 @@ func (n *Node) Stop() {
n.book.Stop()
}
// Add the event switch to reactors, mempool, etc.
func AddEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
for _, e := range eventables {
e.AddEventSwitch(evsw)
}
}
// Add a Listener to accept inbound peer connections.
func (n *Node) AddListener(l p2p.Listener) {
log.Info(Fmt("Added %v", l))
@ -130,7 +147,7 @@ func (n *Node) StartRPC() {
core.SetConsensusState(n.consensusState)
core.SetMempoolReactor(n.mempoolReactor)
core.SetSwitch(n.sw)
rpc.StartHTTPServer()
rpc.StartHTTPServer(n.evsw)
}
func (n *Node) Switch() *p2p.Switch {


+ 6
- 0
events/events.go View File

@ -5,6 +5,12 @@ import (
"sync/atomic"
)
// reactors and other modules should export
// this interface to become eventable
type Eventable interface {
AddEventSwitch(*EventSwitch)
}
type EventSwitch struct {
mtx sync.RWMutex
eventCells map[string]*eventCell


+ 8
- 0
mempool/reactor.go View File

@ -6,6 +6,7 @@ import (
"sync/atomic"
"github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
@ -22,6 +23,8 @@ type MempoolReactor struct {
stopped uint32
Mempool *Mempool
evsw *events.EventSwitch
}
func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
@ -110,6 +113,11 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
return nil
}
// implements events.Eventable
func (memR *MempoolReactor) AddEventSwitch(evsw *events.EventSwitch) {
memR.evsw = evsw
}
//-----------------------------------------------------------------------------
// Messages


+ 8
- 0
p2p/pex_reactor.go View File

@ -9,6 +9,7 @@ import (
"github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
)
var pexErrInvalidMessage = errors.New("Invalid PEX message")
@ -31,6 +32,8 @@ type PEXReactor struct {
stopped uint32
book *AddrBook
evsw *events.EventSwitch
}
func NewPEXReactor(book *AddrBook) *PEXReactor {
@ -207,6 +210,11 @@ func (pexR *PEXReactor) ensurePeers() {
}
}
// implements events.Eventable
func (pexR *PEXReactor) AddEventSwitch(evsw *events.EventSwitch) {
pexR.evsw = evsw
}
//-----------------------------------------------------------------------------
// Messages


+ 44
- 39
rpc/handlers.go View File

@ -226,6 +226,45 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
//-----------------------------------------------------------------------------
// rpc.websocket
// for requests coming in
type WsRequest struct {
Type string // subscribe or unsubscribe
Event string
}
// for responses going out
type WsResponse struct {
Event string
Data interface{}
Error string
}
// a single websocket connection
// contains the listeners id
type Connection struct {
id string
wsCon *websocket.Conn
writeChan chan WsResponse
quitChan chan struct{}
failedSends uint
}
// new websocket connection wrapper
func NewConnection(con *websocket.Conn) *Connection {
return &Connection{
id: con.RemoteAddr().String(),
wsCon: con,
writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full
}
}
// close the connection
func (c *Connection) Close() {
c.wsCon.Close()
close(c.writeChan)
close(c.quitChan)
}
// main manager for all websocket connections
// holds the event switch
type WebsocketManager struct {
@ -327,50 +366,16 @@ func (w *WebsocketManager) write(con *Connection) {
websocket.Message.Send(con.wsCon, buf.Bytes())
}
case <-con.quitChan:
close(con.quitChan)
con.Close()
w.closeConn(con)
return
}
}
}
// a single websocket connection
// contains the listeners id
type Connection struct {
id string
wsCon *websocket.Conn
writeChan chan WsResponse
quitChan chan struct{}
failedSends uint
}
// for requests coming in
type WsRequest struct {
Type string // subscribe or unsubscribe
Event string
}
// for responses going out
type WsResponse struct {
Event string
Data interface{}
Error string
}
// new websocket connection wrapper
func NewConnection(con *websocket.Conn) *Connection {
return &Connection{
id: con.RemoteAddr().String(),
wsCon: con,
writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full
}
}
// close the channel
// should only be called by firing on c.quitChan
func (c *Connection) Close() {
close(c.writeChan)
c.wsCon.Close()
// close a connection and delete from manager
func (w *WebsocketManager) closeConn(con *Connection) {
con.Close()
delete(w.cons, con.id)
}
// rpc.websocket


Loading…
Cancel
Save