Browse Source

some refactoring; fixes; readme

pull/1943/head
Ethan Buchman 9 years ago
parent
commit
673e346ba4
8 changed files with 261 additions and 137 deletions
  1. +37
    -0
      README.md
  2. +18
    -5
      handlers/handlers.go
  3. +14
    -0
      local-chain.json
  4. +5
    -34
      main.go
  5. +89
    -0
      types/chain.go
  6. +7
    -0
      types/log.go
  7. +4
    -98
      types/types.go
  8. +87
    -0
      types/val.go

+ 37
- 0
README.md View File

@ -1,2 +1,39 @@
# netmon
blockchain network monitor
#Quick Start
To get started, [install golang](https://golang.org/doc/install) and [set your $GOPATH](https://github.com/tendermint/tendermint/wiki/Setting-GOPATH).
Install `tendermint`, `tmsp`, and the `netmon`:
```
go get github.com/tendermint/tendermint/cmd/tendermint
go get github.com/tendermint/tmsp/cmd/...
go get github.com/tendermint/netmon
```
Initialize and start a local tendermint node with
```
tendermint init
dummy &
tendermint node --fast_sync=false --log_level=debug
```
In another window, start the netmon with
```
netmon monitor $GOPATH/src/github.com/tendermint/netmon/local-chain.json
```
Then visit your browser at http://localhost:46670.
The chain's rpc can be found at http://localhost:46657.
# Notes
The netmon expects a config file with a list of chains/validators to get started. A default one for a local chain is provided as local-chain.json. `netmon config` can be used to create a config file for a chain deployed with `mintnet`.
The API is available as GET requests with URI encoded parameters, or as JSONRPC POST requests. The JSONRPC methods are also exposed over websocket.

+ 18
- 5
handlers/handlers.go View File

@ -18,7 +18,7 @@ type NetMonResult interface {
var _ = wire.RegisterInterface(
struct{ NetMonResult }{},
wire.ConcreteType{&types.ChainAndValidatorIDs{}, 0x01},
wire.ConcreteType{&types.ChainStatus{}, 0x02},
wire.ConcreteType{&types.ChainState{}, 0x02},
wire.ConcreteType{&types.Validator{}, 0x03},
wire.ConcreteType{&eventmeter.EventMetric{}, 0x04},
)
@ -28,14 +28,14 @@ var _ = wire.RegisterInterface(
type TendermintNetwork struct {
mtx sync.Mutex
Chains map[string]*types.ChainStatus `json:"blockchains"`
Chains map[string]*types.ChainState `json:"blockchains"`
ValSets map[string]*types.ValidatorSet `json:"validator_sets"`
}
// TODO: populate validator sets
func NewTendermintNetwork(chains ...*types.ChainStatus) *TendermintNetwork {
func NewTendermintNetwork(chains ...*types.ChainState) *TendermintNetwork {
network := &TendermintNetwork{
Chains: make(map[string]*types.ChainStatus),
Chains: make(map[string]*types.ChainState),
ValSets: make(map[string]*types.ValidatorSet),
}
for _, chain := range chains {
@ -44,6 +44,19 @@ func NewTendermintNetwork(chains ...*types.ChainStatus) *TendermintNetwork {
return network
}
//------------
// Public Methods
func (tn *TendermintNetwork) RegisterChain(chain *types.ChainState) {
tn.mtx.Lock()
defer tn.mtx.Unlock()
tn.Chains[chain.Config.ID] = chain
}
func (tn *TendermintNetwork) Stop() {
// TODO: for each chain, stop each validator
}
//------------
// RPC funcs
@ -71,7 +84,7 @@ func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorIDs, error) {
}
func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainStatus, error) {
func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainState, error) {
tn.mtx.Lock()
defer tn.mtx.Unlock()
chain, ok := tn.Chains[chainID]


+ 14
- 0
local-chain.json View File

@ -0,0 +1,14 @@
{
"id": "mychain",
"val_set_id": "myvalset",
"validators": [
{
"validator": {
"id": "local",
"chains": ["mychain"]
},
"addr": "localhost:46657",
"index": 0
}
]
}

+ 5
- 34
main.go View File

@ -3,25 +3,20 @@ package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"github.com/tendermint/go-event-meter"
"github.com/tendermint/netmon/handlers"
"github.com/tendermint/netmon/types"
"github.com/codegangsta/cli"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-events"
pcm "github.com/tendermint/go-process"
"github.com/tendermint/go-rpc/server"
tmcfg "github.com/tendermint/tendermint/config/tendermint"
tmtypes "github.com/tendermint/tendermint/types"
)
func init() {
@ -61,38 +56,16 @@ func cmdMonitor(c *cli.Context) {
if len(args) != 1 {
Exit("monitor expectes 1 arg")
}
configFile := args[0]
chainConfigFile := args[0]
b, err := ioutil.ReadFile(configFile)
chainState, err := types.LoadChainFromFile(chainConfigFile)
if err != nil {
Exit(err.Error())
}
// for now we start with one blockchain;
// eventually more can be uploaded or created through endpoints
chainConfig := new(types.BlockchainConfig)
if err := json.Unmarshal(b, chainConfig); err != nil {
Exit(err.Error())
}
chainStatus := &types.ChainStatus{Config: chainConfig}
// start the event meter and listen for new blocks on each validator
for _, v := range chainConfig.Validators {
if err := v.NewEventMeter(); err != nil {
Exit(err.Error())
}
if err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), func(metric *eventmeter.EventMetric, data events.EventData) {
// TODO: update chain status with block and metric
// chainStatus.NewBlock(data.(tmtypes.EventDataNewBlock).Block)
}); err != nil {
Exit(err.Error())
}
}
// the main object that watches for changes and serves the rpc requests
network := handlers.NewTendermintNetwork(chainStatus)
network := handlers.NewTendermintNetwork()
network.RegisterChain(chainState)
// the routes are functions on the network object
routes := handlers.Routes(network)
@ -108,9 +81,7 @@ func cmdMonitor(c *cli.Context) {
TrapSignal(func() {
// TODO: clean shutdown server, maybe persist last state
for _, v := range chainConfig.Validators {
v.EventMeter().Stop()
}
network.Stop()
})
}


+ 89
- 0
types/chain.go View File

@ -0,0 +1,89 @@
package types
import (
"encoding/json"
"fmt"
"io/ioutil"
"sync"
"github.com/tendermint/go-event-meter"
"github.com/tendermint/go-events"
tmtypes "github.com/tendermint/tendermint/types"
)
//------------------------------------------------
// blockchain types
// State of a chain
// Returned over RPC but also used to manage state
type ChainState struct {
Config *BlockchainConfig `json:"config"`
Status *BlockchainStatus `json:"status"`
}
// basic chain config
// threadsafe
type BlockchainConfig struct {
mtx sync.Mutex
ID string `json:"id"`
ValSetID string `json:"val_set_id"`
Validators []*ChainValidator `json:"validators"`
valIDMap map[string]int // map IDs to indices
}
// So we can fetch validator by id
func (bc *BlockchainConfig) PopulateValIDMap() {
bc.mtx.Lock()
defer bc.mtx.Unlock()
bc.valIDMap = make(map[string]int)
for i, v := range bc.Validators {
bc.valIDMap[v.Validator.ID] = i
}
}
func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ChainValidator, error) {
bc.mtx.Lock()
defer bc.mtx.Unlock()
valIndex, ok := bc.valIDMap[valID]
if !ok {
return nil, fmt.Errorf("Unknown validator %s", valID)
}
return bc.Validators[valIndex], nil
}
func LoadChainFromFile(configFile string) (*ChainState, error) {
b, err := ioutil.ReadFile(configFile)
if err != nil {
return nil, err
}
// for now we start with one blockchain loaded from file;
// eventually more can be uploaded or created through endpoints
chainConfig := new(BlockchainConfig)
if err := json.Unmarshal(b, chainConfig); err != nil {
return nil, err
}
chainState := &ChainState{Config: chainConfig}
// start the event meter and listen for new blocks on each validator
for _, v := range chainConfig.Validators {
if err := v.Start(); err != nil {
return nil, err
}
err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), func(metric *eventmeter.EventMetric, data events.EventData) {
// TODO: update chain status with block and metric
// chainState.NewBlock(data.(tmtypes.EventDataNewBlock).Block)
})
if err != nil {
return nil, err
}
// get/set the validator's pub key
v.PubKey()
}
return chainState, nil
}

+ 7
- 0
types/log.go View File

@ -0,0 +1,7 @@
package types
import (
"github.com/tendermint/go-logger"
)
var log = logger.New("module", "types")

+ 4
- 98
types/types.go View File

@ -1,20 +1,12 @@
package types
import (
"encoding/json"
"fmt"
"reflect"
"sync"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-event-meter"
"github.com/tendermint/go-events"
"github.com/tendermint/go-wire"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
//---------------------------------------------
// core types
// simple types
// Known chain and validator set IDs (from which anything else can be found)
type ChainAndValidatorIDs struct {
@ -22,47 +14,8 @@ type ChainAndValidatorIDs struct {
ValidatorSetIDs []string `json:"validator_set_ids"`
}
// state of a chain
type ChainStatus struct {
Config *BlockchainConfig `json:"config"`
Status *BlockchainStatus `json:"status"`
}
// basic chain config
// threadsafe
type BlockchainConfig struct {
mtx sync.Mutex
ID string `json:"id"`
ValSetID string `json:"val_set_id"`
Validators []*ChainValidator `json:"validators"`
valIDMap map[string]int // map IDs to indices
}
func (bc *BlockchainConfig) PopulateValIDMap() {
bc.mtx.Lock()
defer bc.mtx.Unlock()
bc.valIDMap = make(map[string]int)
for i, v := range bc.Validators {
bc.valIDMap[v.ID] = i
}
}
func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ChainValidator, error) {
bc.mtx.Lock()
defer bc.mtx.Unlock()
valIndex, ok := bc.valIDMap[valID]
if !ok {
return nil, fmt.Errorf("Unknown validator %s", valID)
}
return bc.Validators[valIndex], nil
}
// basic chain status/metrics
// threadsafe
type BlockchainStatus struct {
mtx sync.Mutex
Height int `json:"height"`
MeanBlockTime float64 `json:"mean_block_time" wire:"unsafe"`
TxThroughput float64 `json:"tx_throughput" wire:"unsafe"`
@ -70,44 +23,6 @@ type BlockchainStatus struct {
BlockchainSize int64 `json:"blockchain_size"` // how might we get StateSize ?
}
// validator on a chain
type ChainValidator struct {
*Validator `json:"validator"`
Addr string `json:"addr"` // do we want multiple addrs?
Index int `json:"index"`
em *eventmeter.EventMeter // holds a ws connection to the val
Latency float64 `json:"latency,omitempty" wire:"unsafe"`
}
func unmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
var err error
result := new(ctypes.TMResult)
wire.ReadJSONPtr(result, b, &err)
if err != nil {
return "", nil, err
}
event, ok := (*result).(*ctypes.ResultEvent)
if !ok {
return "", nil, fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result))
}
return event.Name, event.Data, nil
}
func (cv *ChainValidator) NewEventMeter() error {
em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", cv.Addr), unmarshalEvent)
if err := em.Start(); err != nil {
return err
}
cv.em = em
return nil
}
func (cv *ChainValidator) EventMeter() *eventmeter.EventMeter {
return cv.em
}
// validator set (independent of chains)
type ValidatorSet struct {
Validators []*Validator `json:"validators"`
@ -124,16 +39,7 @@ func (vs *ValidatorSet) Validator(valID string) (*Validator, error) {
// validator (independent of chain)
type Validator struct {
ID string `json:"id"`
PubKey crypto.PubKey `json:"pub_key,omitempty"`
Chains []*ChainStatus `json:"chains,omitempty"`
}
func (v *Validator) Chain(chainID string) (*ChainStatus, error) {
for _, c := range v.Chains {
if c.Config.ID == chainID {
return c, nil
}
}
return nil, fmt.Errorf("Unknwon chain %s", chainID)
ID string `json:"id"`
PubKey crypto.PubKey `json:"pub_key"`
Chains []string `json:"chains"`
}

+ 87
- 0
types/val.go View File

@ -0,0 +1,87 @@
package types
import (
"encoding/json"
"fmt"
"reflect"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-event-meter"
"github.com/tendermint/go-events"
client "github.com/tendermint/go-rpc/client"
"github.com/tendermint/go-wire"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
//------------------------------------------------
// validator types
// Validator on a chain
// Responsible for communication with the validator
// Returned over RPC but also used to manage state
type ChainValidator struct {
Validator *Validator `json:"validator"`
Addr string `json:"addr"` // do we want multiple addrs?
Index int `json:"index"`
em *eventmeter.EventMeter // holds a ws connection to the val
client *client.ClientURI // rpc client
Latency float64 `json:"latency" wire:"unsafe"`
}
// Start a new event meter, including the websocket connection
// Also create the http rpc client for convenienve
func (cv *ChainValidator) Start() error {
em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", cv.Addr), UnmarshalEvent)
if err := em.Start(); err != nil {
return err
}
cv.em = em
cv.client = client.NewClientURI(fmt.Sprintf("http://%s", cv.Addr))
return nil
}
func (cv *ChainValidator) Stop() {
cv.em.Stop()
}
func (cv *ChainValidator) EventMeter() *eventmeter.EventMeter {
return cv.em
}
// Return the validators pubkey. If it's not yet set, get it from the node
// TODO: proof that it's the node's key
func (cv *ChainValidator) PubKey() crypto.PubKey {
if cv.Validator.PubKey != nil {
return cv.Validator.PubKey
}
var result ctypes.TMResult
_, err := cv.client.Call("status", nil, &result)
if err != nil {
log.Error("Error getting validator pubkey", "addr", cv.Addr, "val", cv.Validator.ID, "error", err)
return nil
}
status := result.(*ctypes.ResultStatus)
cv.Validator.PubKey = status.PubKey
return cv.Validator.PubKey
}
//---------------------------------------------------
// utilities
// Unmarshal a json event
func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
var err error
result := new(ctypes.TMResult)
wire.ReadJSONPtr(result, b, &err)
if err != nil {
return "", nil, err
}
event, ok := (*result).(*ctypes.ResultEvent)
if !ok {
return "", nil, fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result))
}
return event.Name, event.Data, nil
}

Loading…
Cancel
Save