Browse Source

Merge remote-tracking branch 'origin/websockets' into develop

pull/55/head
Jae Kwon 10 years ago
parent
commit
a9467414d6
27 changed files with 982 additions and 214 deletions
  1. +6
    -0
      account/account.go
  2. +2
    -2
      blockchain/reactor.go
  3. +5
    -4
      common/word.go
  4. +3
    -2
      consensus/reactor.go
  5. +37
    -16
      consensus/state.go
  6. +41
    -0
      events/event_cache.go
  7. +6
    -1
      events/events.go
  8. +2
    -2
      mempool/mempool.go
  9. +2
    -2
      mempool/reactor.go
  10. +7
    -3
      node/node.go
  11. +2
    -2
      p2p/pex_reactor.go
  12. +0
    -4
      rpc/core/routes.go
  13. +2
    -2
      rpc/core/txs.go
  14. +124
    -87
      rpc/handlers.go
  15. +17
    -0
      rpc/http_server.go
  16. +302
    -0
      rpc/test/client_ws_test.go
  17. +66
    -5
      rpc/test/helpers_test.go
  18. +36
    -54
      rpc/test/tests_test.go
  19. +136
    -0
      rpc/test/ws_helpers_test.go
  20. +6
    -6
      state/block_cache.go
  21. +39
    -4
      state/execution.go
  22. +10
    -0
      state/state.go
  23. +1
    -1
      state/state_test.go
  24. +10
    -6
      state/tx_cache.go
  25. +84
    -0
      types/events.go
  26. +2
    -2
      vm/test/vm_test.go
  27. +34
    -9
      vm/vm.go

+ 6
- 0
account/account.go View File

@ -6,6 +6,7 @@ import (
"io"
"github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/merkle"
)
// Signable is an interface for all signable things.
@ -24,6 +25,11 @@ func SignBytes(o Signable) []byte {
return buf.Bytes()
}
// HashSignBytes is a convenience method for getting the hash of the bytes of a signable
func HashSignBytes(o Signable) []byte {
return merkle.HashFromBinary(SignBytes(o))
}
//-----------------------------------------------------------------------------
// Account resides in the application state, and is mutated by transactions


+ 2
- 2
blockchain/reactor.go View File

@ -43,7 +43,7 @@ type BlockchainReactor struct {
quit chan struct{}
running uint32
evsw *events.EventSwitch
evsw events.Fireable
}
func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
@ -244,7 +244,7 @@ func (bcR *BlockchainReactor) BroadcastStatus() error {
}
// implements events.Eventable
func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
bcR.evsw = evsw
}


+ 5
- 4
common/word.go View File

