Browse Source

Merge pull request #412 from tendermint/feature/237-tx-indexing

tx indexing (Refs #237)
pull/439/merge
Ethan Buchman 8 years ago
committed by GitHub
parent
commit
07143a4353
38 changed files with 1166 additions and 323 deletions
  1. +2
    -4
      blockchain/pool_test.go
  2. +0
    -1
      blockchain/reactor.go
  3. +2
    -0
      config/tendermint/config.go
  4. +2
    -0
      config/tendermint_test/config.go
  5. +84
    -51
      consensus/replay.go
  6. +1
    -1
      consensus/replay_test.go
  7. +25
    -8
      consensus/state.go
  8. +4
    -4
      consensus/test_data/build.sh
  9. +1
    -1
      consensus/test_data/empty_block.cswal
  10. +6
    -6
      consensus/test_data/many_blocks.cswal
  11. +1
    -1
      consensus/test_data/small_block1.cswal
  12. +1
    -1
      consensus/test_data/small_block2.cswal
  13. +3
    -9
      consensus/wal.go
  14. +4
    -4
      glide.lock
  15. +36
    -14
      node/node.go
  16. +13
    -0
      rpc/client/httpclient.go
  17. +1
    -0
      rpc/client/interface.go
  18. +4
    -0
      rpc/client/localclient.go
  19. +2
    -2
      rpc/client/mock/abci.go
  20. +22
    -13
      rpc/client/rpc_test.go
  21. +10
    -5
      rpc/core/mempool.go
  22. +11
    -5
      rpc/core/pipe.go
  23. +31
    -115
      rpc/core/routes.go
  24. +43
    -0
      rpc/core/tx.go
  25. +29
    -0
      rpc/core/types/responses.go
  26. +38
    -0
      rpc/core/types/responses_test.go
  27. +87
    -0
      rpc/test/client_test.go
  28. +83
    -67
      state/execution.go
  29. +90
    -0
      state/execution_test.go
  30. +92
    -5
      state/state.go
  31. +31
    -0
      state/state_test.go
  32. +57
    -0
      state/txindex/indexer.go
  33. +56
    -0
      state/txindex/kv/kv.go
  34. +63
    -0
      state/txindex/kv/kv_test.go
  35. +21
    -0
      state/txindex/null/null.go
  36. +7
    -6
      types/events.go
  37. +81
    -0
      types/tx.go
  38. +122
    -0
      types/tx_test.go

+ 2
- 4
blockchain/pool_test.go View File

@ -35,6 +35,7 @@ func TestBasic(t *testing.T) {
requestsCh := make(chan BlockRequest, 100)
pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.Start()
defer pool.Stop()
// Introduce each peer.
go func() {
@ -76,8 +77,6 @@ func TestBasic(t *testing.T) {
}()
}
}
pool.Stop()
}
func TestTimeout(t *testing.T) {
@ -87,6 +86,7 @@ func TestTimeout(t *testing.T) {
requestsCh := make(chan BlockRequest, 100)
pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.Start()
defer pool.Stop()
for _, peer := range peers {
log.Info("Peer", "id", peer.id)
@ -131,6 +131,4 @@ func TestTimeout(t *testing.T) {
log.Info("TEST: Pulled new BlockRequest", "request", request)
}
}
pool.Stop()
}

+ 0
- 1
blockchain/reactor.go View File

@ -251,7 +251,6 @@ FOR_LOOP:
// TODO This is bad, are we zombie?
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
bcR.state.Save()
}
}
continue FOR_LOOP


+ 2
- 0
config/tendermint/config.go View File

@ -101,6 +101,8 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("mempool_broadcast", true)
mapConfig.SetDefault("mempool_wal_dir", rootDir+"/data/mempool.wal")
mapConfig.SetDefault("tx_index", "kv")
return mapConfig
}


+ 2
- 0
config/tendermint_test/config.go View File

@ -107,6 +107,8 @@ func ResetConfig(localPath string) cfg.Config {
mapConfig.SetDefault("mempool_broadcast", true)
mapConfig.SetDefault("mempool_wal_dir", "")
mapConfig.SetDefault("tx_index", "kv")
logger.SetLogLevel(mapConfig.GetString("log_level"))
return mapConfig


+ 84
- 51
consensus/replay.go View File

@ -100,27 +100,51 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error {
cs.replayMode = true
defer func() { cs.replayMode = false }()
// Ensure that height+1 doesn't exist
gr, found, err := cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight+1))
if found {
return errors.New(Fmt("WAL should not contain height %d.", csHeight+1))
}
// Ensure that ENDHEIGHT for this height doesn't exist
// NOTE: This is just a sanity check. As far as we know things work fine without it,
// and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT).
gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight))
if gr != nil {
gr.Close()
}
if found {
return errors.New(Fmt("WAL should not contain #ENDHEIGHT %d.", csHeight))
}
// Search for height marker
gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight))
// Search for last height marker
gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1))
if err == io.EOF {
log.Warn("Replay: wal.group.Search returned EOF", "height", csHeight)
return nil
log.Warn("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
// if we upgraded from 0.9 to 0.9.1, we may have #HEIGHT instead
// TODO (0.10.0): remove this
gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight))
if err == io.EOF {
log.Warn("Replay: wal.group.Search returned EOF", "#HEIGHT", csHeight)
return nil
} else if err != nil {
return err
}
} else if err != nil {
return err
} else {
defer gr.Close()
}
if !found {
return errors.New(Fmt("WAL does not contain height %d.", csHeight))
// if we upgraded from 0.9 to 0.9.1, we may have #HEIGHT instead
// TODO (0.10.0): remove this
gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight))
if err == io.EOF {
log.Warn("Replay: wal.group.Search returned EOF", "#HEIGHT", csHeight)
return nil
} else if err != nil {
return err
} else {
defer gr.Close()
}
// TODO (0.10.0): uncomment
// return errors.New(Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1))
}
defer gr.Close()
log.Notice("Catchup by replaying consensus messages", "height", csHeight)
@ -147,7 +171,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error {
//--------------------------------------------------------------------------------
// Parses marker lines of the form:
// #HEIGHT: 12345
// #ENDHEIGHT: 12345
func makeHeightSearchFunc(height int) auto.SearchFunc {
return func(line string) (int, error) {
line = strings.TrimRight(line, "\n")
@ -273,15 +297,18 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p
} else if appBlockHeight == stateBlockHeight {
// We haven't run Commit (both the state and app are one block behind),
// so run through consensus with the real app
// so replayBlock with the real app.
// NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT
log.Info("Replay last block using real app")
return h.replayLastBlock(proxyApp.Consensus())
return h.replayBlock(storeBlockHeight, proxyApp.Consensus())
} else if appBlockHeight == storeBlockHeight {
// We ran Commit, but didn't save the state, so run through consensus with mock app
mockApp := newMockProxyApp(appHash)
// We ran Commit, but didn't save the state, so replayBlock with mock app
abciResponses := h.state.LoadABCIResponses()
mockApp := newMockProxyApp(appHash, abciResponses)
log.Info("Replay last block using mock app")
return h.replayLastBlock(mockApp)
return h.replayBlock(storeBlockHeight, mockApp)
}
}
@ -290,24 +317,23 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p
return nil, nil
}
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) {
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, mutateState bool) ([]byte, error) {
// App is further behind than it should be, so we need to replay blocks.
// We replay all blocks from appBlockHeight+1.
// If useReplayFunc == true, stop short of the last block
// so it can be replayed using the WAL in ReplayBlocks.
// Note that we don't have an old version of the state,
// so we by-pass state validation using sm.ApplyBlock.
// so we by-pass state validation/mutation using sm.ExecCommitBlock.
// If mutateState == true, the final block is replayed with h.replayBlock()
var appHash []byte
var err error
finalBlock := storeBlockHeight
if useReplayFunc {
if mutateState {
finalBlock -= 1
}
for i := appBlockHeight + 1; i <= finalBlock; i++ {
log.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
appHash, err = sm.ApplyBlock(proxyApp.Consensus(), block)
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block)
if err != nil {
return nil, err
}
@ -315,44 +341,29 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store
h.nBlocks += 1
}
if useReplayFunc {
if mutateState {
// sync the final block
return h.ReplayBlocks(appHash, finalBlock, proxyApp)
return h.replayBlock(storeBlockHeight, proxyApp.Consensus())
}
return appHash, h.checkAppHash(appHash)
}
// Replay the last block through the consensus and return the AppHash from after Commit.
func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) {
// ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) {
mempool := types.MockMempool{}
cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool)
evsw := types.NewEventSwitch()
evsw.Start()
defer evsw.Stop()
cs.SetEventSwitch(evsw)
newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1)
var eventCache types.Fireable // nil
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
// run through the WAL, commit new block, stop
if _, err := cs.Start(); err != nil {
if err := h.state.ApplyBlock(eventCache, proxyApp, block, meta.BlockID.PartsHeader, mempool); err != nil {
return nil, err
}
defer cs.Stop()
timeout := h.config.GetInt("timeout_handshake")
timer := time.NewTimer(time.Duration(timeout) * time.Millisecond)
log.Notice("Attempting to replay last block", "height", h.store.Height(), "timeout", timeout)
select {
case <-newBlockCh:
case <-timer.C:
return nil, ErrReplayLastBlockTimeout
}
h.nBlocks += 1
return cs.state.AppHash, nil
return h.state.AppHash, nil
}
func (h *Handshaker) checkAppHash(appHash []byte) error {
@ -364,9 +375,14 @@ func (h *Handshaker) checkAppHash(appHash []byte) error {
}
//--------------------------------------------------------------------------------
func newMockProxyApp(appHash []byte) proxy.AppConnConsensus {
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash})
// mockProxyApp uses ABCIResponses to give the right results
// Useful because we don't want to call Commit() twice for the same block on the real app.
func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppConnConsensus {
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
appHash: appHash,
abciResponses: abciResponses,
})
cli, _ := clientCreator.NewABCIClient()
return proxy.NewAppConnConsensus(cli)
}
@ -374,7 +390,24 @@ func newMockProxyApp(appHash []byte) proxy.AppConnConsensus {
type mockProxyApp struct {
abci.BaseApplication
appHash []byte
appHash []byte
txCount int
abciResponses *sm.ABCIResponses
}
func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result {
r := mock.abciResponses.DeliverTx[mock.txCount]
mock.txCount += 1
return abci.Result{
r.Code,
r.Data,
r.Log,
}
}
func (mock *mockProxyApp) EndBlock(height uint64) abci.ResponseEndBlock {
mock.txCount = 0
return mock.abciResponses.EndBlock
}
func (mock *mockProxyApp) Commit() abci.Result {


+ 1
- 1
consensus/replay_test.go View File

@ -443,7 +443,7 @@ func buildTMStateFromChain(config cfg.Config, state *sm.State, chain []*types.Bl
func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
// Search for height marker
gr, found, err := wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(1))
gr, found, err := wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(0))
if err != nil {
return nil, nil, err
}


+ 25
- 8
consensus/state.go View File

@ -283,7 +283,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap
//----------------------------------------
// Public interface
// implements events.Eventable
// SetEventSwitch implements events.Eventable
func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) {
cs.evsw = evsw
}
@ -1216,13 +1216,26 @@ func (cs *ConsensusState) finalizeCommit(height int) {
fail.Fail() // XXX
// Finish writing to the WAL for this height.
// NOTE: If we fail before writing this, we'll never write it,
// and just recover by running ApplyBlock in the Handshake.
// If we moved it before persisting the block, we'd have to allow
// WAL replay for blocks with an #ENDHEIGHT
// As is, ConsensusState should not be started again
// until we successfully call ApplyBlock (ie. here or in Handshake after restart)
if cs.wal != nil {
cs.wal.writeEndHeight(height)
}
fail.Fail() // XXX
// Create a copy of the state for staging
// and an event cache for txs
stateCopy := cs.state.Copy()
eventCache := types.NewEventCache(cs.evsw)
// Execute and commit the block, and update the mempool.
// All calls to the proxyAppConn should come here.
// Execute and commit the block, update and save the state, and update the mempool.
// All calls to the proxyAppConn come here.
// NOTE: the block.AppHash wont reflect these txs until the next block
err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
if err != nil {
@ -1232,20 +1245,24 @@ func (cs *ConsensusState) finalizeCommit(height int) {
fail.Fail() // XXX
// Fire off event for new block.
// TODO: Handle app failure. See #177
// Fire event for new block.
// NOTE: If we fail before firing, these events will never fire
//
// TODO: Either
// * Fire before persisting state, in ApplyBlock
// * Fire on start up if we haven't written any new WAL msgs
// Both options mean we may fire more than once. Is that fine ?
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
eventCache.Flush()
// Save the state.
stateCopy.Save()
fail.Fail() // XXX
// NewHeightStep!
cs.updateToState(stateCopy)
fail.Fail() // XXX
// cs.StartTime is already set.
// Schedule Round0 to start soon.
cs.scheduleRound0(&cs.RoundState)


+ 4
- 4
consensus/test_data/build.sh View File

@ -27,7 +27,7 @@ killall tendermint
# /q would print up to and including the match, then quit.
# /Q doesn't include the match.
# http://unix.stackexchange.com/questions/11305/grep-show-all-the-file-up-to-the-match
sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal
sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal
reset
}
@ -41,7 +41,7 @@ sleep 7
killall tendermint
kill -9 $PID
sed '/HEIGHT: 7/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/many_blocks.cswal
sed '/ENDHEIGHT: 6/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/many_blocks.cswal
reset
}
@ -56,7 +56,7 @@ sleep 10
killall tendermint
kill -9 $PID
sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block1.cswal
sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block1.cswal
reset
}
@ -73,7 +73,7 @@ sleep 5
killall tendermint
kill -9 $PID
sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block2.cswal
sed '/ENDHEIGHT: 1/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block2.cswal
reset
}


