diff --git a/log.go b/log.go index 0ab002446..9f04a5083 100644 --- a/log.go +++ b/log.go @@ -6,6 +6,7 @@ import ( "github.com/op/go-logging" "github.com/tendermint/tendermint/blocks" "github.com/tendermint/tendermint/consensus" + "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" ) @@ -28,7 +29,8 @@ func init() { Log.Error("error") */ - p2p.SetP2PLogger(log) blocks.SetBlocksLogger(log) consensus.SetConsensusLogger(log) + p2p.SetP2PLogger(log) + mempool.SetMempoolLogger(log) } diff --git a/mempool/agent.go b/mempool/agent.go index 4bbce327f..36733869e 100644 --- a/mempool/agent.go +++ b/mempool/agent.go @@ -1,9 +1,21 @@ -package mempol +package mempool import ( + "bytes" + "fmt" + "io" + "sync/atomic" + + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/blocks" "github.com/tendermint/tendermint/p2p" ) +var ( + MempoolCh = byte(0x30) +) + +// MempoolAgent handles mempool tx broadcasting amongst peers. type MempoolAgent struct { sw *p2p.Switch swEvents chan interface{} @@ -60,13 +72,13 @@ func (memA *MempoolAgent) switchEventsRoutine() { func (memA *MempoolAgent) gossipTxRoutine() { OUTER_LOOP: for { - // Receive incoming message on ProposalCh - inMsg, ok := memA.sw.Receive(ProposalCh) + // Receive incoming message on MempoolCh + inMsg, ok := memA.sw.Receive(MempoolCh) if !ok { break OUTER_LOOP // Client has stopped } _, msg_ := decodeMessage(inMsg.Bytes) - log.Info("gossipProposalRoutine received %v", msg_) + log.Info("gossipMempoolRoutine received %v", msg_) switch msg_.(type) { case *TxMessage: @@ -98,6 +110,7 @@ func decodeMessage(bz []byte) (msgType byte, msg interface{}) { switch msgType { case msgTypeTx: msg = readTxMessage(bytes.NewReader(bz[1:]), n, err) + // case ...: default: msg = nil } diff --git a/mempool/log.go b/mempool/log.go new file mode 100644 index 000000000..8e2775f95 --- /dev/null +++ b/mempool/log.go @@ -0,0 +1,15 @@ +package mempool + +import ( + "github.com/op/go-logging" +) + +var log = logging.MustGetLogger("mempool") + +func init() { + logging.SetFormatter(logging.MustStringFormatter("[%{level:.1s}] %{message}")) +} + +func SetMempoolLogger(l *logging.Logger) { + log = l +} diff --git a/mempool/mempool.go b/mempool/mempool.go index 209fcd2de..5bbe9e196 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -16,6 +16,7 @@ package mempool import ( "sync" + . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/blocks" . "github.com/tendermint/tendermint/state" ) diff --git a/p2p/README.md b/p2p/README.md index 0fda9fdb8..411fbb9ad 100644 --- a/p2p/README.md +++ b/p2p/README.md @@ -1,3 +1,118 @@ +# P2P Module + +P2P provides an abstraction around peer-to-peer communication.
+Communication happens via Agents that react to messages from peers.
+Each Agent has one or more Channels of communication for each Peer.
+Channels are multiplexed automatically and can be configured.
+A Switch is started upon app start, and handles Peer management.
+A PEXAgent implementation is provided to automate peer discovery.
+ +## Usage + +MempoolAgent started from the following template code.
+Modify the snippet below according to your needs.
+Check out the ConsensusAgent for an example of tracking peer state.
+ +```golang +package mempool + +import ( + "bytes" + "fmt" + "io" + "sync/atomic" + + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/blocks" + "github.com/tendermint/tendermint/p2p" +) + +var ( + MempoolCh = byte(0x30) +) + +// MempoolAgent handles mempool tx broadcasting amongst peers. +type MempoolAgent struct { + sw *p2p.Switch + swEvents chan interface{} + quit chan struct{} + started uint32 + stopped uint32 +} + +func NewMempoolAgent(sw *p2p.Switch) *MempoolAgent { + swEvents := make(chan interface{}) + sw.AddEventListener("MempoolAgent.swEvents", swEvents) + memA := &MempoolAgent{ + sw: sw, + swEvents: swEvents, + quit: make(chan struct{}), + } + return memA +} + +func (memA *MempoolAgent) Start() { + if atomic.CompareAndSwapUint32(&memA.started, 0, 1) { + log.Info("Starting MempoolAgent") + go memA.switchEventsRoutine() + go memA.gossipTxRoutine() + } +} + +func (memA *MempoolAgent) Stop() { + if atomic.CompareAndSwapUint32(&memA.stopped, 0, 1) { + log.Info("Stopping MempoolAgent") + close(memA.quit) + close(memA.swEvents) + } +} + +// Handle peer new/done events +func (memA *MempoolAgent) switchEventsRoutine() { + for { + swEvent, ok := <-memA.swEvents + if !ok { + break + } + switch swEvent.(type) { + case p2p.SwitchEventNewPeer: + // event := swEvent.(p2p.SwitchEventNewPeer) + case p2p.SwitchEventDonePeer: + // event := swEvent.(p2p.SwitchEventDonePeer) + default: + log.Warning("Unhandled switch event type") + } + } +} + +func (memA *MempoolAgent) gossipTxRoutine() { +OUTER_LOOP: + for { + // Receive incoming message on MempoolCh + inMsg, ok := memA.sw.Receive(MempoolCh) + if !ok { + break OUTER_LOOP // Client has stopped + } + _, msg_ := decodeMessage(inMsg.Bytes) + log.Info("gossipMempoolRoutine received %v", msg_) + + switch msg_.(type) { + case *TxMessage: + // msg := msg_.(*TxMessage) + // XXX + + default: + // Ignore unknown message + // memA.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) + } + } + + // Cleanup +} + +``` + + ## Channels Each peer connection is multiplexed into channels.