From 65f669160fff441a807929ff443f19e3ad454933 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 17 Apr 2015 13:18:50 -0700 Subject: [PATCH] more fixes from review --- events/event_cache.go | 2 + node/node.go | 4 ++ rpc/handlers.go | 48 +++++++++++-------- rpc/test/{helpers.go => helpers_test.go} | 0 rpc/test/{tests.go => tests_test.go} | 0 .../{ws_helpers.go => ws_helpers_test.go} | 0 state/state.go | 1 + state/state_test.go | 2 +- vm/vm.go | 7 +-- 9 files changed, 39 insertions(+), 25 deletions(-) rename rpc/test/{helpers.go => helpers_test.go} (100%) rename rpc/test/{tests.go => tests_test.go} (100%) rename rpc/test/{ws_helpers.go => ws_helpers_test.go} (100%) diff --git a/events/event_cache.go b/events/event_cache.go index e1af09fa4..3231fe0d4 100644 --- a/events/event_cache.go +++ b/events/event_cache.go @@ -32,8 +32,10 @@ func (evc *EventCache) FireEvent(event string, msg interface{}) { } // Fire events by running evsw.FireEvent on all cached events. Blocks. +// Clears cached events func (evc *EventCache) Flush() { for _, ei := range evc.events { evc.evsw.FireEvent(ei.event, ei.msg) } + evc.events = make([]eventInfo, eventsBufferSize) } diff --git a/node/node.go b/node/node.go index 311828349..2bcc66857 100644 --- a/node/node.go +++ b/node/node.go @@ -161,6 +161,10 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { return n.mempoolReactor } +func (n *Node) EventSwitch() *events.EventSwitch { + return n.evsw +} + //------------------------------------------------------------------------------ func RunNode() { diff --git a/rpc/handlers.go b/rpc/handlers.go index d3b969189..b7c235447 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -212,8 +212,9 @@ type WSResponse struct { } // a single websocket connection -// contains the listeners id -type Connection struct { +// contains listener id, underlying ws connection, +// and the event switch for subscribing to events +type WSConnection struct { id string wsConn *websocket.Conn writeChan chan WSResponse @@ -225,8 +226,8 @@ type Connection struct { } // new websocket connection wrapper -func NewConnection(wsConn *websocket.Conn) *Connection { - return &Connection{ +func NewWSConnection(wsConn *websocket.Conn) *WSConnection { + return &WSConnection{ id: wsConn.RemoteAddr().String(), wsConn: wsConn, writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full @@ -234,7 +235,7 @@ func NewConnection(wsConn *websocket.Conn) *Connection { } // start the connection and hand her the event switch -func (con *Connection) Start(evsw *events.EventSwitch) { +func (con *WSConnection) Start(evsw *events.EventSwitch) { if atomic.CompareAndSwapUint32(&con.started, 0, 1) { con.evsw = evsw @@ -246,15 +247,29 @@ func (con *Connection) Start(evsw *events.EventSwitch) { } // close the connection -func (con *Connection) Stop() { +func (con *WSConnection) Stop() { if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) { con.wsConn.Close() close(con.writeChan) } } +// attempt to write response to writeChan and record failures +func (con *WSConnection) safeWrite(resp WSResponse) { + select { + case con.writeChan <- resp: + // yay + con.failedSends = 0 + default: + // channel is full + // if this happens too many times in a row, + // close connection + con.failedSends += 1 + } +} + // read from the socket and subscribe to or unsubscribe from events -func (con *Connection) read() { +func (con *WSConnection) read() { reaper := time.Tick(time.Second * WSConnectionReaperSeconds) for { select { @@ -278,7 +293,7 @@ func (con *Connection) read() { err = json.Unmarshal(in, &req) if err != nil { errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - con.writeChan <- WSResponse{Error: errStr} + con.safeWrite(WSResponse{Error: errStr}) continue } switch req.Type { @@ -289,16 +304,7 @@ func (con *Connection) read() { Event: req.Event, Data: msg, } - select { - case con.writeChan <- resp: - // yay - con.failedSends = 0 - default: - // channel is full - // if this happens too many times, - // close connection - con.failedSends += 1 - } + con.safeWrite(resp) }) case "unsubscribe": if req.Event != "" { @@ -307,7 +313,7 @@ func (con *Connection) read() { con.evsw.RemoveListener(con.id) } default: - con.writeChan <- WSResponse{Error: "Unknown request type: " + req.Type} + con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type}) } } @@ -315,7 +321,7 @@ func (con *Connection) read() { } // receives on a write channel and writes out on the socket -func (con *Connection) write() { +func (con *WSConnection) write() { n, err := new(int64), new(error) for { msg, more := <-con.writeChan @@ -369,7 +375,7 @@ func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Requ } // register connection - con := NewConnection(wsConn) + con := NewWSConnection(wsConn) log.Info("New websocket connection", "origin", con.id) con.Start(wm.evsw) } diff --git a/rpc/test/helpers.go b/rpc/test/helpers_test.go similarity index 100% rename from rpc/test/helpers.go rename to rpc/test/helpers_test.go diff --git a/rpc/test/tests.go b/rpc/test/tests_test.go similarity index 100% rename from rpc/test/tests.go rename to rpc/test/tests_test.go diff --git a/rpc/test/ws_helpers.go b/rpc/test/ws_helpers_test.go similarity index 100% rename from rpc/test/ws_helpers.go rename to rpc/test/ws_helpers_test.go diff --git a/state/state.go b/state/state.go index 01270a241..182d2692b 100644 --- a/state/state.go +++ b/state/state.go @@ -101,6 +101,7 @@ func (s *State) Copy() *State { UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily. accounts: s.accounts.Copy(), validatorInfos: s.validatorInfos.Copy(), + evc: nil, } } diff --git a/state/state_test.go b/state/state_test.go index 58a673195..c2d766e31 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -12,7 +12,7 @@ import ( func execTxWithState(state *State, tx types.Tx, runCall bool) error { cache := NewBlockCache(state) - err := ExecTx(cache, tx, runCall, false) + err := ExecTx(cache, tx, runCall, nil) if err != nil { return err } else { diff --git a/vm/vm.go b/vm/vm.go index f2e869cb4..ba16d846a 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -307,7 +307,6 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga x, y := stack.Pop64(), stack.Pop64() stack.Push64(x & y) dbg.Printf(" %v & %v = %v\n", x, y, x&y) - case OR: // 0x17 x, y := stack.Pop64(), stack.Pop64() stack.Push64(x | y) @@ -381,7 +380,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga return nil, firstErr(err, ErrInputOutOfBounds) } stack.Push(RightPadWord256(data)) - dbg.Printf(" => 0x%X\n", data) + dbg.Printf(" => 0x%X\n", RightPadWord256(data)) case CALLDATASIZE: // 0x36 stack.Push64(uint64(len(input))) @@ -721,10 +720,12 @@ func subslice(data []byte, offset, length uint64, flip_ bool) (ret []byte, ok bo if size < offset { return nil, false } else if size < offset+length { - ret, ok = data[offset:], false + ret, ok = data[offset:], true + ret = RightPadBytes(ret, 32) } else { ret, ok = data[offset:offset+length], true } + if flip_ { ret = flip(ret) }