Browse Source

libs/bits: inline defer and change order of mutexes (#5187)

* libs/bits: inline defer and change order of mutexes

Closes #3217

* abci/client: unexpose StopForError func

a) it's not called outside
b) the reason for exposing it in the first place is unclear
c) Stop already exist if someone from outside wants to stop the client
pull/5290/head
Anton Kaliaev 4 years ago
committed by GitHub
parent
commit
4827ed121f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 89 deletions
  1. +77
    -60
      abci/client/socket_client.go
  2. +0
    -24
      abci/client/socket_client_test.go
  3. +3
    -5
      libs/bits/bit_array.go

+ 77
- 60
abci/client/socket_client.go View File

@ -17,15 +17,13 @@ import (
"github.com/tendermint/tendermint/libs/timer"
)
const reqQueueSize = 256 // TODO make configurable
// const maxResponseSize = 1048576 // 1MB TODO make configurable
const flushThrottleMS = 20 // Don't wait longer than...
var _ Client = (*socketClient)(nil)
const (
reqQueueSize = 256 // TODO make configurable
flushThrottleMS = 20 // Don't wait longer than...
)
// This is goroutine-safe, but users should beware that
// the application in general is not meant to be interfaced
// with concurrent callers.
// This is goroutine-safe, but users should beware that the application in
// general is not meant to be interfaced with concurrent callers.
type socketClient struct {
service.BaseService
@ -40,9 +38,13 @@ type socketClient struct {
err error
reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set.
}
var _ Client = (*socketClient)(nil)
// NewSocketClient creates a new socket client, which connects to a given
// address. If mustConnect is true, the client will return an error upon start
// if it fails to connect.
func NewSocketClient(addr string, mustConnect bool) Client {
cli := &socketClient{
reqQueue: make(chan *ReqRes, reqQueueSize),
@ -57,19 +59,24 @@ func NewSocketClient(addr string, mustConnect bool) Client {
return cli
}
// OnStart implements Service by connecting to the server and spawning reading
// and writing goroutines.
func (cli *socketClient) OnStart() error {
var err error
var conn net.Conn
RETRY_LOOP:
var (
err error
conn net.Conn
)
for {
conn, err = tmnet.Connect(cli.addr)
if err != nil {
if cli.mustConnect {
return err
}
cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr), "err", err)
cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying after %vs...",
cli.addr, dialRetryIntervalSeconds), "err", err)
time.Sleep(time.Second * dialRetryIntervalSeconds)
continue RETRY_LOOP
continue
}
cli.conn = conn
@ -80,39 +87,26 @@ RETRY_LOOP:
}
}
// OnStop implements Service by closing connection and flushing all queues.
func (cli *socketClient) OnStop() {
if cli.conn != nil {
cli.conn.Close()
}
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.flushQueue()
cli.flushTimer.Stop()
}
// Stop the client and set the error
func (cli *socketClient) StopForError(err error) {
if !cli.IsRunning() {
return
}
cli.mtx.Lock()
if cli.err == nil {
cli.err = err
}
cli.mtx.Unlock()
cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error()))
cli.Stop()
}
// Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}
// Set listener for all responses
// SetResponseCallback sets a callback, which will be executed for each
// non-error & non-empty response from the server.
//
// NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
@ -123,57 +117,60 @@ func (cli *socketClient) SetResponseCallback(resCb Callback) {
//----------------------------------------
func (cli *socketClient) sendRequestsRoutine(conn io.Writer) {
w := bufio.NewWriter(conn)
for {
select {
case <-cli.flushTimer.Ch:
select {
case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
default:
// Probably will fill the buffer, or retry later.
}
case <-cli.Quit():
return
case reqres := <-cli.reqQueue:
// cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
cli.willSendReq(reqres)
err := types.WriteMessage(reqres.Request, w)
if err != nil {
cli.StopForError(fmt.Errorf("error writing msg: %v", err))
cli.stopForError(fmt.Errorf("write to buffer: %w", err))
return
}
// cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
// If it's a flush request, flush the current buffer.
if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
err = w.Flush()
if err != nil {
cli.StopForError(fmt.Errorf("error flushing writer: %v", err))
cli.stopForError(fmt.Errorf("flush buffer: %w", err))
return
}
}
case <-cli.flushTimer.Ch: // flush queue
select {
case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
default:
// Probably will fill the buffer, or retry later.
}
case <-cli.Quit():
return
}
}
}
func (cli *socketClient) recvResponseRoutine(conn io.Reader) {
r := bufio.NewReader(conn) // Buffer reads
r := bufio.NewReader(conn)
for {
var res = &types.Response{}
err := types.ReadMessage(r, res)
if err != nil {
cli.StopForError(err)
cli.stopForError(fmt.Errorf("read message: %w", err))
return
}
// cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
switch r := res.Value.(type) {
case *types.Response_Exception:
case *types.Response_Exception: // app responded with error
// XXX After setting cli.err, release waiters (e.g. reqres.Done())
cli.StopForError(errors.New(r.Exception.Error))
cli.stopForError(errors.New(r.Exception.Error))
return
default:
// cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
err := cli.didRecvResponse(res)
if err != nil {
cli.StopForError(err)
cli.stopForError(err)
return
}
}
@ -190,20 +187,21 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
// Get the first ReqRes
// Get the first ReqRes.
next := cli.reqSent.Front()
if next == nil {
return fmt.Errorf("unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
return fmt.Errorf("unexpected %v when nothing expected", reflect.TypeOf(res.Value))
}
reqres := next.Value.(*ReqRes)
if !resMatchesReq(reqres.Request, res) {
return fmt.Errorf("unexpected result type %v when response to %v expected",
return fmt.Errorf("unexpected %v when response to %v expected",
reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
}
reqres.Response = res // Set response
reqres.Done() // Release waiters
cli.reqSent.Remove(next) // Pop first item from linked list
reqres.Response = res
reqres.Done() // release waiters
cli.reqSent.Remove(next) // pop first item from linked list
// Notify client listener if set (global callback).
if cli.resCb != nil {
@ -211,8 +209,9 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
}
// Notify reqRes listener if set (request specific callback).
// NOTE: it is possible this callback isn't set on the reqres object.
// at this point, in which case it will be called after, when it is set.
//
// NOTE: It is possible this callback isn't set on the reqres object. At this
// point, in which case it will be called after, when it is set.
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}
@ -438,6 +437,9 @@ func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
}
func (cli *socketClient) flushQueue() {
cli.mtx.Lock()
defer cli.mtx.Unlock()
// mark all in-flight messages as resolved (they will get cli.Error())
for req := cli.reqSent.Front(); req != nil; req = req.Next() {
reqres := req.Value.(*ReqRes)
@ -485,3 +487,18 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
}
return ok
}
func (cli *socketClient) stopForError(err error) {
if !cli.IsRunning() {
return
}
cli.mtx.Lock()
if cli.err == nil {
cli.err = err
}
cli.mtx.Unlock()
cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error()))
cli.Stop()
}

+ 0
- 24
abci/client/socket_client_test.go View File

@ -1,7 +1,6 @@
package abcicli_test
import (
"errors"
"fmt"
"testing"
"time"
@ -16,29 +15,6 @@ import (
"github.com/tendermint/tendermint/libs/service"
)
type errorStopper interface {
StopForError(error)
}
func TestSocketClientStopForErrorDeadlock(t *testing.T) {
c := abcicli.NewSocketClient(":80", false).(errorStopper)
err := errors.New("foo-tendermint")
// See Issue https://github.com/tendermint/abci/issues/114
doneChan := make(chan bool)
go func() {
defer close(doneChan)
c.StopForError(err)
c.StopForError(err)
}()
select {
case <-doneChan:
case <-time.After(time.Second * 4):
t.Fatalf("Test took too long, potential deadlock still exists")
}
}
func TestProperSyncCalls(t *testing.T) {
app := slowApp{}


+ 3
- 5
libs/bits/bit_array.go View File

@ -355,14 +355,12 @@ func (bA *BitArray) Update(o *BitArray) {
if bA == nil || o == nil {
return
}
bA.mtx.Lock()
o.mtx.Lock()
defer func() {
bA.mtx.Unlock()
o.mtx.Unlock()
}()
copy(bA.Elems, o.Elems)
o.mtx.Unlock()
bA.mtx.Unlock()
}
// MarshalJSON implements json.Marshaler interface by marshaling bit array


Loading…
Cancel
Save