+ 1
- 1
consensus/test_data/empty_block.cswal View File

@ -1,4 +1,4 @@
#HEIGHT: 1
#ENDHEIGHT: 0
{"time":"2016-12-18T05:05:33.502Z","msg":[3,{"duration":974084551,"height":1,"round":0,"step":1}]}
{"time":"2016-12-18T05:05:33.505Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
{"time":"2016-12-18T05:05:33.505Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"71D2DA2336A9F84C22A28FF6C67F35F3478FC0AF"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"62C0F2BCCB491399EEDAF8E85837ADDD4E25BAB7A84BFC4F0E88594531FBC6D4755DEC7E6427F04AD7EB8BB89502762AB4380C7BBA93A4C297E6180EC78E3504"]}}],"peer_key":""}]}


+ 6
- 6
consensus/test_data/many_blocks.cswal View File

@ -1,4 +1,4 @@
#HEIGHT: 1
#ENDHEIGHT: 0
{"time":"2017-02-17T23:54:19.013Z","msg":[3,{"duration":969121813,"height":1,"round":0,"step":1}]}
{"time":"2017-02-17T23:54:19.014Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
{"time":"2017-02-17T23:54:19.014Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"2E32C8D500E936D27A47FCE3FF4BE7C1AFB3FAE1"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"105A5A834E9AE2FA2191CAB5CB20D63594BA7859BD3EB92F055C8A35476D71F0D89F9FD5B0FF030D021533C71A81BF6E8F026BF4A37FC637CF38CA35291A9D00"]}}],"peer_key":""}]}
@ -8,7 +8,7 @@
{"time":"2017-02-17T23:54:19.016Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2017-02-17T23:54:19.016Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":1,"round":0,"type":2,"block_id":{"hash":"3F32EE37F9EA674A2173CAD651836A8EE628B5C7","parts":{"total":1,"hash":"2E32C8D500E936D27A47FCE3FF4BE7C1AFB3FAE1"}},"signature":[1,"2B1070A5AB9305612A3AE74A8036D82B5E49E0DBBFBC7D723DB985CC8A8E72A52FF8E34D85273FEB8B901945CA541FA5142C3C4D43A04E9205ACECF53FD19B01"]}}],"peer_key":""}]}
{"time":"2017-02-17T23:54:19.017Z","msg":[1,{"height":1,"round":0,"step":"RoundStepCommit"}]}
#HEIGHT: 2
#ENDHEIGHT: 1
{"time":"2017-02-17T23:54:19.019Z","msg":[1,{"height":2,"round":0,"step":"RoundStepNewHeight"}]}
{"time":"2017-02-17T23:54:20.017Z","msg":[3,{"duration":998073370,"height":2,"round":0,"step":1}]}
{"time":"2017-02-17T23:54:20.018Z","msg":[1,{"height":2,"round":0,"step":"RoundStepPropose"}]}
@ -19,7 +19,7 @@
{"time":"2017-02-17T23:54:20.021Z","msg":[1,{"height":2,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2017-02-17T23:54:20.021Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":2,"round":0,"type":2,"block_id":{"hash":"32310D174A99844713693C9815D2CA660364E028","parts":{"total":1,"hash":"D008E9014CDDEA8EC95E1E99E21333241BD52DFC"}},"signature":[1,"AA9F03D0707752301D7CBFCF4F0BCDBD666A46C1CAED3910BD64A3C5C2874AAF328172646C951C5E2FD962359C382A3CBBA2C73EC9B533668C6386995B83EC08"]}}],"peer_key":""}]}
{"time":"2017-02-17T23:54:20.022Z","msg":[1,{"height":2,"round":0,"step":"RoundStepCommit"}]}
#HEIGHT: 3
#ENDHEIGHT: 2
{"time":"2017-02-17T23:54:20.025Z","msg":[1,{"height":3,"round":0,"step":"RoundStepNewHeight"}]}
{"time":"2017-02-17T23:54:21.022Z","msg":[3,{"duration":997103974,"height":3,"round":0,"step":1}]}
{"time":"2017-02-17T23:54:21.024Z","msg":[1,{"height":3,"round":0,"step":"RoundStepPropose"}]}
@ -30,7 +30,7 @@
{"time":"2017-02-17T23:54:21.028Z","msg":[1,{"height":3,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2017-02-17T23:54:21.028Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":3,"round":0,"type":2,"block_id":{"hash":"37AF6866DA8C3167CFC280FAE47B6ED441B00D5B","parts":{"total":1,"hash":"2E5DE5777A5AD899CD2531304F42A470509DE989"}},"signature":[1,"C900519E305EC03392E7D197D5FAB535DB240C9C0BA5375A1679C75BAAA07C7410C0EF43CF97D98F2C08A1D739667D5ACFF6233A1FAE75D3DA275AEA422EFD0F"]}}],"peer_key":""}]}
{"time":"2017-02-17T23:54:21.028Z","msg":[1,{"height":3,"round":0,"step":"RoundStepCommit"}]}
#HEIGHT: 4
#ENDHEIGHT: 3
{"time":"2017-02-17T23:54:21.032Z","msg":[1,{"height":4,"round":0,"step":"RoundStepNewHeight"}]}
{"time":"2017-02-17T23:54:22.028Z","msg":[3,{"duration":996302067,"height":4,"round":0,"step":1}]}
{"time":"2017-02-17T23:54:22.030Z","msg":[1,{"height":4,"round":0,"step":"RoundStepPropose"}]}
@ -41,7 +41,7 @@
{"time":"2017-02-17T23:54:22.033Z","msg":[1,{"height":4,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2017-02-17T23:54:22.033Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":4,"round":0,"type":2,"block_id":{"hash":"04715E223BF4327FFA9B0D5AD849B74A099D5DEC","parts":{"total":1,"hash":"24CEBCBEB833F56D47AD14354071B3B7A243068A"}},"signature":[1,"F544743F17479A61F94B0F68C63D254BD60493D78E818D48A5859133619AEE5E92C47CAD89C654DF64E0911C3152091E047555D5F14655D95B9681AE9B336505"]}}],"peer_key":""}]}
{"time":"2017-02-17T23:54:22.034Z","msg":[1,{"height":4,"round":0,"step":"RoundStepCommit"}]}
#HEIGHT: 5
#ENDHEIGHT: 4
{"time":"2017-02-17T23:54:22.036Z","msg":[1,{"height":5,"round":0,"step":"RoundStepNewHeight"}]}
{"time":"2017-02-17T23:54:23.034Z","msg":[3,{"duration":997096276,"height":5,"round":0,"step":1}]}
{"time":"2017-02-17T23:54:23.035Z","msg":[1,{"height":5,"round":0,"step":"RoundStepPropose"}]}
@ -52,7 +52,7 @@
{"time":"2017-02-17T23:54:23.038Z","msg":[1,{"height":5,"round":0,"step":"RoundStepPrecommit"}]}
{"time":"2017-02-17T23:54:23.038Z","msg":[2,{"msg":[20,{"Vote":{"validator_address":"D028C9981F7A87F3093672BF0D5B0E2A1B3ED456","validator_index":0,"height":5,"round":0,"type":2,"block_id":{"hash":"FDC6D837995BEBBBFCBF3E7D7CF44F8FDA448543","parts":{"total":1,"hash":"A52BAA9C2E52E633A1605F4B930205613E3E7A2F"}},"signature":[1,"DF51D23D5D2C57598F67791D953A6C2D9FC5865A3048ADA4469B37500D2996B95732E0DC6F99EAEAEA12B4818CE355C7B701D16857D2AC767D740C2E30E9260C"]}}],"peer_key":""}]}
{"time":"2017-02-17T23:54:23.038Z","msg":[1,{"height":5,"round":0,"step":"RoundStepCommit"}]}
#HEIGHT: 6
#ENDHEIGHT: 5
{"time":"2017-02-17T23:54:23.041Z","msg":[1,{"height":6,"round":0,"step":"RoundStepNewHeight"}]}
{"time":"2017-02-17T23:54:24.038Z","msg":[3,{"duration":997341910,"height":6,"round":0,"step":1}]}
{"time":"2017-02-17T23:54:24.040Z","msg":[1,{"height":6,"round":0,"step":"RoundStepPropose"}]}


+ 1
- 1
consensus/test_data/small_block1.cswal View File

@ -1,4 +1,4 @@
#HEIGHT: 1
#ENDHEIGHT: 0
{"time":"2016-12-18T05:05:38.593Z","msg":[3,{"duration":970717663,"height":1,"round":0,"step":1}]}
{"time":"2016-12-18T05:05:38.595Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
{"time":"2016-12-18T05:05:38.595Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"A434EC796DF1CECC01296E953839C4675863A4E5"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"39563C3C7EDD9855B2971457A5DABF05CFDAF52805658847EB1F05115B8341344A77761CC85E670AF1B679DA9FC0905231957174699FE8326DBE7706209BDD0B"]}}],"peer_key":""}]}


+ 1
- 1
consensus/test_data/small_block2.cswal View File

@ -1,4 +1,4 @@
#HEIGHT: 1
#ENDHEIGHT: 0
{"time":"2016-12-18T05:05:43.641Z","msg":[3,{"duration":969409681,"height":1,"round":0,"step":1}]}
{"time":"2016-12-18T05:05:43.643Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
{"time":"2016-12-18T05:05:43.643Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":5,"hash":"C916905C3C444501DDDAA1BF52E959B7531E762E"},"pol_round":-1,"pol_block_id":{"hash":"","parts":{"total":0,"hash":""}},"signature":[1,"F1A8E9928889C68FD393F3983B5362AECA4A95AA13FE3C78569B2515EC046893CB718071CAF54F3F1507DCD851B37CD5557EA17BB5471D2DC6FB5AC5FBB72E02"]}}],"peer_key":""}]}


+ 3
- 9
consensus/wal.go View File

@ -59,7 +59,7 @@ func (wal *WAL) OnStart() error {
if err != nil {
return err
} else if size == 0 {
wal.writeHeight(1)
wal.writeEndHeight(0)
}
_, err = wal.group.Start()
return err
@ -83,12 +83,6 @@ func (wal *WAL) Save(wmsg WALMessage) {
}
}
}
// Write #HEIGHT: XYZ if new height
if edrs, ok := wmsg.(types.EventDataRoundState); ok {
if edrs.Step == RoundStepNewHeight.String() {
wal.writeHeight(edrs.Height)
}
}
// Write the wal message
var wmsgBytes = wire.JSONBytes(TimedWALMessage{time.Now(), wmsg})
err := wal.group.WriteLine(string(wmsgBytes))
@ -101,8 +95,8 @@ func (wal *WAL) Save(wmsg WALMessage) {
}
}
func (wal *WAL) writeHeight(height int) {
wal.group.WriteLine(Fmt("#HEIGHT: %v", height))
func (wal *WAL) writeEndHeight(height int) {
wal.group.WriteLine(Fmt("#ENDHEIGHT: %v", height))
// TODO: only flush when necessary
if err := wal.group.Flush(); err != nil {


+ 4
- 4
glide.lock View File

@ -1,5 +1,5 @@
hash: d9724aa287c40d1b3856b6565f09235d809c8b2f7c6537c04f597137c0d6cd26
updated: 2017-04-12T13:43:29.528413882-04:00
updated: 2017-04-14T17:17:31.933202871-04:00
imports:
- name: github.com/btcsuite/btcd
version: b8df516b4b267acf2de46be593a9d948d1d2c420
@ -85,7 +85,7 @@ imports:
- name: github.com/tendermint/go-clist
version: 3baa390bbaf7634251c42ad69a8682e7e3990552
- name: github.com/tendermint/go-common
version: 6af2364fa91ef2f3afc8ba0db33b66d9d3ae006c
version: 714fdaee3bb3f8670e721a75c5ddda8787b256dd
subpackages:
- test
- name: github.com/tendermint/go-config
@ -111,13 +111,13 @@ imports:
subpackages:
- upnp
- name: github.com/tendermint/go-rpc
version: c3295f4878019ff3fdfcac37a4c0e4bcf4bb02a7
version: 4671c44b2d124f7f6f6243dbfbf4ae2bf42ee809
subpackages:
- client
- server
- types
- name: github.com/tendermint/go-wire
version: ad797c70affa2c81fccc5edaed63ac25144397c6
version: 50889e2b4a9ba65b67be86a486f25853d514b937
- name: github.com/tendermint/log15
version: ae0f3d6450da9eac7074b439c8e1c3cabf0d5ce6
subpackages:


+ 36
- 14
node/node.go View File

@ -10,12 +10,12 @@ import (
abci "github.com/tendermint/abci/types"
cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
crypto "github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-p2p"
"github.com/tendermint/go-rpc"
"github.com/tendermint/go-rpc/server"
"github.com/tendermint/go-wire"
p2p "github.com/tendermint/go-p2p"
rpc "github.com/tendermint/go-rpc"
rpcserver "github.com/tendermint/go-rpc/server"
wire "github.com/tendermint/go-wire"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/consensus"
mempl "github.com/tendermint/tendermint/mempool"
@ -23,6 +23,9 @@ import (
rpccore "github.com/tendermint/tendermint/rpc/core"
grpccore "github.com/tendermint/tendermint/rpc/grpc"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
@ -51,6 +54,7 @@ type Node struct {
consensusReactor *consensus.ConsensusReactor // for participating in the consensus
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
}
func NewNodeDefault(config cfg.Config) *Node {
@ -84,6 +88,17 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
// reload the state (it may have been updated by the handshake)
state = sm.LoadState(stateDB)
// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.GetString("tx_index") {
case "kv":
store := dbm.NewDB("tx_index", config.GetString("db_backend"), config.GetString("db_dir"))
txIndexer = kv.NewTxIndex(store)
default:
txIndexer = &null.TxIndex{}
}
state.TxIndexer = txIndexer
// Generate node PrivKey
privKey := crypto.GenPrivKeyEd25519()
@ -188,6 +203,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
consensusState: consensusState,
consensusReactor: consensusReactor,
proxyApp: proxyApp,
txIndexer: txIndexer,
}
node.BaseService = *cmn.NewBaseService(log, "Node", node)
return node
@ -201,7 +217,7 @@ func (n *Node) OnStart() error {
n.sw.AddListener(l)
// Start the switch
n.sw.SetNodeInfo(makeNodeInfo(n.config, n.sw, n.privKey))
n.sw.SetNodeInfo(n.makeNodeInfo())
n.sw.SetNodePrivKey(n.privKey)
_, err := n.sw.Start()
if err != nil {
@ -278,6 +294,7 @@ func (n *Node) ConfigureRPC() {
rpccore.SetGenesisDoc(n.genesisDoc)
rpccore.SetAddrBook(n.addrBook)
rpccore.SetProxyAppQuery(n.proxyApp.Query())
rpccore.SetTxIndexer(n.txIndexer)
}
func (n *Node) startRPC() ([]net.Listener, error) {
@ -348,34 +365,39 @@ func (n *Node) ProxyApp() proxy.AppConns {
return n.proxyApp
}
func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo {
func (n *Node) makeNodeInfo() *p2p.NodeInfo {
txIndexerStatus := "on"
if _, ok := n.txIndexer.(*null.TxIndex); ok {
txIndexerStatus = "off"
}
nodeInfo := &p2p.NodeInfo{
PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
Moniker: config.GetString("moniker"),
Network: config.GetString("chain_id"),
PubKey: n.privKey.PubKey().(crypto.PubKeyEd25519),
Moniker: n.config.GetString("moniker"),
Network: n.config.GetString("chain_id"),
Version: version.Version,
Other: []string{
cmn.Fmt("wire_version=%v", wire.Version),
cmn.Fmt("p2p_version=%v", p2p.Version),
cmn.Fmt("consensus_version=%v", consensus.Version),
cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
cmn.Fmt("tx_index=%v", txIndexerStatus),
},
}
// include git hash in the nodeInfo if available
if rev, err := cmn.ReadFile(config.GetString("revision_file")); err == nil {
if rev, err := cmn.ReadFile(n.config.GetString("revision_file")); err == nil {
nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("revision=%v", string(rev)))
}
if !sw.IsListening() {
if !n.sw.IsListening() {
return nodeInfo
}
p2pListener := sw.Listeners()[0]
p2pListener := n.sw.Listeners()[0]
p2pHost := p2pListener.ExternalAddress().IP.String()
p2pPort := p2pListener.ExternalAddress().Port
rpcListenAddr := config.GetString("rpc_laddr")
rpcListenAddr := n.config.GetString("rpc_laddr")
// We assume that the rpcListener has the same ExternalAddress.
// This is probably true because both P2P and RPC listeners use UPnP,


+ 13
- 0
rpc/client/httpclient.go View File

@ -160,6 +160,19 @@ func (c *HTTP) Commit(height int) (*ctypes.ResultCommit, error) {
return (*tmResult).(*ctypes.ResultCommit), nil
}
func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
tmResult := new(ctypes.TMResult)
query := map[string]interface{}{
"hash": hash,
"prove": prove,
}
_, err := c.rpc.Call("tx", query, tmResult)
if err != nil {
return nil, errors.Wrap(err, "Tx")
}
return (*tmResult).(*ctypes.ResultTx), nil
}
func (c *HTTP) Validators() (*ctypes.ResultValidators, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("validators", map[string]interface{}{}, tmResult)


+ 1
- 0
rpc/client/interface.go View File

@ -44,6 +44,7 @@ type SignClient interface {
Block(height int) (*ctypes.ResultBlock, error)
Commit(height int) (*ctypes.ResultCommit, error)
Validators() (*ctypes.ResultValidators, error)
Tx(hash []byte, prove bool) (*ctypes.ResultTx, error)
}
// HistoryClient shows us data from genesis to now in large chunks.


+ 4
- 0
rpc/client/localclient.go View File

@ -103,3 +103,7 @@ func (c Local) Commit(height int) (*ctypes.ResultCommit, error) {
func (c Local) Validators() (*ctypes.ResultValidators, error) {
return core.Validators()
}
func (c Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
return core.Tx(hash, prove)
}

+ 2
- 2
rpc/client/mock/abci.go View File

@ -45,7 +45,7 @@ func (a ABCIApp) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error
if c.IsOK() {
go func() { a.App.DeliverTx(tx) }()
}
return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log}, nil
return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log, tx.Hash()}, nil
}
func (a ABCIApp) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
@ -54,7 +54,7 @@ func (a ABCIApp) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error)
if c.IsOK() {
go func() { a.App.DeliverTx(tx) }()
}
return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log}, nil
return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log, tx.Hash()}, nil
}
// ABCIMock will send all abci related request to the named app,


+ 22
- 13
rpc/client/rpc_test.go View File

@ -3,7 +3,6 @@ package client_test
import (
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -11,6 +10,7 @@ import (
merktest "github.com/tendermint/merkleeyes/testutil"
"github.com/tendermint/tendermint/rpc/client"
rpctest "github.com/tendermint/tendermint/rpc/test"
"github.com/tendermint/tendermint/types"
)
func getHTTPClient() *client.HTTP {
@ -117,49 +117,58 @@ func TestAppCalls(t *testing.T) {
// write something
k, v, tx := merktest.MakeTxKV()
_, err = c.BroadcastTxCommit(tx)
bres, err := c.BroadcastTxCommit(tx)
require.Nil(err, "%d: %+v", i, err)
require.True(bres.DeliverTx.GetCode().IsOK())
txh := bres.Height
apph := txh + 1 // this is where the tx will be applied to the state
// wait before querying
time.Sleep(time.Second * 1)
client.WaitForHeight(c, apph, nil)
qres, err := c.ABCIQuery("/key", k, false)
if assert.Nil(err) && assert.True(qres.Response.Code.IsOK()) {
data := qres.Response
// assert.Equal(k, data.GetKey()) // only returned for proofs
assert.Equal(v, data.GetValue())
}
// +/- 1 making my head hurt
h := int(qres.Response.Height) - 1
// make sure we can lookup the tx with proof
// ptx, err := c.Tx(bres.TxID, true)
ptx, err := c.Tx(bres.TxID, true)
require.Nil(err, "%d: %+v", i, err)
assert.Equal(txh, ptx.Height)
assert.Equal(types.Tx(tx), ptx.Tx)
// and we can even check the block is added
block, err := c.Block(h)
block, err := c.Block(apph)
require.Nil(err, "%d: %+v", i, err)
appHash := block.BlockMeta.Header.AppHash
assert.True(len(appHash) > 0)
assert.EqualValues(h, block.BlockMeta.Header.Height)
assert.EqualValues(apph, block.BlockMeta.Header.Height)
// check blockchain info, now that we know there is info
// TODO: is this commented somewhere that they are returned
// in order of descending height???
info, err := c.BlockchainInfo(h-2, h)
info, err := c.BlockchainInfo(apph, apph)
require.Nil(err, "%d: %+v", i, err)
assert.True(info.LastHeight > 2)
if assert.Equal(3, len(info.BlockMetas)) {
assert.True(info.LastHeight >= apph)
if assert.Equal(1, len(info.BlockMetas)) {
lastMeta := info.BlockMetas[0]
assert.EqualValues(h, lastMeta.Header.Height)
assert.EqualValues(apph, lastMeta.Header.Height)
bMeta := block.BlockMeta
assert.Equal(bMeta.Header.AppHash, lastMeta.Header.AppHash)
assert.Equal(bMeta.BlockID, lastMeta.BlockID)
}
// and get the corresponding commit with the same apphash
commit, err := c.Commit(h)
commit, err := c.Commit(apph)
require.Nil(err, "%d: %+v", i, err)
cappHash := commit.Header.AppHash
assert.Equal(appHash, cappHash)
assert.NotNil(commit.Commit)
// compare the commits (note Commit(2) has commit from Block(3))
commit2, err := c.Commit(h - 1)
commit2, err := c.Commit(apph - 1)
require.Nil(err, "%d: %+v", i, err)
assert.Equal(block.Block.LastCommit, commit2.Commit)


+ 10
- 5
rpc/core/mempool.go View File

@ -4,9 +4,9 @@ import (
"fmt"
"time"
abci "github.com/tendermint/abci/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
abci "github.com/tendermint/abci/types"
)
//-----------------------------------------------------------------------------
@ -18,7 +18,7 @@ func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
if err != nil {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
return &ctypes.ResultBroadcastTx{}, nil
return &ctypes.ResultBroadcastTx{TxID: tx.Hash()}, nil
}
// Returns with the response from CheckTx
@ -36,6 +36,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
Code: r.Code,
Data: r.Data,
Log: r.Log,
TxID: tx.Hash(),
}, nil
}
@ -65,8 +66,9 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
if checkTxR.Code != abci.CodeType_OK {
// CheckTx failed!
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
CheckTx: checkTxR,
DeliverTx: nil,
TxID: tx.Hash(),
}, nil
}
@ -84,14 +86,17 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
}
log.Notice("DeliverTx passed ", "tx", []byte(tx), "response", deliverTxR)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
CheckTx: checkTxR,
DeliverTx: deliverTxR,
TxID: tx.Hash(),
Height: deliverTxRes.Height,
}, nil
case <-timer.C:
log.Error("failed to include tx")
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
CheckTx: checkTxR,
DeliverTx: nil,
TxID: tx.Hash(),
}, fmt.Errorf("Timed out waiting for transaction to be included in a block")
}


+ 11
- 5
rpc/core/pipe.go View File

@ -2,11 +2,12 @@ package core
import (
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-p2p"
crypto "github.com/tendermint/go-crypto"
p2p "github.com/tendermint/go-p2p"
"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
@ -42,9 +43,10 @@ var (
p2pSwitch P2P
// objects
pubKey crypto.PubKey
genDoc *types.GenesisDoc // cache the genesis structure
addrBook *p2p.AddrBook
pubKey crypto.PubKey
genDoc *types.GenesisDoc // cache the genesis structure
addrBook *p2p.AddrBook
txIndexer txindex.TxIndexer
)
func SetConfig(c cfg.Config) {
@ -86,3 +88,7 @@ func SetAddrBook(book *p2p.AddrBook) {
func SetProxyAppQuery(appConn proxy.AppConnQuery) {
proxyAppQuery = appConn
}
func SetTxIndexer(indexer txindex.TxIndexer) {
txIndexer = indexer
}

+ 31
- 115
rpc/core/routes.go View File

@ -19,6 +19,7 @@ var Routes = map[string]*rpc.RPCFunc{
"genesis": rpc.NewRPCFunc(GenesisResult, ""),
"block": rpc.NewRPCFunc(BlockResult, "height"),
"commit": rpc.NewRPCFunc(CommitResult, "height"),
"tx": rpc.NewRPCFunc(TxResult, "hash,prove"),
"validators": rpc.NewRPCFunc(ValidatorsResult, ""),
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""),
@ -45,185 +46,100 @@ var Routes = map[string]*rpc.RPCFunc{
}
func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) {
if r, err := Subscribe(wsCtx, event); err != nil {
return nil, err
} else {
return r, nil
}
return Subscribe(wsCtx, event)
}
func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) {
if r, err := Unsubscribe(wsCtx, event); err != nil {
return nil, err
} else {
return r, nil
}
return Unsubscribe(wsCtx, event)
}
func StatusResult() (ctypes.TMResult, error) {
if r, err := Status(); err != nil {
return nil, err
} else {
return r, nil
}
return Status()
}
func NetInfoResult() (ctypes.TMResult, error) {
if r, err := NetInfo(); err != nil {
return nil, err
} else {
return r, nil
}
return NetInfo()
}
func UnsafeDialSeedsResult(seeds []string) (ctypes.TMResult, error) {
if r, err := UnsafeDialSeeds(seeds); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeDialSeeds(seeds)
}
func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) {
if r, err := BlockchainInfo(min, max); err != nil {
return nil, err
} else {
return r, nil
}
return BlockchainInfo(min, max)
}
func GenesisResult() (ctypes.TMResult, error) {
if r, err := Genesis(); err != nil {
return nil, err
} else {
return r, nil
}
return Genesis()
}
func BlockResult(height int) (ctypes.TMResult, error) {
if r, err := Block(height); err != nil {
return nil, err
} else {
return r, nil
}
return Block(height)
}
func CommitResult(height int) (ctypes.TMResult, error) {
if r, err := Commit(height); err != nil {
return nil, err
} else {
return r, nil
}
return Commit(height)
}
func ValidatorsResult() (ctypes.TMResult, error) {
if r, err := Validators(); err != nil {
return nil, err
} else {
return r, nil
}
return Validators()
}
func DumpConsensusStateResult() (ctypes.TMResult, error) {
if r, err := DumpConsensusState(); err != nil {
return nil, err
} else {
return r, nil
}
return DumpConsensusState()
}
func UnconfirmedTxsResult() (ctypes.TMResult, error) {
if r, err := UnconfirmedTxs(); err != nil {
return nil, err
} else {
return r, nil
}
return UnconfirmedTxs()
}
func NumUnconfirmedTxsResult() (ctypes.TMResult, error) {
if r, err := NumUnconfirmedTxs(); err != nil {
return nil, err
} else {
return r, nil
}
return NumUnconfirmedTxs()
}
// Tx allow user to query the transaction results. `nil` could mean the
// transaction is in the mempool, invalidated, or was not send in the first
// place.
func TxResult(hash []byte, prove bool) (ctypes.TMResult, error) {
return Tx(hash, prove)
}
func BroadcastTxCommitResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTxCommit(tx); err != nil {
return nil, err
} else {
return r, nil
}
return BroadcastTxCommit(tx)
}
func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTxSync(tx); err != nil {
return nil, err
} else {
return r, nil
}
return BroadcastTxSync(tx)
}
func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTxAsync(tx); err != nil {
return nil, err
} else {
return r, nil
}
return BroadcastTxAsync(tx)
}
func ABCIQueryResult(path string, data []byte, prove bool) (ctypes.TMResult, error) {
if r, err := ABCIQuery(path, data, prove); err != nil {
return nil, err
} else {
return r, nil
}
return ABCIQuery(path, data, prove)
}
func ABCIInfoResult() (ctypes.TMResult, error) {
if r, err := ABCIInfo(); err != nil {
return nil, err
} else {
return r, nil
}
return ABCIInfo()
}
func UnsafeFlushMempoolResult() (ctypes.TMResult, error) {
if r, err := UnsafeFlushMempool(); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeFlushMempool()
}
func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) {
if r, err := UnsafeSetConfig(typ, key, value); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeSetConfig(typ, key, value)
}
func UnsafeStartCPUProfilerResult(filename string) (ctypes.TMResult, error) {
if r, err := UnsafeStartCPUProfiler(filename); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeStartCPUProfiler(filename)
}
func UnsafeStopCPUProfilerResult() (ctypes.TMResult, error) {
if r, err := UnsafeStopCPUProfiler(); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeStopCPUProfiler()
}
func UnsafeWriteHeapProfileResult(filename string) (ctypes.TMResult, error) {
if r, err := UnsafeWriteHeapProfile(filename); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeWriteHeapProfile(filename)
}

+ 43
- 0
rpc/core/tx.go View File

@ -0,0 +1,43 @@
package core
import (
"fmt"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types"
)
func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
// if index is disabled, return error
if _, ok := txIndexer.(*null.TxIndex); ok {
return nil, fmt.Errorf("Transaction indexing is disabled.")
}
r, err := txIndexer.Get(hash)
if err != nil {
return nil, err
}
if r == nil {
return nil, fmt.Errorf("Tx (%X) not found", hash)
}
height := int(r.Height) // XXX
index := int(r.Index)
var proof types.TxProof
if prove {
block := blockStore.LoadBlock(height)
proof = block.Data.Txs.Proof(index)
}
return &ctypes.ResultTx{
Height: height,
Index: index,
TxResult: r.Result,
Tx: r.Tx,
Proof: proof,
}, nil
}

