Browse Source

send ValidatorSetUpdates event when validator set changes (#2161)

Refs #1916
pull/2225/head
Anton Kaliaev 6 years ago
committed by GitHub
parent
commit
80e49abada
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 200 additions and 107 deletions
  1. +2
    -11
      Gopkg.lock
  2. +36
    -0
      docs/app-dev/subscribing-to-events-via-websocket.md
  3. +7
    -0
      state/execution.go
  4. +63
    -5
      state/execution_test.go
  5. +35
    -35
      types/event_bus.go
  6. +3
    -1
      types/event_bus_test.go
  7. +38
    -39
      types/events.go
  8. +16
    -16
      types/nop_event_bus.go

+ 2
- 11
Gopkg.lock View File

@ -244,7 +244,7 @@
[[projects]]
branch = "master"
digest = "1:dad2e5a2153ee7a6c9ab8fc13673a16ee4fb64434a7da980965a3741b0c981a3"
digest = "1:63b68062b8968092eb86bedc4e68894bd096ea6b24920faca8b9dcf451f54bb5"
name = "github.com/prometheus/common"
packages = [
"expfmt",
@ -377,14 +377,6 @@
revision = "a8328986c1608950fa5d3d1c0472cccc4f8fc02c"
version = "v0.12.0-rc0"
[[projects]]
digest = "1:cefd237469d443fe56cbeadf68a1baf46cef293f071dc0e317f8b8062c3ffe72"
name = "github.com/tendermint/go-crypto"
packages = ["tmhash"]
pruneopts = "UT"
revision = "6a6b591a3d7592a04b46af95451cb5be3b114f76"
version = "v0.9.0"
[[projects]]
branch = "master"
digest = "1:c31a37cafc12315b8bd745c8ad6a006ac25350472488162a821e557b3e739d67"
@ -426,7 +418,7 @@
[[projects]]
branch = "master"
digest = "1:70656e26ab4a96e683a21d677630edb5239a3d60b2d54bdc861c808ab5aa42c7"
digest = "1:bb0fe59917bdd5b89f49b9a8b26e5f465e325d9223b3a8e32254314bdf51e0f1"
name = "golang.org/x/sys"
packages = [
"cpu",
@ -547,7 +539,6 @@
"github.com/tendermint/ed25519",
"github.com/tendermint/ed25519/extra25519",
"github.com/tendermint/go-amino",
"github.com/tendermint/go-crypto/tmhash",
"golang.org/x/crypto/bcrypt",
"golang.org/x/crypto/chacha20poly1305",
"golang.org/x/crypto/curve25519",


+ 36
- 0
docs/app-dev/subscribing-to-events-via-websocket.md View File

@ -26,3 +26,39 @@ more information on query syntax and other options.
You can also use tags, given you had included them into DeliverTx
response, to query transaction results. See [Indexing
transactions](./indexing-transactions.md) for details.
### ValidatorSetUpdates
When validator set changes, ValidatorSetUpdates event is published. The
event carries a list of pubkey/power pairs. The list is the same
Tendermint receives from ABCI application (see [EndBlock
section](https://tendermint.com/docs/app-dev/abci-spec.html#endblock) in
the ABCI spec).
Response:
```
{
"jsonrpc": "2.0",
"id": "0#event",
"result": {
"query": "tm.event='ValidatorSetUpdates'",
"data": {
"type": "tendermint/event/ValidatorSetUpdates",
"value": {
"validator_updates": [
{
"address": "09EAD022FD25DE3A02E64B0FE9610B1417183EE4",
"pub_key": {
"type": "tendermint/PubKeyEd25519",
"value": "ww0z4WaZ0Xg+YI10w43wTWbBmM3dpVza4mmSQYsd0ck="
},
"voting_power": "10",
"accum": "0"
}
]
}
}
}
}
```

+ 7
- 0
state/execution.go View File

@ -380,6 +380,13 @@ func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *ty
Result: *(abciResponses.DeliverTx[i]),
}})
}
if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
// if there were an error, we would've stopped in updateValidators
updates, _ := types.PB2TM.Validators(abciResponses.EndBlock.ValidatorUpdates)
eventBus.PublishEventValidatorSetUpdates(
types.EventDataValidatorSetUpdates{ValidatorUpdates: updates})
}
}
//----------------------------------------------------------------------------------------------------