@ -13,10 +13,11 @@ var (
type Word256 [32]byte
func (w Word256) String() string { return string(w[:]) }
func (w Word256) Copy() Word256 { return w }
func (w Word256) Bytes() []byte { return w[:] } // copied.
func (w Word256) Prefix(n int) []byte { return w[:n] }
func (w Word256) String() string { return string(w[:]) }
func (w Word256) Copy() Word256 { return w }
func (w Word256) Bytes() []byte { return w[:] } // copied.
func (w Word256) Prefix(n int) []byte { return w[:n] }
func (w Word256) Postfix(n int) []byte { return w[32-n:] }
func (w Word256) IsZero() bool {
accum := byte(0)
for _, byt := range w {


+ 3
- 2
consensus/reactor.go View File

@ -41,7 +41,7 @@ type ConsensusReactor struct {
blockStore *bc.BlockStore
conS *ConsensusState
evsw *events.EventSwitch
evsw events.Fireable
}
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
@ -235,8 +235,9 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) {
}
// implements events.Eventable
func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
conR.evsw = evsw
conR.conS.SetFireable(evsw)
}
//--------------------------------------


+ 37
- 16
consensus/state.go View File

@ -66,17 +66,21 @@ import (
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/config"
. "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/events"
mempl "github.com/tendermint/tendermint/mempool"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
const (
roundDuration0 = 10 * time.Second // The first round is 60 seconds long.
roundDurationDelta = 3 * time.Second // Each successive round lasts 15 seconds longer.
roundDeadlinePrevote = float64(1.0 / 3.0) // When the prevote is due.
roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due.
newHeightDelta = roundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds.
)
var (
RoundDuration0 = 10 * time.Second // The first round is 60 seconds long.
RoundDurationDelta = 3 * time.Second // Each successive round lasts 15 seconds longer.
newHeightDelta = RoundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds.
)
var (
@ -246,6 +250,9 @@ type ConsensusState struct {
stagedBlock *types.Block // Cache last staged block.
stagedState *sm.State // Cache result of staged block.
lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on.
evsw events.Fireable
evc *events.EventCache // set in stageBlock and passed into state
}
func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState {
@ -315,14 +322,14 @@ func (cs *ConsensusState) stepTransitionRoutine() {
// NOTE: We can push directly to runActionCh because
// we're running in a separate goroutine, which avoids deadlocks.
rs := cs.getRoundState()
round, roundStartTime, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime)
round, roundStartTime, RoundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime)
log.Debug("Scheduling next action", "height", rs.Height, "round", round, "step", rs.Step, "roundStartTime", roundStartTime, "elapsedRatio", elapsedRatio)
switch rs.Step {
case RoundStepNewHeight:
// We should run RoundActionPropose when rs.StartTime passes.
if elapsedRatio < 0 {
// startTime is in the future.
time.Sleep(time.Duration((-1.0 * elapsedRatio) * float64(roundDuration)))
time.Sleep(time.Duration((-1.0 * elapsedRatio) * float64(RoundDuration)))
}
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose}
case RoundStepNewRound:
@ -330,15 +337,15 @@ func (cs *ConsensusState) stepTransitionRoutine() {
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose}
case RoundStepPropose:
// Wake up when it's time to vote.
time.Sleep(time.Duration((roundDeadlinePrevote - elapsedRatio) * float64(roundDuration)))
time.Sleep(time.Duration((roundDeadlinePrevote - elapsedRatio) * float64(RoundDuration)))
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrevote}
case RoundStepPrevote:
// Wake up when it's time to precommit.
time.Sleep(time.Duration((roundDeadlinePrecommit - elapsedRatio) * float64(roundDuration)))
time.Sleep(time.Duration((roundDeadlinePrecommit - elapsedRatio) * float64(RoundDuration)))
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrecommit}
case RoundStepPrecommit:
// Wake up when the round is over.
time.Sleep(time.Duration((1.0 - elapsedRatio) * float64(roundDuration)))
time.Sleep(time.Duration((1.0 - elapsedRatio) * float64(RoundDuration)))
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionTryCommit}
case RoundStepCommit:
// There's nothing to scheudle, we're waiting for
@ -437,6 +444,12 @@ ACTION_LOOP:
if cs.TryFinalizeCommit(rs.Height) {
// Now at new height
// cs.Step is at RoundStepNewHeight or RoundStepNewRound.
// fire some events!
go func() {
newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight)
cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock)
cs.evc.Flush()
}()
scheduleNextAction()
continue ACTION_LOOP
} else {
@ -1023,6 +1036,9 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS
// Create a copy of the state for staging
stateCopy := cs.state.Copy()
// reset the event cache and pass it into the state
cs.evc = events.NewEventCache(cs.evsw)
stateCopy.SetFireable(cs.evc)
// Commit block onto the copied state.
// NOTE: Basic validation is done in state.AppendBlock().
@ -1107,17 +1123,22 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty
}
}
// implements events.Eventable
func (cs *ConsensusState) SetFireable(evsw events.Fireable) {
cs.evsw = evsw
}
//-----------------------------------------------------------------------------
// total duration of given round
func calcRoundDuration(round uint) time.Duration {
return roundDuration0 + roundDurationDelta*time.Duration(round)
return RoundDuration0 + RoundDurationDelta*time.Duration(round)
}
// startTime is when round zero started.
func calcRoundStartTime(round uint, startTime time.Time) time.Time {
return startTime.Add(roundDuration0*time.Duration(round) +
roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2)))
return startTime.Add(RoundDuration0*time.Duration(round) +
RoundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2)))
}
// calculates the current round given startTime of round zero.
@ -1131,8 +1152,8 @@ func calcRound(startTime time.Time) uint {
// D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0.
// AR^2 + BR + C <= 0; A = D_delta, B = (2_D0 - D_delta), C = 2(Start - Now).
// R = Floor((-B + Sqrt(B^2 - 4AC))/2A)
A := float64(roundDurationDelta)
B := 2.0*float64(roundDuration0) - float64(roundDurationDelta)
A := float64(RoundDurationDelta)
B := 2.0*float64(RoundDuration0) - float64(RoundDurationDelta)
C := 2.0 * float64(startTime.Sub(now))
R := math.Floor((-B + math.Sqrt(B*B-4.0*A*C)) / (2 * A))
if math.IsNaN(R) {
@ -1149,12 +1170,12 @@ func calcRound(startTime time.Time) uint {
// convenience
// NOTE: elapsedRatio can be negative if startTime is in the future.
func calcRoundInfo(startTime time.Time) (round uint, roundStartTime time.Time, roundDuration time.Duration,
func calcRoundInfo(startTime time.Time) (round uint, roundStartTime time.Time, RoundDuration time.Duration,
roundElapsed time.Duration, elapsedRatio float64) {
round = calcRound(startTime)
roundStartTime = calcRoundStartTime(round, startTime)
roundDuration = calcRoundDuration(round)
RoundDuration = calcRoundDuration(round)
roundElapsed = time.Now().Sub(roundStartTime)
elapsedRatio = float64(roundElapsed) / float64(roundDuration)
elapsedRatio = float64(roundElapsed) / float64(RoundDuration)
return
}

+ 41
- 0
events/event_cache.go View File

@ -0,0 +1,41 @@
package events
const (
eventsBufferSize = 1000
)
// An EventCache buffers events for a Fireable
// All events are cached. Filtering happens on Flush
type EventCache struct {
evsw Fireable
events []eventInfo
}
// Create a new EventCache with an EventSwitch as backend
func NewEventCache(evsw Fireable) *EventCache {
return &EventCache{
evsw: evsw,
events: make([]eventInfo, eventsBufferSize),
}
}
// a cached event
type eventInfo struct {
event string
msg interface{}
}
// Cache an event to be fired upon finality.
func (evc *EventCache) FireEvent(event string, msg interface{}) {
// append to list
evc.events = append(evc.events, eventInfo{event, msg})
}
// Fire events by running evsw.FireEvent on all cached events. Blocks.
// Clears cached events
func (evc *EventCache) Flush() {
for _, ei := range evc.events {
evc.evsw.FireEvent(ei.event, ei.msg)
}
evc.events = make([]eventInfo, eventsBufferSize)
}

+ 6
- 1
events/events.go View File

@ -8,7 +8,12 @@ import (
// reactors and other modules should export
// this interface to become eventable
type Eventable interface {
SetEventSwitch(*EventSwitch)
SetFireable(Fireable)
}
// an event switch or cache implements fireable
type Fireable interface {
FireEvent(event string, msg interface{})
}
type EventSwitch struct {


+ 2
- 2
mempool/mempool.go View File

@ -42,7 +42,7 @@ func (mem *Mempool) GetCache() *sm.BlockCache {
func (mem *Mempool) AddTx(tx types.Tx) (err error) {
mem.mtx.Lock()
defer mem.mtx.Unlock()
err = sm.ExecTx(mem.cache, tx, false)
err = sm.ExecTx(mem.cache, tx, false, nil)
if err != nil {
log.Debug("AddTx() error", "tx", tx, "error", err)
return err
@ -93,7 +93,7 @@ func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) {
// Next, filter all txs that aren't valid given new state.
validTxs := []types.Tx{}
for _, tx := range txs {
err := sm.ExecTx(mem.cache, tx, false)
err := sm.ExecTx(mem.cache, tx, false, nil)
if err == nil {
log.Debug("Filter in, valid", "tx", tx)
validTxs = append(validTxs, tx)


+ 2
- 2
mempool/reactor.go View File

@ -26,7 +26,7 @@ type MempoolReactor struct {
Mempool *Mempool
evsw *events.EventSwitch
evsw events.Fireable
}
func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
@ -116,7 +116,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
}
// implements events.Eventable
func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (memR *MempoolReactor) SetFireable(evsw events.Fireable) {
memR.evsw = evsw
}


+ 7
- 3
node/node.go View File

@ -82,7 +82,7 @@ func NewNode() *Node {
// add the event switch to all services
// they should all satisfy events.Eventable
SetEventSwitch(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
return &Node{
sw: sw,
@ -116,9 +116,9 @@ func (n *Node) Stop() {
}
// Add the event switch to reactors, mempool, etc.
func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
for _, e := range eventables {
e.SetEventSwitch(evsw)
e.SetFireable(evsw)
}
}
@ -167,6 +167,10 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor {
return n.mempoolReactor
}
func (n *Node) EventSwitch() *events.EventSwitch {
return n.evsw
}
//------------------------------------------------------------------------------
func RunNode() {


+ 2
- 2
p2p/pex_reactor.go View File

@ -34,7 +34,7 @@ type PEXReactor struct {
book *AddrBook
evsw *events.EventSwitch
evsw events.Fireable
}
func NewPEXReactor(book *AddrBook) *PEXReactor {
@ -211,7 +211,7 @@ func (pexR *PEXReactor) ensurePeers() {
}
// implements events.Eventable
func (pexR *PEXReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (pexR *PEXReactor) SetFireable(evsw events.Fireable) {
pexR.evsw = evsw
}


+ 0
- 4
rpc/core/routes.go View File

@ -4,10 +4,6 @@ import (
"github.com/tendermint/tendermint/rpc"
)
/*
TODO: support Call && GetStorage.
*/
var Routes = map[string]*rpc.RPCFunc{
"status": rpc.NewRPCFunc(Status, []string{}),
"net_info": rpc.NewRPCFunc(NetInfo, []string{}),


+ 2
- 2
rpc/core/txs.go View File

@ -43,7 +43,7 @@ func Call(address, data []byte) (*ctypes.ResponseCall, error) {
GasLimit: 10000000,
}
vmach := vm.NewVM(txCache, params, caller.Address)
vmach := vm.NewVM(txCache, params, caller.Address, nil)
gas := uint64(1000000000)
ret, err := vmach.Call(caller, callee, callee.Code, data, 0, &gas)
if err != nil {
@ -68,7 +68,7 @@ func CallCode(code, data []byte) (*ctypes.ResponseCall, error) {
GasLimit: 10000000,
}
vmach := vm.NewVM(txCache, params, caller.Address)
vmach := vm.NewVM(txCache, params, caller.Address, nil)
gas := uint64(1000000000)
ret, err := vmach.Call(caller, callee, code, data, 0, &gas)
if err != nil {


+ 124
- 87
rpc/handlers.go View File

@ -5,12 +5,13 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/events"
"golang.org/x/net/websocket"
"io/ioutil"
"net/http"
"reflect"
"sync/atomic"
"time"
)
@ -26,8 +27,8 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
// websocket endpoint
w := NewWebsocketManager(evsw)
mux.Handle("/events", websocket.Handler(w.eventsHandler))
wm := NewWebsocketManager(evsw)
mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler))
}
//-------------------------------------
@ -198,156 +199,192 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
//-----------------------------------------------------------------------------
// rpc.websocket
const (
WSConnectionReaperSeconds = 5
MaxFailedSends = 10
WriteChanBufferSize = 10
)
// for requests coming in
type WsRequest struct {
type WSRequest struct {
Type string // subscribe or unsubscribe
Event string
}
// for responses going out
type WsResponse struct {
type WSResponse struct {
Event string
Data interface{}
Error string
}
// a single websocket connection
// contains the listeners id
type Connection struct {
// contains listener id, underlying ws connection,
// and the event switch for subscribing to events
type WSConnection struct {
id string
wsCon *websocket.Conn
writeChan chan WsResponse
quitChan chan struct{}
wsConn *websocket.Conn
writeChan chan WSResponse
failedSends uint
started uint32
stopped uint32
evsw *events.EventSwitch
}
// new websocket connection wrapper
func NewConnection(con *websocket.Conn) *Connection {
return &Connection{
id: con.RemoteAddr().String(),
wsCon: con,
writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full
func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
return &WSConnection{
id: wsConn.RemoteAddr().String(),
wsConn: wsConn,
writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
}
}
// close the connection
func (c *Connection) Close() {
c.wsCon.Close()
close(c.writeChan)
close(c.quitChan)
}
// start the connection and hand her the event switch
func (con *WSConnection) Start(evsw *events.EventSwitch) {
if atomic.CompareAndSwapUint32(&con.started, 0, 1) {
con.evsw = evsw
// main manager for all websocket connections
// holds the event switch
type WebsocketManager struct {
ew *events.EventSwitch
cons map[string]*Connection
// read subscriptions/unsubscriptions to events
go con.read()
// write responses
con.write()
}
}
func NewWebsocketManager(ew *events.EventSwitch) *WebsocketManager {
return &WebsocketManager{
ew: ew,
cons: make(map[string]*Connection),
// close the connection
func (con *WSConnection) Stop() {
if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
con.wsConn.Close()
close(con.writeChan)
}
}
func (w *WebsocketManager) eventsHandler(con *websocket.Conn) {
// register connection
c := NewConnection(con)
w.cons[con.RemoteAddr().String()] = c
// read subscriptions/unsubscriptions to events
go w.read(c)
// write responses
go w.write(c)
// attempt to write response to writeChan and record failures
func (con *WSConnection) safeWrite(resp WSResponse) {
select {
case con.writeChan <- resp:
// yay
con.failedSends = 0
default:
// channel is full
// if this happens too many times in a row,
// close connection
con.failedSends += 1
}
}
const (
WsConnectionReaperSeconds = 5
MaxFailedSendsSeconds = 10
WriteChanBuffer = 10
)
// read from the socket and subscribe to or unsubscribe from events
func (w *WebsocketManager) read(con *Connection) {
reaper := time.Tick(time.Second * WsConnectionReaperSeconds)
func (con *WSConnection) read() {
reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
for {
select {
case <-reaper:
if con.failedSends > MaxFailedSendsSeconds {
if con.failedSends > MaxFailedSends {
// sending has failed too many times.
// kill the connection
con.quitChan <- struct{}{}
con.Stop()
return
}
default:
var in []byte
if err := websocket.Message.Receive(con.wsCon, &in); err != nil {
_, in, err := con.wsConn.ReadMessage()
if err != nil {
// an error reading the connection,
// so kill the connection
con.quitChan <- struct{}{}
// kill the connection
con.Stop()
return
}
var req WsRequest
err := json.Unmarshal(in, &req)
var req WSRequest
err = json.Unmarshal(in, &req)
if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
con.writeChan <- WsResponse{Error: errStr}
con.safeWrite(WSResponse{Error: errStr})
continue
}
switch req.Type {
case "subscribe":
w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
resp := WsResponse{
log.Info("New event subscription", "con id", con.id, "event", req.Event)
con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
resp := WSResponse{
Event: req.Event,
Data: msg,
}
select {
case con.writeChan <- resp:
// yay
con.failedSends = 0
default:
// channel is full
// if this happens too many times,
// close connection
con.failedSends += 1
}
con.safeWrite(resp)
})
case "unsubscribe":
if req.Event != "" {
w.ew.RemoveListenerForEvent(req.Event, con.id)
con.evsw.RemoveListenerForEvent(req.Event, con.id)
} else {
w.ew.RemoveListener(con.id)
con.evsw.RemoveListener(con.id)
}
default:
con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type}
con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type})
}
}
}
}
// receives on a write channel and writes out to the socket
func (w *WebsocketManager) write(con *Connection) {
// receives on a write channel and writes out on the socket
func (con *WSConnection) write() {
n, err := new(int64), new(error)
for {
select {
case msg := <-con.writeChan:
buf := new(bytes.Buffer)
binary.WriteJSON(msg, buf, n, err)
if *err != nil {
log.Error("Failed to write JSON WsResponse", "error", err)
} else {
websocket.Message.Send(con.wsCon, buf.Bytes())
}
case <-con.quitChan:
w.closeConn(con)
msg, more := <-con.writeChan
if !more {
// the channel was closed, so ensure
// connection is stopped and return
con.Stop()
return
}
buf := new(bytes.Buffer)
binary.WriteJSON(msg, buf, n, err)
if *err != nil {
log.Error("Failed to write JSON WSResponse", "error", err)
} else {
if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
log.Error("Failed to write response on websocket", "error", err)
con.Stop()
return
}
}
}
}
// close a connection and delete from manager
func (w *WebsocketManager) closeConn(con *Connection) {
con.Close()
delete(w.cons, con.id)
// main manager for all websocket connections
// holds the event switch
type WebsocketManager struct {
websocket.Upgrader
evsw *events.EventSwitch
}
func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager {
return &WebsocketManager{
evsw: evsw,
Upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// TODO
return true
},
},
}
}
func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) {
wsConn, err := wm.Upgrade(w, r, nil)
if err != nil {
// TODO - return http error
log.Error("Failed to upgrade to websocket connection", "error", err)
return
}
// register connection
con := NewWSConnection(wsConn)
log.Info("New websocket connection", "origin", con.id)
con.Start(wm.evsw)
}
// rpc.websocket


+ 17
- 0
rpc/http_server.go View File

@ -2,12 +2,15 @@
package rpc
import (
"bufio"
"bytes"
"fmt"
"net"
"net/http"
"runtime/debug"
"time"
"github.com/tendermint/tendermint/alert"
"github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
)
@ -96,3 +99,17 @@ func (w *ResponseWriterWrapper) WriteHeader(status int) {
w.Status = status
w.ResponseWriter.WriteHeader(status)
}
// implements http.Hijacker
func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return w.ResponseWriter.(http.Hijacker).Hijack()
}
// Stick it as a deferred statement in gouroutines to prevent the program from crashing.
func Recover(daemonName string) {
if e := recover(); e != nil {
stack := string(debug.Stack())
errorString := fmt.Sprintf("[%s] %s\n%s", daemonName, e, stack)
alert.Alert(errorString)
}
}

+ 302
- 0
rpc/test/client_ws_test.go View File

@ -0,0 +1,302 @@
package rpc
import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"github.com/tendermint/tendermint/account"
"github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/rpc"
"github.com/tendermint/tendermint/types"
"net/http"
"testing"
"time"
)
//--------------------------------------------------------------------------------
// Utilities for testing the websocket service
// create a new connection
func newWSCon(t *testing.T) *websocket.Conn {
dialer := websocket.DefaultDialer
rHeader := http.Header{}
con, r, err := dialer.Dial(websocketAddr, rHeader)
fmt.Println("response", r)
if err != nil {
t.Fatal(err)
}
return con
}
// subscribe to an event
func subscribe(t *testing.T, con *websocket.Conn, eventid string) {
err := con.WriteJSON(rpc.WSRequest{
Type: "subscribe",
Event: eventid,
})
if err != nil {
t.Fatal(err)
}
}
// unsubscribe from an event
func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) {
err := con.WriteJSON(rpc.WSRequest{
Type: "unsubscribe",
Event: eventid,
})
if err != nil {
t.Fatal(err)
}
}
// wait for an event; do things that might trigger events, and check them when they are received
func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) {
// go routine to wait for webscoket msg
gch := make(chan []byte) // good channel
ech := make(chan error) // error channel
go func() {
for {
_, p, err := con.ReadMessage()
if err != nil {
ech <- err
break
} else {
// if the event id isnt what we're waiting on
// ignore it
var response struct {
Event string
}
if err := json.Unmarshal(p, &response); err != nil {
ech <- err
break
}
if response.Event == eventid {
gch <- p
break
}
}
}
}()
// do stuff (transactions)
f()
// wait for an event or 10 seconds
ticker := time.Tick(10 * time.Second)
select {
case <-ticker:
if dieOnTimeout {
con.Close()
t.Fatalf("%s event was not received in time", eventid)
}
// else that's great, we didn't hear the event
// and we shouldn't have
case p := <-gch:
if dieOnTimeout {
// message was received and expected
// run the check
err := check(eventid, p)
if err != nil {
t.Fatal(err)
}
} else {
con.Close()
t.Fatalf("%s event was not expected", eventid)
}
case err := <-ech:
t.Fatal(err)
}
}
func unmarshalResponseNewBlock(b []byte) (*types.Block, error) {
// unmarshall and assert somethings
var response struct {
Event string
Data *types.Block
Error string
}
var err error
binary.ReadJSON(&response, b, &err)
if err != nil {
return nil, err
}
if response.Error != "" {
return nil, fmt.Errorf(response.Error)
}
block := response.Data
return block, nil
}
//--------------------------------------------------------------------------------
// Test the websocket service
// make a simple connection to the server
func TestWSConnect(t *testing.T) {
con := newWSCon(t)
con.Close()
}
// receive a new block message
func _TestWSNewBlock(t *testing.T) {
con := newWSCon(t)
eid := types.EventStringNewBlock()
subscribe(t, con, eid)
defer func() {
unsubscribe(t, con, eid)
con.Close()
}()
waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error {
fmt.Println("Check:", string(b))
return nil
})
}
// receive a few new block messages in a row, with increasing height
func TestWSBlockchainGrowth(t *testing.T) {
con := newWSCon(t)
eid := types.EventStringNewBlock()
subscribe(t, con, eid)
defer func() {
unsubscribe(t, con, eid)
con.Close()
}()
// listen for NewBlock, ensure height increases by 1
unmarshalValidateBlockchain(t, con, eid)
}
// send a transaction and validate the events from listening for both sender and receiver
func TestWSSend(t *testing.T) {
toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54}
amt := uint64(100)
con := newWSCon(t)
eidInput := types.EventStringAccInput(userByteAddr)
eidOutput := types.EventStringAccOutput(toAddr)
subscribe(t, con, eidInput)
subscribe(t, con, eidOutput)
defer func() {
unsubscribe(t, con, eidInput)
unsubscribe(t, con, eidOutput)
con.Close()
}()
waitForEvent(t, con, eidInput, true, func() {
broadcastTx(t, "JSONRPC", userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0)
}, unmarshalValidateSend(amt, toAddr))
waitForEvent(t, con, eidOutput, true, func() {}, unmarshalValidateSend(amt, toAddr))
}
// ensure events are only fired once for a given transaction
func TestWSDoubleFire(t *testing.T) {
con := newWSCon(t)
eid := types.EventStringAccInput(userByteAddr)
subscribe(t, con, eid)
defer func() {
unsubscribe(t, con, eid)
con.Close()
}()
amt := uint64(100)
toAddr := []byte{20, 143, 25, 63, 16, 177, 83, 29, 91, 91, 54, 23, 233, 46, 190, 121, 122, 34, 86, 54}
// broadcast the transaction, wait to hear about it
waitForEvent(t, con, eid, true, func() {
broadcastTx(t, "JSONRPC", userByteAddr, toAddr, nil, userBytePriv, amt, 0, 0)
}, func(eid string, b []byte) error {
return nil
})
// but make sure we don't hear about it twice
waitForEvent(t, con, eid, false, func() {
}, func(eid string, b []byte) error {
return nil
})
}
// create a contract, wait for the event, and send it a msg, validate the return
func TestWSCallWait(t *testing.T) {
con := newWSCon(t)
eid1 := types.EventStringAccInput(userByteAddr)
subscribe(t, con, eid1)
defer func() {
unsubscribe(t, con, eid1)
con.Close()
}()
amt := uint64(10000)
code, returnCode, returnVal := simpleContract()
var contractAddr []byte
// wait for the contract to be created
waitForEvent(t, con, eid1, true, func() {
_, receipt := broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000)
contractAddr = receipt.ContractAddr
}, unmarshalValidateCall(amt, returnCode))
// susbscribe to the new contract
amt = uint64(10001)
eid2 := types.EventStringAccReceive(contractAddr)
subscribe(t, con, eid2)
defer func() {
unsubscribe(t, con, eid2)
}()
// get the return value from a call
data := []byte{0x1} // just needs to be non empty for this to be a CallTx
waitForEvent(t, con, eid2, true, func() {
broadcastTx(t, "JSONRPC", userByteAddr, contractAddr, data, userBytePriv, amt, 1000, 1000)
}, unmarshalValidateCall(amt, returnVal))
}
// create a contract and send it a msg without waiting. wait for contract event
// and validate return
func TestWSCallNoWait(t *testing.T) {
con := newWSCon(t)
amt := uint64(10000)
code, _, returnVal := simpleContract()
_, receipt := broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000)
contractAddr := receipt.ContractAddr
// susbscribe to the new contract
amt = uint64(10001)
eid := types.EventStringAccReceive(contractAddr)
subscribe(t, con, eid)
defer func() {
unsubscribe(t, con, eid)
con.Close()
}()
// get the return value from a call
data := []byte{0x1} // just needs to be non empty for this to be a CallTx
waitForEvent(t, con, eid, true, func() {
broadcastTx(t, "JSONRPC", userByteAddr, contractAddr, data, userBytePriv, amt, 1000, 1000)
}, unmarshalValidateCall(amt, returnVal))
}
// create two contracts, one of which calls the other
func TestWSCallCall(t *testing.T) {
con := newWSCon(t)
amt := uint64(10000)
code, _, returnVal := simpleContract()
txid := new([]byte)
// deploy the two contracts
_, receipt := broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000)
contractAddr1 := receipt.ContractAddr
code, _, _ = simpleCallContract(contractAddr1)
_, receipt = broadcastTx(t, "JSONRPC", userByteAddr, nil, code, userBytePriv, amt, 1000, 1000)
contractAddr2 := receipt.ContractAddr
// susbscribe to the new contracts
amt = uint64(10001)
eid1 := types.EventStringAccReceive(contractAddr1)
eid2 := types.EventStringAccReceive(contractAddr2)
subscribe(t, con, eid1)
subscribe(t, con, eid2)
defer func() {
unsubscribe(t, con, eid1)
unsubscribe(t, con, eid2)
con.Close()
}()
// call contract2, which should call contract1, and wait for ev1
data := []byte{0x1} // just needs to be non empty for this to be a CallTx
waitForEvent(t, con, eid1, true, func() {
tx, _ := broadcastTx(t, "JSONRPC", userByteAddr, contractAddr2, data, userBytePriv, amt, 1000, 1000)
*txid = account.HashSignBytes(tx)
}, unmarshalValidateCallCall(userByteAddr, returnVal, txid))
}

rpc/test/helpers.go → rpc/test/helpers_test.go View File


rpc/test/tests.go → rpc/test/tests_test.go View File


+ 136
- 0
rpc/test/ws_helpers_test.go View File

@ -0,0 +1,136 @@
package rpc
import (
"bytes"
"fmt"
"github.com/gorilla/websocket"
"github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/types"
"testing"
)
func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) {
var initBlockN uint
for i := 0; i < 2; i++ {
waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error {
block, err := unmarshalResponseNewBlock(b)
if err != nil {
return err
}
if i == 0 {
initBlockN = block.Header.Height
} else {
if block.Header.Height != initBlockN+uint(i) {
return fmt.Errorf("Expected block %d, got block %d", i, block.Header.Height)
}
}
return nil
})
}
}
func unmarshalValidateSend(amt uint64, toAddr []byte) func(string, []byte) error {
return func(eid string, b []byte) error {
// unmarshal and assert correctness
var response struct {
Event string
Data types.SendTx
Error string
}
var err error
binary.ReadJSON(&response, b, &err)
if err != nil {
return err
}
if response.Error != "" {
return fmt.Errorf(response.Error)
}
if eid != response.Event {
return fmt.Errorf("Eventid is not correct. Got %s, expected %s", response.Event, eid)
}
tx := response.Data
if bytes.Compare(tx.Inputs[0].Address, userByteAddr) != 0 {
return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Inputs[0].Address, userByteAddr)
}
if tx.Inputs[0].Amount != amt {
return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Inputs[0].Amount, amt)
}
if bytes.Compare(tx.Outputs[0].Address, toAddr) != 0 {
return fmt.Errorf("Receivers do not match up! Got %x, expected %x", tx.Outputs[0].Address, userByteAddr)
}
return nil
}
}
func unmarshalValidateCall(amt uint64, returnCode []byte) func(string, []byte) error {
return func(eid string, b []byte) error {
// unmarshall and assert somethings
var response struct {
Event string
Data struct {
Tx types.CallTx
Return []byte
Exception string
}
Error string
}
var err error
binary.ReadJSON(&response, b, &err)
if err != nil {
return err
}
if response.Error != "" {
return fmt.Errorf(response.Error)
}
if response.Data.Exception != "" {
return fmt.Errorf(response.Data.Exception)
}
tx := response.Data.Tx
if bytes.Compare(tx.Input.Address, userByteAddr) != 0 {
return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Input.Address, userByteAddr)
}
if tx.Input.Amount != amt {
return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Input.Amount, amt)
}
ret := response.Data.Return
if bytes.Compare(ret, returnCode) != 0 {
return fmt.Errorf("Call did not return correctly. Got %x, expected %x", ret, returnCode)
}
return nil
}
}
func unmarshalValidateCallCall(origin, returnCode []byte, txid *[]byte) func(string, []byte) error {
return func(eid string, b []byte) error {
// unmarshall and assert somethings
var response struct {
Event string
Data types.EventMsgCall
Error string
}
var err error
binary.ReadJSON(&response, b, &err)
if err != nil {
return err
}
if response.Error != "" {
return fmt.Errorf(response.Error)
}
if response.Data.Exception != "" {
return fmt.Errorf(response.Data.Exception)
}
if bytes.Compare(response.Data.Origin, origin) != 0 {
return fmt.Errorf("Origin does not match up! Got %x, expected %x", response.Data.Origin, origin)
}
ret := response.Data.Return
if bytes.Compare(ret, returnCode) != 0 {
return fmt.Errorf("Call did not return correctly. Got %x, expected %x", ret, returnCode)
}
if bytes.Compare(response.Data.TxId, *txid) != 0 {
return fmt.Errorf("TxIds do not match up! Got %x, expected %x", response.Data.TxId, *txid)
}
// calldata := response.Data.CallData
return nil
}
}

+ 6
- 6
state/block_cache.go View File

@ -91,13 +91,13 @@ func (cache *BlockCache) GetStorage(addr Word256, key Word256) (value Word256) {
}
// Get or load storage
acc, storage, removed, dirty := cache.accounts[string(addr.Prefix(20))].unpack()
acc, storage, removed, dirty := cache.accounts[string(addr.Postfix(20))].unpack()
if removed {
panic("GetStorage() on removed account")
}
if acc != nil && storage == nil {
storage = makeStorage(cache.db, acc.StorageRoot)
cache.accounts[string(addr.Prefix(20))] = accountInfo{acc, storage, false, dirty}
cache.accounts[string(addr.Postfix(20))] = accountInfo{acc, storage, false, dirty}
} else if acc == nil {
return Zero256
}
@ -114,7 +114,7 @@ func (cache *BlockCache) GetStorage(addr Word256, key Word256) (value Word256) {
// NOTE: Set value to zero to removed from the trie.
func (cache *BlockCache) SetStorage(addr Word256, key Word256, value Word256) {
_, _, removed, _ := cache.accounts[string(addr.Prefix(20))].unpack()
_, _, removed, _ := cache.accounts[string(addr.Postfix(20))].unpack()
if removed {
panic("SetStorage() on a removed account")
}
@ -146,7 +146,7 @@ func (cache *BlockCache) Sync() {
for _, storageKey := range storageKeys {
addr, key := Tuple256Split(storageKey)
if addr != curAddr || curAcc == nil {
acc, storage, removed, _ := cache.accounts[string(addr.Prefix(20))].unpack()
acc, storage, removed, _ := cache.accounts[string(addr.Postfix(20))].unpack()
if storage == nil {
storage = makeStorage(cache.db, acc.StorageRoot)
}
@ -166,7 +166,7 @@ func (cache *BlockCache) Sync() {
curStorage.Remove(key.Bytes())
} else {
curStorage.Set(key.Bytes(), value.Bytes())
cache.accounts[string(addr.Prefix(20))] = accountInfo{curAcc, curStorage, false, true}
cache.accounts[string(addr.Postfix(20))] = accountInfo{curAcc, curStorage, false, true}
}
}
@ -187,7 +187,7 @@ func (cache *BlockCache) Sync() {
}
} else {
if acc == nil {
panic(Fmt("Account should not be nil for addr: %X", acc.Address))
continue
}
if storage != nil {
newStorageRoot := storage.Save()


+ 39
- 4
state/execution.go View File

@ -7,6 +7,7 @@ import (
"github.com/tendermint/tendermint/account"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/vm"
)
@ -112,7 +113,7 @@ func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade
// Commit each tx
for _, tx := range block.Data.Txs {
err := ExecTx(blockCache, tx, true)
err := ExecTx(blockCache, tx, true, s.evc)
if err != nil {
return InvalidTxError{tx, err}
}
@ -292,11 +293,11 @@ func adjustByOutputs(accounts map[string]*account.Account, outs []*types.TxOutpu
// If the tx is invalid, an error will be returned.
// Unlike ExecBlock(), state will not be altered.
func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error {
func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool, evc events.Fireable) error {
// TODO: do something with fees
fees := uint64(0)
_s := blockCache.State() // hack to access validators.
_s := blockCache.State() // hack to access validators and event switch.
// Exec tx
switch tx := tx_.(type) {
@ -326,6 +327,17 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error {
for _, acc := range accounts {
blockCache.UpdateAccount(acc)
}
// if the evc is nil, nothing will happen
if evc != nil {
for _, i := range tx.Inputs {
evc.FireEvent(types.EventStringAccInput(i.Address), tx)
}
for _, o := range tx.Outputs {
evc.FireEvent(types.EventStringAccOutput(o.Address), tx)
}
}
return nil
case *types.CallTx:
@ -412,10 +424,14 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error {
txCache.UpdateAccount(caller) // because we adjusted by input above, and bumped nonce maybe.
txCache.UpdateAccount(callee) // because we adjusted by input above.
vmach := vm.NewVM(txCache, params, caller.Address)
vmach := vm.NewVM(txCache, params, caller.Address, account.HashSignBytes(tx))
vmach.SetFireable(_s.evc)
// NOTE: Call() transfers the value from caller to callee iff call succeeds.
ret, err := vmach.Call(caller, callee, code, tx.Data, value, &gas)
exception := ""
if err != nil {
exception = err.Error()
// Failure. Charge the gas fee. The 'value' was otherwise not transferred.
log.Debug(Fmt("Error on execution: %v", err))
inAcc.Balance -= tx.Fee
@ -432,6 +448,13 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error {
}
// Create a receipt from the ret and whether errored.
log.Info("VM call complete", "caller", caller, "callee", callee, "return", ret, "err", err)
// Fire Events for sender and receiver
// a separate event will be fired from vm for each additional call
if evc != nil {
evc.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception})
evc.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception})
}
} else {
// The mempool does not call txs until
// the proposer determines the order of txs.
@ -499,6 +522,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error {
if !added {
panic("Failed to add validator")
}
if evc != nil {
evc.FireEvent(types.EventStringBond(), tx)
}
return nil
case *types.UnbondTx:
@ -521,6 +547,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error {
// Good!
_s.unbondValidator(val)
if evc != nil {
evc.FireEvent(types.EventStringUnbond(), tx)
}
return nil
case *types.RebondTx:
@ -543,6 +572,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error {
// Good!
_s.rebondValidator(val)
if evc != nil {
evc.FireEvent(types.EventStringRebond(), tx)
}
return nil
case *types.DupeoutTx:
@ -586,6 +618,9 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool) error {
// Good! (Bad validator!)
_s.destroyValidator(accused)
if evc != nil {
evc.FireEvent(types.EventStringDupeout(), tx)
}
return nil
default:


+ 10
- 0
state/state.go View File

@ -8,6 +8,7 @@ import (
"github.com/tendermint/tendermint/account"
"github.com/tendermint/tendermint/binary"
dbm "github.com/tendermint/tendermint/db"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/merkle"
"github.com/tendermint/tendermint/types"
)
@ -34,6 +35,8 @@ type State struct {
UnbondingValidators *ValidatorSet
accounts merkle.Tree // Shouldn't be accessed directly.
validatorInfos merkle.Tree // Shouldn't be accessed directly.
evc events.Fireable // typically an events.EventCache
}
func LoadState(db dbm.DB) *State {
@ -98,6 +101,7 @@ func (s *State) Copy() *State {
UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily.
accounts: s.accounts.Copy(),
validatorInfos: s.validatorInfos.Copy(),
evc: nil,
}
}
@ -115,6 +119,7 @@ func (s *State) Hash() []byte {
// Mutates the block in place and updates it with new state hash.
func (s *State) SetBlockStateHash(block *types.Block) error {
sCopy := s.Copy()
// sCopy has no event cache in it, so this won't fire events
err := execBlock(sCopy, block, types.PartSetHeader{})
if err != nil {
return err
@ -264,6 +269,11 @@ func (s *State) LoadStorage(hash []byte) (storage merkle.Tree) {
// State.storage
//-------------------------------------
// Implements events.Eventable. Typically uses events.EventCache
func (s *State) SetFireable(evc events.Fireable) {
s.evc = evc
}
//-----------------------------------------------------------------------------
type InvalidTxError struct {


+ 1
- 1
state/state_test.go View File

@ -12,7 +12,7 @@ import (
func execTxWithState(state *State, tx types.Tx, runCall bool) error {
cache := NewBlockCache(state)
err := ExecTx(cache, tx, runCall)
err := ExecTx(cache, tx, runCall, nil)
if err != nil {
return err
} else {


+ 10
- 6
state/tx_cache.go View File

@ -30,9 +30,13 @@ func (cache *TxCache) GetAccount(addr Word256) *vm.Account {
acc, removed := vmUnpack(cache.accounts[addr])
if removed {
return nil
} else {
return acc
} else if acc == nil {
acc2 := cache.backend.GetAccount(addr.Postfix(20))
if acc2 != nil {
return toVMAccount(acc2)
}
}
return acc
}
func (cache *TxCache) UpdateAccount(acc *vm.Account) {
@ -64,7 +68,7 @@ func (cache *TxCache) CreateAccount(creator *vm.Account) *vm.Account {
nonce := creator.Nonce
creator.Nonce += 1
addr := RightPadWord256(NewContractAddress(creator.Address.Prefix(20), nonce))
addr := LeftPadWord256(NewContractAddress(creator.Address.Postfix(20), nonce))
// Create account from address.
account, removed := vmUnpack(cache.accounts[addr])
@ -124,7 +128,7 @@ func (cache *TxCache) Sync() {
for addr, accInfo := range cache.accounts {
acc, removed := vmUnpack(accInfo)
if removed {
cache.backend.RemoveAccount(addr.Prefix(20))
cache.backend.RemoveAccount(addr.Postfix(20))
} else {
cache.backend.UpdateAccount(toStateAccount(acc))
}
@ -150,7 +154,7 @@ func NewContractAddress(caller []byte, nonce uint64) []byte {
// Converts backend.Account to vm.Account struct.
func toVMAccount(acc *ac.Account) *vm.Account {
return &vm.Account{
Address: RightPadWord256(acc.Address),
Address: LeftPadWord256(acc.Address),
Balance: acc.Balance,
Code: acc.Code, // This is crazy.
Nonce: uint64(acc.Sequence),
@ -172,7 +176,7 @@ func toStateAccount(acc *vm.Account) *ac.Account {
storageRoot = acc.StorageRoot.Bytes()
}
return &ac.Account{
Address: acc.Address.Prefix(20),
Address: acc.Address.Postfix(20),
PubKey: pubKey,
Balance: acc.Balance,
Code: acc.Code,


+ 84
- 0
types/events.go View File

@ -0,0 +1,84 @@
package types
import (
"fmt"
)
// Functions to generate eventId strings
func EventStringAccInput(addr []byte) string {
return fmt.Sprintf("Acc/%x/Input", addr)
}
func EventStringAccOutput(addr []byte) string {
return fmt.Sprintf("Acc/%x/Output", addr)
}
func EventStringAccReceive(addr []byte) string {
return fmt.Sprintf("Acc/%x/Receive", addr)
}
func EventStringBond() string {
return "Bond"
}
func EventStringUnbond() string {
return "Unbond"
}
func EventStringRebond() string {
return "Rebond"
}
func EventStringDupeout() string {
return "Dupeout"
}
func EventStringNewBlock() string {
return "NewBlock"
}
func EventStringFork() string {
return "Fork"
}
// Most event messages are basic types (a block, a transaction)
// but some (an input to a call tx or a receive) are more exotic:
type EventMsgCallTx struct {
Tx Tx
Return []byte
Exception string
}
type CallData struct {
Caller []byte
Callee []byte
Data []byte
Value uint64
Gas uint64
}
type EventMsgCall struct {
CallData *CallData
Origin []byte
TxId []byte
Return []byte
Exception string
}
/*
Acc/XYZ/Input -> full tx or {full tx, return value, exception}
Acc/XYZ/Output -> full tx
Acc/XYZ/Receive -> full tx, return value, exception, (optionally?) calldata
Bond -> full tx
Unbond -> full tx
Rebond -> full tx
Dupeout -> full tx
NewBlock -> full block
Fork -> block A, block B
Log -> Fuck this
NewPeer -> peer
Alert -> alert msg
*/

+ 2
- 2
vm/test/vm_test.go View File

@ -36,7 +36,7 @@ func makeBytes(n int) []byte {
}
func TestVM(t *testing.T) {
ourVm := NewVM(newAppState(), newParams(), Zero256)
ourVm := NewVM(newAppState(), newParams(), Zero256, nil)
// Create accounts
account1 := &Account{
@ -72,7 +72,7 @@ func TestSubcurrency(t *testing.T) {
st.accounts[account1.Address.String()] = account1
st.accounts[account2.Address.String()] = account2
ourVm := NewVM(st, newParams(), Zero256)
ourVm := NewVM(st, newParams(), Zero256, nil)
var gas uint64 = 1000
code_parts := []string{"620f42403355",


+ 34
- 9
vm/vm.go View File

@ -6,6 +6,8 @@ import (
"math/big"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/vm/sha3"
)
@ -43,19 +45,28 @@ type VM struct {
appState AppState
params Params
origin Word256
txid []byte
callDepth int
evc events.Fireable
}
func NewVM(appState AppState, params Params, origin Word256) *VM {
func NewVM(appState AppState, params Params, origin Word256, txid []byte) *VM {
return &VM{
appState: appState,
params: params,
origin: origin,
callDepth: 0,
txid: txid,
}
}
// satisfies events.Eventable
func (vm *VM) SetFireable(evc events.Fireable) {
vm.evc = evc
}
// CONTRACT appState is aware of caller and callee, so we can just mutate them.
// value: To be transferred from caller to callee. Refunded upon error.
// gas: Available gas. No refunds for gas.
@ -72,12 +83,25 @@ func (vm *VM) Call(caller, callee *Account, code, input []byte, value uint64, ga
vm.callDepth += 1
output, err = vm.call(caller, callee, code, input, value, gas)
vm.callDepth -= 1
exception := ""
if err != nil {
exception = err.Error()
err := transfer(callee, caller, value)
if err != nil {
panic("Could not return value to caller")
}
}
// if callDepth is 0 the event is fired from ExecTx (along with the Input event)
// otherwise, we fire from here.
if vm.callDepth != 0 && vm.evc != nil {
vm.evc.FireEvent(types.EventStringAccReceive(callee.Address.Postfix(20)), types.EventMsgCall{
&types.CallData{caller.Address.Postfix(20), callee.Address.Postfix(20), input, value, *gas},
vm.origin.Postfix(20),
vm.txid,
output,
exception,
})
}
return
}
@ -283,7 +307,6 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga
x, y := stack.Pop64(), stack.Pop64()
stack.Push64(x & y)
dbg.Printf(" %v & %v = %v\n", x, y, x&y)
case OR: // 0x17
x, y := stack.Pop64(), stack.Pop64()
stack.Push64(x | y)
@ -330,7 +353,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga
if ok = useGas(gas, GasGetAccount); !ok {
return nil, firstErr(err, ErrInsufficientGas)
}
acc := vm.appState.GetAccount(addr) // TODO ensure that 20byte lengths are supported.
acc := vm.appState.GetAccount(flipWord(addr)) // TODO ensure that 20byte lengths are supported.
if acc == nil {
return nil, firstErr(err, ErrUnknownAddress)
}
@ -357,7 +380,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga
return nil, firstErr(err, ErrInputOutOfBounds)
}
stack.Push(RightPadWord256(data))
dbg.Printf(" => 0x%X\n", data)
dbg.Printf(" => 0x%X\n", RightPadWord256(data))
case CALLDATASIZE: // 0x36
stack.Push64(uint64(len(input)))
@ -407,7 +430,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga
if ok = useGas(gas, GasGetAccount); !ok {
return nil, firstErr(err, ErrInsufficientGas)
}
acc := vm.appState.GetAccount(addr)
acc := vm.appState.GetAccount(flipWord(addr))
if acc == nil {
return nil, firstErr(err, ErrUnknownAddress)
}
@ -421,7 +444,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga
if ok = useGas(gas, GasGetAccount); !ok {
return nil, firstErr(err, ErrInsufficientGas)
}
acc := vm.appState.GetAccount(addr)
acc := vm.appState.GetAccount(flipWord(addr))
if acc == nil {
return nil, firstErr(err, ErrUnknownAddress)
}
@ -628,7 +651,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga
if ok = useGas(gas, GasGetAccount); !ok {
return nil, firstErr(err, ErrInsufficientGas)
}
acc := vm.appState.GetAccount(addr)
acc := vm.appState.GetAccount(flipWord(addr))
if acc == nil {
return nil, firstErr(err, ErrUnknownAddress)
}
@ -671,7 +694,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga
return nil, firstErr(err, ErrInsufficientGas)
}
// TODO if the receiver is , then make it the fee.
receiver := vm.appState.GetAccount(addr)
receiver := vm.appState.GetAccount(flipWord(addr))
if receiver == nil {
return nil, firstErr(err, ErrUnknownAddress)
}
@ -697,10 +720,12 @@ func subslice(data []byte, offset, length uint64, flip_ bool) (ret []byte, ok bo
if size < offset {
return nil, false
} else if size < offset+length {
ret, ok = data[offset:], false
ret, ok = data[offset:], true
ret = RightPadBytes(ret, 32)
} else {
ret, ok = data[offset:offset+length], true
}
if flip_ {
ret = flip(ret)
}


Loading…
Cancel
Save