+ 29
- 0
rpc/core/types/responses.go View File

@ -1,6 +1,8 @@
package core_types
import (
"strings"
abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-p2p"
@ -38,6 +40,19 @@ type ResultStatus struct {
LatestBlockTime int64 `json:"latest_block_time"` // nano
}
func (s *ResultStatus) TxIndexEnabled() bool {
if s == nil || s.NodeInfo == nil {
return false
}
for _, s := range s.NodeInfo.Other {
info := strings.Split(s, "=")
if len(info) == 2 && info[0] == "tx_index" {
return info[1] == "on"
}
}
return false
}
type ResultNetInfo struct {
Listening bool `json:"listening"`
Listeners []string `json:"listeners"`
@ -68,11 +83,23 @@ type ResultBroadcastTx struct {
Code abci.CodeType `json:"code"`
Data []byte `json:"data"`
Log string `json:"log"`
TxID []byte `json:"tx_id"`
}
type ResultBroadcastTxCommit struct {
CheckTx *abci.ResponseCheckTx `json:"check_tx"`
DeliverTx *abci.ResponseDeliverTx `json:"deliver_tx"`
TxID []byte `json:"tx_id"`
Height int `json:"height"`
}
type ResultTx struct {
Height int `json:"height"`
Index int `json:"index"`
TxResult abci.ResponseDeliverTx `json:"tx_result"`
Tx types.Tx `json:"tx"`
Proof types.TxProof `json:"proof,omitempty"`
}
type ResultUnconfirmedTxs struct {
@ -128,6 +155,7 @@ const (
ResultTypeBroadcastTx = byte(0x60)
ResultTypeUnconfirmedTxs = byte(0x61)
ResultTypeBroadcastTxCommit = byte(0x62)
ResultTypeTx = byte(0x63)
// 0x7 bytes are for querying the application
ResultTypeABCIQuery = byte(0x70)
@ -164,6 +192,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState},
wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx},
wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit},
wire.ConcreteType{&ResultTx{}, ResultTypeTx},
wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs},
wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe},
wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe},


