|
@ -1,43 +1,57 @@ |
|
|
package blocks |
|
|
package blocks |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
|
|
|
db_ "github.com/tendermint/tendermint/db" |
|
|
"github.com/tendermint/tendermint/p2p" |
|
|
"github.com/tendermint/tendermint/p2p" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
const ( |
|
|
const ( |
|
|
BlocksCh = "block" |
|
|
|
|
|
|
|
|
blocksCh = "block" |
|
|
|
|
|
|
|
|
msgTypeUnknown = Byte(0x00) |
|
|
msgTypeUnknown = Byte(0x00) |
|
|
msgTypeState = Byte(0x01) |
|
|
msgTypeState = Byte(0x01) |
|
|
msgTypeRequest = Byte(0x02) |
|
|
msgTypeRequest = Byte(0x02) |
|
|
msgTypeData = Byte(0x03) |
|
|
msgTypeData = Byte(0x03) |
|
|
|
|
|
|
|
|
dataTypeAll = byte(0x00) |
|
|
|
|
|
dataTypeValidation = byte(0x01) |
|
|
|
|
|
dataTypeTxs = byte(0x02) |
|
|
|
|
|
|
|
|
DataTypeFullBlock = byte(0x00) |
|
|
|
|
|
DataTypeValidation = byte(0x01) |
|
|
|
|
|
DataTypeTxs = byte(0x02) |
|
|
// dataTypeCheckpoint = byte(0x04)
|
|
|
// dataTypeCheckpoint = byte(0x04)
|
|
|
|
|
|
|
|
|
|
|
|
dbKeyState = "state" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
/* |
|
|
/* |
|
|
*/ |
|
|
*/ |
|
|
type BlockManager struct { |
|
|
type BlockManager struct { |
|
|
quit chan struct{} |
|
|
|
|
|
started uint32 |
|
|
|
|
|
stopped uint32 |
|
|
|
|
|
|
|
|
db *db_.LevelDB |
|
|
|
|
|
sw *p2p.Switch |
|
|
|
|
|
swEvents chan interface{} |
|
|
|
|
|
state blockManagerState |
|
|
|
|
|
peerStates map[string]*blockManagerState |
|
|
|
|
|
quit chan struct{} |
|
|
|
|
|
started uint32 |
|
|
|
|
|
stopped uint32 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func NewBlockManager() *BlockManager { |
|
|
|
|
|
|
|
|
func NewBlockManager(sw *p2p.Switch, db *db_.LevelDB) *BlockManager { |
|
|
|
|
|
swEvents := make(chan interface{}) |
|
|
|
|
|
sw.AddEventListener("BlockManager.swEvents", swEvents) |
|
|
bm := &BlockManager{ |
|
|
bm := &BlockManager{ |
|
|
sw: sw, |
|
|
|
|
|
quit: make(chan struct{}), |
|
|
|
|
|
|
|
|
db: db, |
|
|
|
|
|
sw: sw, |
|
|
|
|
|
swEvents: swEvents, |
|
|
|
|
|
peerStates: make(map[string]*blockManagerState), |
|
|
|
|
|
quit: make(chan struct{}), |
|
|
} |
|
|
} |
|
|
|
|
|
bm.loadState() |
|
|
return bm |
|
|
return bm |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (bm *BlockManager) Start() { |
|
|
func (bm *BlockManager) Start() { |
|
|
if atomic.CompareAndSwapUint32(&bm.started, 0, 1) { |
|
|
if atomic.CompareAndSwapUint32(&bm.started, 0, 1) { |
|
|
log.Info("Starting BlockManager") |
|
|
log.Info("Starting BlockManager") |
|
|
go bm.XXX() |
|
|
|
|
|
|
|
|
go bm.switchEventsHandler() |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -45,10 +59,90 @@ func (bm *BlockManager) Stop() { |
|
|
if atomic.CompareAndSwapUint32(&bm.stopped, 0, 1) { |
|
|
if atomic.CompareAndSwapUint32(&bm.stopped, 0, 1) { |
|
|
log.Info("Stopping BlockManager") |
|
|
log.Info("Stopping BlockManager") |
|
|
close(bm.quit) |
|
|
close(bm.quit) |
|
|
|
|
|
close(bm.swEvents) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (bm *BlockManager) StoreData(dataType byte, dataObj interface{}) { |
|
|
|
|
|
// Validate data if possible.
|
|
|
|
|
|
// If we have new data that extends our contiguous range, then announce it.
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (bm *BlockManager) LoadData(dataType byte) interface{} { |
|
|
|
|
|
// NOTE: who's calling?
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (bm *BlockManager) loadState() { |
|
|
|
|
|
// Load the state
|
|
|
|
|
|
stateBytes := bm.db.Get(dbKeyState) |
|
|
|
|
|
if stateBytes == nil { |
|
|
|
|
|
log.Info("New BlockManager with no state") |
|
|
|
|
|
} else { |
|
|
|
|
|
err := json.Unmarshal(stateBytes, &bm.state) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
panic("Could not unmarshal state bytes: %X", stateBytes) |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (bm *BlockManager) XXX() { |
|
|
|
|
|
|
|
|
func (bm *BlockManager) saveState() { |
|
|
|
|
|
stateBytes, err := json.Marshal(&bm.state) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
panic("Could not marshal state bytes") |
|
|
|
|
|
} |
|
|
|
|
|
bm.db.Set(dbKeyState, stateBytes) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (bm *BlockManager) switchEventsHandler() { |
|
|
|
|
|
for { |
|
|
|
|
|
swEvent, ok := <-bm.swEvents |
|
|
|
|
|
if !ok { |
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
|
|
|
switch swEvent.(type) { |
|
|
|
|
|
case p2p.SwitchEventNewPeer: |
|
|
|
|
|
event := swEvent.(p2p.SwitchEventNewPeer) |
|
|
|
|
|
// Create entry in .peerStates
|
|
|
|
|
|
// Share our state with event.Peer
|
|
|
|
|
|
event.Peer |
|
|
|
|
|
case p2p.SwitchEventDonePeer: |
|
|
|
|
|
// Remove entry from .peerStates
|
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
/* This is just to persist the block manager state in the db. */ |
|
|
|
|
|
type blockManagerState struct { |
|
|
|
|
|
LastHeaderHeight uint64 // Last contiguous header height
|
|
|
|
|
|
OtherHeaderHeights []uint64 |
|
|
|
|
|
LastValidationHeight uint64 // Last contiguous validation height
|
|
|
|
|
|
OtherValidationHeights []uint64 |
|
|
|
|
|
LastTxsHeight uint64 // Last contiguous txs height
|
|
|
|
|
|
OtherTxsHeights []uint64 |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
/* |
|
|
|
|
|
Each part of a block are stored separately in the db. |
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
|
|
func headerKey(height int) { |
|
|
|
|
|
return fmt.Sprintf("B%v", height) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func validationKey(height int) { |
|
|
|
|
|
return fmt.Sprintf("V%v", height) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func txsKey(height int) { |
|
|
|
|
|
return fmt.Sprintf("T%v", height) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func checkpointKey(height int) { |
|
|
|
|
|
return fmt.Sprintf("C%v", height) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
//-----------------------------------------------------------------------------
|
|
|