diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 797135a4e..c71d4b139 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -86,7 +86,7 @@ }, { "ImportPath": "github.com/tendermint/go-event-meter", - "Rev": "1d455b3fc1930f0d12609342b668aed385655ed8" + "Rev": "48308fb1a262b55db2fce92eb5cd55c5316217c1" }, { "ImportPath": "github.com/tendermint/go-events", diff --git a/handlers/callbacks.go b/handlers/callbacks.go index eda140738..0b1f94829 100644 --- a/handlers/callbacks.go +++ b/handlers/callbacks.go @@ -1,6 +1,8 @@ package handlers import ( + "time" + "github.com/tendermint/go-event-meter" "github.com/tendermint/go-events" @@ -12,9 +14,16 @@ import ( /* Each chain-validator gets an eventmeter which maintains the websocket Certain pre-defined events may update the netmon state: latency pongs, new blocks + All callbacks are called in a go-routine by the event-meter TODO: config changes for new validators and changing ip/port */ +func (tn *TendermintNetwork) registerCallbacks(chainState *types.ChainState, v *types.ValidatorState) error { + v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainState, v)) + v.EventMeter().RegisterDisconnectCallback(tn.disconnectCallback(chainState, v)) + return v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainState, v)) +} + // implements eventmeter.EventCallbackFunc // updates validator and possibly chain with new block func (tn *TendermintNetwork) newBlockCallback(chainState *types.ChainState, val *types.ValidatorState) eventmeter.EventCallbackFunc { @@ -47,7 +56,23 @@ func (tn *TendermintNetwork) disconnectCallback(chain *types.ChainState, val *ty // Validator is down! chain.SetOnline(val, false) - // Start reconnect routine - go chain.ReconnectValidator(val) + // reconnect + // TODO: stop trying eventually ... + for { + time.Sleep(time.Second) + + if err := val.Start(); err != nil { + log.Debug("Can't connect to validator", "valID", val.Config.Validator.ID) + } else { + // register callbacks for the validator + tn.registerCallbacks(chain, val) + + chain.SetOnline(val, true) + + // TODO: authenticate pubkey + + return + } + } } } diff --git a/handlers/handlers.go b/handlers/handlers.go index 7cef48e61..7e9284387 100644 --- a/handlers/handlers.go +++ b/handlers/handlers.go @@ -9,7 +9,6 @@ import ( "github.com/tendermint/go-wire" "github.com/tendermint/netmon/types" - tmtypes "github.com/tendermint/tendermint/types" ) type NetMonResult interface { @@ -22,12 +21,14 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&types.ChainState{}, 0x02}, wire.ConcreteType{&types.ValidatorSet{}, 0x10}, wire.ConcreteType{&types.Validator{}, 0x11}, + wire.ConcreteType{&types.ValidatorConfig{}, 0x12}, wire.ConcreteType{&eventmeter.EventMetric{}, 0x20}, ) //--------------------------------------------- // global state and backend functions +// TODO: relax the locking (use RWMutex, reduce scope) type TendermintNetwork struct { mtx sync.Mutex Chains map[string]*types.ChainState `json:"blockchains"` @@ -125,18 +126,14 @@ func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig) return nil, err } - v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainState, v)) - v.EventMeter().RegisterDisconnectCallback(tn.disconnectCallback(chainState, v)) - err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainState, v)) - if err != nil { - return nil, err - } + // register callbacks for the validator + tn.registerCallbacks(chainState, v) // the DisconnectCallback will set us offline and start a reconnect routine chainState.Status.SetOnline(v, true) // get/set the validator's pub key - // TODO: possibly remove? why should we depend on this here? + // TODO: make this authenticate... v.PubKey() } @@ -180,6 +177,20 @@ func (tn *TendermintNetwork) GetValidator(valSetID, valID string) (*types.Valida return val, nil } +// Update the validator's rpc address (for now its the only thing that can be updated!) +func (tn *TendermintNetwork) UpdateValidator(chainID, valID, rpcAddr string) (*types.ValidatorConfig, error) { + tn.mtx.Lock() + defer tn.mtx.Unlock() + val, err := tn.getChainVal(chainID, valID) + if err != nil { + return nil, err + } + + val.Config.UpdateRPCAddress(rpcAddr) + log.Debug("Update validator rpc address", "chain", chainID, "val", valID, "rpcAddr", rpcAddr) + return val.Config, nil +} + //------------------ // Event metering diff --git a/handlers/log.go b/handlers/log.go new file mode 100644 index 000000000..a2ee08dfd --- /dev/null +++ b/handlers/log.go @@ -0,0 +1,18 @@ +package handlers + +import ( + "github.com/tendermint/go-logger" +) + +var log = logger.New("module", "handlers") + +/* +func init() { + log.SetHandler( + logger.LvlFilterHandler( + logger.LvlDebug, + logger.BypassHandler(), + ), + ) +} +*/ diff --git a/handlers/routes.go b/handlers/routes.go index 7a81f1660..5de29a438 100644 --- a/handlers/routes.go +++ b/handlers/routes.go @@ -12,11 +12,12 @@ func Routes(network *TendermintNetwork) map[string]*rpc.RPCFunc { // "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}), "status": rpc.NewRPCFunc(StatusResult(network), ""), - "blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"), + "get_chain": rpc.NewRPCFunc(GetChainResult(network), "chainID"), "register_chain": rpc.NewRPCFunc(RegisterChainResult(network), "chainConfig"), "validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"), - "register_validator_set": rpc.NewRPCFunc(RegisterValidatorSetResult(network), "valSet"), + "register_validator_set": rpc.NewRPCFunc(RegisterValidatorSetResult(network), "valSetID"), "validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"), + "update_validator": rpc.NewRPCFunc(UpdateValidatorResult(network), "chainID,valID,rpcAddr"), "start_meter": rpc.NewRPCFunc(network.StartMeter, "chainID,valID,event"), "stop_meter": rpc.NewRPCFunc(network.StopMeter, "chainID,valID,event"), @@ -60,6 +61,12 @@ func GetValidatorResult(network *TendermintNetwork) interface{} { } } +func UpdateValidatorResult(network *TendermintNetwork) interface{} { + return func(chainID, valID, rpcAddr string) (NetMonResult, error) { + return network.UpdateValidator(chainID, valID, rpcAddr) + } +} + func GetMeterResult(network *TendermintNetwork) interface{} { return func(chainID, valID, eventID string) (NetMonResult, error) { return network.GetMeter(chainID, valID, eventID) diff --git a/types/chain.go b/types/chain.go index ba9357437..cfd1c1689 100644 --- a/types/chain.go +++ b/types/chain.go @@ -46,10 +46,6 @@ func (cs *ChainState) SetOnline(val *ValidatorState, isOnline bool) { cs.Status.SetOnline(val, isOnline) } -func (cs *ChainState) ReconnectValidator(val *ValidatorState) { - cs.Status.ReconnectValidator(val) -} - //------------------------------------------------ // Blockchain Config: id, validator config @@ -230,13 +226,6 @@ func (s *BlockchainStatus) SetOnline(val *ValidatorState, isOnline bool) { } } -// called in a go routine -func (s *BlockchainStatus) ReconnectValidator(val *ValidatorState) { - for { - - } -} - func TwoThirdsMaj(count, total int) bool { return float64(count) > (2.0/3.0)*float64(total) } diff --git a/types/val.go b/types/val.go index d37458d1e..6cc81e35b 100644 --- a/types/val.go +++ b/types/val.go @@ -63,12 +63,17 @@ type ValidatorState struct { // Start a new event meter, including the websocket connection // Also create the http rpc client for convenienve func (vs *ValidatorState) Start() error { - em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", vs.Config.RPCAddr), UnmarshalEvent) + // we need the lock because RPCAddr can be updated concurrently + vs.Config.mtx.Lock() + rpcAddr := vs.Config.RPCAddr + vs.Config.mtx.Unlock() + + em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", rpcAddr), UnmarshalEvent) if err := em.Start(); err != nil { return err } vs.em = em - vs.client = client.NewClientURI(fmt.Sprintf("http://%s", vs.Config.RPCAddr)) + vs.client = client.NewClientURI(fmt.Sprintf("http://%s", rpcAddr)) return nil } @@ -127,6 +132,14 @@ type ValidatorConfig struct { Index int `json:"index,omitempty"` } +// TODO: update p2p address + +func (vc *ValidatorConfig) UpdateRPCAddress(rpcAddr string) { + vc.mtx.Lock() + defer vc.mtx.Unlock() + vc.RPCAddr = rpcAddr +} + type ValidatorStatus struct { mtx sync.Mutex Online bool `json:"online"` diff --git a/vendor/github.com/tendermint/go-event-meter/event_meter.go b/vendor/github.com/tendermint/go-event-meter/event_meter.go index 19139427e..32fefc0cf 100644 --- a/vendor/github.com/tendermint/go-event-meter/event_meter.go +++ b/vendor/github.com/tendermint/go-event-meter/event_meter.go @@ -78,8 +78,6 @@ type DisconnectCallbackFunc func() // Each node gets an event meter to track events for that node type EventMeter struct { - QuitService // inherits from the wsc - wsc *client.WSClient mtx sync.Mutex @@ -103,12 +101,11 @@ func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { receivedPong: true, unmarshalEvent: unmarshalEvent, } - em.QuitService = em.wsc.QuitService return em } func (em *EventMeter) Start() error { - if err := em.wsc.OnStart(); err != nil { + if _, err := em.wsc.Start(); err != nil { return err } @@ -128,7 +125,7 @@ func (em *EventMeter) Start() error { } func (em *EventMeter) Stop() { - em.wsc.OnStop() + em.wsc.Stop() if em.disconnectCallback != nil { em.disconnectCallback() } @@ -217,6 +214,11 @@ func (em *EventMeter) receiveRoutine() { return } case r := <-em.wsc.ResultsCh: + if r == nil { + // we might receive the closed ResultsCh before the Quit + em.Stop() // call stop to trigger the disconnect callback + return + } eventID, data, err := em.unmarshalEvent(r) if err != nil { log.Error(err.Error()) @@ -225,8 +227,9 @@ func (em *EventMeter) receiveRoutine() { if eventID != "" { em.updateMetric(eventID, data) } - case <-em.Quit: - break + case <-em.wsc.Quit: + em.Stop() // call stop to trigger the disconnect callback + return } }