+ 38
- 0
rpc/core/types/responses_test.go View File

@ -0,0 +1,38 @@
package core_types
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tendermint/go-p2p"
)
func TestStatusIndexer(t *testing.T) {
assert := assert.New(t)
var status *ResultStatus
assert.False(status.TxIndexEnabled())
status = &ResultStatus{}
assert.False(status.TxIndexEnabled())
status.NodeInfo = &p2p.NodeInfo{}
assert.False(status.TxIndexEnabled())
cases := []struct {
expected bool
other []string
}{
{false, nil},
{false, []string{}},
{false, []string{"a=b"}},
{false, []string{"tx_indexiskv", "some=dood"}},
{true, []string{"tx_index=on", "tx_index=other"}},
{true, []string{"^(*^(", "tx_index=on", "a=n=b=d="}},
}
for _, tc := range cases {
status.NodeInfo.Other = tc.other
assert.Equal(tc.expected, status.TxIndexEnabled())
}
}

+ 87
- 0
rpc/test/client_test.go View File

@ -13,7 +13,9 @@ import (
abci "github.com/tendermint/abci/types"
. "github.com/tendermint/go-common"
rpc "github.com/tendermint/go-rpc/client"
"github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types"
)
@ -149,6 +151,91 @@ func testBroadcastTxCommit(t *testing.T, client rpc.HTTPClient) {
// TODO: find tx in block
}
//--------------------------------------------------------------------------------
// query tx
func TestURITx(t *testing.T) {
testTx(t, GetURIClient(), true)
core.SetTxIndexer(&null.TxIndex{})
testTx(t, GetJSONClient(), false)
core.SetTxIndexer(node.ConsensusState().GetState().TxIndexer)
}
func TestJSONTx(t *testing.T) {
testTx(t, GetJSONClient(), true)
core.SetTxIndexer(&null.TxIndex{})
testTx(t, GetJSONClient(), false)
core.SetTxIndexer(node.ConsensusState().GetState().TxIndexer)
}
func testTx(t *testing.T, client rpc.HTTPClient, withIndexer bool) {
assert, require := assert.New(t), require.New(t)
// first we broadcast a tx
tmResult := new(ctypes.TMResult)
txBytes := randBytes(t)
tx := types.Tx(txBytes)
_, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": txBytes}, tmResult)
require.Nil(err)
res := (*tmResult).(*ctypes.ResultBroadcastTxCommit)
checkTx := res.CheckTx
require.Equal(abci.CodeType_OK, checkTx.Code)
deliverTx := res.DeliverTx
require.Equal(abci.CodeType_OK, deliverTx.Code)
mem := node.MempoolReactor().Mempool
require.Equal(0, mem.Size())
txHash := tx.Hash()
txHash2 := types.Tx("a different tx").Hash()
cases := []struct {
valid bool
hash []byte
prove bool
}{
// only valid if correct hash provided
{true, txHash, false},
{true, txHash, true},
{false, txHash2, false},
{false, txHash2, true},
{false, nil, false},
{false, nil, true},
}
for i, tc := range cases {
idx := fmt.Sprintf("%d", i)
// now we query for the tx.
// since there's only one tx, we know index=0.
tmResult = new(ctypes.TMResult)
query := map[string]interface{}{
"hash": tc.hash,
"prove": tc.prove,
}
_, err = client.Call("tx", query, tmResult)
valid := (withIndexer && tc.valid)
if !valid {
require.NotNil(err, idx)
} else {
require.Nil(err, idx)
res2 := (*tmResult).(*ctypes.ResultTx)
assert.Equal(tx, res2.Tx, idx)
assert.Equal(res.Height, res2.Height, idx)
assert.Equal(0, res2.Index, idx)
assert.Equal(abci.CodeType_OK, res2.TxResult.Code, idx)
// time to verify the proof
proof := res2.Proof
if tc.prove && assert.Equal(tx, proof.Data, idx) {
assert.True(proof.Proof.Verify(proof.Index, proof.Total, tx.Hash(), proof.RootHash), idx)
}
}
}
}
//--------------------------------------------------------------------------------
// Test the websocket service


