From de1d5f635307280921e6e02a7145953e70c3c5ac Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 21 Jan 2016 22:57:24 -0500 Subject: [PATCH] fixes; load from file with valsets and chains --- handlers/callbacks.go | 6 +- handlers/handlers.go | 14 +++- handlers/routes.go | 17 +++-- main.go | 163 +++++++++++++++++++++++++++++++++++++++--- types/chain.go | 13 +++- types/val.go | 54 ++++++++------ 6 files changed, 221 insertions(+), 46 deletions(-) diff --git a/handlers/callbacks.go b/handlers/callbacks.go index 3cba0f2c6..f75ca411b 100644 --- a/handlers/callbacks.go +++ b/handlers/callbacks.go @@ -20,7 +20,7 @@ func (tn *TendermintNetwork) newBlockCallback(chainID, valID string) eventmeter. val, _ := chain.Config.GetValidatorByID(valID) // update height for validator - val.BlockHeight = block.Header.Height + val.Status.BlockHeight = block.Header.Height // possibly update height and mean block time for chain if block.Header.Height > chain.Status.Height { @@ -42,8 +42,8 @@ func (tn *TendermintNetwork) latencyCallback(chainID, valID string) eventmeter.L // update latency for this validator and avg latency for chain mean := chain.Status.MeanLatency * float64(chain.Status.NumValidators) - mean = (mean - val.Latency + latency) / float64(chain.Status.NumValidators) - val.Latency = latency + mean = (mean - val.Status.Latency + latency) / float64(chain.Status.NumValidators) + val.Status.Latency = latency chain.Status.MeanLatency = mean // TODO: possibly update active nodes and uptime for chain diff --git a/handlers/handlers.go b/handlers/handlers.go index 6955b06a0..d23c53827 100644 --- a/handlers/handlers.go +++ b/handlers/handlers.go @@ -110,12 +110,13 @@ func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig) // start the event meter and listen for new blocks on each validator for _, v := range chainConfig.Validators { + v.Status = &types.ValidatorStatus{} if err := v.Start(); err != nil { return nil, err } - v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainConfig.ID, v.Validator.ID)) - err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainConfig.ID, v.Validator.ID)) + v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainConfig.ID, v.Config.Validator.ID)) + err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainConfig.ID, v.Config.Validator.ID)) if err != nil { return nil, err } @@ -137,6 +138,13 @@ func (tn *TendermintNetwork) GetValidatorSet(valSetID string) (*types.ValidatorS return valSet, nil } +func (tn *TendermintNetwork) RegisterValidatorSet(valSet *types.ValidatorSet) (*types.ValidatorSet, error) { + tn.mtx.Lock() + defer tn.mtx.Unlock() + tn.ValSets[valSet.ID] = valSet + return valSet, nil +} + func (tn *TendermintNetwork) GetValidator(valSetID, valID string) (*types.Validator, error) { tn.mtx.Lock() defer tn.mtx.Unlock() @@ -182,7 +190,7 @@ func (tn *TendermintNetwork) GetMeter(chainID, valID, eventID string) (*eventmet return val.EventMeter().GetMetric(eventID) } -func (tn *TendermintNetwork) getChainVal(chainID, valID string) (*types.ChainValidator, error) { +func (tn *TendermintNetwork) getChainVal(chainID, valID string) (*types.ValidatorState, error) { chain, ok := tn.Chains[chainID] if !ok { return nil, fmt.Errorf("Unknown chain %s", chainID) diff --git a/handlers/routes.go b/handlers/routes.go index 67f879036..7a81f1660 100644 --- a/handlers/routes.go +++ b/handlers/routes.go @@ -11,11 +11,12 @@ func Routes(network *TendermintNetwork) map[string]*rpc.RPCFunc { // "subscribe": rpc.NewWSRPCFunc(Subscribe, []string{"event"}), // "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}), - "status": rpc.NewRPCFunc(StatusResult(network), ""), - "blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"), - "register_chain": rpc.NewRPCFunc(RegisterChainResult(network), "chainConfig"), - "validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"), - "validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"), + "status": rpc.NewRPCFunc(StatusResult(network), ""), + "blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"), + "register_chain": rpc.NewRPCFunc(RegisterChainResult(network), "chainConfig"), + "validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"), + "register_validator_set": rpc.NewRPCFunc(RegisterValidatorSetResult(network), "valSet"), + "validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"), "start_meter": rpc.NewRPCFunc(network.StartMeter, "chainID,valID,event"), "stop_meter": rpc.NewRPCFunc(network.StopMeter, "chainID,valID,event"), @@ -47,6 +48,12 @@ func GetValidatorSetResult(network *TendermintNetwork) interface{} { } } +func RegisterValidatorSetResult(network *TendermintNetwork) interface{} { + return func(valSet *types.ValidatorSet) (NetMonResult, error) { + return network.RegisterValidatorSet(valSet) + } +} + func GetValidatorResult(network *TendermintNetwork) interface{} { return func(valSetID, valID string) (NetMonResult, error) { return network.GetValidator(valSetID, valID) diff --git a/main.go b/main.go index cd7707e6b..fae7f3a23 100644 --- a/main.go +++ b/main.go @@ -3,8 +3,10 @@ package main import ( "encoding/json" "fmt" + "io/ioutil" "net/http" "os" + "path" "strconv" "strings" @@ -16,6 +18,7 @@ import ( cfg "github.com/tendermint/go-config" pcm "github.com/tendermint/go-process" "github.com/tendermint/go-rpc/server" + "github.com/tendermint/go-wire" tmcfg "github.com/tendermint/tendermint/config/tendermint" ) @@ -39,6 +42,32 @@ func main() { cmdConfig(c) }, }, + { + Name: "chains-and-vals", + Usage: "Add a chain or validator set to the main config file", + ArgsUsage: "", + Action: func(c *cli.Context) { + cmdChainsAndVals(c) + }, + Subcommands: []cli.Command{ + { + Name: "chain", + Usage: "Add a chain to the main config file", + ArgsUsage: "[configFile] [chainBaseDir]", + Action: func(c *cli.Context) { + cmdAddChain(c) + }, + }, + { + Name: "val", + Usage: "Add a validator set to the main config file", + ArgsUsage: "[configFile] [valsetBaseDir]", + Action: func(c *cli.Context) { + cmdAddValSet(c) + }, + }, + }, + }, { Name: "monitor", Usage: "Monitor a chain", @@ -51,21 +80,110 @@ func main() { app.Run(os.Args) } +func cmdChainsAndVals(c *cli.Context) { + cli.ShowAppHelp(c) +} + +func cmdAddChain(c *cli.Context) { + args := c.Args() + if len(args) != 2 { + Exit("add chain expectes 2 arg") + } + cfgFile, chainDir := args[0], args[1] + + // load major config + chainsAndVals := new(ChainsAndValidators) + if err := ReadJSONFile(chainsAndVals, cfgFile); err != nil { + Exit(err.Error()) + } + + // load new chain + chainCfg := new(types.BlockchainConfig) + if err := ReadJSONFile(chainCfg, path.Join(chainDir, "chain_config.json")); err != nil { + Exit(err.Error()) + } + + // append new chain + chainsAndVals.Blockchains = append(chainsAndVals.Blockchains, chainCfg) + + // write major config + b := wire.JSONBytes(chainsAndVals) + if err := ioutil.WriteFile(cfgFile, b, 0600); err != nil { + Exit(err.Error()) + } +} + +func ReadJSONFile(o interface{}, filename string) error { + b, err := ioutil.ReadFile(filename) + if err != nil { + return err + } + wire.ReadJSON(o, b, &err) + if err != nil { + return err + } + return nil +} + +func cmdAddValSet(c *cli.Context) { + args := c.Args() + if len(args) != 2 { + Exit("add chain expectes 2 arg") + } + cfgFile, valSetDir := args[0], args[1] + + // load major config + chainsAndVals := new(ChainsAndValidators) + if err := ReadJSONFile(chainsAndVals, cfgFile); err != nil { + Exit(err.Error()) + } + + // load new validator set + valSet := new(types.ValidatorSet) + if err := ReadJSONFile(valSet, path.Join(valSetDir, "validator_set.json")); err != nil { + Exit(err.Error()) + } + + // append new validator set + chainsAndVals.ValidatorSets = append(chainsAndVals.ValidatorSets, valSet) + + // write major config to file + b := wire.JSONBytes(chainsAndVals) + if err := ioutil.WriteFile(cfgFile, b, 0600); err != nil { + Exit(err.Error()) + } + +} + func cmdMonitor(c *cli.Context) { args := c.Args() if len(args) != 1 { Exit("monitor expectes 1 arg") } - chainConfigFile := args[0] - - chainConfig, err := types.LoadChainFromFile(chainConfigFile) + chainsAndValsFile := args[0] + chainsAndVals, err := LoadChainsAndValsFromFile(chainsAndValsFile) if err != nil { Exit(err.Error()) } // the main object that watches for changes and serves the rpc requests network := handlers.NewTendermintNetwork() - network.RegisterChain(chainConfig) + + for _, valSetCfg := range chainsAndVals.ValidatorSets { + // Register validator set + _, err := network.RegisterValidatorSet(valSetCfg) + if err != nil { + Exit(err.Error()) + } + } + + for _, chainCfg := range chainsAndVals.Blockchains { + // Register blockchain + _, err := network.RegisterChain(chainCfg) + if err != nil { + Exit(err.Error()) + } + } // the routes are functions on the network object routes := handlers.Routes(network) @@ -111,7 +229,7 @@ func ConfigFromMachines(chainID, prefix string, N int) (*types.BlockchainConfig, chain := &types.BlockchainConfig{ ID: chainID, - Validators: make([]*types.ChainValidator, N), + Validators: make([]*types.ValidatorState, N), } for i := 0; i < N; i++ { id := fmt.Sprintf("%s%d", prefix, i+1) @@ -124,10 +242,12 @@ func ConfigFromMachines(chainID, prefix string, N int) (*types.BlockchainConfig, ID: id, // TODO: pubkey } - chainVal := &types.ChainValidator{ - Validator: val, - Addr: fmt.Sprintf("%s:%d", strings.Trim(ip, "\n"), 46657), - Index: i, + chainVal := &types.ValidatorState{ + Config: &types.ValidatorConfig{ + Validator: val, + RPCAddr: fmt.Sprintf("%s:%d", strings.Trim(ip, "\n"), 46657), + Index: i, + }, } chain.Validators[i] = chainVal } @@ -152,3 +272,28 @@ func runProcessGetResult(label string, command string, args []string) (string, b return string(outFile.Bytes()), false } } + +//---------------------------------------------------------------------- + +type ChainsAndValidators struct { + ValidatorSets []*types.ValidatorSet `json:"validator_sets"` + Blockchains []*types.BlockchainConfig `json:"blockchains"` +} + +func LoadChainsAndValsFromFile(configFile string) (*ChainsAndValidators, error) { + + b, err := ioutil.ReadFile(configFile) + if err != nil { + return nil, err + } + + // for now we start with one blockchain loaded from file; + // eventually more can be uploaded or created through endpoints + chainsAndVals := new(ChainsAndValidators) + wire.ReadJSON(chainsAndVals, b, &err) + if err != nil { + return nil, err + } + + return chainsAndVals, nil +} diff --git a/types/chain.go b/types/chain.go index 2784b8de6..e71a9bde1 100644 --- a/types/chain.go +++ b/types/chain.go @@ -62,6 +62,13 @@ type ChainState struct { Status *BlockchainStatus `json:"status"` } +// chain config without ValidatorState +type BlockchainBaseConfig struct { + ID string `json:"id"` + ValSetID string `json:"val_set_id"` + Validators []*ValidatorConfig `json:"validators"` +} + // basic chain config // threadsafe type BlockchainConfig struct { @@ -69,7 +76,7 @@ type BlockchainConfig struct { ValSetID string `json:"val_set_id"` mtx sync.Mutex - Validators []*ChainValidator `json:"validators"` + Validators []*ValidatorState `json:"validators"` // TODO: this should be ValidatorConfig and the state in BlockchainStatus valIDMap map[string]int // map IDs to indices } @@ -79,11 +86,11 @@ func (bc *BlockchainConfig) PopulateValIDMap() { defer bc.mtx.Unlock() bc.valIDMap = make(map[string]int) for i, v := range bc.Validators { - bc.valIDMap[v.Validator.ID] = i + bc.valIDMap[v.Config.Validator.ID] = i } } -func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ChainValidator, error) { +func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ValidatorState, error) { bc.mtx.Lock() defer bc.mtx.Unlock() valIndex, ok := bc.valIDMap[valID] diff --git a/types/val.go b/types/val.go index b6972ad63..98a76a580 100644 --- a/types/val.go +++ b/types/val.go @@ -18,6 +18,7 @@ import ( // validator set (independent of chains) type ValidatorSet struct { + ID string `json:"id"` Validators []*Validator `json:"validators"` } @@ -34,61 +35,68 @@ func (vs *ValidatorSet) Validator(valID string) (*Validator, error) { type Validator struct { ID string `json:"id"` PubKey crypto.PubKey `json:"pub_key"` - Chains []string `json:"chains"` + Chains []string `json:"chains,omitempty"` // TODO: put this elsewhere (?) } -// Validator on a chain -// Responsible for communication with the validator -// Returned over RPC but also used to manage state -type ChainValidator struct { +type ValidatorConfig struct { Validator *Validator `json:"validator"` - Addr string `json:"addr"` // do we want multiple addrs? - Index int `json:"index"` + P2PAddr string `json:"p2p_addr"` + RPCAddr string `json:"rpc_addr"` + Index int `json:"index,omitempty"` +} +type ValidatorStatus struct { Latency float64 `json:"latency" wire:"unsafe"` BlockHeight int `json:"block_height"` +} + +// Validator on a chain +// Responsible for communication with the validator +// Returned over RPC but also used to manage state +type ValidatorState struct { + Config *ValidatorConfig `json:"config"` + Status *ValidatorStatus `json:"status"` em *eventmeter.EventMeter // holds a ws connection to the val client *client.ClientURI // rpc client - } // Start a new event meter, including the websocket connection // Also create the http rpc client for convenienve -func (cv *ChainValidator) Start() error { - em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", cv.Addr), UnmarshalEvent) +func (vs *ValidatorState) Start() error { + em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", vs.Config.RPCAddr), UnmarshalEvent) if err := em.Start(); err != nil { return err } - cv.em = em - cv.client = client.NewClientURI(fmt.Sprintf("http://%s", cv.Addr)) + vs.em = em + vs.client = client.NewClientURI(fmt.Sprintf("http://%s", vs.Config.RPCAddr)) return nil } -func (cv *ChainValidator) Stop() { - cv.em.Stop() +func (vs *ValidatorState) Stop() { + vs.em.Stop() } -func (cv *ChainValidator) EventMeter() *eventmeter.EventMeter { - return cv.em +func (vs *ValidatorState) EventMeter() *eventmeter.EventMeter { + return vs.em } // Return the validators pubkey. If it's not yet set, get it from the node // TODO: proof that it's the node's key -func (cv *ChainValidator) PubKey() crypto.PubKey { - if cv.Validator.PubKey != nil { - return cv.Validator.PubKey +func (vs *ValidatorState) PubKey() crypto.PubKey { + if vs.Config.Validator.PubKey != nil { + return vs.Config.Validator.PubKey } var result ctypes.TMResult - _, err := cv.client.Call("status", nil, &result) + _, err := vs.client.Call("status", nil, &result) if err != nil { - log.Error("Error getting validator pubkey", "addr", cv.Addr, "val", cv.Validator.ID, "error", err) + log.Error("Error getting validator pubkey", "addr", vs.Config.RPCAddr, "val", vs.Config.Validator.ID, "error", err) return nil } status := result.(*ctypes.ResultStatus) - cv.Validator.PubKey = status.PubKey - return cv.Validator.PubKey + vs.Config.Validator.PubKey = status.PubKey + return vs.Config.Validator.PubKey } //---------------------------------------------------