Browse Source

event listeners take EventData, not interface{}; EventData type assertions

pull/130/head
Jae Kwon 9 years ago
parent
commit
ffcc657be6
16 changed files with 62 additions and 73 deletions
  1. +1
    -1
      crawler/crawl.go
  2. +1
    -1
      events/event_cache.go
  3. +7
    -6
      events/events.go
  4. +8
    -7
      rpc/server/handlers.go
  5. +6
    -12
      rpc/test/ws_helpers.go
  6. +0
    -3
      rpc/types/types.go
  7. +7
    -7
      state/execution.go
  8. +1
    -1
      state/permissions_test.go
  9. +0
    -6
      state/tx_cache.go
  10. +10
    -2
      types/events.go
  11. +0
    -6
      vm/test/fake_app_state.go
  12. +1
    -1
      vm/test/log_event_test.go
  13. +1
    -2
      vm/test/vm_test.go
  14. +0
    -3
      vm/types.go
  15. +7
    -9
      vm/vm.go
  16. +12
    -6
      wire/reflect.go

+ 1
- 1
crawler/crawl.go View File

@ -181,7 +181,7 @@ func (c *Crawler) readLoop(node *Node) {
}
func (c *Crawler) consumeMessage(eventMsg ctypes.ResultEvent, node *Node) error {
block := eventMsg.Data.(*types.Block)
block := eventMsg.Data.(*types.EventDataNewBlock).Block
node.LastSeen = time.Now()
node.BlockHeight = block.Height
node.BlockHistory[block.Height] = node.LastSeen


+ 1
- 1
events/event_cache.go View File

@ -30,7 +30,7 @@ type eventInfo struct {
}
// Cache an event to be fired upon finality.
func (evc *EventCache) FireEvent(event string, data interface{}) {
func (evc *EventCache) FireEvent(event string, data types.EventData) {
// append to list
evc.events = append(evc.events, eventInfo{event, data})
}


+ 7
- 6
events/events.go View File

@ -4,6 +4,7 @@ import (
"sync"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types"
)
// reactors and other modules should export
@ -14,7 +15,7 @@ type Eventable interface {
// an event switch or cache implements fireable
type Fireable interface {
FireEvent(event string, msg interface{})
FireEvent(event string, data types.EventData)
}
type EventSwitch struct {
@ -108,7 +109,7 @@ func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerId string)
}
}
func (evsw *EventSwitch) FireEvent(event string, msg interface{}) {
func (evsw *EventSwitch) FireEvent(event string, data types.EventData) {
// Get the eventCell
evsw.mtx.RLock()
eventCell := evsw.eventCells[event]
@ -119,7 +120,7 @@ func (evsw *EventSwitch) FireEvent(event string, msg interface{}) {
}
// Fire event for all listeners in eventCell
eventCell.FireEvent(msg)
eventCell.FireEvent(data)
}
//-----------------------------------------------------------------------------
@ -150,17 +151,17 @@ func (cell *eventCell) RemoveListener(listenerId string) int {
return numListeners
}
func (cell *eventCell) FireEvent(msg interface{}) {
func (cell *eventCell) FireEvent(data types.EventData) {
cell.mtx.RLock()
for _, listener := range cell.listeners {
listener(msg)
listener(data)
}
cell.mtx.RUnlock()
}
//-----------------------------------------------------------------------------
type eventCallback func(msg interface{})
type eventCallback func(data types.EventData)
type eventListener struct {
id string


+ 8
- 7
rpc/server/handlers.go View File

@ -16,6 +16,7 @@ import (
"github.com/tendermint/tendermint/events"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
. "github.com/tendermint/tendermint/rpc/types"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/wire"
)
@ -54,22 +55,22 @@ func NewRPCFunc(f interface{}, args []string) *RPCFunc {
func funcArgTypes(f interface{}) []reflect.Type {
t := reflect.TypeOf(f)
n := t.NumIn()
types := make([]reflect.Type, n)
typez := make([]reflect.Type, n)
for i := 0; i < n; i++ {
types[i] = t.In(i)
typez[i] = t.In(i)
}
return types
return typez
}
// return a function's return types
func funcReturnTypes(f interface{}) []reflect.Type {
t := reflect.TypeOf(f)
n := t.NumOut()
types := make([]reflect.Type, n)
typez := make([]reflect.Type, n)
for i := 0; i < n; i++ {
types[i] = t.Out(i)
typez[i] = t.Out(i)
}
return types
return typez
}
// function introspection
@ -337,7 +338,7 @@ func (wsc *WSConnection) readRoutine() {
continue
} else {
log.Notice("Subscribe to event", "id", wsc.id, "event", event)
wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg interface{}) {
wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg types.EventData) {
// NOTE: RPCResponses of subscribed events have id suffix "#event"
wsc.writeRPCResponse(NewRPCResponse(request.Id+"#event", ctypes.ResultEvent{event, msg}, ""))
})


+ 6
- 12
rpc/test/ws_helpers.go View File

@ -82,7 +82,8 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou
errCh <- err
break
}
if response.Result.(*ctypes.ResultEvent).Event == eventid {
event, ok := response.Result.(*ctypes.ResultEvent)
if ok && event.Event == eventid {
goodCh <- p
break
}
@ -109,8 +110,8 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou
// run the check
err := check(eventid, p)
if err != nil {
panic(err)
t.Fatal(err)
panic(err) // Show the stack trace.
}
} else {
con.Close()
@ -118,6 +119,7 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou
}
case err := <-errCh:
t.Fatal(err)
panic(err) // Show the stack trace.
}
}
@ -140,15 +142,7 @@ func unmarshalResponseNewBlock(b []byte) (*types.Block, error) {
func unmarshalResponseNameReg(b []byte) (*types.NameTx, error) {
// unmarshall and assert somethings
var response struct {
JSONRPC string `json:"jsonrpc"`
Id string `json:"id"`
Result struct {
Event string `json:"event"`
Data *types.NameTx `json:"data"`
} `json:"result"`
Error string `json:"error"`
}
var response ctypes.Response
var err error
wire.ReadJSON(&response, b, &err)
if err != nil {
@ -157,7 +151,7 @@ func unmarshalResponseNameReg(b []byte) (*types.NameTx, error) {
if response.Error != "" {
return nil, fmt.Errorf(response.Error)
}
tx := response.Result.Data
tx := response.Result.(*ctypes.ResultEvent).Data.(types.EventDataTx).Tx.(*types.NameTx)
return tx, nil
}


+ 0
- 3
rpc/types/types.go View File

@ -15,9 +15,6 @@ type RPCResponse struct {
}
func NewRPCResponse(id string, res interface{}, err string) RPCResponse {
if res == nil {
res = struct{}{}
}
return RPCResponse{
JSONRPC: "2.0",
Id: id,


+ 7
- 7
state/execution.go View File

@ -628,8 +628,8 @@ func ExecTx(blockCache *BlockCache, tx types.Tx, runCall bool, evc events.Fireab
// TODO: maybe we want to take funds on error and allow txs in that don't do anythingi?
if evc != nil {
evc.FireEvent(types.EventStringAccInput(tx.Input.Address), tx)
evc.FireEvent(types.EventStringNameReg(tx.Name), tx)
evc.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventDataTx{tx, nil, ""})
evc.FireEvent(types.EventStringNameReg(tx.Name), types.EventDataTx{tx, nil, ""})
}
return nil
@ -711,7 +711,7 @@ func ExecTx(blockCache *BlockCache, tx types.Tx, runCall bool, evc events.Fireab
}
if evc != nil {
// TODO: fire for all inputs
evc.FireEvent(types.EventStringBond(), tx)
evc.FireEvent(types.EventStringBond(), types.EventDataTx{tx, nil, ""})
}
return nil
@ -736,7 +736,7 @@ func ExecTx(blockCache *BlockCache, tx types.Tx, runCall bool, evc events.Fireab
// Good!
_s.unbondValidator(val)
if evc != nil {
evc.FireEvent(types.EventStringUnbond(), tx)
evc.FireEvent(types.EventStringUnbond(), types.EventDataTx{tx, nil, ""})
}
return nil
@ -764,7 +764,7 @@ func ExecTx(blockCache *BlockCache, tx types.Tx, runCall bool, evc events.Fireab
// Good!
_s.rebondValidator(val)
if evc != nil {
evc.FireEvent(types.EventStringRebond(), tx)
evc.FireEvent(types.EventStringRebond(), types.EventDataTx{tx, nil, ""})
}
return nil
@ -803,7 +803,7 @@ func ExecTx(blockCache *BlockCache, tx types.Tx, runCall bool, evc events.Fireab
// Good! (Bad validator!)
_s.destroyValidator(accused)
if evc != nil {
evc.FireEvent(types.EventStringDupeout(), tx)
evc.FireEvent(types.EventStringDupeout(), types.EventDataTx{tx, nil, ""})
}
return nil
@ -894,7 +894,7 @@ func ExecTx(blockCache *BlockCache, tx types.Tx, runCall bool, evc events.Fireab
if evc != nil {
evc.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventDataTx{tx, nil, ""})
evc.FireEvent(types.EventStringPermissions(ptypes.PermFlagToString(permFlag)), tx)
evc.FireEvent(types.EventStringPermissions(ptypes.PermFlagToString(permFlag)), types.EventDataTx{tx, nil, ""})
}
return nil


+ 1
- 1
state/permissions_test.go View File

@ -1055,7 +1055,7 @@ func execTxWaitEvent(t *testing.T, blockCache *BlockCache, tx types.Tx, eventid
evsw := events.NewEventSwitch()
evsw.Start()
ch := make(chan interface{})
evsw.AddListenerForEvent("test", eventid, func(msg interface{}) {
evsw.AddListenerForEvent("test", eventid, func(msg types.EventData) {
ch <- msg
})
evc := events.NewEventCache(evsw)


+ 0
- 6
state/tx_cache.go View File

@ -12,7 +12,6 @@ type TxCache struct {
backend *BlockCache
accounts map[Word256]vmAccountInfo
storages map[Tuple256]Word256
logs []types.EventDataLog
}
func NewTxCache(backend *BlockCache) *TxCache {
@ -20,7 +19,6 @@ func NewTxCache(backend *BlockCache) *TxCache {
backend: backend,
accounts: make(map[Word256]vmAccountInfo),
storages: make(map[Tuple256]Word256),
logs: make([]types.EventDataLog, 0),
}
}
@ -138,10 +136,6 @@ func (cache *TxCache) Sync() {
}
}
func (cache *TxCache) AddLog(log types.EventDataLog) {
cache.logs = append(cache.logs, log)
}
//-----------------------------------------------------------------------------
// Convenience function to return address of new contract


+ 10
- 2
types/events.go View File

@ -14,6 +14,7 @@ func EventStringAccOutput(addr []byte) string { return fmt.Sprintf("Acc/%X/Out
func EventStringAccCall(addr []byte) string { return fmt.Sprintf("Acc/%X/Call", addr) }
func EventStringLogEvent(addr []byte) string { return fmt.Sprintf("Log/%X", addr) }
func EventStringPermissions(name string) string { return fmt.Sprintf("Permissions/%s", name) }
func EventStringNameReg(name string) string { return fmt.Sprintf("NameReg/%s", name) }
func EventStringBond() string { return "Bond" }
func EventStringUnbond() string { return "Unbond" }
func EventStringRebond() string { return "Rebond" }
@ -31,12 +32,14 @@ const (
EventDataTypeLog = byte(0x05)
)
type EventData interface{}
type EventData interface {
AssertIsEventData()
}
var _ = wire.RegisterInterface(
struct{ EventData }{},
wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock},
// wire.ConcreteType{EventDataNewBlock{}, EventDataTypeFork },
// wire.ConcreteType{EventDataFork{}, EventDataTypeFork },
wire.ConcreteType{EventDataTx{}, EventDataTypeTx},
wire.ConcreteType{EventDataCall{}, EventDataTypeCall},
wire.ConcreteType{EventDataLog{}, EventDataTypeLog},
@ -80,3 +83,8 @@ type EventDataLog struct {
Data []byte `json:"data"`
Height int64 `json:"height"`
}
func (_ EventDataNewBlock) AssertIsEventData() {}
func (_ EventDataTx) AssertIsEventData() {}
func (_ EventDataCall) AssertIsEventData() {}
func (_ EventDataLog) AssertIsEventData() {}

+ 0
- 6
vm/test/fake_app_state.go View File

@ -2,7 +2,6 @@ package vm
import (
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types"
. "github.com/tendermint/tendermint/vm"
"github.com/tendermint/tendermint/vm/sha3"
)
@ -10,7 +9,6 @@ import (
type FakeAppState struct {
accounts map[string]*Account
storage map[string]Word256
logs []types.EventDataLog
}
func (fas *FakeAppState) GetAccount(addr Word256) *Account {
@ -70,10 +68,6 @@ func (fas *FakeAppState) SetStorage(addr Word256, key Word256, value Word256) {
fas.storage[addr.String()+key.String()] = value
}
func (fas *FakeAppState) AddLog(log types.EventDataLog) {
fas.logs = append(fas.logs, log)
}
// Creates a 20 byte address and bumps the nonce.
func createAddress(creator *Account) Word256 {
nonce := creator.Nonce


+ 1
- 1
vm/test/log_event_test.go View File

@ -44,7 +44,7 @@ func TestLog4(t *testing.T) {
doneChan := make(chan struct{}, 1)
eventSwitch.AddListenerForEvent("test", eventId, func(event interface{}) {
eventSwitch.AddListenerForEvent("test", eventId, func(event types.EventData) {
logEvent := event.(types.EventDataLog)
// No need to test address as this event would not happen if it wasn't correct
if !reflect.DeepEqual(logEvent.Topics, expectedTopics) {


+ 1
- 2
vm/test/vm_test.go View File

@ -19,7 +19,6 @@ func newAppState() *FakeAppState {
fas := &FakeAppState{
accounts: make(map[string]*Account),
storage: make(map[string]Word256),
logs: nil,
}
// For default permissions
fas.accounts[ptypes.GlobalPermissionsAddress256.String()] = &Account{
@ -158,7 +157,7 @@ func runVMWaitEvents(t *testing.T, ourVm *VM, caller, callee *Account, subscribe
evsw.Start()
ch := make(chan interface{})
fmt.Printf("subscribe to %x\n", subscribeAddr)
evsw.AddListenerForEvent("test", types.EventStringAccCall(subscribeAddr), func(msg interface{}) {
evsw.AddListenerForEvent("test", types.EventStringAccCall(subscribeAddr), func(msg types.EventData) {
ch <- msg
})
evc := events.NewEventCache(evsw)


+ 0
- 3
vm/types.go View File

@ -3,7 +3,6 @@ package vm
import (
. "github.com/tendermint/tendermint/common"
ptypes "github.com/tendermint/tendermint/permission/types"
"github.com/tendermint/tendermint/types"
)
const (
@ -40,8 +39,6 @@ type AppState interface {
GetStorage(Word256, Word256) Word256
SetStorage(Word256, Word256, Word256) // Setting to Zero is deleting.
// Logs
AddLog(types.EventDataLog)
}
type Params struct {


+ 7
- 9
vm/vm.go View File

@ -694,20 +694,18 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value int64, gas
if !ok {
return nil, firstErr(err, ErrMemoryOutOfBounds)
}
log := types.EventDataLog{
callee.Address,
topics,
data,
vm.params.BlockHeight,
}
vm.appState.AddLog(log)
if vm.evc != nil {
eventId := types.EventStringLogEvent(callee.Address.Postfix(20))
fmt.Printf("eventId: %s\n", eventId)
log := types.EventDataLog{
callee.Address,
topics,
data,
vm.params.BlockHeight,
}
vm.evc.FireEvent(eventId, log)
}
// Using sol-log for this as well since 'log' will print garbage.
dbg.Printf(" => T:%X D:%X\n", log.Topics, log.Data)
dbg.Printf(" => T:%X D:%X\n", topics, data)
case CREATE: // 0xF0
if !HasPermission(vm.appState, callee, ptypes.CreateContract) {


+ 12
- 6
wire/reflect.go View File

@ -419,11 +419,14 @@ func writeReflectBinary(rv reflect.Value, rt reflect.Type, opts Options, w io.Wr
if !ok {
switch crt.Kind() {
case reflect.Ptr:
*err = errors.New(Fmt("Unexpected pointer type %v. Was it registered as a value receiver rather than as a pointer receiver?", crt))
*err = errors.New(Fmt("Unexpected pointer type %v for registered interface %v. "+
"Was it registered as a value receiver rather than as a pointer receiver?", crt, rt.Name()))
case reflect.Struct:
*err = errors.New(Fmt("Unexpected struct type %v. Was it registered as a pointer receiver rather than as a value receiver?", crt))
*err = errors.New(Fmt("Unexpected struct type %v for registered interface %v. "+
"Was it registered as a pointer receiver rather than as a value receiver?", crt, rt.Name()))
default:
*err = errors.New(Fmt("Unexpected type %v.", crt))
*err = errors.New(Fmt("Unexpected type %v for registered interface %v. "+
"If this is intentional, please register it.", crt, rt.Name()))
}
return
}
@ -819,11 +822,14 @@ func writeReflectJSON(rv reflect.Value, rt reflect.Type, w io.Writer, n *int64,
if !ok {
switch crt.Kind() {
case reflect.Ptr:
*err = errors.New(Fmt("Unexpected pointer type %v. Was it registered as a value receiver rather than as a pointer receiver?", crt))
*err = errors.New(Fmt("Unexpected pointer type %v for registered interface %v. "+
"Was it registered as a value receiver rather than as a pointer receiver?", crt, rt.Name()))
case reflect.Struct:
*err = errors.New(Fmt("Unexpected struct type %v. Was it registered as a pointer receiver rather than as a value receiver?", crt))
*err = errors.New(Fmt("Unexpected struct type %v for registered interface %v. "+
"Was it registered as a pointer receiver rather than as a value receiver?", crt, rt.Name()))
default:
*err = errors.New(Fmt("Unexpected type %v.", crt))
*err = errors.New(Fmt("Unexpected type %v for registered interface %v. "+
"If this is intentional, please register it.", crt, rt.Name()))
}
return
}


Loading…
Cancel
Save