+ 83
- 67
state/execution.go View File

@ -2,68 +2,49 @@ package state
import (
"errors"
"fmt"
"github.com/ebuchman/fail-test"
fail "github.com/ebuchman/fail-test"
abci "github.com/tendermint/abci/types"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-crypto"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
//--------------------------------------------------
// Execute the block
// Execute the block to mutate State.
// Validates block and then executes Data.Txs in the block.
func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error {
// ValExecBlock executes the block, but does NOT mutate State.
// + validates the block
// + executes block.Txs on the proxyAppConn
func (s *State) ValExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
// Validate the block.
if err := s.validateBlock(block); err != nil {
return ErrInvalidBlock(err)
return nil, ErrInvalidBlock(err)
}
// compute bitarray of validators that signed
signed := commitBitArrayFromBlock(block)
_ = signed // TODO send on begin block
// copy the valset
valSet := s.Validators.Copy()
nextValSet := valSet.Copy()
// Execute the block txs
changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block)
abciResponses, err := execBlockOnProxyApp(eventCache, proxyAppConn, block)
if err != nil {
// There was some error in proxyApp
// TODO Report error and wait for proxyApp to be available.
return ErrProxyAppConn(err)
return nil, ErrProxyAppConn(err)
}
// update the validator set
err = updateValidators(nextValSet, changedValidators)
if err != nil {
log.Warn("Error changing validator set", "error", err)
// TODO: err or carry on?
}
// All good!
// Update validator accums and set state variables
nextValSet.IncrementAccum(1)
s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet)
fail.Fail() // XXX
return nil
return abciResponses, nil
}
// Executes block's transactions on proxyAppConn.
// Returns a list of updates to the validator set
// Returns a list of transaction results and updates to the validator set
// TODO: Generate a bitmap or otherwise store tx validity in state.
func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*abci.Validator, error) {
func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
var validTxs, invalidTxs = 0, 0
txIndex := 0
abciResponses := NewABCIResponses(block)
// Execute transactions and get hash
proxyCb := func(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
@ -73,22 +54,27 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
// Blocks may include invalid txs.
// reqDeliverTx := req.(abci.RequestDeliverTx)
txError := ""
apTx := r.DeliverTx
if apTx.Code == abci.CodeType_OK {
validTxs += 1
txResult := r.DeliverTx
if txResult.Code == abci.CodeType_OK {
validTxs++
} else {
log.Debug("Invalid tx", "code", r.DeliverTx.Code, "log", r.DeliverTx.Log)
invalidTxs += 1
txError = apTx.Code.String()
log.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log)
invalidTxs++
txError = txResult.Code.String()
}
abciResponses.DeliverTx[txIndex] = txResult
txIndex++
// NOTE: if we count we can access the tx from the block instead of
// pulling it from the req
event := types.EventDataTx{
Tx: req.GetDeliverTx().Tx,
Data: apTx.Data,
Code: apTx.Code,
Log: apTx.Log,
Error: txError,
Height: block.Height,
Tx: types.Tx(req.GetDeliverTx().Tx),
Data: txResult.Data,
Code: txResult.Code,
Log: txResult.Log,
Error: txError,
}
types.FireEventTx(eventCache, event)
}
@ -102,33 +88,29 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
return nil, err
}
fail.Fail() // XXX
// Run txs of block
for _, tx := range block.Txs {
fail.FailRand(len(block.Txs)) // XXX
proxyAppConn.DeliverTxAsync(tx)
if err := proxyAppConn.Error(); err != nil {
return nil, err
}
}
fail.Fail() // XXX
// End block
respEndBlock, err := proxyAppConn.EndBlockSync(uint64(block.Height))
abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(uint64(block.Height))
if err != nil {
log.Warn("Error in proxyAppConn.EndBlock", "error", err)
return nil, err
}
fail.Fail() // XXX
valDiff := abciResponses.EndBlock.Diffs
log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs)
if len(respEndBlock.Diffs) > 0 {
log.Info("Update to validator set", "updates", abci.ValidatorsString(respEndBlock.Diffs))
if len(valDiff) > 0 {
log.Info("Update to validator set", "updates", abci.ValidatorsString(valDiff))
}
return respEndBlock.Diffs, nil
return abciResponses, nil
}
func updateValidators(validators *types.ValidatorSet, changedValidators []*abci.Validator) error {
@ -219,25 +201,43 @@ func (s *State) validateBlock(block *types.Block) error {
}
//-----------------------------------------------------------------------------
// ApplyBlock executes the block, then commits and updates the mempool atomically
// ApplyBlock validates & executes the block, updates state w/ ABCI responses,
// then commits and updates the mempool atomically, then saves state.
// Transaction results are optionally indexed.
// Execute and commit block against app, save block and state
// Validate, execute, and commit block against app, save block and state
func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus,
block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error {
// Run the block on the State:
// + update validator sets
// + run txs on the proxyAppConn
err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader)
abciResponses, err := s.ValExecBlock(eventCache, proxyAppConn, block)
if err != nil {
return errors.New(Fmt("Exec failed for application: %v", err))
return fmt.Errorf("Exec failed for application: %v", err)
}
fail.Fail() // XXX
// index txs. This could run in the background
s.indexTxs(abciResponses)
// save the results before we commit
s.SaveABCIResponses(abciResponses)
fail.Fail() // XXX
// now update the block and validators
s.SetBlockAndValidators(block.Header, partsHeader, abciResponses)
// lock mempool, commit state, update mempoool
err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
if err != nil {
return errors.New(Fmt("Commit failed for application: %v", err))
return fmt.Errorf("Commit failed for application: %v", err)
}
fail.Fail() // XXX
// save the state
s.Save()
return nil
}
@ -268,9 +268,25 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
return nil
}
// Apply and commit a block, but without all the state validation.
func (s *State) indexTxs(abciResponses *ABCIResponses) {
// save the tx results using the TxIndexer
// NOTE: these may be overwriting, but the values should be the same.
batch := txindex.NewBatch(len(abciResponses.DeliverTx))
for i, d := range abciResponses.DeliverTx {
tx := abciResponses.txs[i]
batch.Add(types.TxResult{
Height: uint64(abciResponses.Height),
Index: uint32(i),
Tx: tx,
Result: *d,
})
}
s.TxIndexer.AddBatch(batch)
}
// Exec and commit a block on the proxyApp without validating or mutating the state
// Returns the application root hash (result of abci.Commit)
func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) {
func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) {
var eventCache types.Fireable // nil
_, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
if err != nil {


+ 90
- 0
state/execution_test.go View File

@ -0,0 +1,90 @@
package state
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/abci/example/dummy"
crypto "github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db"
cfg "github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
var (
privKey = crypto.GenPrivKeyEd25519FromSecret([]byte("execution_test"))
chainID = "execution_chain"
testPartSize = 65536
nTxsPerBlock = 10
)
func TestApplyBlock(t *testing.T) {
cc := proxy.NewLocalClientCreator(dummy.NewDummyApplication())
config := cfg.ResetConfig("execution_test_")
proxyApp := proxy.NewAppConns(config, cc, nil)
_, err := proxyApp.Start()
require.Nil(t, err)
defer proxyApp.Stop()
mempool := mempool.NewMempool(config, proxyApp.Mempool())
state := state()
indexer := &dummyIndexer{0}
state.TxIndexer = indexer
// make block
block := makeBlock(1, state)
err = state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool)
require.Nil(t, err)
assert.Equal(t, nTxsPerBlock, indexer.Indexed) // test indexing works
// TODO check state and mempool
}
//----------------------------------------------------------------------------
// make some bogus txs
func makeTxs(blockNum int) (txs []types.Tx) {
for i := 0; i < nTxsPerBlock; i++ {
txs = append(txs, types.Tx([]byte{byte(blockNum), byte(i)}))
}
return txs
}
func state() *State {
return MakeGenesisState(dbm.NewMemDB(), &types.GenesisDoc{
ChainID: chainID,
Validators: []types.GenesisValidator{
types.GenesisValidator{privKey.PubKey(), 10000, "test"},
},
AppHash: nil,
})
}
func makeBlock(num int, state *State) *types.Block {
prevHash := state.LastBlockID.Hash
prevParts := types.PartSetHeader{}
valHash := state.Validators.Hash()
prevBlockID := types.BlockID{prevHash, prevParts}
block, _ := types.MakeBlock(num, chainID, makeTxs(num), new(types.Commit),
prevBlockID, valHash, state.AppHash, testPartSize)
return block
}
// dummyIndexer increments counter every time we index transaction.
type dummyIndexer struct {
Indexed int
}
func (indexer *dummyIndexer) Get(hash []byte) (*types.TxResult, error) {
return nil, nil
}
func (indexer *dummyIndexer) AddBatch(batch *txindex.Batch) error {
indexer.Indexed += batch.Size()
return nil
}

+ 92
- 5
state/state.go View File

@ -6,15 +6,19 @@ import (
"sync"
"time"
abci "github.com/tendermint/abci/types"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types"
)
var (
stateKey = []byte("stateKey")
stateKey = []byte("stateKey")
abciResponsesKey = []byte("abciResponsesKey")
)
//-----------------------------------------------------------------------------
@ -29,7 +33,7 @@ type State struct {
GenesisDoc *types.GenesisDoc
ChainID string
// updated at end of ExecBlock
// updated at end of SetBlockAndValidators
LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist.
LastBlockID types.BlockID
LastBlockTime time.Time
@ -38,6 +42,12 @@ type State struct {
// AppHash is updated after Commit
AppHash []byte
TxIndexer txindex.TxIndexer `json:"-"` // Transaction indexer.
// Intermediate results from processing
// Persisted separately from the state
abciResponses *ABCIResponses
}
func LoadState(db dbm.DB) *State {
@ -45,7 +55,7 @@ func LoadState(db dbm.DB) *State {
}
func loadState(db dbm.DB, key []byte) *State {
s := &State{db: db}
s := &State{db: db, TxIndexer: &null.TxIndex{}}
buf := db.Get(key)
if len(buf) == 0 {
return nil
@ -54,7 +64,7 @@ func loadState(db dbm.DB, key []byte) *State {
wire.ReadBinaryPtr(&s, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err))
Exit(Fmt("LoadState: Data has been corrupted or its spec has changed: %v\n", *err))
}
// TODO: ensure that buf is completely read.
}
@ -72,6 +82,7 @@ func (s *State) Copy() *State {
Validators: s.Validators.Copy(),
LastValidators: s.LastValidators.Copy(),
AppHash: s.AppHash,
TxIndexer: s.TxIndexer, // pointer here, not value
}
}
@ -81,6 +92,29 @@ func (s *State) Save() {
s.db.SetSync(stateKey, s.Bytes())
}
// Sets the ABCIResponses in the state and writes them to disk
// in case we crash after app.Commit and before s.Save()
func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) {
// save the validators to the db
s.db.SetSync(abciResponsesKey, abciResponses.Bytes())
}
func (s *State) LoadABCIResponses() *ABCIResponses {
abciResponses := new(ABCIResponses)
buf := s.db.Get(abciResponsesKey)
if len(buf) != 0 {
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(abciResponses, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
Exit(Fmt("LoadABCIResponses: Data has been corrupted or its spec has changed: %v\n", *err))
}
// TODO: ensure that buf is completely read.
}
return abciResponses
}
func (s *State) Equals(s2 *State) bool {
return bytes.Equal(s.Bytes(), s2.Bytes())
}
@ -96,7 +130,22 @@ func (s *State) Bytes() []byte {
// Mutate state variables to match block and validators
// after running EndBlock
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) {
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, abciResponses *ABCIResponses) {
// copy the valset so we can apply changes from EndBlock
// and update s.LastValidators and s.Validators
prevValSet := s.Validators.Copy()
nextValSet := prevValSet.Copy()
// update the validator set with the latest abciResponses
err := updateValidators(nextValSet, abciResponses.EndBlock.Diffs)
if err != nil {
log.Warn("Error changing validator set", "error", err)
// TODO: err or carry on?
}
// Update validator accums and set state variables
nextValSet.IncrementAccum(1)
s.setBlockAndValidators(header.Height,
types.BlockID{header.Hash(), blockPartsHeader}, header.Time,
prevValSet, nextValSet)
@ -125,12 +174,46 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State {
state = MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
state.Save()
}
return state
}
//--------------------------------------------------
// ABCIResponses holds intermediate state during block processing
type ABCIResponses struct {
Height int
DeliverTx []*abci.ResponseDeliverTx
EndBlock abci.ResponseEndBlock
txs types.Txs // reference for indexing results by hash
}
func NewABCIResponses(block *types.Block) *ABCIResponses {
return &ABCIResponses{
Height: block.Height,
DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs),
txs: block.Data.Txs,
}
}
// Serialize the ABCIResponse
func (a *ABCIResponses) Bytes() []byte {
buf, n, err := new(bytes.Buffer), new(int), new(error)
wire.WriteBinary(*a, buf, n, err)
if *err != nil {
PanicCrisis(*err)
}
return buf.Bytes()
}
//-----------------------------------------------------------------------------
// Genesis
// MakeGenesisStateFromFile reads and unmarshals state from the given file.
//
// Used during replay and in tests.
func MakeGenesisStateFromFile(db dbm.DB, genDocFile string) *State {
genDocJSON, err := ioutil.ReadFile(genDocFile)
if err != nil {
@ -143,6 +226,9 @@ func MakeGenesisStateFromFile(db dbm.DB, genDocFile string) *State {
return MakeGenesisState(db, genDoc)
}
// MakeGenesisState creates state from types.GenesisDoc.
//
// Used in tests.
func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) *State {
if len(genDoc.Validators) == 0 {
Exit(Fmt("The genesis file has no validators"))
@ -176,5 +262,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) *State {
Validators: types.NewValidatorSet(validators),
LastValidators: types.NewValidatorSet(nil),
AppHash: genDoc.AppHash,
TxIndexer: &null.TxIndex{}, // we do not need indexer during replay and in tests
}
}

+ 31
- 0
state/state_test.go View File

@ -1,8 +1,12 @@
package state
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db"
"github.com/tendermint/tendermint/config/tendermint_test"
)
@ -40,3 +44,30 @@ func TestStateSaveLoad(t *testing.T) {
t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state)
}
}
func TestABCIResponsesSaveLoad(t *testing.T) {
assert := assert.New(t)
config := tendermint_test.ResetConfig("state_")
stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
state := GetState(config, stateDB)
state.LastBlockHeight += 1
// build mock responses
block := makeBlock(2, state)
abciResponses := NewABCIResponses(block)
abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo")}
abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok"}
abciResponses.EndBlock = abci.ResponseEndBlock{Diffs: []*abci.Validator{
{
PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(),
Power: 10,
},
}}
abciResponses.txs = nil
state.SaveABCIResponses(abciResponses)
abciResponses2 := state.LoadABCIResponses()
assert.Equal(abciResponses, abciResponses2, fmt.Sprintf("ABCIResponses don't match: Got %v, Expected %v", abciResponses2, abciResponses))
}

