Browse Source

reconnect validator

pull/1943/head
Ethan Buchman 9 years ago
parent
commit
d52f690e56
8 changed files with 99 additions and 33 deletions
  1. +1
    -1
      Godeps/Godeps.json
  2. +27
    -2
      handlers/callbacks.go
  3. +19
    -8
      handlers/handlers.go
  4. +18
    -0
      handlers/log.go
  5. +9
    -2
      handlers/routes.go
  6. +0
    -11
      types/chain.go
  7. +15
    -2
      types/val.go
  8. +10
    -7
      vendor/github.com/tendermint/go-event-meter/event_meter.go

+ 1
- 1
Godeps/Godeps.json View File

@ -86,7 +86,7 @@
},
{
"ImportPath": "github.com/tendermint/go-event-meter",
"Rev": "1d455b3fc1930f0d12609342b668aed385655ed8"
"Rev": "48308fb1a262b55db2fce92eb5cd55c5316217c1"
},
{
"ImportPath": "github.com/tendermint/go-events",


+ 27
- 2
handlers/callbacks.go View File

@ -1,6 +1,8 @@
package handlers
import (
"time"
"github.com/tendermint/go-event-meter"
"github.com/tendermint/go-events"
@ -12,9 +14,16 @@ import (
/*
Each chain-validator gets an eventmeter which maintains the websocket
Certain pre-defined events may update the netmon state: latency pongs, new blocks
All callbacks are called in a go-routine by the event-meter
TODO: config changes for new validators and changing ip/port
*/
func (tn *TendermintNetwork) registerCallbacks(chainState *types.ChainState, v *types.ValidatorState) error {
v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainState, v))
v.EventMeter().RegisterDisconnectCallback(tn.disconnectCallback(chainState, v))
return v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainState, v))
}
// implements eventmeter.EventCallbackFunc
// updates validator and possibly chain with new block
func (tn *TendermintNetwork) newBlockCallback(chainState *types.ChainState, val *types.ValidatorState) eventmeter.EventCallbackFunc {
@ -47,7 +56,23 @@ func (tn *TendermintNetwork) disconnectCallback(chain *types.ChainState, val *ty
// Validator is down!
chain.SetOnline(val, false)
// Start reconnect routine
go chain.ReconnectValidator(val)
// reconnect
// TODO: stop trying eventually ...
for {
time.Sleep(time.Second)
if err := val.Start(); err != nil {
log.Debug("Can't connect to validator", "valID", val.Config.Validator.ID)
} else {
// register callbacks for the validator
tn.registerCallbacks(chain, val)
chain.SetOnline(val, true)
// TODO: authenticate pubkey
return
}
}
}
}

+ 19
- 8
handlers/handlers.go View File

@ -9,7 +9,6 @@ import (
"github.com/tendermint/go-wire"
"github.com/tendermint/netmon/types"
tmtypes "github.com/tendermint/tendermint/types"
)
type NetMonResult interface {
@ -22,12 +21,14 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&types.ChainState{}, 0x02},
wire.ConcreteType{&types.ValidatorSet{}, 0x10},
wire.ConcreteType{&types.Validator{}, 0x11},
wire.ConcreteType{&types.ValidatorConfig{}, 0x12},
wire.ConcreteType{&eventmeter.EventMetric{}, 0x20},
)
//---------------------------------------------
// global state and backend functions
// TODO: relax the locking (use RWMutex, reduce scope)
type TendermintNetwork struct {
mtx sync.Mutex
Chains map[string]*types.ChainState `json:"blockchains"`
@ -125,18 +126,14 @@ func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig)
return nil, err
}
v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainState, v))
v.EventMeter().RegisterDisconnectCallback(tn.disconnectCallback(chainState, v))
err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainState, v))
if err != nil {
return nil, err
}
// register callbacks for the validator
tn.registerCallbacks(chainState, v)
// the DisconnectCallback will set us offline and start a reconnect routine
chainState.Status.SetOnline(v, true)
// get/set the validator's pub key
// TODO: possibly remove? why should we depend on this here?
// TODO: make this authenticate...
v.PubKey()
}
@ -180,6 +177,20 @@ func (tn *TendermintNetwork) GetValidator(valSetID, valID string) (*types.Valida
return val, nil
}
// Update the validator's rpc address (for now its the only thing that can be updated!)
func (tn *TendermintNetwork) UpdateValidator(chainID, valID, rpcAddr string) (*types.ValidatorConfig, error) {
tn.mtx.Lock()
defer tn.mtx.Unlock()
val, err := tn.getChainVal(chainID, valID)
if err != nil {
return nil, err
}
val.Config.UpdateRPCAddress(rpcAddr)
log.Debug("Update validator rpc address", "chain", chainID, "val", valID, "rpcAddr", rpcAddr)
return val.Config, nil
}
//------------------
// Event metering


+ 18
- 0
handlers/log.go View File

@ -0,0 +1,18 @@
package handlers
import (
"github.com/tendermint/go-logger"
)
var log = logger.New("module", "handlers")
/*
func init() {
log.SetHandler(
logger.LvlFilterHandler(
logger.LvlDebug,
logger.BypassHandler(),
),
)
}
*/

