You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

870 lines
23 KiB

abci: localClient improvements & bugfixes & pubsub Unsubscribe issues (#2748) * use READ lock/unlock in ConsensusState#GetLastHeight Refs #2721 * do not use defers when there's no need * fix peer formatting (output its address instead of the pointer) ``` [54310]: E[11-02|11:59:39.851] Connection failed @ sendRoutine module=p2p peer=0xb78f00 conn=MConn{74.207.236.148:26656} err="pong timeout" ``` https://github.com/tendermint/tendermint/issues/2721#issuecomment-435326581 * panic if peer has no state https://github.com/tendermint/tendermint/issues/2721#issuecomment-435347165 It's confusing that sometimes we check if peer has a state, but most of the times we expect it to be there 1. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/mempool/reactor.go#L138 2. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/rpc/core/consensus.go#L196 (edited) I will change everything to always assume peer has a state and panic otherwise that should help identify issues earlier * abci/localclient: extend lock on app callback App callback should be protected by lock as well (note this was already done for InitChainAsync, why not for others???). Otherwise, when we execute the block, tx might come in and call the callback in the same time we're updating it in execBlockOnProxyApp => DATA RACE Fixes #2721 Consensus state is locked ``` goroutine 113333 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00180009c, 0xc0000c7e00) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*RWMutex).RLock(0xc001800090) /usr/local/go/src/sync/rwmutex.go:50 +0x4e github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).GetRoundState(0xc001800000, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:218 +0x46 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).queryMaj23Routine(0xc0017def80, 0x11104a0, 0xc0072488f0, 0xc007248 9c0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:735 +0x16d created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).AddPeer /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:172 +0x236 ``` because localClient is locked ``` goroutine 1899 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00003363c, 0xc0000cb500) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*Mutex).Lock(0xc000033638) /usr/local/go/src/sync/mutex.go:134 +0xff github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).SetResponseCallback(0xc0001fb560, 0xc007868540) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:32 +0x33 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnConsensus).SetResponseCallback(0xc00002f750, 0xc007868540) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:57 +0x40 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.execBlockOnProxyApp(0x1104e20, 0xc002ca0ba0, 0x11092a0, 0xc00002f750, 0xc0001fe960, 0xc000bfc660, 0x110cfe0, 0xc000090330, 0xc9d12, 0xc000d9d5a0, ...) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:230 +0x1fd github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.(*BlockExecutor).ApplyBlock(0xc002c2a230, 0x7, 0x0, 0xc000eae880, 0x6, 0xc002e52c60, 0x16, 0x1f927, 0xc9d12, 0xc000d9d5a0, ...) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:96 +0x142 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).finalizeCommit(0xc001800000, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1339 +0xa3e github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryFinalizeCommit(0xc001800000, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1270 +0x451 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit.func1(0xc001800000, 0x0, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1218 +0x90 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit(0xc001800000, 0x1f928, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1247 +0x6b8 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xc003bc7ad0, 0xc003bc7b10) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1659 +0xbad github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xf1, 0xf1) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1517 +0x59 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg(0xc001800000, 0xd98200, 0xc0070dbed0, 0xc000cf4cc0, 0x28) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:660 +0x64b github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine(0xc001800000, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:617 +0x670 created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).OnStart /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:311 +0x132 ``` tx comes in and CheckTx is executed right when we execute the block ``` goroutine 111044 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00003363c, 0x0) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*Mutex).Lock(0xc000033638) /usr/local/go/src/sync/mutex.go:134 +0xff github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).CheckTxAsync(0xc0001fb0e0, 0xc002d94500, 0x13f, 0x280, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:85 +0x47 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnMempool).CheckTxAsync(0xc00002f720, 0xc002d94500, 0x13f, 0x280, 0x1) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:114 +0x51 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool.(*Mempool).CheckTx(0xc002d3a320, 0xc002d94500, 0x13f, 0x280, 0xc0072355f0, 0x0, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool/mempool.go:316 +0x17b github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core.BroadcastTxSync(0xc002d94500, 0x13f, 0x280, 0x0, 0x0, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core/mempool.go:93 +0xb8 reflect.Value.call(0xd85560, 0x10326c0, 0x13, 0xec7b8b, 0x4, 0xc00663f180, 0x1, 0x1, 0xc00663f180, 0xc00663f188, ...) /usr/local/go/src/reflect/value.go:447 +0x449 reflect.Value.Call(0xd85560, 0x10326c0, 0x13, 0xc00663f180, 0x1, 0x1, 0x0, 0x0, 0xc005cc9344) /usr/local/go/src/reflect/value.go:308 +0xa4 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.makeHTTPHandler.func2(0x1102060, 0xc00663f100, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/handlers.go:269 +0x188 net/http.HandlerFunc.ServeHTTP(0xc002c81f20, 0x1102060, 0xc00663f100, 0xc0082d7900) /usr/local/go/src/net/http/server.go:1964 +0x44 net/http.(*ServeMux).ServeHTTP(0xc002c81b60, 0x1102060, 0xc00663f100, 0xc0082d7900) /usr/local/go/src/net/http/server.go:2361 +0x127 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.maxBytesHandler.ServeHTTP(0x10f8a40, 0xc002c81b60, 0xf4240, 0x1102060, 0xc00663f100, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:219 +0xcf github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.RecoverAndLogHandler.func1(0x1103220, 0xc00121e620, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:192 +0x394 net/http.HandlerFunc.ServeHTTP(0xc002c06ea0, 0x1103220, 0xc00121e620, 0xc0082d7900) /usr/local/go/src/net/http/server.go:1964 +0x44 net/http.serverHandler.ServeHTTP(0xc001a1aa90, 0x1103220, 0xc00121e620, 0xc0082d7900) /usr/local/go/src/net/http/server.go:2741 +0xab net/http.(*conn).serve(0xc00785a3c0, 0x11041a0, 0xc000f844c0) /usr/local/go/src/net/http/server.go:1847 +0x646 created by net/http.(*Server).Serve /usr/local/go/src/net/http/server.go:2851 +0x2f5 ``` * consensus: use read lock in Receive#VoteMessage * use defer to unlock mutex because application might panic * use defer in every method of the localClient * add a changelog entry * drain channels before Unsubscribe(All) Read https://github.com/tendermint/tendermint/blob/55362ed76630f3e1ebec159a598f6a9fb5892cb1/libs/pubsub/pubsub.go#L13 for the detailed explanation of the issue. We'll need to fix it someday. Make sure to keep an eye on https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md * retry instead of panic when peer has no state in reactors other than consensus in /dump_consensus_state RPC endpoint, skip a peer with no state * rpc/core/mempool: simplify error messages * rpc/core/mempool: use time.After instead of timer also, do not log DeliverTx result (to be consistent with other memthods) * unlock before calling the callback in reqRes#SetCallback
6 years ago
abci: localClient improvements & bugfixes & pubsub Unsubscribe issues (#2748) * use READ lock/unlock in ConsensusState#GetLastHeight Refs #2721 * do not use defers when there's no need * fix peer formatting (output its address instead of the pointer) ``` [54310]: E[11-02|11:59:39.851] Connection failed @ sendRoutine module=p2p peer=0xb78f00 conn=MConn{74.207.236.148:26656} err="pong timeout" ``` https://github.com/tendermint/tendermint/issues/2721#issuecomment-435326581 * panic if peer has no state https://github.com/tendermint/tendermint/issues/2721#issuecomment-435347165 It's confusing that sometimes we check if peer has a state, but most of the times we expect it to be there 1. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/mempool/reactor.go#L138 2. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/rpc/core/consensus.go#L196 (edited) I will change everything to always assume peer has a state and panic otherwise that should help identify issues earlier * abci/localclient: extend lock on app callback App callback should be protected by lock as well (note this was already done for InitChainAsync, why not for others???). Otherwise, when we execute the block, tx might come in and call the callback in the same time we're updating it in execBlockOnProxyApp => DATA RACE Fixes #2721 Consensus state is locked ``` goroutine 113333 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00180009c, 0xc0000c7e00) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*RWMutex).RLock(0xc001800090) /usr/local/go/src/sync/rwmutex.go:50 +0x4e github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).GetRoundState(0xc001800000, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:218 +0x46 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).queryMaj23Routine(0xc0017def80, 0x11104a0, 0xc0072488f0, 0xc007248 9c0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:735 +0x16d created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).AddPeer /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:172 +0x236 ``` because localClient is locked ``` goroutine 1899 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00003363c, 0xc0000cb500) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*Mutex).Lock(0xc000033638) /usr/local/go/src/sync/mutex.go:134 +0xff github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).SetResponseCallback(0xc0001fb560, 0xc007868540) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:32 +0x33 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnConsensus).SetResponseCallback(0xc00002f750, 0xc007868540) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:57 +0x40 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.execBlockOnProxyApp(0x1104e20, 0xc002ca0ba0, 0x11092a0, 0xc00002f750, 0xc0001fe960, 0xc000bfc660, 0x110cfe0, 0xc000090330, 0xc9d12, 0xc000d9d5a0, ...) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:230 +0x1fd github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.(*BlockExecutor).ApplyBlock(0xc002c2a230, 0x7, 0x0, 0xc000eae880, 0x6, 0xc002e52c60, 0x16, 0x1f927, 0xc9d12, 0xc000d9d5a0, ...) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:96 +0x142 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).finalizeCommit(0xc001800000, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1339 +0xa3e github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryFinalizeCommit(0xc001800000, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1270 +0x451 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit.func1(0xc001800000, 0x0, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1218 +0x90 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit(0xc001800000, 0x1f928, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1247 +0x6b8 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xc003bc7ad0, 0xc003bc7b10) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1659 +0xbad github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xf1, 0xf1) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1517 +0x59 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg(0xc001800000, 0xd98200, 0xc0070dbed0, 0xc000cf4cc0, 0x28) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:660 +0x64b github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine(0xc001800000, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:617 +0x670 created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).OnStart /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:311 +0x132 ``` tx comes in and CheckTx is executed right when we execute the block ``` goroutine 111044 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00003363c, 0x0) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*Mutex).Lock(0xc000033638) /usr/local/go/src/sync/mutex.go:134 +0xff github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).CheckTxAsync(0xc0001fb0e0, 0xc002d94500, 0x13f, 0x280, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:85 +0x47 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnMempool).CheckTxAsync(0xc00002f720, 0xc002d94500, 0x13f, 0x280, 0x1) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:114 +0x51 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool.(*Mempool).CheckTx(0xc002d3a320, 0xc002d94500, 0x13f, 0x280, 0xc0072355f0, 0x0, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool/mempool.go:316 +0x17b github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core.BroadcastTxSync(0xc002d94500, 0x13f, 0x280, 0x0, 0x0, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core/mempool.go:93 +0xb8 reflect.Value.call(0xd85560, 0x10326c0, 0x13, 0xec7b8b, 0x4, 0xc00663f180, 0x1, 0x1, 0xc00663f180, 0xc00663f188, ...) /usr/local/go/src/reflect/value.go:447 +0x449 reflect.Value.Call(0xd85560, 0x10326c0, 0x13, 0xc00663f180, 0x1, 0x1, 0x0, 0x0, 0xc005cc9344) /usr/local/go/src/reflect/value.go:308 +0xa4 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.makeHTTPHandler.func2(0x1102060, 0xc00663f100, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/handlers.go:269 +0x188 net/http.HandlerFunc.ServeHTTP(0xc002c81f20, 0x1102060, 0xc00663f100, 0xc0082d7900) /usr/local/go/src/net/http/server.go:1964 +0x44 net/http.(*ServeMux).ServeHTTP(0xc002c81b60, 0x1102060, 0xc00663f100, 0xc0082d7900) /usr/local/go/src/net/http/server.go:2361 +0x127 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.maxBytesHandler.ServeHTTP(0x10f8a40, 0xc002c81b60, 0xf4240, 0x1102060, 0xc00663f100, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:219 +0xcf github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.RecoverAndLogHandler.func1(0x1103220, 0xc00121e620, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:192 +0x394 net/http.HandlerFunc.ServeHTTP(0xc002c06ea0, 0x1103220, 0xc00121e620, 0xc0082d7900) /usr/local/go/src/net/http/server.go:1964 +0x44 net/http.serverHandler.ServeHTTP(0xc001a1aa90, 0x1103220, 0xc00121e620, 0xc0082d7900) /usr/local/go/src/net/http/server.go:2741 +0xab net/http.(*conn).serve(0xc00785a3c0, 0x11041a0, 0xc000f844c0) /usr/local/go/src/net/http/server.go:1847 +0x646 created by net/http.(*Server).Serve /usr/local/go/src/net/http/server.go:2851 +0x2f5 ``` * consensus: use read lock in Receive#VoteMessage * use defer to unlock mutex because application might panic * use defer in every method of the localClient * add a changelog entry * drain channels before Unsubscribe(All) Read https://github.com/tendermint/tendermint/blob/55362ed76630f3e1ebec159a598f6a9fb5892cb1/libs/pubsub/pubsub.go#L13 for the detailed explanation of the issue. We'll need to fix it someday. Make sure to keep an eye on https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md * retry instead of panic when peer has no state in reactors other than consensus in /dump_consensus_state RPC endpoint, skip a peer with no state * rpc/core/mempool: simplify error messages * rpc/core/mempool: use time.After instead of timer also, do not log DeliverTx result (to be consistent with other memthods) * unlock before calling the callback in reqRes#SetCallback
6 years ago
abci: localClient improvements & bugfixes & pubsub Unsubscribe issues (#2748) * use READ lock/unlock in ConsensusState#GetLastHeight Refs #2721 * do not use defers when there's no need * fix peer formatting (output its address instead of the pointer) ``` [54310]: E[11-02|11:59:39.851] Connection failed @ sendRoutine module=p2p peer=0xb78f00 conn=MConn{74.207.236.148:26656} err="pong timeout" ``` https://github.com/tendermint/tendermint/issues/2721#issuecomment-435326581 * panic if peer has no state https://github.com/tendermint/tendermint/issues/2721#issuecomment-435347165 It's confusing that sometimes we check if peer has a state, but most of the times we expect it to be there 1. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/mempool/reactor.go#L138 2. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/rpc/core/consensus.go#L196 (edited) I will change everything to always assume peer has a state and panic otherwise that should help identify issues earlier * abci/localclient: extend lock on app callback App callback should be protected by lock as well (note this was already done for InitChainAsync, why not for others???). Otherwise, when we execute the block, tx might come in and call the callback in the same time we're updating it in execBlockOnProxyApp => DATA RACE Fixes #2721 Consensus state is locked ``` goroutine 113333 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00180009c, 0xc0000c7e00) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*RWMutex).RLock(0xc001800090) /usr/local/go/src/sync/rwmutex.go:50 +0x4e github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).GetRoundState(0xc001800000, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:218 +0x46 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).queryMaj23Routine(0xc0017def80, 0x11104a0, 0xc0072488f0, 0xc007248 9c0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:735 +0x16d created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).AddPeer /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:172 +0x236 ``` because localClient is locked ``` goroutine 1899 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00003363c, 0xc0000cb500) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*Mutex).Lock(0xc000033638) /usr/local/go/src/sync/mutex.go:134 +0xff github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).SetResponseCallback(0xc0001fb560, 0xc007868540) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:32 +0x33 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnConsensus).SetResponseCallback(0xc00002f750, 0xc007868540) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:57 +0x40 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.execBlockOnProxyApp(0x1104e20, 0xc002ca0ba0, 0x11092a0, 0xc00002f750, 0xc0001fe960, 0xc000bfc660, 0x110cfe0, 0xc000090330, 0xc9d12, 0xc000d9d5a0, ...) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:230 +0x1fd github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.(*BlockExecutor).ApplyBlock(0xc002c2a230, 0x7, 0x0, 0xc000eae880, 0x6, 0xc002e52c60, 0x16, 0x1f927, 0xc9d12, 0xc000d9d5a0, ...) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:96 +0x142 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).finalizeCommit(0xc001800000, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1339 +0xa3e github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryFinalizeCommit(0xc001800000, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1270 +0x451 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit.func1(0xc001800000, 0x0, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1218 +0x90 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit(0xc001800000, 0x1f928, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1247 +0x6b8 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xc003bc7ad0, 0xc003bc7b10) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1659 +0xbad github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xf1, 0xf1) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1517 +0x59 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg(0xc001800000, 0xd98200, 0xc0070dbed0, 0xc000cf4cc0, 0x28) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:660 +0x64b github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine(0xc001800000, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:617 +0x670 created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).OnStart /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:311 +0x132 ``` tx comes in and CheckTx is executed right when we execute the block ``` goroutine 111044 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00003363c, 0x0) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*Mutex).Lock(0xc000033638) /usr/local/go/src/sync/mutex.go:134 +0xff github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).CheckTxAsync(0xc0001fb0e0, 0xc002d94500, 0x13f, 0x280, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:85 +0x47 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnMempool).CheckTxAsync(0xc00002f720, 0xc002d94500, 0x13f, 0x280, 0x1) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:114 +0x51 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool.(*Mempool).CheckTx(0xc002d3a320, 0xc002d94500, 0x13f, 0x280, 0xc0072355f0, 0x0, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool/mempool.go:316 +0x17b github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core.BroadcastTxSync(0xc002d94500, 0x13f, 0x280, 0x0, 0x0, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core/mempool.go:93 +0xb8 reflect.Value.call(0xd85560, 0x10326c0, 0x13, 0xec7b8b, 0x4, 0xc00663f180, 0x1, 0x1, 0xc00663f180, 0xc00663f188, ...) /usr/local/go/src/reflect/value.go:447 +0x449 reflect.Value.Call(0xd85560, 0x10326c0, 0x13, 0xc00663f180, 0x1, 0x1, 0x0, 0x0, 0xc005cc9344) /usr/local/go/src/reflect/value.go:308 +0xa4 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.makeHTTPHandler.func2(0x1102060, 0xc00663f100, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/handlers.go:269 +0x188 net/http.HandlerFunc.ServeHTTP(0xc002c81f20, 0x1102060, 0xc00663f100, 0xc0082d7900) /usr/local/go/src/net/http/server.go:1964 +0x44 net/http.(*ServeMux).ServeHTTP(0xc002c81b60, 0x1102060, 0xc00663f100, 0xc0082d7900) /usr/local/go/src/net/http/server.go:2361 +0x127 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.maxBytesHandler.ServeHTTP(0x10f8a40, 0xc002c81b60, 0xf4240, 0x1102060, 0xc00663f100, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:219 +0xcf github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.RecoverAndLogHandler.func1(0x1103220, 0xc00121e620, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:192 +0x394 net/http.HandlerFunc.ServeHTTP(0xc002c06ea0, 0x1103220, 0xc00121e620, 0xc0082d7900) /usr/local/go/src/net/http/server.go:1964 +0x44 net/http.serverHandler.ServeHTTP(0xc001a1aa90, 0x1103220, 0xc00121e620, 0xc0082d7900) /usr/local/go/src/net/http/server.go:2741 +0xab net/http.(*conn).serve(0xc00785a3c0, 0x11041a0, 0xc000f844c0) /usr/local/go/src/net/http/server.go:1847 +0x646 created by net/http.(*Server).Serve /usr/local/go/src/net/http/server.go:2851 +0x2f5 ``` * consensus: use read lock in Receive#VoteMessage * use defer to unlock mutex because application might panic * use defer in every method of the localClient * add a changelog entry * drain channels before Unsubscribe(All) Read https://github.com/tendermint/tendermint/blob/55362ed76630f3e1ebec159a598f6a9fb5892cb1/libs/pubsub/pubsub.go#L13 for the detailed explanation of the issue. We'll need to fix it someday. Make sure to keep an eye on https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md * retry instead of panic when peer has no state in reactors other than consensus in /dump_consensus_state RPC endpoint, skip a peer with no state * rpc/core/mempool: simplify error messages * rpc/core/mempool: use time.After instead of timer also, do not log DeliverTx result (to be consistent with other memthods) * unlock before calling the callback in reqRes#SetCallback
6 years ago
  1. // Modified for Tendermint
  2. // Originally Copyright (c) 2013-2014 Conformal Systems LLC.
  3. // https://github.com/conformal/btcd/blob/master/LICENSE
  4. package pex
  5. import (
  6. "crypto/sha256"
  7. "encoding/binary"
  8. "fmt"
  9. "math"
  10. "net"
  11. "sync"
  12. "time"
  13. "github.com/tendermint/tendermint/crypto"
  14. cmn "github.com/tendermint/tendermint/libs/common"
  15. "github.com/tendermint/tendermint/p2p"
  16. )
  17. const (
  18. bucketTypeNew = 0x01
  19. bucketTypeOld = 0x02
  20. )
  21. // AddrBook is an address book used for tracking peers
  22. // so we can gossip about them to others and select
  23. // peers to dial.
  24. // TODO: break this up?
  25. type AddrBook interface {
  26. cmn.Service
  27. // Add our own addresses so we don't later add ourselves
  28. AddOurAddress(*p2p.NetAddress)
  29. // Check if it is our address
  30. OurAddress(*p2p.NetAddress) bool
  31. AddPrivateIDs([]string)
  32. // Add and remove an address
  33. AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error
  34. RemoveAddress(*p2p.NetAddress)
  35. // Check if the address is in the book
  36. HasAddress(*p2p.NetAddress) bool
  37. // Do we need more peers?
  38. NeedMoreAddrs() bool
  39. // Is Address Book Empty? Answer should not depend on being in your own
  40. // address book, or private peers
  41. Empty() bool
  42. // Pick an address to dial
  43. PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress
  44. // Mark address
  45. MarkGood(*p2p.NetAddress)
  46. MarkAttempt(*p2p.NetAddress)
  47. MarkBad(*p2p.NetAddress)
  48. IsGood(*p2p.NetAddress) bool
  49. // Send a selection of addresses to peers
  50. GetSelection() []*p2p.NetAddress
  51. // Send a selection of addresses with bias
  52. GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddress
  53. // TODO: remove
  54. ListOfKnownAddresses() []*knownAddress
  55. // Persist to disk
  56. Save()
  57. }
  58. var _ AddrBook = (*addrBook)(nil)
  59. // addrBook - concurrency safe peer address manager.
  60. // Implements AddrBook.
  61. type addrBook struct {
  62. cmn.BaseService
  63. // immutable after creation
  64. filePath string
  65. routabilityStrict bool
  66. key string // random prefix for bucket placement
  67. // accessed concurrently
  68. mtx sync.Mutex
  69. rand *cmn.Rand
  70. ourAddrs map[string]struct{}
  71. privateIDs map[p2p.ID]struct{}
  72. addrLookup map[p2p.ID]*knownAddress // new & old
  73. bucketsOld []map[string]*knownAddress
  74. bucketsNew []map[string]*knownAddress
  75. nOld int
  76. nNew int
  77. wg sync.WaitGroup
  78. }
  79. // NewAddrBook creates a new address book.
  80. // Use Start to begin processing asynchronous address updates.
  81. func NewAddrBook(filePath string, routabilityStrict bool) *addrBook {
  82. am := &addrBook{
  83. rand: cmn.NewRand(),
  84. ourAddrs: make(map[string]struct{}),
  85. privateIDs: make(map[p2p.ID]struct{}),
  86. addrLookup: make(map[p2p.ID]*knownAddress),
  87. filePath: filePath,
  88. routabilityStrict: routabilityStrict,
  89. }
  90. am.init()
  91. am.BaseService = *cmn.NewBaseService(nil, "AddrBook", am)
  92. return am
  93. }
  94. // Initialize the buckets.
  95. // When modifying this, don't forget to update loadFromFile()
  96. func (a *addrBook) init() {
  97. a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits
  98. // New addr buckets
  99. a.bucketsNew = make([]map[string]*knownAddress, newBucketCount)
  100. for i := range a.bucketsNew {
  101. a.bucketsNew[i] = make(map[string]*knownAddress)
  102. }
  103. // Old addr buckets
  104. a.bucketsOld = make([]map[string]*knownAddress, oldBucketCount)
  105. for i := range a.bucketsOld {
  106. a.bucketsOld[i] = make(map[string]*knownAddress)
  107. }
  108. }
  109. // OnStart implements Service.
  110. func (a *addrBook) OnStart() error {
  111. if err := a.BaseService.OnStart(); err != nil {
  112. return err
  113. }
  114. a.loadFromFile(a.filePath)
  115. // wg.Add to ensure that any invocation of .Wait()
  116. // later on will wait for saveRoutine to terminate.
  117. a.wg.Add(1)
  118. go a.saveRoutine()
  119. return nil
  120. }
  121. // OnStop implements Service.
  122. func (a *addrBook) OnStop() {
  123. a.BaseService.OnStop()
  124. }
  125. func (a *addrBook) Wait() {
  126. a.wg.Wait()
  127. }
  128. func (a *addrBook) FilePath() string {
  129. return a.filePath
  130. }
  131. //-------------------------------------------------------
  132. // AddOurAddress one of our addresses.
  133. func (a *addrBook) AddOurAddress(addr *p2p.NetAddress) {
  134. a.mtx.Lock()
  135. defer a.mtx.Unlock()
  136. a.Logger.Info("Add our address to book", "addr", addr)
  137. a.ourAddrs[addr.String()] = struct{}{}
  138. }
  139. // OurAddress returns true if it is our address.
  140. func (a *addrBook) OurAddress(addr *p2p.NetAddress) bool {
  141. a.mtx.Lock()
  142. defer a.mtx.Unlock()
  143. _, ok := a.ourAddrs[addr.String()]
  144. return ok
  145. }
  146. func (a *addrBook) AddPrivateIDs(IDs []string) {
  147. a.mtx.Lock()
  148. defer a.mtx.Unlock()
  149. for _, id := range IDs {
  150. a.privateIDs[p2p.ID(id)] = struct{}{}
  151. }
  152. }
  153. // AddAddress implements AddrBook
  154. // Add address to a "new" bucket. If it's already in one, only add it probabilistically.
  155. // Returns error if the addr is non-routable. Does not add self.
  156. // NOTE: addr must not be nil
  157. func (a *addrBook) AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error {
  158. a.mtx.Lock()
  159. defer a.mtx.Unlock()
  160. return a.addAddress(addr, src)
  161. }
  162. // RemoveAddress implements AddrBook - removes the address from the book.
  163. func (a *addrBook) RemoveAddress(addr *p2p.NetAddress) {
  164. a.mtx.Lock()
  165. defer a.mtx.Unlock()
  166. ka := a.addrLookup[addr.ID]
  167. if ka == nil {
  168. return
  169. }
  170. a.Logger.Info("Remove address from book", "addr", addr)
  171. a.removeFromAllBuckets(ka)
  172. }
  173. // IsGood returns true if peer was ever marked as good and haven't
  174. // done anything wrong since then.
  175. func (a *addrBook) IsGood(addr *p2p.NetAddress) bool {
  176. a.mtx.Lock()
  177. defer a.mtx.Unlock()
  178. return a.addrLookup[addr.ID].isOld()
  179. }
  180. // HasAddress returns true if the address is in the book.
  181. func (a *addrBook) HasAddress(addr *p2p.NetAddress) bool {
  182. a.mtx.Lock()
  183. defer a.mtx.Unlock()
  184. ka := a.addrLookup[addr.ID]
  185. return ka != nil
  186. }
  187. // NeedMoreAddrs implements AddrBook - returns true if there are not have enough addresses in the book.
  188. func (a *addrBook) NeedMoreAddrs() bool {
  189. return a.Size() < needAddressThreshold
  190. }
  191. // Empty implements AddrBook - returns true if there are no addresses in the address book.
  192. // Does not count the peer appearing in its own address book, or private peers.
  193. func (a *addrBook) Empty() bool {
  194. return a.Size() == 0
  195. }
  196. // PickAddress implements AddrBook. It picks an address to connect to.
  197. // The address is picked randomly from an old or new bucket according
  198. // to the biasTowardsNewAddrs argument, which must be between [0, 100] (or else is truncated to that range)
  199. // and determines how biased we are to pick an address from a new bucket.
  200. // PickAddress returns nil if the AddrBook is empty or if we try to pick
  201. // from an empty bucket.
  202. func (a *addrBook) PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress {
  203. a.mtx.Lock()
  204. defer a.mtx.Unlock()
  205. bookSize := a.size()
  206. if bookSize <= 0 {
  207. if bookSize < 0 {
  208. a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld)
  209. }
  210. return nil
  211. }
  212. if biasTowardsNewAddrs > 100 {
  213. biasTowardsNewAddrs = 100
  214. }
  215. if biasTowardsNewAddrs < 0 {
  216. biasTowardsNewAddrs = 0
  217. }
  218. // Bias between new and old addresses.
  219. oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(biasTowardsNewAddrs))
  220. newCorrelation := math.Sqrt(float64(a.nNew)) * float64(biasTowardsNewAddrs)
  221. // pick a random peer from a random bucket
  222. var bucket map[string]*knownAddress
  223. pickFromOldBucket := (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation
  224. if (pickFromOldBucket && a.nOld == 0) ||
  225. (!pickFromOldBucket && a.nNew == 0) {
  226. return nil
  227. }
  228. // loop until we pick a random non-empty bucket
  229. for len(bucket) == 0 {
  230. if pickFromOldBucket {
  231. bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))]
  232. } else {
  233. bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
  234. }
  235. }
  236. // pick a random index and loop over the map to return that index
  237. randIndex := a.rand.Intn(len(bucket))
  238. for _, ka := range bucket {
  239. if randIndex == 0 {
  240. return ka.Addr
  241. }
  242. randIndex--
  243. }
  244. return nil
  245. }
  246. // MarkGood implements AddrBook - it marks the peer as good and
  247. // moves it into an "old" bucket.
  248. func (a *addrBook) MarkGood(addr *p2p.NetAddress) {
  249. a.mtx.Lock()
  250. defer a.mtx.Unlock()
  251. ka := a.addrLookup[addr.ID]
  252. if ka == nil {
  253. return
  254. }
  255. ka.markGood()
  256. if ka.isNew() {
  257. a.moveToOld(ka)
  258. }
  259. }
  260. // MarkAttempt implements AddrBook - it marks that an attempt was made to connect to the address.
  261. func (a *addrBook) MarkAttempt(addr *p2p.NetAddress) {
  262. a.mtx.Lock()
  263. defer a.mtx.Unlock()
  264. ka := a.addrLookup[addr.ID]
  265. if ka == nil {
  266. return
  267. }
  268. ka.markAttempt()
  269. }
  270. // MarkBad implements AddrBook. Currently it just ejects the address.
  271. // TODO: black list for some amount of time
  272. func (a *addrBook) MarkBad(addr *p2p.NetAddress) {
  273. a.RemoveAddress(addr)
  274. }
  275. // GetSelection implements AddrBook.
  276. // It randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
  277. // Must never return a nil address.
  278. func (a *addrBook) GetSelection() []*p2p.NetAddress {
  279. a.mtx.Lock()
  280. defer a.mtx.Unlock()
  281. bookSize := a.size()
  282. if bookSize <= 0 {
  283. if bookSize < 0 {
  284. a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld)
  285. }
  286. return nil
  287. }
  288. numAddresses := cmn.MaxInt(
  289. cmn.MinInt(minGetSelection, bookSize),
  290. bookSize*getSelectionPercent/100)
  291. numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
  292. // XXX: instead of making a list of all addresses, shuffling, and slicing a random chunk,
  293. // could we just select a random numAddresses of indexes?
  294. allAddr := make([]*p2p.NetAddress, bookSize)
  295. i := 0
  296. for _, ka := range a.addrLookup {
  297. allAddr[i] = ka.Addr
  298. i++
  299. }
  300. // Fisher-Yates shuffle the array. We only need to do the first
  301. // `numAddresses' since we are throwing the rest.
  302. for i := 0; i < numAddresses; i++ {
  303. // pick a number between current index and the end
  304. j := cmn.RandIntn(len(allAddr)-i) + i
  305. allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
  306. }
  307. // slice off the limit we are willing to share.
  308. return allAddr[:numAddresses]
  309. }
  310. // GetSelectionWithBias implements AddrBook.
  311. // It randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
  312. // Must never return a nil address.
  313. //
  314. // Each address is picked randomly from an old or new bucket according to the
  315. // biasTowardsNewAddrs argument, which must be between [0, 100] (or else is truncated to
  316. // that range) and determines how biased we are to pick an address from a new
  317. // bucket.
  318. func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddress {
  319. a.mtx.Lock()
  320. defer a.mtx.Unlock()
  321. bookSize := a.size()
  322. if bookSize <= 0 {
  323. if bookSize < 0 {
  324. a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld)
  325. }
  326. return nil
  327. }
  328. if biasTowardsNewAddrs > 100 {
  329. biasTowardsNewAddrs = 100
  330. }
  331. if biasTowardsNewAddrs < 0 {
  332. biasTowardsNewAddrs = 0
  333. }
  334. numAddresses := cmn.MaxInt(
  335. cmn.MinInt(minGetSelection, bookSize),
  336. bookSize*getSelectionPercent/100)
  337. numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
  338. selection := make([]*p2p.NetAddress, numAddresses)
  339. oldBucketToAddrsMap := make(map[int]map[string]struct{})
  340. var oldIndex int
  341. newBucketToAddrsMap := make(map[int]map[string]struct{})
  342. var newIndex int
  343. selectionIndex := 0
  344. ADDRS_LOOP:
  345. for selectionIndex < numAddresses {
  346. // determine whether to pick from an old bucket.
  347. biasedTowardsOldAddrs := int((float64(selectionIndex)/float64(numAddresses))*100) >= biasTowardsNewAddrs
  348. // if there's not enough old addresses to pick from, then we can't pick from old bucket.
  349. pickFromOldBucket := biasedTowardsOldAddrs && a.nOld > len(oldBucketToAddrsMap)
  350. // if there's not enough new addrs to pick from, just return early.
  351. if !pickFromOldBucket && a.nNew <= len(newBucketToAddrsMap) {
  352. return selection
  353. }
  354. bucket := make(map[string]*knownAddress)
  355. // loop until we pick a random non-empty bucket
  356. for len(bucket) == 0 {
  357. if pickFromOldBucket {
  358. oldIndex = a.rand.Intn(len(a.bucketsOld))
  359. bucket = a.bucketsOld[oldIndex]
  360. } else {
  361. newIndex = a.rand.Intn(len(a.bucketsNew))
  362. bucket = a.bucketsNew[newIndex]
  363. }
  364. }
  365. // pick a random index
  366. randIndex := a.rand.Intn(len(bucket))
  367. // loop over the map to return that index
  368. var selectedAddr *p2p.NetAddress
  369. for _, ka := range bucket {
  370. if randIndex == 0 {
  371. selectedAddr = ka.Addr
  372. break
  373. }
  374. randIndex--
  375. }
  376. // if we have selected the address before, restart the loop
  377. // otherwise, record it and continue
  378. if pickFromOldBucket {
  379. if addrsMap, ok := oldBucketToAddrsMap[oldIndex]; ok {
  380. if _, ok = addrsMap[selectedAddr.String()]; ok {
  381. continue ADDRS_LOOP
  382. }
  383. } else {
  384. oldBucketToAddrsMap[oldIndex] = make(map[string]struct{})
  385. }
  386. oldBucketToAddrsMap[oldIndex][selectedAddr.String()] = struct{}{}
  387. } else {
  388. if addrsMap, ok := newBucketToAddrsMap[newIndex]; ok {
  389. if _, ok = addrsMap[selectedAddr.String()]; ok {
  390. continue ADDRS_LOOP
  391. }
  392. } else {
  393. newBucketToAddrsMap[newIndex] = make(map[string]struct{})
  394. }
  395. newBucketToAddrsMap[newIndex][selectedAddr.String()] = struct{}{}
  396. }
  397. selection[selectionIndex] = selectedAddr
  398. selectionIndex++
  399. }
  400. return selection
  401. }
  402. // ListOfKnownAddresses returns the new and old addresses.
  403. func (a *addrBook) ListOfKnownAddresses() []*knownAddress {
  404. a.mtx.Lock()
  405. defer a.mtx.Unlock()
  406. addrs := []*knownAddress{}
  407. for _, addr := range a.addrLookup {
  408. addrs = append(addrs, addr.copy())
  409. }
  410. return addrs
  411. }
  412. //------------------------------------------------
  413. // Size returns the number of addresses in the book.
  414. func (a *addrBook) Size() int {
  415. a.mtx.Lock()
  416. defer a.mtx.Unlock()
  417. return a.size()
  418. }
  419. func (a *addrBook) size() int {
  420. return a.nNew + a.nOld
  421. }
  422. //----------------------------------------------------------
  423. // Save persists the address book to disk.
  424. func (a *addrBook) Save() {
  425. a.saveToFile(a.filePath) // thread safe
  426. }
  427. func (a *addrBook) saveRoutine() {
  428. defer a.wg.Done()
  429. saveFileTicker := time.NewTicker(dumpAddressInterval)
  430. out:
  431. for {
  432. select {
  433. case <-saveFileTicker.C:
  434. a.saveToFile(a.filePath)
  435. case <-a.Quit():
  436. break out
  437. }
  438. }
  439. saveFileTicker.Stop()
  440. a.saveToFile(a.filePath)
  441. }
  442. //----------------------------------------------------------
  443. func (a *addrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
  444. switch bucketType {
  445. case bucketTypeNew:
  446. return a.bucketsNew[bucketIdx]
  447. case bucketTypeOld:
  448. return a.bucketsOld[bucketIdx]
  449. default:
  450. cmn.PanicSanity("Should not happen")
  451. return nil
  452. }
  453. }
  454. // Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full.
  455. // NOTE: currently it always returns true.
  456. func (a *addrBook) addToNewBucket(ka *knownAddress, bucketIdx int) {
  457. // Sanity check
  458. if ka.isOld() {
  459. a.Logger.Error("Failed Sanity Check! Cant add old address to new bucket", "ka", ka, "bucket", bucketIdx)
  460. return
  461. }
  462. addrStr := ka.Addr.String()
  463. bucket := a.getBucket(bucketTypeNew, bucketIdx)
  464. // Already exists?
  465. if _, ok := bucket[addrStr]; ok {
  466. return
  467. }
  468. // Enforce max addresses.
  469. if len(bucket) > newBucketSize {
  470. a.Logger.Info("new bucket is full, expiring new")
  471. a.expireNew(bucketIdx)
  472. }
  473. // Add to bucket.
  474. bucket[addrStr] = ka
  475. // increment nNew if the peer doesnt already exist in a bucket
  476. if ka.addBucketRef(bucketIdx) == 1 {
  477. a.nNew++
  478. }
  479. // Add it to addrLookup
  480. a.addrLookup[ka.ID()] = ka
  481. }
  482. // Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full.
  483. func (a *addrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
  484. // Sanity check
  485. if ka.isNew() {
  486. a.Logger.Error(fmt.Sprintf("Cannot add new address to old bucket: %v", ka))
  487. return false
  488. }
  489. if len(ka.Buckets) != 0 {
  490. a.Logger.Error(fmt.Sprintf("Cannot add already old address to another old bucket: %v", ka))
  491. return false
  492. }
  493. addrStr := ka.Addr.String()
  494. bucket := a.getBucket(bucketTypeOld, bucketIdx)
  495. // Already exists?
  496. if _, ok := bucket[addrStr]; ok {
  497. return true
  498. }
  499. // Enforce max addresses.
  500. if len(bucket) > oldBucketSize {
  501. return false
  502. }
  503. // Add to bucket.
  504. bucket[addrStr] = ka
  505. if ka.addBucketRef(bucketIdx) == 1 {
  506. a.nOld++
  507. }
  508. // Ensure in addrLookup
  509. a.addrLookup[ka.ID()] = ka
  510. return true
  511. }
  512. func (a *addrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) {
  513. if ka.BucketType != bucketType {
  514. a.Logger.Error(fmt.Sprintf("Bucket type mismatch: %v", ka))
  515. return
  516. }
  517. bucket := a.getBucket(bucketType, bucketIdx)
  518. delete(bucket, ka.Addr.String())
  519. if ka.removeBucketRef(bucketIdx) == 0 {
  520. if bucketType == bucketTypeNew {
  521. a.nNew--
  522. } else {
  523. a.nOld--
  524. }
  525. delete(a.addrLookup, ka.ID())
  526. }
  527. }
  528. func (a *addrBook) removeFromAllBuckets(ka *knownAddress) {
  529. for _, bucketIdx := range ka.Buckets {
  530. bucket := a.getBucket(ka.BucketType, bucketIdx)
  531. delete(bucket, ka.Addr.String())
  532. }
  533. ka.Buckets = nil
  534. if ka.BucketType == bucketTypeNew {
  535. a.nNew--
  536. } else {
  537. a.nOld--
  538. }
  539. delete(a.addrLookup, ka.ID())
  540. }
  541. //----------------------------------------------------------
  542. func (a *addrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
  543. bucket := a.getBucket(bucketType, bucketIdx)
  544. var oldest *knownAddress
  545. for _, ka := range bucket {
  546. if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
  547. oldest = ka
  548. }
  549. }
  550. return oldest
  551. }
  552. // adds the address to a "new" bucket. if its already in one,
  553. // it only adds it probabilistically
  554. func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
  555. if addr == nil || src == nil {
  556. return ErrAddrBookNilAddr{addr, src}
  557. }
  558. if a.routabilityStrict && !addr.Routable() {
  559. return ErrAddrBookNonRoutable{addr}
  560. }
  561. if !addr.Valid() {
  562. return ErrAddrBookInvalidAddr{addr}
  563. }
  564. if !addr.HasID() {
  565. return ErrAddrBookInvalidAddrNoID{addr}
  566. }
  567. // TODO: we should track ourAddrs by ID and by IP:PORT and refuse both.
  568. if _, ok := a.ourAddrs[addr.String()]; ok {
  569. return ErrAddrBookSelf{addr}
  570. }
  571. if _, ok := a.privateIDs[addr.ID]; ok {
  572. return ErrAddrBookPrivate{addr}
  573. }
  574. if _, ok := a.privateIDs[src.ID]; ok {
  575. return ErrAddrBookPrivateSrc{src}
  576. }
  577. ka := a.addrLookup[addr.ID]
  578. if ka != nil {
  579. // If its already old and the addr is the same, ignore it.
  580. if ka.isOld() && ka.Addr.Equals(addr) {
  581. return nil
  582. }
  583. // Already in max new buckets.
  584. if len(ka.Buckets) == maxNewBucketsPerAddress {
  585. return nil
  586. }
  587. // The more entries we have, the less likely we are to add more.
  588. factor := int32(2 * len(ka.Buckets))
  589. if a.rand.Int31n(factor) != 0 {
  590. return nil
  591. }
  592. } else {
  593. ka = newKnownAddress(addr, src)
  594. }
  595. bucket := a.calcNewBucket(addr, src)
  596. a.addToNewBucket(ka, bucket)
  597. return nil
  598. }
  599. // Make space in the new buckets by expiring the really bad entries.
  600. // If no bad entries are available we remove the oldest.
  601. func (a *addrBook) expireNew(bucketIdx int) {
  602. for addrStr, ka := range a.bucketsNew[bucketIdx] {
  603. // If an entry is bad, throw it away
  604. if ka.isBad() {
  605. a.Logger.Info(fmt.Sprintf("expiring bad address %v", addrStr))
  606. a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
  607. return
  608. }
  609. }
  610. // If we haven't thrown out a bad entry, throw out the oldest entry
  611. oldest := a.pickOldest(bucketTypeNew, bucketIdx)
  612. a.removeFromBucket(oldest, bucketTypeNew, bucketIdx)
  613. }
  614. // Promotes an address from new to old. If the destination bucket is full,
  615. // demote the oldest one to a "new" bucket.
  616. // TODO: Demote more probabilistically?
  617. func (a *addrBook) moveToOld(ka *knownAddress) {
  618. // Sanity check
  619. if ka.isOld() {
  620. a.Logger.Error(fmt.Sprintf("Cannot promote address that is already old %v", ka))
  621. return
  622. }
  623. if len(ka.Buckets) == 0 {
  624. a.Logger.Error(fmt.Sprintf("Cannot promote address that isn't in any new buckets %v", ka))
  625. return
  626. }
  627. // Remove from all (new) buckets.
  628. a.removeFromAllBuckets(ka)
  629. // It's officially old now.
  630. ka.BucketType = bucketTypeOld
  631. // Try to add it to its oldBucket destination.
  632. oldBucketIdx := a.calcOldBucket(ka.Addr)
  633. added := a.addToOldBucket(ka, oldBucketIdx)
  634. if !added {
  635. // No room; move the oldest to a new bucket
  636. oldest := a.pickOldest(bucketTypeOld, oldBucketIdx)
  637. a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx)
  638. newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src)
  639. a.addToNewBucket(oldest, newBucketIdx)
  640. // Finally, add our ka to old bucket again.
  641. added = a.addToOldBucket(ka, oldBucketIdx)
  642. if !added {
  643. a.Logger.Error(fmt.Sprintf("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx))
  644. }
  645. }
  646. }
  647. //---------------------------------------------------------------------
  648. // calculate bucket placements
  649. // doublesha256( key + sourcegroup +
  650. // int64(doublesha256(key + group + sourcegroup))%bucket_per_group ) % num_new_buckets
  651. func (a *addrBook) calcNewBucket(addr, src *p2p.NetAddress) int {
  652. data1 := []byte{}
  653. data1 = append(data1, []byte(a.key)...)
  654. data1 = append(data1, []byte(a.groupKey(addr))...)
  655. data1 = append(data1, []byte(a.groupKey(src))...)
  656. hash1 := doubleSha256(data1)
  657. hash64 := binary.BigEndian.Uint64(hash1)
  658. hash64 %= newBucketsPerGroup
  659. var hashbuf [8]byte
  660. binary.BigEndian.PutUint64(hashbuf[:], hash64)
  661. data2 := []byte{}
  662. data2 = append(data2, []byte(a.key)...)
  663. data2 = append(data2, a.groupKey(src)...)
  664. data2 = append(data2, hashbuf[:]...)
  665. hash2 := doubleSha256(data2)
  666. return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
  667. }
  668. // doublesha256( key + group +
  669. // int64(doublesha256(key + addr))%buckets_per_group ) % num_old_buckets
  670. func (a *addrBook) calcOldBucket(addr *p2p.NetAddress) int {
  671. data1 := []byte{}
  672. data1 = append(data1, []byte(a.key)...)
  673. data1 = append(data1, []byte(addr.String())...)
  674. hash1 := doubleSha256(data1)
  675. hash64 := binary.BigEndian.Uint64(hash1)
  676. hash64 %= oldBucketsPerGroup
  677. var hashbuf [8]byte
  678. binary.BigEndian.PutUint64(hashbuf[:], hash64)
  679. data2 := []byte{}
  680. data2 = append(data2, []byte(a.key)...)
  681. data2 = append(data2, a.groupKey(addr)...)
  682. data2 = append(data2, hashbuf[:]...)
  683. hash2 := doubleSha256(data2)
  684. return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
  685. }
  686. // Return a string representing the network group of this address.
  687. // This is the /16 for IPv4, the /32 (/36 for he.net) for IPv6, the string
  688. // "local" for a local address and the string "unroutable" for an unroutable
  689. // address.
  690. func (a *addrBook) groupKey(na *p2p.NetAddress) string {
  691. if a.routabilityStrict && na.Local() {
  692. return "local"
  693. }
  694. if a.routabilityStrict && !na.Routable() {
  695. return "unroutable"
  696. }
  697. if ipv4 := na.IP.To4(); ipv4 != nil {
  698. return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
  699. }
  700. if na.RFC6145() || na.RFC6052() {
  701. // last four bytes are the ip address
  702. ip := net.IP(na.IP[12:16])
  703. return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
  704. }
  705. if na.RFC3964() {
  706. ip := net.IP(na.IP[2:7])
  707. return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
  708. }
  709. if na.RFC4380() {
  710. // teredo tunnels have the last 4 bytes as the v4 address XOR
  711. // 0xff.
  712. ip := net.IP(make([]byte, 4))
  713. for i, byte := range na.IP[12:16] {
  714. ip[i] = byte ^ 0xff
  715. }
  716. return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
  717. }
  718. // OK, so now we know ourselves to be a IPv6 address.
  719. // bitcoind uses /32 for everything, except for Hurricane Electric's
  720. // (he.net) IP range, which it uses /36 for.
  721. bits := 32
  722. heNet := &net.IPNet{IP: net.ParseIP("2001:470::"),
  723. Mask: net.CIDRMask(32, 128)}
  724. if heNet.Contains(na.IP) {
  725. bits = 36
  726. }
  727. return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
  728. }
  729. // doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes.
  730. func doubleSha256(b []byte) []byte {
  731. hasher := sha256.New()
  732. hasher.Write(b) // nolint: errcheck, gas
  733. sum := hasher.Sum(nil)
  734. hasher.Reset()
  735. hasher.Write(sum) // nolint: errcheck, gas
  736. return hasher.Sum(nil)
  737. }