+ 57
- 0
state/txindex/indexer.go View File

@ -0,0 +1,57 @@
package txindex
import (
"errors"
"github.com/tendermint/tendermint/types"
)
// Indexer interface defines methods to index and search transactions.
type TxIndexer interface {
// Batch analyzes, indexes or stores a batch of transactions.
//
// NOTE We do not specify Index method for analyzing a single transaction
// here because it bears heavy perfomance loses. Almost all advanced indexers
// support batching.
AddBatch(b *Batch) error
// Tx returns specified transaction or nil if the transaction is not indexed
// or stored.
Get(hash []byte) (*types.TxResult, error)
}
//----------------------------------------------------
// Txs are written as a batch
// A Batch groups together multiple Index operations you would like performed
// at the same time. The Batch structure is NOT thread-safe. You should only
// perform operations on a batch from a single thread at a time. Once batch
// execution has started, you may not modify it.
type Batch struct {
Ops []types.TxResult
}
// NewBatch creates a new Batch.
func NewBatch(n int) *Batch {
return &Batch{
Ops: make([]types.TxResult, n),
}
}
// Index adds or updates entry for the given result.Index.
func (b *Batch) Add(result types.TxResult) error {
b.Ops[result.Index] = result
return nil
}
// Size returns the total number of operations inside the batch.
func (b *Batch) Size() int {
return len(b.Ops)
}
//----------------------------------------------------
// Errors
// ErrorEmptyHash indicates empty hash
var ErrorEmptyHash = errors.New("Transaction hash cannot be empty")