+ 9
- 2
handlers/routes.go View File

@ -12,11 +12,12 @@ func Routes(network *TendermintNetwork) map[string]*rpc.RPCFunc {
// "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}),
"status": rpc.NewRPCFunc(StatusResult(network), ""),
"blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"),
"get_chain": rpc.NewRPCFunc(GetChainResult(network), "chainID"),
"register_chain": rpc.NewRPCFunc(RegisterChainResult(network), "chainConfig"),
"validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"),
"register_validator_set": rpc.NewRPCFunc(RegisterValidatorSetResult(network), "valSet"),
"register_validator_set": rpc.NewRPCFunc(RegisterValidatorSetResult(network), "valSetID"),
"validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"),
"update_validator": rpc.NewRPCFunc(UpdateValidatorResult(network), "chainID,valID,rpcAddr"),
"start_meter": rpc.NewRPCFunc(network.StartMeter, "chainID,valID,event"),
"stop_meter": rpc.NewRPCFunc(network.StopMeter, "chainID,valID,event"),
@ -60,6 +61,12 @@ func GetValidatorResult(network *TendermintNetwork) interface{} {
}
}
func UpdateValidatorResult(network *TendermintNetwork) interface{} {
return func(chainID, valID, rpcAddr string) (NetMonResult, error) {
return network.UpdateValidator(chainID, valID, rpcAddr)
}
}
func GetMeterResult(network *TendermintNetwork) interface{} {
return func(chainID, valID, eventID string) (NetMonResult, error) {
return network.GetMeter(chainID, valID, eventID)


+ 0
- 11
types/chain.go View File

@ -46,10 +46,6 @@ func (cs *ChainState) SetOnline(val *ValidatorState, isOnline bool) {
cs.Status.SetOnline(val, isOnline)
}
func (cs *ChainState) ReconnectValidator(val *ValidatorState) {
cs.Status.ReconnectValidator(val)
}
//------------------------------------------------
// Blockchain Config: id, validator config
@ -230,13 +226,6 @@ func (s *BlockchainStatus) SetOnline(val *ValidatorState, isOnline bool) {
}
}
// called in a go routine
func (s *BlockchainStatus) ReconnectValidator(val *ValidatorState) {
for {
}
}
func TwoThirdsMaj(count, total int) bool {
return float64(count) > (2.0/3.0)*float64(total)
}

+ 15
- 2
types/val.go View File

@ -63,12 +63,17 @@ type ValidatorState struct {
// Start a new event meter, including the websocket connection
// Also create the http rpc client for convenienve
func (vs *ValidatorState) Start() error {
em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", vs.Config.RPCAddr), UnmarshalEvent)
// we need the lock because RPCAddr can be updated concurrently
vs.Config.mtx.Lock()
rpcAddr := vs.Config.RPCAddr
vs.Config.mtx.Unlock()
em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", rpcAddr), UnmarshalEvent)
if err := em.Start(); err != nil {
return err
}
vs.em = em
vs.client = client.NewClientURI(fmt.Sprintf("http://%s", vs.Config.RPCAddr))
vs.client = client.NewClientURI(fmt.Sprintf("http://%s", rpcAddr))
return nil
}
@ -127,6 +132,14 @@ type ValidatorConfig struct {
Index int `json:"index,omitempty"`
}
// TODO: update p2p address
func (vc *ValidatorConfig) UpdateRPCAddress(rpcAddr string) {
vc.mtx.Lock()
defer vc.mtx.Unlock()
vc.RPCAddr = rpcAddr
}
type ValidatorStatus struct {
mtx sync.Mutex
Online bool `json:"online"`


+ 10
- 7
vendor/github.com/tendermint/go-event-meter/event_meter.go View File

@ -78,8 +78,6 @@ type DisconnectCallbackFunc func()
// Each node gets an event meter to track events for that node
type EventMeter struct {
QuitService // inherits from the wsc
wsc *client.WSClient
mtx sync.Mutex
@ -103,12 +101,11 @@ func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
receivedPong: true,
unmarshalEvent: unmarshalEvent,
}
em.QuitService = em.wsc.QuitService
return em
}
func (em *EventMeter) Start() error {
if err := em.wsc.OnStart(); err != nil {
if _, err := em.wsc.Start(); err != nil {
return err
}
@ -128,7 +125,7 @@ func (em *EventMeter) Start() error {
}
func (em *EventMeter) Stop() {
em.wsc.OnStop()
em.wsc.Stop()
if em.disconnectCallback != nil {
em.disconnectCallback()
}
@ -217,6 +214,11 @@ func (em *EventMeter) receiveRoutine() {
return
}
case r := <-em.wsc.ResultsCh:
if r == nil {
// we might receive the closed ResultsCh before the Quit
em.Stop() // call stop to trigger the disconnect callback
return
}
eventID, data, err := em.unmarshalEvent(r)
if err != nil {
log.Error(err.Error())
@ -225,8 +227,9 @@ func (em *EventMeter) receiveRoutine() {
if eventID != "" {
em.updateMetric(eventID, data)
}
case <-em.Quit:
break
case <-em.wsc.Quit:
em.Stop() // call stop to trigger the disconnect callback
return
}
}


Loading…
Cancel
Save