+ 63
- 5
state/execution_test.go View File

@ -1,6 +1,7 @@
package state
import (
"context"
"fmt"
"testing"
"time"
@ -232,6 +233,62 @@ func TestUpdateValidators(t *testing.T) {
}
}
// TestEndBlockValidatorUpdates ensures we update validator set and send an event.
func TestEndBlockValidatorUpdates(t *testing.T) {
app := &testApp{}
cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc, nil)
err := proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop()
state, stateDB := state(1, 1)
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(),
MockMempool{}, MockEvidencePool{})
eventBus := types.NewEventBus()
err = eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()
blockExec.SetEventBus(eventBus)
updatesCh := make(chan interface{}, 1)
err = eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates, updatesCh)
require.NoError(t, err)
block := makeBlock(state, 1)
blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()}
pubkey := ed25519.GenPrivKey().PubKey()
app.ValidatorUpdates = []abci.Validator{
{PubKey: types.TM2PB.PubKey(pubkey), Power: 10},
}
state, err = blockExec.ApplyBlock(state, blockID, block)
require.Nil(t, err)
// test new validator was added to NextValidators
if assert.Equal(t, state.Validators.Size()+1, state.NextValidators.Size()) {
idx, _ := state.NextValidators.GetByAddress(pubkey.Address())
if idx < 0 {
t.Fatalf("can't find address %v in the set %v", pubkey.Address(), state.NextValidators)
}
}
// test we threw an event
select {
case e := <-updatesCh:
event, ok := e.(types.EventDataValidatorSetUpdates)
require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", e)
if assert.NotEmpty(t, event.ValidatorUpdates) {
assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey)
assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower)
}
case <-time.After(1 * time.Second):
t.Fatal("Did not receive EventValidatorSetUpdates within 1 sec.")
}
}
//----------------------------------------------------------------------------
// make some bogus txs
@ -275,18 +332,15 @@ func makeBlock(state State, height int64) *types.Block {
//----------------------------------------------------------------------------
var _ abci.Application = (*testApp)(nil)
type testApp struct {
abci.BaseApplication
Validators []abci.SigningValidator
ByzantineValidators []abci.Evidence
ValidatorUpdates []abci.Validator
}
func NewKVStoreApplication() *testApp {
return &testApp{}
}
var _ abci.Application = (*testApp)(nil)
func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
return abci.ResponseInfo{}
@ -298,6 +352,10 @@ func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlo
return abci.ResponseBeginBlock{}
}
func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
return abci.ResponseEndBlock{ValidatorUpdates: app.ValidatorUpdates}
}
func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}}
}


+ 35
- 35
types/event_bus.go View File