+ 56
- 0
state/txindex/kv/kv.go View File

@ -0,0 +1,56 @@
package kv
import (
"bytes"
"fmt"
db "github.com/tendermint/go-db"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
// TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB).
// It could only index transaction by its identifier.
type TxIndex struct {
store db.DB
}
// NewTxIndex returns new instance of TxIndex.
func NewTxIndex(store db.DB) *TxIndex {
return &TxIndex{store: store}
}
// Get gets transaction from the TxIndex storage and returns it or nil if the
// transaction is not found.
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
if len(hash) == 0 {
return nil, txindex.ErrorEmptyHash
}
rawBytes := txi.store.Get(hash)
if rawBytes == nil {
return nil, nil
}
r := bytes.NewReader(rawBytes)
var n int
var err error
txResult := wire.ReadBinary(&types.TxResult{}, r, 0, &n, &err).(*types.TxResult)
if err != nil {
return nil, fmt.Errorf("Error reading TxResult: %v", err)
}
return txResult, nil
}
// Batch writes a batch of transactions into the TxIndex storage.
func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
storeBatch := txi.store.NewBatch()
for _, result := range b.Ops {
rawBytes := wire.BinaryBytes(&result)
storeBatch.Set(result.Tx.Hash(), rawBytes)
}
storeBatch.Write()
return nil
}

