Browse Source

rpc/lib/client: jitter test updates and only to-be run on releases

* Updated code with feedback from @melekes, @ebuchman and @silasdavis.
* Added Makefile clause `release` to only run the test on seeing tag
`release` during releases i.e
```shell
make release
```
which will run the comprehensive and long integration-ish tests.
pull/775/head
Emmanuel Odeke 7 years ago
parent
commit
6e5cd10399
No known key found for this signature in database GPG Key ID: 1CA47A292F89DD40
4 changed files with 73 additions and 58 deletions
  1. +3
    -0
      Makefile
  2. +66
    -0
      rpc/lib/client/integration_test.go
  3. +4
    -3
      rpc/lib/client/ws_client.go
  4. +0
    -55
      rpc/lib/client/ws_client_test.go

+ 3
- 0
Makefile View File

@ -35,6 +35,9 @@ test_race:
test_integrations: test_integrations:
@bash ./test/test.sh @bash ./test/test.sh
release:
@go test -tags release $(PACKAGES)
test100: test100:
@for i in {1..100}; do make test; done @for i in {1..100}; do make test; done


+ 66
- 0
rpc/lib/client/integration_test.go View File

@ -0,0 +1,66 @@
// +build release
// The code in here is comprehensive as an integration
// test and is long, hence is only run before releases.
package rpcclient
import (
"bytes"
"errors"
"net"
"regexp"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tmlibs/log"
)
func TestWSClientReconnectWithJitter(t *testing.T) {
n := 8
maxReconnectAttempts := 3
// Max wait time is ceil(1+0.999) + ceil(2+0.999) + ceil(4+0.999) + ceil(...) = 2 + 3 + 5 = 10s + ...
maxSleepTime := time.Second * time.Duration(((1<<uint(maxReconnectAttempts))-1)+maxReconnectAttempts)
var errNotConnected = errors.New("not connected")
clientMap := make(map[int]*WSClient)
buf := new(bytes.Buffer)
logger := log.NewTMLogger(buf)
for i := 0; i < n; i++ {
c := NewWSClient("tcp://foo", "/websocket")
c.Dialer = func(string, string) (net.Conn, error) {
return nil, errNotConnected
}
c.SetLogger(logger)
c.maxReconnectAttempts = maxReconnectAttempts
// Not invoking defer c.Stop() because
// after all the reconnect attempts have been
// exhausted, c.Stop is implicitly invoked.
clientMap[i] = c
// Trigger the reconnect routine that performs exponential backoff.
go c.reconnect()
}
stopCount := 0
time.Sleep(maxSleepTime)
for key, c := range clientMap {
if !c.IsActive() {
delete(clientMap, key)
stopCount += 1
}
}
require.Equal(t, stopCount, n, "expecting all clients to have been stopped")
// Next we have to examine the logs to ensure that no single time was repeated
backoffDurRegexp := regexp.MustCompile(`backoff_duration=(.+)`)
matches := backoffDurRegexp.FindAll(buf.Bytes(), -1)
seenMap := make(map[string]int)
for i, match := range matches {
if origIndex, seen := seenMap[string(match)]; seen {
t.Errorf("Match #%d (%q) was seen originally at log entry #%d", i, match, origIndex)
} else {
seenMap[string(match)] = i
}
}
}

+ 4
- 3
rpc/lib/client/ws_client.go View File

@ -254,10 +254,11 @@ func (c *WSClient) reconnect() error {
c.mtx.Unlock() c.mtx.Unlock()
}() }()
_1sAsNs := float64(time.Second.Nanoseconds())
// 1s == (1e9 ns) == (1 Billion ns)
billionNs := float64(time.Second.Nanoseconds())
for { for {
jitter := time.Duration(rand.Float64() * _1sAsNs)
backoffDuration := jitter + ((1 << uint(attempt)) * time.Second)
jitterSeconds := time.Duration(rand.Float64() * billionNs)
backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second)
c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration) c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
time.Sleep(backoffDuration) time.Sleep(backoffDuration)


+ 0
- 55
rpc/lib/client/ws_client_test.go View File

@ -1,14 +1,11 @@
package rpcclient package rpcclient
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"net" "net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"regexp"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -194,55 +191,3 @@ func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) {
} }
} }
} }
func TestWSClientReconnectWithJitter(t *testing.T) {
if testing.Short() {
t.Skipf("This is a potentially long test")
}
n := 8
maxReconnectAttempts := 3
// Max wait time is ceil(1+0.999) + ceil(2+0.999) + ceil(4+0.999) + ceil(...) = 2 + 3 + 5 = 10s + ...
maxSleepTime := time.Second * time.Duration(((1<<uint(maxReconnectAttempts))-1)+maxReconnectAttempts)
var errNotConnected = errors.New("not connected")
clientMap := make(map[int]*WSClient)
buf := new(bytes.Buffer)
logger := log.NewTMLogger(buf)
for i := 0; i < n; i++ {
c := NewWSClient("tcp://foo", "/websocket")
c.Dialer = func(string, string) (net.Conn, error) {
return nil, errNotConnected
}
c.SetLogger(logger)
c.maxReconnectAttempts = maxReconnectAttempts
// Not invoking defer c.Stop() because
// after all the reconnect attempts have been
// exhausted, c.Stop is implicitly invoked.
clientMap[i] = c
// Trigger the reconnect routine that performs exponential backoff.
go c.reconnect()
}
stopCount := 0
time.Sleep(maxSleepTime)
for key, c := range clientMap {
if !c.IsActive() {
delete(clientMap, key)
stopCount += 1
}
}
require.Equal(t, stopCount, n, "expecting all clients to have been stopped")
// Next we have to examine the logs to ensure that no single time was repeated
backoffDurRegexp := regexp.MustCompile(`backoff_duration=(.+)`)
matches := backoffDurRegexp.FindAll(buf.Bytes(), -1)
seenMap := make(map[string]int)
for i, match := range matches {
if origIndex, seen := seenMap[string(match)]; seen {
t.Errorf("Match #%d (%q) was seen originally at log entry #%d", i, match, origIndex)
} else {
seenMap[string(match)] = i
}
}
}

Loading…
Cancel
Save