@ -71,34 +71,32 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
return nil
}
//--- block, tx, and vote events
func (b *EventBus) PublishEventNewBlock(event EventDataNewBlock) error {
return b.Publish(EventNewBlock, event)
func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
return b.Publish(EventNewBlock, data)
}
func (b *EventBus) PublishEventNewBlockHeader(event EventDataNewBlockHeader) error {
return b.Publish(EventNewBlockHeader, event)
func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
return b.Publish(EventNewBlockHeader, data)
}
func (b *EventBus) PublishEventVote(event EventDataVote) error {
return b.Publish(EventVote, event)
func (b *EventBus) PublishEventVote(data EventDataVote) error {
return b.Publish(EventVote, data)
}
// PublishEventTx publishes tx event with tags from Result. Note it will add
// predefined tags (EventTypeKey, TxHashKey). Existing tags with the same names
// will be overwritten.
func (b *EventBus) PublishEventTx(event EventDataTx) error {
func (b *EventBus) PublishEventTx(data EventDataTx) error {
// no explicit deadline for publishing events
ctx := context.Background()
tags := make(map[string]string)
// validate and fill tags from tx result
for _, tag := range event.Result.Tags {
for _, tag := range data.Result.Tags {
// basic validation
if len(tag.Key) == 0 {
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", event.Tx)
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", data.Tx)
continue
}
tags[string(tag.Key)] = string(tag.Value)
@ -109,55 +107,57 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error {
tags[EventTypeKey] = EventTx
logIfTagExists(TxHashKey, tags, b.Logger)
tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash())
tags[TxHashKey] = fmt.Sprintf("%X", data.Tx.Hash())
logIfTagExists(TxHeightKey, tags, b.Logger)
tags[TxHeightKey] = fmt.Sprintf("%d", event.Height)
tags[TxHeightKey] = fmt.Sprintf("%d", data.Height)
b.pubsub.PublishWithTags(ctx, event, tmpubsub.NewTagMap(tags))
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
return nil
}
func (b *EventBus) PublishEventProposalHeartbeat(event EventDataProposalHeartbeat) error {
return b.Publish(EventProposalHeartbeat, event)
func (b *EventBus) PublishEventProposalHeartbeat(data EventDataProposalHeartbeat) error {
return b.Publish(EventProposalHeartbeat, data)
}
//--- EventDataRoundState events
func (b *EventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
return b.Publish(EventNewRoundStep, data)
}
func (b *EventBus) PublishEventNewRoundStep(event EventDataRoundState) error {
return b.Publish(EventNewRoundStep, event)
func (b *EventBus) PublishEventTimeoutPropose(data EventDataRoundState) error {
return b.Publish(EventTimeoutPropose, data)
}
func (b *EventBus) PublishEventTimeoutPropose(event EventDataRoundState) error {
return b.Publish(EventTimeoutPropose, event)
func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
return b.Publish(EventTimeoutWait, data)
}
func (b *EventBus) PublishEventTimeoutWait(event EventDataRoundState) error {
return b.Publish(EventTimeoutWait, event)
func (b *EventBus) PublishEventNewRound(data EventDataRoundState) error {
return b.Publish(EventNewRound, data)
}
func (b *EventBus) PublishEventNewRound(event EventDataRoundState) error {
return b.Publish(EventNewRound, event)
func (b *EventBus) PublishEventCompleteProposal(data EventDataRoundState) error {
return b.Publish(EventCompleteProposal, data)
}
func (b *EventBus) PublishEventCompleteProposal(event EventDataRoundState) error {
return b.Publish(EventCompleteProposal, event)
func (b *EventBus) PublishEventPolka(data EventDataRoundState) error {
return b.Publish(EventPolka, data)
}
func (b *EventBus) PublishEventPolka(event EventDataRoundState) error {
return b.Publish(EventPolka, event)
func (b *EventBus) PublishEventUnlock(data EventDataRoundState) error {
return b.Publish(EventUnlock, data)
}
func (b *EventBus) PublishEventUnlock(event EventDataRoundState) error {
return b.Publish(EventUnlock, event)
func (b *EventBus) PublishEventRelock(data EventDataRoundState) error {
return b.Publish(EventRelock, data)
}
func (b *EventBus) PublishEventRelock(event EventDataRoundState) error {
return b.Publish(EventRelock, event)
func (b *EventBus) PublishEventLock(data EventDataRoundState) error {
return b.Publish(EventLock, data)
}
func (b *EventBus) PublishEventLock(event EventDataRoundState) error {
return b.Publish(EventLock, event)
func (b *EventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
return b.Publish(EventValidatorSetUpdates, data)
}
func logIfTagExists(tag string, tags map[string]string, logger log.Logger) {


+ 3
- 1
types/event_bus_test.go View File

@ -68,7 +68,7 @@ func TestEventBusPublish(t *testing.T) {
err = eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, eventsCh)
require.NoError(t, err)
const numEventsExpected = 14
const numEventsExpected = 15
done := make(chan struct{})
go func() {
numEvents := 0
@ -108,6 +108,8 @@ func TestEventBusPublish(t *testing.T) {
require.NoError(t, err)
err = eventBus.PublishEventLock(EventDataRoundState{})
require.NoError(t, err)
err = eventBus.PublishEventValidatorSetUpdates(EventDataValidatorSetUpdates{})
require.NoError(t, err)
select {
case <-done:


+ 38
- 39
types/events.go View File

@ -8,42 +8,34 @@ import (
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
)
// Reserved event types
// Reserved event types (alphabetically sorted).
const (
EventCompleteProposal = "CompleteProposal"
EventLock = "Lock"
EventNewBlock = "NewBlock"
EventNewBlockHeader = "NewBlockHeader"
EventNewRound = "NewRound"
EventNewRoundStep = "NewRoundStep"
EventPolka = "Polka"
EventRelock = "Relock"
EventTimeoutPropose = "TimeoutPropose"
EventTimeoutWait = "TimeoutWait"
EventTx = "Tx"
EventUnlock = "Unlock"
EventVote = "Vote"
EventProposalHeartbeat = "ProposalHeartbeat"
EventCompleteProposal = "CompleteProposal"
EventLock = "Lock"
EventNewBlock = "NewBlock"
EventNewBlockHeader = "NewBlockHeader"
EventNewRound = "NewRound"
EventNewRoundStep = "NewRoundStep"
EventPolka = "Polka"
EventProposalHeartbeat = "ProposalHeartbeat"
EventRelock = "Relock"
EventTimeoutPropose = "TimeoutPropose"
EventTimeoutWait = "TimeoutWait"
EventTx = "Tx"
EventUnlock = "Unlock"
EventValidatorSetUpdates = "ValidatorSetUpdates"
EventVote = "Vote"
)
///////////////////////////////////////////////////////////////////////////////
// ENCODING / DECODING
///////////////////////////////////////////////////////////////////////////////
// implements events.EventData
// TMEventData implements events.EventData.
type TMEventData interface {
AssertIsTMEventData()
// empty interface
}
func (_ EventDataNewBlock) AssertIsTMEventData() {}
func (_ EventDataNewBlockHeader) AssertIsTMEventData() {}
func (_ EventDataTx) AssertIsTMEventData() {}
func (_ EventDataRoundState) AssertIsTMEventData() {}
func (_ EventDataVote) AssertIsTMEventData() {}
func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {}
func (_ EventDataString) AssertIsTMEventData() {}
func RegisterEventDatas(cdc *amino.Codec) {
cdc.RegisterInterface((*TMEventData)(nil), nil)
cdc.RegisterConcrete(EventDataNewBlock{}, "tendermint/event/NewBlock", nil)
@ -52,6 +44,7 @@ func RegisterEventDatas(cdc *amino.Codec) {
cdc.RegisterConcrete(EventDataRoundState{}, "tendermint/event/RoundState", nil)
cdc.RegisterConcrete(EventDataVote{}, "tendermint/event/Vote", nil)
cdc.RegisterConcrete(EventDataProposalHeartbeat{}, "tendermint/event/ProposalHeartbeat", nil)
cdc.RegisterConcrete(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates", nil)
cdc.RegisterConcrete(EventDataString(""), "tendermint/event/ProposalString", nil)
}
@ -92,6 +85,10 @@ type EventDataVote struct {
type EventDataString string
type EventDataValidatorSetUpdates struct {
ValidatorUpdates []*Validator `json:"validator_updates"`
}
///////////////////////////////////////////////////////////////////////////////
// PUBSUB
///////////////////////////////////////////////////////////////////////////////
@ -108,20 +105,21 @@ const (
)
var (
EventQueryNewBlock = QueryForEvent(EventNewBlock)
EventQueryNewBlockHeader = QueryForEvent(EventNewBlockHeader)
EventQueryNewRound = QueryForEvent(EventNewRound)
EventQueryNewRoundStep = QueryForEvent(EventNewRoundStep)
EventQueryTimeoutPropose = QueryForEvent(EventTimeoutPropose)
EventQueryCompleteProposal = QueryForEvent(EventCompleteProposal)
EventQueryPolka = QueryForEvent(EventPolka)
EventQueryUnlock = QueryForEvent(EventUnlock)
EventQueryLock = QueryForEvent(EventLock)
EventQueryRelock = QueryForEvent(EventRelock)
EventQueryTimeoutWait = QueryForEvent(EventTimeoutWait)
EventQueryVote = QueryForEvent(EventVote)
EventQueryProposalHeartbeat = QueryForEvent(EventProposalHeartbeat)
EventQueryTx = QueryForEvent(EventTx)
EventQueryCompleteProposal = QueryForEvent(EventCompleteProposal)
EventQueryLock = QueryForEvent(EventLock)
EventQueryNewBlock = QueryForEvent(EventNewBlock)
EventQueryNewBlockHeader = QueryForEvent(EventNewBlockHeader)
EventQueryNewRound = QueryForEvent(EventNewRound)
EventQueryNewRoundStep = QueryForEvent(EventNewRoundStep)
EventQueryPolka = QueryForEvent(EventPolka)
EventQueryProposalHeartbeat = QueryForEvent(EventProposalHeartbeat)
EventQueryRelock = QueryForEvent(EventRelock)
EventQueryTimeoutPropose = QueryForEvent(EventTimeoutPropose)
EventQueryTimeoutWait = QueryForEvent(EventTimeoutWait)
EventQueryTx = QueryForEvent(EventTx)
EventQueryUnlock = QueryForEvent(EventUnlock)
EventQueryValidatorSetUpdates = QueryForEvent(EventValidatorSetUpdates)
EventQueryVote = QueryForEvent(EventVote)
)
func EventQueryTxFor(tx Tx) tmpubsub.Query {
@ -137,6 +135,7 @@ type BlockEventPublisher interface {
PublishEventNewBlock(block EventDataNewBlock) error
PublishEventNewBlockHeader(header EventDataNewBlockHeader) error
PublishEventTx(EventDataTx) error
PublishEventValidatorSetUpdates(EventDataValidatorSetUpdates) error
}
type TxEventPublisher interface {


+ 16
- 16
types/nop_event_bus.go View File

@ -20,58 +20,58 @@ func (NopEventBus) UnsubscribeAll(ctx context.Context, subscriber string) error
return nil
}
//--- block, tx, and vote events
func (NopEventBus) PublishEventNewBlock(block EventDataNewBlock) error {
func (NopEventBus) PublishEventNewBlock(data EventDataNewBlock) error {
return nil
}
func (NopEventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error {
func (NopEventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
return nil
}
func (NopEventBus) PublishEventVote(vote EventDataVote) error {
func (NopEventBus) PublishEventVote(data EventDataVote) error {
return nil
}
func (NopEventBus) PublishEventTx(tx EventDataTx) error {
func (NopEventBus) PublishEventTx(data EventDataTx) error {
return nil
}
//--- EventDataRoundState events
func (NopEventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventNewRoundStep(rs EventDataRoundState) error {
func (NopEventBus) PublishEventTimeoutPropose(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error {
func (NopEventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventTimeoutWait(rs EventDataRoundState) error {
func (NopEventBus) PublishEventNewRound(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventNewRound(rs EventDataRoundState) error {
func (NopEventBus) PublishEventCompleteProposal(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventCompleteProposal(rs EventDataRoundState) error {
func (NopEventBus) PublishEventPolka(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventPolka(rs EventDataRoundState) error {
func (NopEventBus) PublishEventUnlock(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventUnlock(rs EventDataRoundState) error {
func (NopEventBus) PublishEventRelock(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventRelock(rs EventDataRoundState) error {
func (NopEventBus) PublishEventLock(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventLock(rs EventDataRoundState) error {
func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
return nil
}

Loading…
Cancel
Save