Browse Source

Fix blockpool bugs and clean up log messages.

pull/43/merge
Jae Kwon 10 years ago
parent
commit
7171823fc6
7 changed files with 51 additions and 48 deletions
  1. +8
    -13
      blockchain/pool.go
  2. +33
    -28
      blockchain/reactor.go
  3. +1
    -1
      consensus/reactor.go
  4. +1
    -1
      p2p/addrbook.go
  5. +1
    -0
      p2p/connection.go
  6. +6
    -4
      p2p/pex_reactor.go
  7. +1
    -1
      p2p/switch.go

+ 8
- 13
blockchain/pool.go View File

@ -10,14 +10,12 @@ import (
)
const (
maxOutstandingRequestsPerPeer = 10
inputsChannelCapacity = 100
maxTries = 3
requestIntervalMS = 500
requestBatchSize = 50
maxPendingRequests = 50
maxTotalRequests = 100
maxRequestsPerPeer = 20
maxTries = 3
inputsChannelCapacity = 200
requestIntervalMS = 500
maxPendingRequests = 200
maxTotalRequests = 300
maxRequestsPerPeer = 300
)
var (
@ -85,9 +83,7 @@ RUN_LOOP:
if atomic.LoadInt32(&pool.running) == 0 {
break RUN_LOOP
}
height, numPending, numTotal := pool.GetStatus()
log.Debug("BlockPool.run", "height", height, "numPending", numPending,
"numTotal", numTotal)
_, numPending, numTotal := pool.GetStatus()
if numPending >= maxPendingRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
@ -344,14 +340,13 @@ func requestRoutine(pool *BlockPool, height uint) {
}
peer = pool.pickIncrAvailablePeer(height)
if peer == nil {
log.Debug("No peers available", "height", height)
//log.Debug("No peers available", "height", height)
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_LOOP
}
break PICK_LOOP
}
log.Debug("Selected peer for request", "height", height, "peerId", peer.id)
pool.setPeerForRequest(height, peer.id)
for try := 0; try < maxTries; try++ {


+ 33
- 28
blockchain/reactor.go View File

@ -90,16 +90,15 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
&p2p.ChannelDescriptor{
Id: BlockchainChannel,
Priority: 5,
SendQueueCapacity: 20, // Queue 20 blocks to send to a peer.
SendQueueCapacity: 100,
},
}
}
// Implements Reactor
func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
log.Debug("BlockchainReactor AddPeer", "peer", peer)
// Send peer our state.
peer.Send(BlockchainChannel, PeerStatusMessage{bcR.store.Height()})
peer.Send(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
}
// Implements Reactor
@ -115,14 +114,15 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte)
log.Warn("Error decoding message", "error", err)
return
}
log.Debug("BlockchainReactor received message", "msg", msg_)
log.Info("Received message", "msg", msg_)
switch msg := msg_.(type) {
case BlockRequestMessage:
case bcBlockRequestMessage:
// Got a request for a block. Respond with block if we have it.
block := bcR.store.LoadBlock(msg.Height)
if block != nil {
msg := BlockResponseMessage{Block: block}
msg := bcBlockResponseMessage{Block: block}
queued := src.TrySend(BlockchainChannel, msg)
if !queued {
// queue is full, just ignore.
@ -130,10 +130,10 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte)
} else {
// TODO peer is asking for things we don't have.
}
case BlockResponseMessage:
case bcBlockResponseMessage:
// Got a block.
bcR.pool.AddBlock(msg.Block, src.Key)
case PeerStatusMessage:
case bcPeerStatusMessage:
// Got a peer status.
bcR.pool.SetPeerHeight(src.Key, msg.Height)
default:
@ -155,7 +155,7 @@ FOR_LOOP:
// We can't fulfill the request.
continue FOR_LOOP
}
msg := BlockRequestMessage{request.Height}
msg := bcBlockRequestMessage{request.Height}
queued := peer.TrySend(BlockchainChannel, msg)
if !queued {
// We couldn't queue the request.
@ -174,24 +174,29 @@ FOR_LOOP:
for i := 0; i < 10; i++ {
// See if there are any blocks to sync.
first, second := bcR.pool.PeekTwoBlocks()
//log.Debug("TrySync peeked", "first", first, "second", second)
if first == nil || second == nil {
// We need both to sync the first block.
break SYNC_LOOP
}
firstParts := first.MakePartSet().Header()
firstParts := first.MakePartSet()
firstPartsHeader := firstParts.Header()
// Finally, verify the first block using the second's validation.
err := bcR.state.BondedValidators.VerifyValidation(
first.Hash(), firstParts, first.Height, second.Validation)
first.Hash(), firstPartsHeader, first.Height, second.Validation)
if err != nil {
log.Debug("error in validation", "error", err)
bcR.pool.RedoRequest(first.Height)
break SYNC_LOOP
} else {
bcR.pool.PopRequest()
err := bcR.state.AppendBlock(first, firstParts)
err := bcR.state.AppendBlock(first, firstPartsHeader)
if err != nil {
// TODO This is bad, are we zombie?
panic(Fmt("Failed to process committed block: %v", err))
}
bcR.store.SaveBlock(first, firstParts, second.Validation)
bcR.state.Save()
lastValidatedBlock = first
}
}
@ -224,7 +229,7 @@ FOR_LOOP:
}
func (bcR *BlockchainReactor) BroadcastStatus() error {
bcR.sw.Broadcast(BlockchainChannel, PeerStatusMessage{bcR.store.Height()})
bcR.sw.Broadcast(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
return nil
}
@ -245,11 +250,11 @@ func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
r := bytes.NewReader(bz)
switch msgType {
case msgTypeBlockRequest:
msg = binary.ReadBinary(BlockRequestMessage{}, r, n, &err)
msg = binary.ReadBinary(bcBlockRequestMessage{}, r, n, &err)
case msgTypeBlockResponse:
msg = binary.ReadBinary(BlockResponseMessage{}, r, n, &err)
msg = binary.ReadBinary(bcBlockResponseMessage{}, r, n, &err)
case msgTypePeerStatus:
msg = binary.ReadBinary(PeerStatusMessage{}, r, n, &err)
msg = binary.ReadBinary(bcPeerStatusMessage{}, r, n, &err)
default:
msg = nil
}
@ -258,36 +263,36 @@ func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
//-------------------------------------
type BlockRequestMessage struct {
type bcBlockRequestMessage struct {
Height uint
}
func (m BlockRequestMessage) TypeByte() byte { return msgTypeBlockRequest }
func (m bcBlockRequestMessage) TypeByte() byte { return msgTypeBlockRequest }
func (m BlockRequestMessage) String() string {
return fmt.Sprintf("[BlockRequestMessage %v]", m.Height)
func (m bcBlockRequestMessage) String() string {
return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
}
//-------------------------------------
type BlockResponseMessage struct {
type bcBlockResponseMessage struct {
Block *types.Block
}
func (m BlockResponseMessage) TypeByte() byte { return msgTypeBlockResponse }
func (m bcBlockResponseMessage) TypeByte() byte { return msgTypeBlockResponse }
func (m BlockResponseMessage) String() string {
return fmt.Sprintf("[BlockResponseMessage %v]", m.Block.Height)
func (m bcBlockResponseMessage) String() string {
return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
}
//-------------------------------------
type PeerStatusMessage struct {
type bcPeerStatusMessage struct {
Height uint
}
func (m PeerStatusMessage) TypeByte() byte { return msgTypePeerStatus }
func (m bcPeerStatusMessage) TypeByte() byte { return msgTypePeerStatus }
func (m PeerStatusMessage) String() string {
return fmt.Sprintf("[PeerStatusMessage %v]", m.Height)
func (m bcPeerStatusMessage) String() string {
return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height)
}

+ 1
- 1
consensus/reactor.go View File

@ -69,7 +69,7 @@ func (conR *ConsensusReactor) Stop() {
}
func (conR *ConsensusReactor) IsRunning() bool {
return atomic.LoadUint32(&conR.running) == 0
return atomic.LoadUint32(&conR.running) == 1
}
// Implements Reactor


+ 1
- 1
p2p/addrbook.go View File

@ -381,7 +381,7 @@ out:
for {
select {
case <-dumpAddressTicker.C:
log.Debug("Saving book to file", "size", a.Size())
log.Debug("Saving AddrBook to file", "size", a.Size())
a.saveToFile(a.filePath)
case <-a.quit:
break out


+ 1
- 0
p2p/connection.go View File

@ -417,6 +417,7 @@ FOR_LOOP:
}
msgBytes := channel.recvMsgPacket(pkt)
if msgBytes != nil {
log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes)
c.onReceive(pkt.ChannelId, msgBytes)
}
default:


+ 6
- 4
p2p/pex_reactor.go View File

@ -176,10 +176,12 @@ func (pexR *PEXReactor) ensurePeers() {
alreadyDialing := pexR.sw.IsDialing(try)
alreadyConnected := pexR.sw.Peers().Has(try.String())
if alreadySelected || alreadyDialing || alreadyConnected {
log.Debug("Cannot dial address", "addr", try,
"alreadySelected", alreadySelected,
"alreadyDialing", alreadyDialing,
"alreadyConnected", alreadyConnected)
/*
log.Debug("Cannot dial address", "addr", try,
"alreadySelected", alreadySelected,
"alreadyDialing", alreadyDialing,
"alreadyConnected", alreadyConnected)
*/
continue
} else {
log.Debug("Will dial address", "addr", try)


+ 1
- 1
p2p/switch.go View File

@ -132,7 +132,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er
}
// Start the peer
go peer.start()
peer.start()
// Notify listeners.
sw.doAddPeer(peer)


Loading…
Cancel
Save