+ 63
- 0
state/txindex/kv/kv_test.go View File

@ -0,0 +1,63 @@
package kv
import (
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/abci/types"
db "github.com/tendermint/go-db"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
func TestTxIndex(t *testing.T) {
indexer := &TxIndex{store: db.NewMemDB()}
tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}}
hash := tx.Hash()
batch := txindex.NewBatch(1)
batch.Add(*txResult)
err := indexer.AddBatch(batch)
require.Nil(t, err)
loadedTxResult, err := indexer.Get(hash)
require.Nil(t, err)
assert.Equal(t, txResult, loadedTxResult)
}
func benchmarkTxIndex(txsCount int, b *testing.B) {
tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}}
dir, err := ioutil.TempDir("", "tx_index_db")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(dir)
store := db.NewDB("tx_index", "leveldb", dir)
indexer := &TxIndex{store: store}
batch := txindex.NewBatch(txsCount)
for i := 0; i < txsCount; i++ {
txResult.Index += 1
batch.Add(*txResult)
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
err = indexer.AddBatch(batch)
}
}
func BenchmarkTxIndex1(b *testing.B) { benchmarkTxIndex(1, b) }
func BenchmarkTxIndex500(b *testing.B) { benchmarkTxIndex(500, b) }
func BenchmarkTxIndex1000(b *testing.B) { benchmarkTxIndex(1000, b) }
func BenchmarkTxIndex2000(b *testing.B) { benchmarkTxIndex(2000, b) }
func BenchmarkTxIndex10000(b *testing.B) { benchmarkTxIndex(10000, b) }

+ 21
- 0
state/txindex/null/null.go View File

@ -0,0 +1,21 @@
package null
import (
"errors"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
// TxIndex acts as a /dev/null.
type TxIndex struct{}
// Tx panics.
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`)
}
// Batch returns nil.
func (txi *TxIndex) AddBatch(batch *txindex.Batch) error {
return nil
}

+ 7
- 6
types/events.go View File

@ -2,10 +2,10 @@ package types
import (
// for registering TMEventData as events.EventData
abci "github.com/tendermint/abci/types"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-events"
"github.com/tendermint/go-wire"
abci "github.com/tendermint/abci/types"
)
// Functions to generate eventId strings
@ -73,11 +73,12 @@ type EventDataNewBlockHeader struct {
// All txs fire EventDataTx
type EventDataTx struct {
Tx Tx `json:"tx"`
Data []byte `json:"data"`
Log string `json:"log"`
Code abci.CodeType `json:"code"`
Error string `json:"error"` // this is redundant information for now
Height int `json:"height"`
Tx Tx `json:"tx"`
Data []byte `json:"data"`
Log string `json:"log"`
Code abci.CodeType `json:"code"`
Error string `json:"error"` // this is redundant information for now
}
// NOTE: This goes into the replay WAL


+ 81
- 0
types/tx.go View File

@ -1,6 +1,10 @@
package types
import (
"bytes"
"errors"
abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-merkle"
)
@ -30,3 +34,80 @@ func (txs Txs) Hash() []byte {
return merkle.SimpleHashFromTwoHashes(left, right)
}
}
// Index returns the index of this transaction in the list, or -1 if not found
func (txs Txs) Index(tx Tx) int {
for i := range txs {
if bytes.Equal(txs[i], tx) {
return i
}
}
return -1
}
// Index returns the index of this transaction hash in the list, or -1 if not found
func (txs Txs) IndexByHash(hash []byte) int {
for i := range txs {
if bytes.Equal(txs[i].Hash(), hash) {
return i
}
}
return -1
}
// Proof returns a simple merkle proof for this node.
//
// Panics if i < 0 or i >= len(txs)
//
// TODO: optimize this!
func (txs Txs) Proof(i int) TxProof {
l := len(txs)
hashables := make([]merkle.Hashable, l)
for i := 0; i < l; i++ {
hashables[i] = txs[i]
}
root, proofs := merkle.SimpleProofsFromHashables(hashables)
return TxProof{
Index: i,
Total: l,
RootHash: root,
Data: txs[i],
Proof: *proofs[i],
}
}
type TxProof struct {
Index, Total int
RootHash []byte
Data Tx
Proof merkle.SimpleProof
}
func (tp TxProof) LeafHash() []byte {
return tp.Data.Hash()
}
// Validate returns nil if it matches the dataHash, and is internally consistent
// otherwise, returns a sensible error
func (tp TxProof) Validate(dataHash []byte) error {
if !bytes.Equal(dataHash, tp.RootHash) {
return errors.New("Proof matches different data hash")
}
valid := tp.Proof.Verify(tp.Index, tp.Total, tp.LeafHash(), tp.RootHash)
if !valid {
return errors.New("Proof is not internally consistent")
}
return nil
}
// TxResult contains results of executing the transaction.
//
// One usage is indexing transaction results.
type TxResult struct {
Height uint64 `json:"height"`
Index uint32 `json:"index"`
Tx Tx `json:"tx"`
Result abci.ResponseDeliverTx `json:"result"`
}

+ 122
- 0
types/tx_test.go View File

@ -0,0 +1,122 @@
package types
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
cmn "github.com/tendermint/go-common"
ctest "github.com/tendermint/go-common/test"
wire "github.com/tendermint/go-wire"
)
func makeTxs(cnt, size int) Txs {
txs := make(Txs, cnt)
for i := 0; i < cnt; i++ {
txs[i] = cmn.RandBytes(size)
}
return txs
}
func randInt(low, high int) int {
off := cmn.RandInt() % (high - low)
return low + off
}
func TestTxIndex(t *testing.T) {
assert := assert.New(t)
for i := 0; i < 20; i++ {
txs := makeTxs(15, 60)
for j := 0; j < len(txs); j++ {
tx := txs[j]
idx := txs.Index(tx)
assert.Equal(j, idx)
}
assert.Equal(-1, txs.Index(nil))
assert.Equal(-1, txs.Index(Tx("foodnwkf")))
}
}
func TestValidTxProof(t *testing.T) {
assert := assert.New(t)
cases := []struct {
txs Txs
}{
{Txs{{1, 4, 34, 87, 163, 1}}},
{Txs{{5, 56, 165, 2}, {4, 77}}},
{Txs{Tx("foo"), Tx("bar"), Tx("baz")}},
{makeTxs(20, 5)},
{makeTxs(7, 81)},
{makeTxs(61, 15)},
}
for h, tc := range cases {
txs := tc.txs
root := txs.Hash()
// make sure valid proof for every tx
for i := range txs {
leaf := txs[i]
leafHash := leaf.Hash()
proof := txs.Proof(i)
assert.Equal(i, proof.Index, "%d: %d", h, i)
assert.Equal(len(txs), proof.Total, "%d: %d", h, i)
assert.Equal(root, proof.RootHash, "%d: %d", h, i)
assert.Equal(leaf, proof.Data, "%d: %d", h, i)
assert.Equal(leafHash, proof.LeafHash(), "%d: %d", h, i)
assert.Nil(proof.Validate(root), "%d: %d", h, i)
assert.NotNil(proof.Validate([]byte("foobar")), "%d: %d", h, i)
// read-write must also work
var p2 TxProof
bin := wire.BinaryBytes(proof)
err := wire.ReadBinaryBytes(bin, &p2)
if assert.Nil(err, "%d: %d: %+v", h, i, err) {
assert.Nil(p2.Validate(root), "%d: %d", h, i)
}
}
}
}
func TestTxProofUnchangable(t *testing.T) {
// run the other test a bunch...
for i := 0; i < 40; i++ {
testTxProofUnchangable(t)
}
}
func testTxProofUnchangable(t *testing.T) {
assert := assert.New(t)
// make some proof
txs := makeTxs(randInt(2, 100), randInt(16, 128))
root := txs.Hash()
i := randInt(0, len(txs)-1)
proof := txs.Proof(i)
// make sure it is valid to start with
assert.Nil(proof.Validate(root))
bin := wire.BinaryBytes(proof)
// try mutating the data and make sure nothing breaks
for j := 0; j < 500; j++ {
bad := ctest.MutateByteSlice(bin)
if !bytes.Equal(bad, bin) {
assertBadProof(t, root, bad, proof)
}
}
}
// this make sure the proof doesn't deserialize into something valid
func assertBadProof(t *testing.T, root []byte, bad []byte, good TxProof) {
var proof TxProof
err := wire.ReadBinaryBytes(bad, &proof)
if err == nil {
err = proof.Validate(root)
if err == nil {
// okay, this can happen if we have a slightly different total
// (where the path ends up the same), if it is something else, we have
// a real problem
assert.NotEqual(t, proof.Total, good.Total, "bad: %#v\ngood: %#v", proof, good)
}
}
}

Loading…
Cancel
Save