abci: localClient improvements & bugfixes & pubsub Unsubscribe issues (#2748)
* use READ lock/unlock in ConsensusState#GetLastHeight
Refs #2721
* do not use defers when there's no need
* fix peer formatting (output its address instead of the pointer)
```
[54310]: E[11-02|11:59:39.851] Connection failed @ sendRoutine module=p2p peer=0xb78f00 conn=MConn{74.207.236.148:26656} err="pong timeout"
```
https://github.com/tendermint/tendermint/issues/2721#issuecomment-435326581
* panic if peer has no state
https://github.com/tendermint/tendermint/issues/2721#issuecomment-435347165
It's confusing that sometimes we check if peer has a state, but most of
the times we expect it to be there
1. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/mempool/reactor.go#L138
2. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/rpc/core/consensus.go#L196 (edited)
I will change everything to always assume peer has a state and panic
otherwise
that should help identify issues earlier
* abci/localclient: extend lock on app callback
App callback should be protected by lock as well (note this was already
done for InitChainAsync, why not for others???). Otherwise, when we
execute the block, tx might come in and call the callback in the same
time we're updating it in execBlockOnProxyApp => DATA RACE
Fixes #2721
Consensus state is locked
```
goroutine 113333 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00180009c, 0xc0000c7e00)
/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*RWMutex).RLock(0xc001800090)
/usr/local/go/src/sync/rwmutex.go:50 +0x4e
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).GetRoundState(0xc001800000, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:218 +0x46
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).queryMaj23Routine(0xc0017def80, 0x11104a0, 0xc0072488f0, 0xc007248
9c0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:735 +0x16d
created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).AddPeer
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:172 +0x236
```
because localClient is locked
```
goroutine 1899 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00003363c, 0xc0000cb500)
/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000033638)
/usr/local/go/src/sync/mutex.go:134 +0xff
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).SetResponseCallback(0xc0001fb560, 0xc007868540)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:32 +0x33
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnConsensus).SetResponseCallback(0xc00002f750, 0xc007868540)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:57 +0x40
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.execBlockOnProxyApp(0x1104e20, 0xc002ca0ba0, 0x11092a0, 0xc00002f750, 0xc0001fe960, 0xc000bfc660, 0x110cfe0, 0xc000090330, 0xc9d12, 0xc000d9d5a0, ...)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:230 +0x1fd
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.(*BlockExecutor).ApplyBlock(0xc002c2a230, 0x7, 0x0, 0xc000eae880, 0x6, 0xc002e52c60, 0x16, 0x1f927, 0xc9d12, 0xc000d9d5a0, ...)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:96 +0x142
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).finalizeCommit(0xc001800000, 0x1f928)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1339 +0xa3e
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryFinalizeCommit(0xc001800000, 0x1f928)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1270 +0x451
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit.func1(0xc001800000, 0x0, 0x1f928)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1218 +0x90
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit(0xc001800000, 0x1f928, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1247 +0x6b8
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xc003bc7ad0, 0xc003bc7b10)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1659 +0xbad
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xf1, 0xf1)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1517 +0x59
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg(0xc001800000, 0xd98200, 0xc0070dbed0, 0xc000cf4cc0, 0x28)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:660 +0x64b
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine(0xc001800000, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:617 +0x670
created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).OnStart
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:311 +0x132
```
tx comes in and CheckTx is executed right when we execute the block
```
goroutine 111044 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00003363c, 0x0)
/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000033638)
/usr/local/go/src/sync/mutex.go:134 +0xff
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).CheckTxAsync(0xc0001fb0e0, 0xc002d94500, 0x13f, 0x280, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:85 +0x47
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnMempool).CheckTxAsync(0xc00002f720, 0xc002d94500, 0x13f, 0x280, 0x1)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:114 +0x51
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool.(*Mempool).CheckTx(0xc002d3a320, 0xc002d94500, 0x13f, 0x280, 0xc0072355f0, 0x0, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool/mempool.go:316 +0x17b
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core.BroadcastTxSync(0xc002d94500, 0x13f, 0x280, 0x0, 0x0, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core/mempool.go:93 +0xb8
reflect.Value.call(0xd85560, 0x10326c0, 0x13, 0xec7b8b, 0x4, 0xc00663f180, 0x1, 0x1, 0xc00663f180, 0xc00663f188, ...)
/usr/local/go/src/reflect/value.go:447 +0x449
reflect.Value.Call(0xd85560, 0x10326c0, 0x13, 0xc00663f180, 0x1, 0x1, 0x0, 0x0, 0xc005cc9344)
/usr/local/go/src/reflect/value.go:308 +0xa4
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.makeHTTPHandler.func2(0x1102060, 0xc00663f100, 0xc0082d7900)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/handlers.go:269 +0x188
net/http.HandlerFunc.ServeHTTP(0xc002c81f20, 0x1102060, 0xc00663f100, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:1964 +0x44
net/http.(*ServeMux).ServeHTTP(0xc002c81b60, 0x1102060, 0xc00663f100, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:2361 +0x127
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.maxBytesHandler.ServeHTTP(0x10f8a40, 0xc002c81b60, 0xf4240, 0x1102060, 0xc00663f100, 0xc0082d7900)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:219 +0xcf
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.RecoverAndLogHandler.func1(0x1103220, 0xc00121e620, 0xc0082d7900)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:192 +0x394
net/http.HandlerFunc.ServeHTTP(0xc002c06ea0, 0x1103220, 0xc00121e620, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:1964 +0x44
net/http.serverHandler.ServeHTTP(0xc001a1aa90, 0x1103220, 0xc00121e620, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:2741 +0xab
net/http.(*conn).serve(0xc00785a3c0, 0x11041a0, 0xc000f844c0)
/usr/local/go/src/net/http/server.go:1847 +0x646
created by net/http.(*Server).Serve
/usr/local/go/src/net/http/server.go:2851 +0x2f5
```
* consensus: use read lock in Receive#VoteMessage
* use defer to unlock mutex because application might panic
* use defer in every method of the localClient
* add a changelog entry
* drain channels before Unsubscribe(All)
Read https://github.com/tendermint/tendermint/blob/55362ed76630f3e1ebec159a598f6a9fb5892cb1/libs/pubsub/pubsub.go#L13
for the detailed explanation of the issue.
We'll need to fix it someday. Make sure to keep an eye on
https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md
* retry instead of panic when peer has no state in reactors other than consensus
in /dump_consensus_state RPC endpoint, skip a peer with no state
* rpc/core/mempool: simplify error messages
* rpc/core/mempool: use time.After instead of timer
also, do not log DeliverTx result (to be consistent with other memthods)
* unlock before calling the callback in reqRes#SetCallback
6 years ago abci: localClient improvements & bugfixes & pubsub Unsubscribe issues (#2748)
* use READ lock/unlock in ConsensusState#GetLastHeight
Refs #2721
* do not use defers when there's no need
* fix peer formatting (output its address instead of the pointer)
```
[54310]: E[11-02|11:59:39.851] Connection failed @ sendRoutine module=p2p peer=0xb78f00 conn=MConn{74.207.236.148:26656} err="pong timeout"
```
https://github.com/tendermint/tendermint/issues/2721#issuecomment-435326581
* panic if peer has no state
https://github.com/tendermint/tendermint/issues/2721#issuecomment-435347165
It's confusing that sometimes we check if peer has a state, but most of
the times we expect it to be there
1. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/mempool/reactor.go#L138
2. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/rpc/core/consensus.go#L196 (edited)
I will change everything to always assume peer has a state and panic
otherwise
that should help identify issues earlier
* abci/localclient: extend lock on app callback
App callback should be protected by lock as well (note this was already
done for InitChainAsync, why not for others???). Otherwise, when we
execute the block, tx might come in and call the callback in the same
time we're updating it in execBlockOnProxyApp => DATA RACE
Fixes #2721
Consensus state is locked
```
goroutine 113333 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00180009c, 0xc0000c7e00)
/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*RWMutex).RLock(0xc001800090)
/usr/local/go/src/sync/rwmutex.go:50 +0x4e
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).GetRoundState(0xc001800000, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:218 +0x46
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).queryMaj23Routine(0xc0017def80, 0x11104a0, 0xc0072488f0, 0xc007248
9c0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:735 +0x16d
created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).AddPeer
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:172 +0x236
```
because localClient is locked
```
goroutine 1899 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00003363c, 0xc0000cb500)
/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000033638)
/usr/local/go/src/sync/mutex.go:134 +0xff
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).SetResponseCallback(0xc0001fb560, 0xc007868540)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:32 +0x33
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnConsensus).SetResponseCallback(0xc00002f750, 0xc007868540)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:57 +0x40
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.execBlockOnProxyApp(0x1104e20, 0xc002ca0ba0, 0x11092a0, 0xc00002f750, 0xc0001fe960, 0xc000bfc660, 0x110cfe0, 0xc000090330, 0xc9d12, 0xc000d9d5a0, ...)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:230 +0x1fd
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.(*BlockExecutor).ApplyBlock(0xc002c2a230, 0x7, 0x0, 0xc000eae880, 0x6, 0xc002e52c60, 0x16, 0x1f927, 0xc9d12, 0xc000d9d5a0, ...)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:96 +0x142
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).finalizeCommit(0xc001800000, 0x1f928)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1339 +0xa3e
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryFinalizeCommit(0xc001800000, 0x1f928)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1270 +0x451
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit.func1(0xc001800000, 0x0, 0x1f928)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1218 +0x90
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit(0xc001800000, 0x1f928, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1247 +0x6b8
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xc003bc7ad0, 0xc003bc7b10)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1659 +0xbad
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xf1, 0xf1)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1517 +0x59
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg(0xc001800000, 0xd98200, 0xc0070dbed0, 0xc000cf4cc0, 0x28)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:660 +0x64b
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine(0xc001800000, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:617 +0x670
created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).OnStart
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:311 +0x132
```
tx comes in and CheckTx is executed right when we execute the block
```
goroutine 111044 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00003363c, 0x0)
/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000033638)
/usr/local/go/src/sync/mutex.go:134 +0xff
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).CheckTxAsync(0xc0001fb0e0, 0xc002d94500, 0x13f, 0x280, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:85 +0x47
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnMempool).CheckTxAsync(0xc00002f720, 0xc002d94500, 0x13f, 0x280, 0x1)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:114 +0x51
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool.(*Mempool).CheckTx(0xc002d3a320, 0xc002d94500, 0x13f, 0x280, 0xc0072355f0, 0x0, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool/mempool.go:316 +0x17b
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core.BroadcastTxSync(0xc002d94500, 0x13f, 0x280, 0x0, 0x0, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core/mempool.go:93 +0xb8
reflect.Value.call(0xd85560, 0x10326c0, 0x13, 0xec7b8b, 0x4, 0xc00663f180, 0x1, 0x1, 0xc00663f180, 0xc00663f188, ...)
/usr/local/go/src/reflect/value.go:447 +0x449
reflect.Value.Call(0xd85560, 0x10326c0, 0x13, 0xc00663f180, 0x1, 0x1, 0x0, 0x0, 0xc005cc9344)
/usr/local/go/src/reflect/value.go:308 +0xa4
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.makeHTTPHandler.func2(0x1102060, 0xc00663f100, 0xc0082d7900)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/handlers.go:269 +0x188
net/http.HandlerFunc.ServeHTTP(0xc002c81f20, 0x1102060, 0xc00663f100, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:1964 +0x44
net/http.(*ServeMux).ServeHTTP(0xc002c81b60, 0x1102060, 0xc00663f100, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:2361 +0x127
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.maxBytesHandler.ServeHTTP(0x10f8a40, 0xc002c81b60, 0xf4240, 0x1102060, 0xc00663f100, 0xc0082d7900)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:219 +0xcf
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.RecoverAndLogHandler.func1(0x1103220, 0xc00121e620, 0xc0082d7900)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:192 +0x394
net/http.HandlerFunc.ServeHTTP(0xc002c06ea0, 0x1103220, 0xc00121e620, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:1964 +0x44
net/http.serverHandler.ServeHTTP(0xc001a1aa90, 0x1103220, 0xc00121e620, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:2741 +0xab
net/http.(*conn).serve(0xc00785a3c0, 0x11041a0, 0xc000f844c0)
/usr/local/go/src/net/http/server.go:1847 +0x646
created by net/http.(*Server).Serve
/usr/local/go/src/net/http/server.go:2851 +0x2f5
```
* consensus: use read lock in Receive#VoteMessage
* use defer to unlock mutex because application might panic
* use defer in every method of the localClient
* add a changelog entry
* drain channels before Unsubscribe(All)
Read https://github.com/tendermint/tendermint/blob/55362ed76630f3e1ebec159a598f6a9fb5892cb1/libs/pubsub/pubsub.go#L13
for the detailed explanation of the issue.
We'll need to fix it someday. Make sure to keep an eye on
https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md
* retry instead of panic when peer has no state in reactors other than consensus
in /dump_consensus_state RPC endpoint, skip a peer with no state
* rpc/core/mempool: simplify error messages
* rpc/core/mempool: use time.After instead of timer
also, do not log DeliverTx result (to be consistent with other memthods)
* unlock before calling the callback in reqRes#SetCallback
6 years ago abci: localClient improvements & bugfixes & pubsub Unsubscribe issues (#2748)
* use READ lock/unlock in ConsensusState#GetLastHeight
Refs #2721
* do not use defers when there's no need
* fix peer formatting (output its address instead of the pointer)
```
[54310]: E[11-02|11:59:39.851] Connection failed @ sendRoutine module=p2p peer=0xb78f00 conn=MConn{74.207.236.148:26656} err="pong timeout"
```
https://github.com/tendermint/tendermint/issues/2721#issuecomment-435326581
* panic if peer has no state
https://github.com/tendermint/tendermint/issues/2721#issuecomment-435347165
It's confusing that sometimes we check if peer has a state, but most of
the times we expect it to be there
1. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/mempool/reactor.go#L138
2. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/rpc/core/consensus.go#L196 (edited)
I will change everything to always assume peer has a state and panic
otherwise
that should help identify issues earlier
* abci/localclient: extend lock on app callback
App callback should be protected by lock as well (note this was already
done for InitChainAsync, why not for others???). Otherwise, when we
execute the block, tx might come in and call the callback in the same
time we're updating it in execBlockOnProxyApp => DATA RACE
Fixes #2721
Consensus state is locked
```
goroutine 113333 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00180009c, 0xc0000c7e00)
/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*RWMutex).RLock(0xc001800090)
/usr/local/go/src/sync/rwmutex.go:50 +0x4e
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).GetRoundState(0xc001800000, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:218 +0x46
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).queryMaj23Routine(0xc0017def80, 0x11104a0, 0xc0072488f0, 0xc007248
9c0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:735 +0x16d
created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).AddPeer
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:172 +0x236
```
because localClient is locked
```
goroutine 1899 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00003363c, 0xc0000cb500)
/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000033638)
/usr/local/go/src/sync/mutex.go:134 +0xff
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).SetResponseCallback(0xc0001fb560, 0xc007868540)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:32 +0x33
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnConsensus).SetResponseCallback(0xc00002f750, 0xc007868540)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:57 +0x40
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.execBlockOnProxyApp(0x1104e20, 0xc002ca0ba0, 0x11092a0, 0xc00002f750, 0xc0001fe960, 0xc000bfc660, 0x110cfe0, 0xc000090330, 0xc9d12, 0xc000d9d5a0, ...)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:230 +0x1fd
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.(*BlockExecutor).ApplyBlock(0xc002c2a230, 0x7, 0x0, 0xc000eae880, 0x6, 0xc002e52c60, 0x16, 0x1f927, 0xc9d12, 0xc000d9d5a0, ...)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:96 +0x142
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).finalizeCommit(0xc001800000, 0x1f928)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1339 +0xa3e
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryFinalizeCommit(0xc001800000, 0x1f928)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1270 +0x451
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit.func1(0xc001800000, 0x0, 0x1f928)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1218 +0x90
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit(0xc001800000, 0x1f928, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1247 +0x6b8
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xc003bc7ad0, 0xc003bc7b10)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1659 +0xbad
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xf1, 0xf1)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1517 +0x59
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg(0xc001800000, 0xd98200, 0xc0070dbed0, 0xc000cf4cc0, 0x28)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:660 +0x64b
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine(0xc001800000, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:617 +0x670
created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).OnStart
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:311 +0x132
```
tx comes in and CheckTx is executed right when we execute the block
```
goroutine 111044 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00003363c, 0x0)
/usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000033638)
/usr/local/go/src/sync/mutex.go:134 +0xff
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).CheckTxAsync(0xc0001fb0e0, 0xc002d94500, 0x13f, 0x280, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:85 +0x47
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnMempool).CheckTxAsync(0xc00002f720, 0xc002d94500, 0x13f, 0x280, 0x1)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:114 +0x51
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool.(*Mempool).CheckTx(0xc002d3a320, 0xc002d94500, 0x13f, 0x280, 0xc0072355f0, 0x0, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool/mempool.go:316 +0x17b
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core.BroadcastTxSync(0xc002d94500, 0x13f, 0x280, 0x0, 0x0, 0x0)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core/mempool.go:93 +0xb8
reflect.Value.call(0xd85560, 0x10326c0, 0x13, 0xec7b8b, 0x4, 0xc00663f180, 0x1, 0x1, 0xc00663f180, 0xc00663f188, ...)
/usr/local/go/src/reflect/value.go:447 +0x449
reflect.Value.Call(0xd85560, 0x10326c0, 0x13, 0xc00663f180, 0x1, 0x1, 0x0, 0x0, 0xc005cc9344)
/usr/local/go/src/reflect/value.go:308 +0xa4
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.makeHTTPHandler.func2(0x1102060, 0xc00663f100, 0xc0082d7900)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/handlers.go:269 +0x188
net/http.HandlerFunc.ServeHTTP(0xc002c81f20, 0x1102060, 0xc00663f100, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:1964 +0x44
net/http.(*ServeMux).ServeHTTP(0xc002c81b60, 0x1102060, 0xc00663f100, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:2361 +0x127
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.maxBytesHandler.ServeHTTP(0x10f8a40, 0xc002c81b60, 0xf4240, 0x1102060, 0xc00663f100, 0xc0082d7900)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:219 +0xcf
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.RecoverAndLogHandler.func1(0x1103220, 0xc00121e620, 0xc0082d7900)
/root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:192 +0x394
net/http.HandlerFunc.ServeHTTP(0xc002c06ea0, 0x1103220, 0xc00121e620, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:1964 +0x44
net/http.serverHandler.ServeHTTP(0xc001a1aa90, 0x1103220, 0xc00121e620, 0xc0082d7900)
/usr/local/go/src/net/http/server.go:2741 +0xab
net/http.(*conn).serve(0xc00785a3c0, 0x11041a0, 0xc000f844c0)
/usr/local/go/src/net/http/server.go:1847 +0x646
created by net/http.(*Server).Serve
/usr/local/go/src/net/http/server.go:2851 +0x2f5
```
* consensus: use read lock in Receive#VoteMessage
* use defer to unlock mutex because application might panic
* use defer in every method of the localClient
* add a changelog entry
* drain channels before Unsubscribe(All)
Read https://github.com/tendermint/tendermint/blob/55362ed76630f3e1ebec159a598f6a9fb5892cb1/libs/pubsub/pubsub.go#L13
for the detailed explanation of the issue.
We'll need to fix it someday. Make sure to keep an eye on
https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md
* retry instead of panic when peer has no state in reactors other than consensus
in /dump_consensus_state RPC endpoint, skip a peer with no state
* rpc/core/mempool: simplify error messages
* rpc/core/mempool: use time.After instead of timer
also, do not log DeliverTx result (to be consistent with other memthods)
* unlock before calling the callback in reqRes#SetCallback
6 years ago |
|
- // Modified for Tendermint
- // Originally Copyright (c) 2013-2014 Conformal Systems LLC.
- // https://github.com/conformal/btcd/blob/master/LICENSE
-
- package pex
-
- import (
- "crypto/sha256"
- "encoding/binary"
- "fmt"
- "math"
- "net"
- "sync"
- "time"
-
- "github.com/tendermint/tendermint/crypto"
- cmn "github.com/tendermint/tendermint/libs/common"
- "github.com/tendermint/tendermint/p2p"
- )
-
- const (
- bucketTypeNew = 0x01
- bucketTypeOld = 0x02
- )
-
- // AddrBook is an address book used for tracking peers
- // so we can gossip about them to others and select
- // peers to dial.
- // TODO: break this up?
- type AddrBook interface {
- cmn.Service
-
- // Add our own addresses so we don't later add ourselves
- AddOurAddress(*p2p.NetAddress)
- // Check if it is our address
- OurAddress(*p2p.NetAddress) bool
-
- AddPrivateIDs([]string)
-
- // Add and remove an address
- AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error
- RemoveAddress(*p2p.NetAddress)
-
- // Check if the address is in the book
- HasAddress(*p2p.NetAddress) bool
-
- // Do we need more peers?
- NeedMoreAddrs() bool
- // Is Address Book Empty? Answer should not depend on being in your own
- // address book, or private peers
- Empty() bool
-
- // Pick an address to dial
- PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress
-
- // Mark address
- MarkGood(*p2p.NetAddress)
- MarkAttempt(*p2p.NetAddress)
- MarkBad(*p2p.NetAddress)
-
- IsGood(*p2p.NetAddress) bool
-
- // Send a selection of addresses to peers
- GetSelection() []*p2p.NetAddress
- // Send a selection of addresses with bias
- GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddress
-
- // TODO: remove
- ListOfKnownAddresses() []*knownAddress
-
- // Persist to disk
- Save()
- }
-
- var _ AddrBook = (*addrBook)(nil)
-
- // addrBook - concurrency safe peer address manager.
- // Implements AddrBook.
- type addrBook struct {
- cmn.BaseService
-
- // immutable after creation
- filePath string
- routabilityStrict bool
- key string // random prefix for bucket placement
-
- // accessed concurrently
- mtx sync.Mutex
- rand *cmn.Rand
- ourAddrs map[string]struct{}
- privateIDs map[p2p.ID]struct{}
- addrLookup map[p2p.ID]*knownAddress // new & old
- bucketsOld []map[string]*knownAddress
- bucketsNew []map[string]*knownAddress
- nOld int
- nNew int
-
- wg sync.WaitGroup
- }
-
- // NewAddrBook creates a new address book.
- // Use Start to begin processing asynchronous address updates.
- func NewAddrBook(filePath string, routabilityStrict bool) *addrBook {
- am := &addrBook{
- rand: cmn.NewRand(),
- ourAddrs: make(map[string]struct{}),
- privateIDs: make(map[p2p.ID]struct{}),
- addrLookup: make(map[p2p.ID]*knownAddress),
- filePath: filePath,
- routabilityStrict: routabilityStrict,
- }
- am.init()
- am.BaseService = *cmn.NewBaseService(nil, "AddrBook", am)
- return am
- }
-
- // Initialize the buckets.
- // When modifying this, don't forget to update loadFromFile()
- func (a *addrBook) init() {
- a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits
- // New addr buckets
- a.bucketsNew = make([]map[string]*knownAddress, newBucketCount)
- for i := range a.bucketsNew {
- a.bucketsNew[i] = make(map[string]*knownAddress)
- }
- // Old addr buckets
- a.bucketsOld = make([]map[string]*knownAddress, oldBucketCount)
- for i := range a.bucketsOld {
- a.bucketsOld[i] = make(map[string]*knownAddress)
- }
- }
-
- // OnStart implements Service.
- func (a *addrBook) OnStart() error {
- if err := a.BaseService.OnStart(); err != nil {
- return err
- }
- a.loadFromFile(a.filePath)
-
- // wg.Add to ensure that any invocation of .Wait()
- // later on will wait for saveRoutine to terminate.
- a.wg.Add(1)
- go a.saveRoutine()
-
- return nil
- }
-
- // OnStop implements Service.
- func (a *addrBook) OnStop() {
- a.BaseService.OnStop()
- }
-
- func (a *addrBook) Wait() {
- a.wg.Wait()
- }
-
- func (a *addrBook) FilePath() string {
- return a.filePath
- }
-
- //-------------------------------------------------------
-
- // AddOurAddress one of our addresses.
- func (a *addrBook) AddOurAddress(addr *p2p.NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- a.Logger.Info("Add our address to book", "addr", addr)
- a.ourAddrs[addr.String()] = struct{}{}
- }
-
- // OurAddress returns true if it is our address.
- func (a *addrBook) OurAddress(addr *p2p.NetAddress) bool {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- _, ok := a.ourAddrs[addr.String()]
- return ok
- }
-
- func (a *addrBook) AddPrivateIDs(IDs []string) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- for _, id := range IDs {
- a.privateIDs[p2p.ID(id)] = struct{}{}
- }
- }
-
- // AddAddress implements AddrBook
- // Add address to a "new" bucket. If it's already in one, only add it probabilistically.
- // Returns error if the addr is non-routable. Does not add self.
- // NOTE: addr must not be nil
- func (a *addrBook) AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- return a.addAddress(addr, src)
- }
-
- // RemoveAddress implements AddrBook - removes the address from the book.
- func (a *addrBook) RemoveAddress(addr *p2p.NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- ka := a.addrLookup[addr.ID]
- if ka == nil {
- return
- }
- a.Logger.Info("Remove address from book", "addr", addr)
- a.removeFromAllBuckets(ka)
- }
-
- // IsGood returns true if peer was ever marked as good and haven't
- // done anything wrong since then.
- func (a *addrBook) IsGood(addr *p2p.NetAddress) bool {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- return a.addrLookup[addr.ID].isOld()
- }
-
- // HasAddress returns true if the address is in the book.
- func (a *addrBook) HasAddress(addr *p2p.NetAddress) bool {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- ka := a.addrLookup[addr.ID]
- return ka != nil
- }
-
- // NeedMoreAddrs implements AddrBook - returns true if there are not have enough addresses in the book.
- func (a *addrBook) NeedMoreAddrs() bool {
- return a.Size() < needAddressThreshold
- }
-
- // Empty implements AddrBook - returns true if there are no addresses in the address book.
- // Does not count the peer appearing in its own address book, or private peers.
- func (a *addrBook) Empty() bool {
- return a.Size() == 0
- }
-
- // PickAddress implements AddrBook. It picks an address to connect to.
- // The address is picked randomly from an old or new bucket according
- // to the biasTowardsNewAddrs argument, which must be between [0, 100] (or else is truncated to that range)
- // and determines how biased we are to pick an address from a new bucket.
- // PickAddress returns nil if the AddrBook is empty or if we try to pick
- // from an empty bucket.
- func (a *addrBook) PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- bookSize := a.size()
- if bookSize <= 0 {
- if bookSize < 0 {
- a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld)
- }
- return nil
- }
- if biasTowardsNewAddrs > 100 {
- biasTowardsNewAddrs = 100
- }
- if biasTowardsNewAddrs < 0 {
- biasTowardsNewAddrs = 0
- }
-
- // Bias between new and old addresses.
- oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(biasTowardsNewAddrs))
- newCorrelation := math.Sqrt(float64(a.nNew)) * float64(biasTowardsNewAddrs)
-
- // pick a random peer from a random bucket
- var bucket map[string]*knownAddress
- pickFromOldBucket := (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation
- if (pickFromOldBucket && a.nOld == 0) ||
- (!pickFromOldBucket && a.nNew == 0) {
- return nil
- }
- // loop until we pick a random non-empty bucket
- for len(bucket) == 0 {
- if pickFromOldBucket {
- bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))]
- } else {
- bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
- }
- }
- // pick a random index and loop over the map to return that index
- randIndex := a.rand.Intn(len(bucket))
- for _, ka := range bucket {
- if randIndex == 0 {
- return ka.Addr
- }
- randIndex--
- }
- return nil
- }
-
- // MarkGood implements AddrBook - it marks the peer as good and
- // moves it into an "old" bucket.
- func (a *addrBook) MarkGood(addr *p2p.NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- ka := a.addrLookup[addr.ID]
- if ka == nil {
- return
- }
- ka.markGood()
- if ka.isNew() {
- a.moveToOld(ka)
- }
- }
-
- // MarkAttempt implements AddrBook - it marks that an attempt was made to connect to the address.
- func (a *addrBook) MarkAttempt(addr *p2p.NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- ka := a.addrLookup[addr.ID]
- if ka == nil {
- return
- }
- ka.markAttempt()
- }
-
- // MarkBad implements AddrBook. Currently it just ejects the address.
- // TODO: black list for some amount of time
- func (a *addrBook) MarkBad(addr *p2p.NetAddress) {
- a.RemoveAddress(addr)
- }
-
- // GetSelection implements AddrBook.
- // It randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
- // Must never return a nil address.
- func (a *addrBook) GetSelection() []*p2p.NetAddress {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- bookSize := a.size()
- if bookSize <= 0 {
- if bookSize < 0 {
- a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld)
- }
- return nil
- }
-
- numAddresses := cmn.MaxInt(
- cmn.MinInt(minGetSelection, bookSize),
- bookSize*getSelectionPercent/100)
- numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
-
- // XXX: instead of making a list of all addresses, shuffling, and slicing a random chunk,
- // could we just select a random numAddresses of indexes?
- allAddr := make([]*p2p.NetAddress, bookSize)
- i := 0
- for _, ka := range a.addrLookup {
- allAddr[i] = ka.Addr
- i++
- }
-
- // Fisher-Yates shuffle the array. We only need to do the first
- // `numAddresses' since we are throwing the rest.
- for i := 0; i < numAddresses; i++ {
- // pick a number between current index and the end
- j := cmn.RandIntn(len(allAddr)-i) + i
- allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
- }
-
- // slice off the limit we are willing to share.
- return allAddr[:numAddresses]
- }
-
- func percentageOfNum(p, n int) int {
- return int(math.Round((float64(p) / float64(100)) * float64(n)))
- }
-
- // GetSelectionWithBias implements AddrBook.
- // It randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
- // Must never return a nil address.
- //
- // Each address is picked randomly from an old or new bucket according to the
- // biasTowardsNewAddrs argument, which must be between [0, 100] (or else is truncated to
- // that range) and determines how biased we are to pick an address from a new
- // bucket.
- func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddress {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- bookSize := a.size()
- if bookSize <= 0 {
- if bookSize < 0 {
- a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld)
- }
- return nil
- }
-
- if biasTowardsNewAddrs > 100 {
- biasTowardsNewAddrs = 100
- }
- if biasTowardsNewAddrs < 0 {
- biasTowardsNewAddrs = 0
- }
-
- numAddresses := cmn.MaxInt(
- cmn.MinInt(minGetSelection, bookSize),
- bookSize*getSelectionPercent/100)
- numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
-
- selection := make([]*p2p.NetAddress, numAddresses)
-
- oldBucketToAddrsMap := make(map[int]map[string]struct{})
- var oldIndex int
- newBucketToAddrsMap := make(map[int]map[string]struct{})
- var newIndex int
-
- // initialize counters used to count old and new added addresses.
- // len(oldBucketToAddrsMap) cannot be used as multiple addresses can endup in the same bucket.
- var oldAddressesAdded int
- var newAddressesAdded int
-
- // number of new addresses that, if possible, should be in the beginning of the selection
- numRequiredNewAdd := percentageOfNum(biasTowardsNewAddrs, numAddresses)
-
- selectionIndex := 0
- ADDRS_LOOP:
- for selectionIndex < numAddresses {
- // biasedTowardsOldAddrs indicates if the selection can switch to old addresses
- biasedTowardsOldAddrs := selectionIndex >= numRequiredNewAdd
- // An old addresses is selected if:
- // - the bias is for old and old addressees are still available or,
- // - there are no new addresses or all new addresses have been selected.
- // numAddresses <= a.nOld + a.nNew therefore it is guaranteed that there are enough
- // addresses to fill the selection
- pickFromOldBucket :=
- (biasedTowardsOldAddrs && oldAddressesAdded < a.nOld) ||
- a.nNew == 0 || newAddressesAdded >= a.nNew
-
- bucket := make(map[string]*knownAddress)
-
- // loop until we pick a random non-empty bucket
- for len(bucket) == 0 {
- if pickFromOldBucket {
- oldIndex = a.rand.Intn(len(a.bucketsOld))
- bucket = a.bucketsOld[oldIndex]
- } else {
- newIndex = a.rand.Intn(len(a.bucketsNew))
- bucket = a.bucketsNew[newIndex]
- }
- }
-
- // pick a random index
- randIndex := a.rand.Intn(len(bucket))
-
- // loop over the map to return that index
- var selectedAddr *p2p.NetAddress
- for _, ka := range bucket {
- if randIndex == 0 {
- selectedAddr = ka.Addr
- break
- }
- randIndex--
- }
-
- // if we have selected the address before, restart the loop
- // otherwise, record it and continue
- if pickFromOldBucket {
- if addrsMap, ok := oldBucketToAddrsMap[oldIndex]; ok {
- if _, ok = addrsMap[selectedAddr.String()]; ok {
- continue ADDRS_LOOP
- }
- } else {
- oldBucketToAddrsMap[oldIndex] = make(map[string]struct{})
- }
- oldBucketToAddrsMap[oldIndex][selectedAddr.String()] = struct{}{}
- oldAddressesAdded++
- } else {
- if addrsMap, ok := newBucketToAddrsMap[newIndex]; ok {
- if _, ok = addrsMap[selectedAddr.String()]; ok {
- continue ADDRS_LOOP
- }
- } else {
- newBucketToAddrsMap[newIndex] = make(map[string]struct{})
- }
- newBucketToAddrsMap[newIndex][selectedAddr.String()] = struct{}{}
- newAddressesAdded++
- }
-
- selection[selectionIndex] = selectedAddr
- selectionIndex++
- }
-
- return selection
- }
-
- // ListOfKnownAddresses returns the new and old addresses.
- func (a *addrBook) ListOfKnownAddresses() []*knownAddress {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- addrs := []*knownAddress{}
- for _, addr := range a.addrLookup {
- addrs = append(addrs, addr.copy())
- }
- return addrs
- }
-
- //------------------------------------------------
-
- // Size returns the number of addresses in the book.
- func (a *addrBook) Size() int {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- return a.size()
- }
-
- func (a *addrBook) size() int {
- return a.nNew + a.nOld
- }
-
- //----------------------------------------------------------
-
- // Save persists the address book to disk.
- func (a *addrBook) Save() {
- a.saveToFile(a.filePath) // thread safe
- }
-
- func (a *addrBook) saveRoutine() {
- defer a.wg.Done()
-
- saveFileTicker := time.NewTicker(dumpAddressInterval)
- out:
- for {
- select {
- case <-saveFileTicker.C:
- a.saveToFile(a.filePath)
- case <-a.Quit():
- break out
- }
- }
- saveFileTicker.Stop()
- a.saveToFile(a.filePath)
- }
-
- //----------------------------------------------------------
-
- func (a *addrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
- switch bucketType {
- case bucketTypeNew:
- return a.bucketsNew[bucketIdx]
- case bucketTypeOld:
- return a.bucketsOld[bucketIdx]
- default:
- cmn.PanicSanity("Should not happen")
- return nil
- }
- }
-
- // Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full.
- // NOTE: currently it always returns true.
- func (a *addrBook) addToNewBucket(ka *knownAddress, bucketIdx int) {
- // Sanity check
- if ka.isOld() {
- a.Logger.Error("Failed Sanity Check! Cant add old address to new bucket", "ka", ka, "bucket", bucketIdx)
- return
- }
-
- addrStr := ka.Addr.String()
- bucket := a.getBucket(bucketTypeNew, bucketIdx)
-
- // Already exists?
- if _, ok := bucket[addrStr]; ok {
- return
- }
-
- // Enforce max addresses.
- if len(bucket) > newBucketSize {
- a.Logger.Info("new bucket is full, expiring new")
- a.expireNew(bucketIdx)
- }
-
- // Add to bucket.
- bucket[addrStr] = ka
- // increment nNew if the peer doesnt already exist in a bucket
- if ka.addBucketRef(bucketIdx) == 1 {
- a.nNew++
- }
-
- // Add it to addrLookup
- a.addrLookup[ka.ID()] = ka
- }
-
- // Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full.
- func (a *addrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
- // Sanity check
- if ka.isNew() {
- a.Logger.Error(fmt.Sprintf("Cannot add new address to old bucket: %v", ka))
- return false
- }
- if len(ka.Buckets) != 0 {
- a.Logger.Error(fmt.Sprintf("Cannot add already old address to another old bucket: %v", ka))
- return false
- }
-
- addrStr := ka.Addr.String()
- bucket := a.getBucket(bucketTypeOld, bucketIdx)
-
- // Already exists?
- if _, ok := bucket[addrStr]; ok {
- return true
- }
-
- // Enforce max addresses.
- if len(bucket) > oldBucketSize {
- return false
- }
-
- // Add to bucket.
- bucket[addrStr] = ka
- if ka.addBucketRef(bucketIdx) == 1 {
- a.nOld++
- }
-
- // Ensure in addrLookup
- a.addrLookup[ka.ID()] = ka
-
- return true
- }
-
- func (a *addrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) {
- if ka.BucketType != bucketType {
- a.Logger.Error(fmt.Sprintf("Bucket type mismatch: %v", ka))
- return
- }
- bucket := a.getBucket(bucketType, bucketIdx)
- delete(bucket, ka.Addr.String())
- if ka.removeBucketRef(bucketIdx) == 0 {
- if bucketType == bucketTypeNew {
- a.nNew--
- } else {
- a.nOld--
- }
- delete(a.addrLookup, ka.ID())
- }
- }
-
- func (a *addrBook) removeFromAllBuckets(ka *knownAddress) {
- for _, bucketIdx := range ka.Buckets {
- bucket := a.getBucket(ka.BucketType, bucketIdx)
- delete(bucket, ka.Addr.String())
- }
- ka.Buckets = nil
- if ka.BucketType == bucketTypeNew {
- a.nNew--
- } else {
- a.nOld--
- }
- delete(a.addrLookup, ka.ID())
- }
-
- //----------------------------------------------------------
-
- func (a *addrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
- bucket := a.getBucket(bucketType, bucketIdx)
- var oldest *knownAddress
- for _, ka := range bucket {
- if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
- oldest = ka
- }
- }
- return oldest
- }
-
- // adds the address to a "new" bucket. if its already in one,
- // it only adds it probabilistically
- func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
- if addr == nil || src == nil {
- return ErrAddrBookNilAddr{addr, src}
- }
-
- if a.routabilityStrict && !addr.Routable() {
- return ErrAddrBookNonRoutable{addr}
- }
-
- if !addr.Valid() {
- return ErrAddrBookInvalidAddr{addr}
- }
-
- if !addr.HasID() {
- return ErrAddrBookInvalidAddrNoID{addr}
- }
-
- // TODO: we should track ourAddrs by ID and by IP:PORT and refuse both.
- if _, ok := a.ourAddrs[addr.String()]; ok {
- return ErrAddrBookSelf{addr}
- }
-
- if _, ok := a.privateIDs[addr.ID]; ok {
- return ErrAddrBookPrivate{addr}
- }
-
- if _, ok := a.privateIDs[src.ID]; ok {
- return ErrAddrBookPrivateSrc{src}
- }
-
- ka := a.addrLookup[addr.ID]
- if ka != nil {
- // If its already old and the addr is the same, ignore it.
- if ka.isOld() && ka.Addr.Equals(addr) {
- return nil
- }
- // Already in max new buckets.
- if len(ka.Buckets) == maxNewBucketsPerAddress {
- return nil
- }
- // The more entries we have, the less likely we are to add more.
- factor := int32(2 * len(ka.Buckets))
- if a.rand.Int31n(factor) != 0 {
- return nil
- }
- } else {
- ka = newKnownAddress(addr, src)
- }
-
- bucket := a.calcNewBucket(addr, src)
- a.addToNewBucket(ka, bucket)
- return nil
- }
-
- // Make space in the new buckets by expiring the really bad entries.
- // If no bad entries are available we remove the oldest.
- func (a *addrBook) expireNew(bucketIdx int) {
- for addrStr, ka := range a.bucketsNew[bucketIdx] {
- // If an entry is bad, throw it away
- if ka.isBad() {
- a.Logger.Info(fmt.Sprintf("expiring bad address %v", addrStr))
- a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
- return
- }
- }
-
- // If we haven't thrown out a bad entry, throw out the oldest entry
- oldest := a.pickOldest(bucketTypeNew, bucketIdx)
- a.removeFromBucket(oldest, bucketTypeNew, bucketIdx)
- }
-
- // Promotes an address from new to old. If the destination bucket is full,
- // demote the oldest one to a "new" bucket.
- // TODO: Demote more probabilistically?
- func (a *addrBook) moveToOld(ka *knownAddress) {
- // Sanity check
- if ka.isOld() {
- a.Logger.Error(fmt.Sprintf("Cannot promote address that is already old %v", ka))
- return
- }
- if len(ka.Buckets) == 0 {
- a.Logger.Error(fmt.Sprintf("Cannot promote address that isn't in any new buckets %v", ka))
- return
- }
-
- // Remove from all (new) buckets.
- a.removeFromAllBuckets(ka)
- // It's officially old now.
- ka.BucketType = bucketTypeOld
-
- // Try to add it to its oldBucket destination.
- oldBucketIdx := a.calcOldBucket(ka.Addr)
- added := a.addToOldBucket(ka, oldBucketIdx)
- if !added {
- // No room; move the oldest to a new bucket
- oldest := a.pickOldest(bucketTypeOld, oldBucketIdx)
- a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx)
- newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src)
- a.addToNewBucket(oldest, newBucketIdx)
-
- // Finally, add our ka to old bucket again.
- added = a.addToOldBucket(ka, oldBucketIdx)
- if !added {
- a.Logger.Error(fmt.Sprintf("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx))
- }
- }
- }
-
- //---------------------------------------------------------------------
- // calculate bucket placements
-
- // doublesha256( key + sourcegroup +
- // int64(doublesha256(key + group + sourcegroup))%bucket_per_group ) % num_new_buckets
- func (a *addrBook) calcNewBucket(addr, src *p2p.NetAddress) int {
- data1 := []byte{}
- data1 = append(data1, []byte(a.key)...)
- data1 = append(data1, []byte(a.groupKey(addr))...)
- data1 = append(data1, []byte(a.groupKey(src))...)
- hash1 := doubleSha256(data1)
- hash64 := binary.BigEndian.Uint64(hash1)
- hash64 %= newBucketsPerGroup
- var hashbuf [8]byte
- binary.BigEndian.PutUint64(hashbuf[:], hash64)
- data2 := []byte{}
- data2 = append(data2, []byte(a.key)...)
- data2 = append(data2, a.groupKey(src)...)
- data2 = append(data2, hashbuf[:]...)
-
- hash2 := doubleSha256(data2)
- return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
- }
-
- // doublesha256( key + group +
- // int64(doublesha256(key + addr))%buckets_per_group ) % num_old_buckets
- func (a *addrBook) calcOldBucket(addr *p2p.NetAddress) int {
- data1 := []byte{}
- data1 = append(data1, []byte(a.key)...)
- data1 = append(data1, []byte(addr.String())...)
- hash1 := doubleSha256(data1)
- hash64 := binary.BigEndian.Uint64(hash1)
- hash64 %= oldBucketsPerGroup
- var hashbuf [8]byte
- binary.BigEndian.PutUint64(hashbuf[:], hash64)
- data2 := []byte{}
- data2 = append(data2, []byte(a.key)...)
- data2 = append(data2, a.groupKey(addr)...)
- data2 = append(data2, hashbuf[:]...)
-
- hash2 := doubleSha256(data2)
- return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
- }
-
- // Return a string representing the network group of this address.
- // This is the /16 for IPv4, the /32 (/36 for he.net) for IPv6, the string
- // "local" for a local address and the string "unroutable" for an unroutable
- // address.
- func (a *addrBook) groupKey(na *p2p.NetAddress) string {
- if a.routabilityStrict && na.Local() {
- return "local"
- }
- if a.routabilityStrict && !na.Routable() {
- return "unroutable"
- }
-
- if ipv4 := na.IP.To4(); ipv4 != nil {
- return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
- }
- if na.RFC6145() || na.RFC6052() {
- // last four bytes are the ip address
- ip := net.IP(na.IP[12:16])
- return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
- }
-
- if na.RFC3964() {
- ip := net.IP(na.IP[2:7])
- return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
-
- }
- if na.RFC4380() {
- // teredo tunnels have the last 4 bytes as the v4 address XOR
- // 0xff.
- ip := net.IP(make([]byte, 4))
- for i, byte := range na.IP[12:16] {
- ip[i] = byte ^ 0xff
- }
- return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
- }
-
- // OK, so now we know ourselves to be a IPv6 address.
- // bitcoind uses /32 for everything, except for Hurricane Electric's
- // (he.net) IP range, which it uses /36 for.
- bits := 32
- heNet := &net.IPNet{IP: net.ParseIP("2001:470::"),
- Mask: net.CIDRMask(32, 128)}
- if heNet.Contains(na.IP) {
- bits = 36
- }
-
- return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
- }
-
- // doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes.
- func doubleSha256(b []byte) []byte {
- hasher := sha256.New()
- hasher.Write(b) // nolint: errcheck, gas
- sum := hasher.Sum(nil)
- hasher.Reset()
- hasher.Write(sum) // nolint: errcheck, gas
- return hasher.Sum(nil)
- }
|