From a714d1208545cad6958b8bba29e32c6ae779c2a2 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 30 Jun 2014 16:53:04 -0700 Subject: [PATCH] first client/server connection test passes. --- binary/byteslice.go | 23 +++++------ binary/int.go | 98 +++++++++++++++++++++++++++++++-------------- binary/string.go | 17 +++++--- peer/client.go | 29 +++++++------- peer/client_test.go | 6 +-- peer/connection.go | 70 +++++++++++++++++--------------- peer/msg.go | 36 ++++++++++++++--- peer/peer.go | 78 ++++++++++++++++++------------------ peer/set.go | 67 ------------------------------- 9 files changed, 216 insertions(+), 208 deletions(-) delete mode 100644 peer/set.go diff --git a/binary/byteslice.go b/binary/byteslice.go index bfe92bea2..238207f67 100644 --- a/binary/byteslice.go +++ b/binary/byteslice.go @@ -33,18 +33,17 @@ func (self ByteSlice) WriteTo(w io.Writer) (n int64, err error) { return int64(n_+4), err } -func ReadByteSlice(r io.Reader) ByteSlice { - length := int(ReadUInt32(r)) - bytes := make([]byte, length) - _, err := io.ReadFull(r, bytes) - if err != nil { panic(err) } - return ByteSlice(bytes) -} - func ReadByteSliceSafe(r io.Reader) (ByteSlice, error) { - length := int(ReadUInt32(r)) - bytes := make([]byte, length) - _, err := io.ReadFull(r, bytes) + length, err := ReadUInt32Safe(r) + if err != nil { return nil, err } + bytes := make([]byte, int(length)) + _, err = io.ReadFull(r, bytes) if err != nil { return nil, err } - return ByteSlice(bytes), nil + return bytes, nil +} + +func ReadByteSlice(r io.Reader) ByteSlice { + bytes, err := ReadByteSliceSafe(r) + if r != nil { panic(err) } + return bytes } diff --git a/binary/int.go b/binary/int.go index 351b2a23b..7d381014a 100644 --- a/binary/int.go +++ b/binary/int.go @@ -41,20 +41,19 @@ func (self Byte) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadByte(r io.Reader) Byte { - buf := [1]byte{0} - _, err := io.ReadFull(r, buf[:]) - if err != nil { panic(err) } - return Byte(buf[0]) -} - func ReadByteSafe(r io.Reader) (Byte, error) { buf := [1]byte{0} _, err := io.ReadFull(r, buf[:]) - if err != nil { return Byte(0), err } + if err != nil { return 0, err } return Byte(buf[0]), nil } +func ReadByte(r io.Reader) (Byte) { + b, err := ReadByteSafe(r) + if err != nil { panic(err) } + return b +} + // Int8 @@ -79,11 +78,17 @@ func (self Int8) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadInt8(r io.Reader) Int8 { +func ReadInt8Safe(r io.Reader) (Int8, error) { buf := [1]byte{0} _, err := io.ReadFull(r, buf[:]) + if err != nil { return Int8(0), err } + return Int8(buf[0]), nil +} + +func ReadInt8(r io.Reader) (Int8) { + b, err := ReadInt8Safe(r) if err != nil { panic(err) } - return Int8(buf[0]) + return b } @@ -110,13 +115,6 @@ func (self UInt8) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadUInt8(r io.Reader) UInt8 { - buf := [1]byte{0} - _, err := io.ReadFull(r, buf[:]) - if err != nil { panic(err) } - return UInt8(buf[0]) -} - func ReadUInt8Safe(r io.Reader) (UInt8, error) { buf := [1]byte{0} _, err := io.ReadFull(r, buf[:]) @@ -124,6 +122,12 @@ func ReadUInt8Safe(r io.Reader) (UInt8, error) { return UInt8(buf[0]), nil } +func ReadUInt8(r io.Reader) (UInt8) { + b, err := ReadUInt8Safe(r) + if err != nil { panic(err) } + return b +} + // Int16 @@ -148,11 +152,17 @@ func (self Int16) WriteTo(w io.Writer) (int64, error) { return 2, err } -func ReadInt16(r io.Reader) Int16 { +func ReadInt16Safe(r io.Reader) (Int16, error) { buf := [2]byte{0} _, err := io.ReadFull(r, buf[:]) + if err != nil { return Int16(0), err } + return Int16(binary.LittleEndian.Uint16(buf[:])), nil +} + +func ReadInt16(r io.Reader) (Int16) { + b, err := ReadInt16Safe(r) if err != nil { panic(err) } - return Int16(binary.LittleEndian.Uint16(buf[:])) + return b } @@ -179,11 +189,17 @@ func (self UInt16) WriteTo(w io.Writer) (int64, error) { return 2, err } -func ReadUInt16(r io.Reader) UInt16 { +func ReadUInt16Safe(r io.Reader) (UInt16, error) { buf := [2]byte{0} _, err := io.ReadFull(r, buf[:]) + if err != nil { return UInt16(0), err } + return UInt16(binary.LittleEndian.Uint16(buf[:])), nil +} + +func ReadUInt16(r io.Reader) (UInt16) { + b, err := ReadUInt16Safe(r) if err != nil { panic(err) } - return UInt16(binary.LittleEndian.Uint16(buf[:])) + return b } @@ -210,11 +226,17 @@ func (self Int32) WriteTo(w io.Writer) (int64, error) { return 4, err } -func ReadInt32(r io.Reader) Int32 { +func ReadInt32Safe(r io.Reader) (Int32, error) { buf := [4]byte{0} _, err := io.ReadFull(r, buf[:]) + if err != nil { return Int32(0), err } + return Int32(binary.LittleEndian.Uint32(buf[:])), nil +} + +func ReadInt32(r io.Reader) (Int32) { + b, err := ReadInt32Safe(r) if err != nil { panic(err) } - return Int32(binary.LittleEndian.Uint32(buf[:])) + return b } @@ -241,11 +263,17 @@ func (self UInt32) WriteTo(w io.Writer) (int64, error) { return 4, err } -func ReadUInt32(r io.Reader) UInt32 { +func ReadUInt32Safe(r io.Reader) (UInt32, error) { buf := [4]byte{0} _, err := io.ReadFull(r, buf[:]) + if err != nil { return UInt32(0), err } + return UInt32(binary.LittleEndian.Uint32(buf[:])), nil +} + +func ReadUInt32(r io.Reader) (UInt32) { + b, err := ReadUInt32Safe(r) if err != nil { panic(err) } - return UInt32(binary.LittleEndian.Uint32(buf[:])) + return b } @@ -272,11 +300,17 @@ func (self Int64) WriteTo(w io.Writer) (int64, error) { return 8, err } -func ReadInt64(r io.Reader) Int64 { +func ReadInt64Safe(r io.Reader) (Int64, error) { buf := [8]byte{0} _, err := io.ReadFull(r, buf[:]) + if err != nil { return Int64(0), err } + return Int64(binary.LittleEndian.Uint64(buf[:])), nil +} + +func ReadInt64(r io.Reader) (Int64) { + b, err := ReadInt64Safe(r) if err != nil { panic(err) } - return Int64(binary.LittleEndian.Uint64(buf[:])) + return b } @@ -303,11 +337,17 @@ func (self UInt64) WriteTo(w io.Writer) (int64, error) { return 8, err } -func ReadUInt64(r io.Reader) UInt64 { +func ReadUInt64Safe(r io.Reader) (UInt64, error) { buf := [8]byte{0} _, err := io.ReadFull(r, buf[:]) + if err != nil { return UInt64(0), err } + return UInt64(binary.LittleEndian.Uint64(buf[:])), nil +} + +func ReadUInt64(r io.Reader) (UInt64) { + b, err := ReadUInt64Safe(r) if err != nil { panic(err) } - return UInt64(binary.LittleEndian.Uint64(buf[:])) + return b } diff --git a/binary/string.go b/binary/string.go index d84fc3253..063385d88 100644 --- a/binary/string.go +++ b/binary/string.go @@ -30,10 +30,17 @@ func (self String) WriteTo(w io.Writer) (n int64, err error) { return int64(n_+4), err } +func ReadStringSafe(r io.Reader) (String, error) { + length, err := ReadUInt32Safe(r) + if err != nil { return "", err } + bytes := make([]byte, int(length)) + _, err = io.ReadFull(r, bytes) + if err != nil { return "", err } + return String(bytes), nil +} + func ReadString(r io.Reader) String { - length := int(ReadUInt32(r)) - bytes := make([]byte, length) - _, err := io.ReadFull(r, bytes) - if err != nil { panic(err) } - return String(bytes) + str, err := ReadStringSafe(r) + if r != nil { panic(err) } + return str } diff --git a/peer/client.go b/peer/client.go index 4a8c88f94..94390456c 100644 --- a/peer/client.go +++ b/peer/client.go @@ -2,6 +2,7 @@ package peer import ( . "github.com/tendermint/tendermint/common" + . "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/merkle" "sync/atomic" "sync" @@ -24,7 +25,7 @@ type Client struct { targetNumPeers int makePeerFn func(*Connection) *Peer self *Peer - inQueues map[string]chan *InboundMsg + recvQueues map[String]chan *InboundPacket mtx sync.Mutex peers merkle.Tree // addr -> *Peer @@ -43,9 +44,9 @@ func NewClient(makePeerFn func(*Connection) *Peer) *Client { Panicf("makePeerFn(nil) must return a prototypical peer for self") } - inQueues := make(map[string]chan *InboundMsg) + recvQueues := make(map[String]chan *InboundPacket) for chName, _ := range self.channels { - inQueues[chName] = make(chan *InboundMsg) + recvQueues[chName] = make(chan *InboundPacket) } c := &Client{ @@ -53,7 +54,7 @@ func NewClient(makePeerFn func(*Connection) *Peer) *Client { targetNumPeers: 0, // TODO makePeerFn: makePeerFn, self: self, - inQueues: inQueues, + recvQueues: recvQueues, peers: merkle.NewIAVLTree(nil), quit: make(chan struct{}), @@ -89,18 +90,18 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, err := c.addPeer(peer) if err != nil { return nil, err } - go peer.Start(c.inQueues) + go peer.Start(c.recvQueues) return peer, nil } -func (c *Client) Broadcast(chName string, msg Msg) { +func (c *Client) Broadcast(pkt Packet) { if atomic.LoadUint32(&c.stopped) == 1 { return } - log.Tracef("Broadcast on [%v] msg: %v", chName, msg) + log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) for v := range c.Peers().Values() { peer := v.(*Peer) - success := peer.TryQueueOut(chName , msg) + success := peer.TrySend(pkt) log.Tracef("Broadcast for peer %v success: %v", peer, success) if !success { // TODO: notify the peer @@ -110,19 +111,19 @@ func (c *Client) Broadcast(chName string, msg Msg) { } // blocks until a message is popped. -func (c *Client) PopMessage(chName string) *InboundMsg { +func (c *Client) Receive(chName String) *InboundPacket { if atomic.LoadUint32(&c.stopped) == 1 { return nil } - log.Tracef("PopMessage on [%v]", chName) - q := c.inQueues[chName] - if q == nil { Panicf("Expected inQueues[%f], found none", chName) } + log.Tracef("Receive on [%v]", chName) + q := c.recvQueues[chName] + if q == nil { Panicf("Expected recvQueues[%f], found none", chName) } for { select { case <-c.quit: return nil - case inMsg := <-q: - return inMsg + case inPacket := <-q: + return inPacket } } } diff --git a/peer/client_test.go b/peer/client_test.go index 6c6a11a25..424268ee1 100644 --- a/peer/client_test.go +++ b/peer/client_test.go @@ -11,7 +11,7 @@ func TestConnection(t *testing.T) { peerMaker := func(conn *Connection) *Peer { bufferSize := 10 p := NewPeer(conn) - p.channels = map[string]*Channel{} + p.channels = map[String]*Channel{} p.channels["ch1"] = NewChannel("ch1", bufferSize) p.channels["ch2"] = NewChannel("ch2", bufferSize) return p @@ -44,9 +44,9 @@ func TestConnection(t *testing.T) { } // TODO: test the transmission of information on channels. - c1.Broadcast("ch1", Msg{Bytes:ByteSlice("test data")}) + c1.Broadcast(NewPacket("ch1", ByteSlice("test data"))) time.Sleep(100 * time.Millisecond) - inMsg := c2.PopMessage("ch1") + inMsg := c2.Receive("ch1") t.Logf("c2 popped message: %v", inMsg) diff --git a/peer/connection.go b/peer/connection.go index d04aba037..756e77ddb 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -19,7 +19,7 @@ const ( type Connection struct { ioStats IOStats - outQueue chan ByteSlice // never closes. + sendQueue chan Packet // never closes conn net.Conn quit chan struct{} stopped uint32 @@ -35,7 +35,7 @@ var ( func NewConnection(conn net.Conn) *Connection { return &Connection{ - outQueue: make(chan ByteSlice, OUT_QUEUE_SIZE), + sendQueue: make(chan Packet, OUT_QUEUE_SIZE), conn: conn, quit: make(chan struct{}), pingDebouncer: NewDebouncer(PING_TIMEOUT_MINUTES * time.Minute), @@ -46,19 +46,19 @@ func NewConnection(conn net.Conn) *Connection { // returns true if successfully queued, // returns false if connection was closed. // blocks. -func (c *Connection) QueueOut(msg ByteSlice) bool { +func (c *Connection) Send(pkt Packet) bool { select { - case c.outQueue <- msg: + case c.sendQueue <- pkt: return true case <-c.quit: return false } } -func (c *Connection) Start() { +func (c *Connection) Start(channels map[String]*Channel) { log.Debugf("Starting %v", c) - go c.outHandler() - go c.inHandler() + go c.sendHandler() + go c.recvHandler(channels) } func (c *Connection) Stop() { @@ -68,9 +68,9 @@ func (c *Connection) Stop() { c.conn.Close() c.pingDebouncer.Stop() // We can't close pong safely here because - // inHandler may write to it after we've stopped. + // recvHandler may write to it after we've stopped. // Though it doesn't need to get closed at all, - // we close it @ inHandler. + // we close it @ recvHandler. // close(c.pong) } } @@ -91,8 +91,10 @@ func (c *Connection) flush() { // TODO flush? (turn off nagel, turn back on, etc) } -func (c *Connection) outHandler() { - log.Tracef("Connection %v outHandler", c) +func (c *Connection) sendHandler() { + log.Tracef("Connection %v sendHandler", c) + + // TODO: catch panics & stop connection. FOR_LOOP: for { @@ -100,11 +102,11 @@ func (c *Connection) outHandler() { select { case <-c.pingDebouncer.Ch: _, err = PACKET_TYPE_PING.WriteTo(c.conn) - case outMsg := <-c.outQueue: - log.Tracef("Found msg from outQueue. Writing msg to underlying connection") + case sendPkt := <-c.sendQueue: + log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection") _, err = PACKET_TYPE_MSG.WriteTo(c.conn) if err != nil { break } - _, err = outMsg.WriteTo(c.conn) + _, err = sendPkt.WriteTo(c.conn) case <-c.pong: _, err = PACKET_TYPE_PONG.WriteTo(c.conn) case <-c.quit: @@ -112,7 +114,7 @@ func (c *Connection) outHandler() { } if err != nil { - log.Infof("Connection %v failed @ outHandler:\n%v", c, err) + log.Infof("Connection %v failed @ sendHandler:\n%v", c, err) c.Stop() break FOR_LOOP } @@ -120,53 +122,55 @@ func (c *Connection) outHandler() { c.flush() } - log.Tracef("Connection %v outHandler done", c) + log.Tracef("Connection %v sendHandler done", c) // cleanup } -func (c *Connection) inHandler() { - log.Tracef("Connection %v inHandler", c) +func (c *Connection) recvHandler(channels map[String]*Channel) { + log.Tracef("Connection %v recvHandler with %v channels", c, len(channels)) + + // TODO: catch panics & stop connection. FOR_LOOP: for { - msgType, err := ReadUInt8Safe(c.conn) + pktType, err := ReadUInt8Safe(c.conn) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Infof("Connection %v failed @ inHandler", c) + log.Infof("Connection %v failed @ recvHandler", c) c.Stop() } break FOR_LOOP } else { - log.Tracef("Found msgType %v", msgType) + log.Tracef("Found pktType %v", pktType) } - switch msgType { + switch pktType { case PACKET_TYPE_PING: c.pong <- struct{}{} case PACKET_TYPE_PONG: // do nothing case PACKET_TYPE_MSG: - msg, err := ReadByteSliceSafe(c.conn) + pkt, err := ReadPacketSafe(c.conn) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Infof("Connection %v failed @ inHandler", c) + log.Infof("Connection %v failed @ recvHandler", c) c.Stop() } break FOR_LOOP } - // What to do? - // XXX - XXX well, we need to push it into the channel or something. - or at least provide an inQueue. - log.Tracef("%v", msg) + channel := channels[pkt.Channel] + if channel == nil { + Panicf("Unknown channel %v", pkt.Channel) + } + channel.recvQueue <- pkt default: - Panicf("Unknown message type %v", msgType) + Panicf("Unknown message type %v", pktType) } c.pingDebouncer.Reset() } - log.Tracef("Connection %v inHandler done", c) + log.Tracef("Connection %v recvHandler done", c) // cleanup close(c.pong) for _ = range c.pong { @@ -182,6 +186,6 @@ type IOStats struct { LastRecv Time BytesRecv UInt64 BytesSent UInt64 - MsgsRecv UInt64 - MsgsSent UInt64 + PktsRecv UInt64 + PktsSent UInt64 } diff --git a/peer/msg.go b/peer/msg.go index fed5c5c3f..396612c5c 100644 --- a/peer/msg.go +++ b/peer/msg.go @@ -5,21 +5,44 @@ import ( "io" ) -/* Msg */ +/* Packet */ -type Msg struct { +type Packet struct { + Channel String Bytes ByteSlice - Hash ByteSlice + // Hash +} + +func NewPacket(chName String, bytes ByteSlice) Packet { + return Packet{ + Channel: chName, + Bytes: bytes, + } +} + +func (p Packet) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteOnto(&p.Channel, w, n, err) + n, err = WriteOnto(&p.Bytes, w, n, err) + return +} + +func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { + chName, err := ReadStringSafe(r) + if err != nil { return } + // TODO: packet length sanity check. + bytes, err := ReadByteSliceSafe(r) + if err != nil { return } + return NewPacket(chName, bytes), nil } -/* InboundMsg */ +/* InboundPacket */ -type InboundMsg struct { +type InboundPacket struct { Peer *Peer Channel *Channel Time Time - Msg + Packet } @@ -31,5 +54,6 @@ type NewFilterMsg struct { } func (m *NewFilterMsg) WriteTo(w io.Writer) (int64, error) { + panic("TODO: implement") return 0, nil // TODO } diff --git a/peer/peer.go b/peer/peer.go index cf59a883b..cbd7016fe 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -14,7 +14,7 @@ import ( type Peer struct { outgoing bool conn *Connection - channels map[string]*Channel + channels map[String]*Channel mtx sync.Mutex quit chan struct{} @@ -29,12 +29,12 @@ func NewPeer(conn *Connection) *Peer { } } -func (p *Peer) Start(peerInQueues map[string]chan *InboundMsg ) { +func (p *Peer) Start(peerRecvQueues map[String]chan *InboundPacket ) { log.Debugf("Starting %v", p) - p.conn.Start() + p.conn.Start(p.channels) for chName, _ := range p.channels { - go p.inHandler(chName, peerInQueues[chName]) - go p.outHandler(chName) + go p.recvHandler(chName, peerRecvQueues[chName]) + go p.sendHandler(chName) } } @@ -58,21 +58,21 @@ func (p *Peer) RemoteAddress() *NetAddress { return p.conn.RemoteAddress() } -func (p *Peer) Channel(chName string) *Channel { +func (p *Peer) Channel(chName String) *Channel { return p.channels[chName] } -// Queue the msg for output. -// If the queue is full, just return false. -func (p *Peer) TryQueueOut(chName string, msg Msg) bool { - channel := p.Channel(chName) - outQueue := channel.OutQueue() +// If the channel's queue is full, just return false. +// Later the sendHandler will send the pkt to the underlying connection. +func (p *Peer) TrySend(pkt Packet) bool { + channel := p.Channel(pkt.Channel) + sendQueue := channel.SendQueue() // lock & defer p.mtx.Lock(); defer p.mtx.Unlock() if p.stopped == 1 { return false } select { - case outQueue <- msg: + case sendQueue <- pkt: return true default: // buffer full return false @@ -88,55 +88,55 @@ func (p *Peer) String() string { return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing) } -func (p *Peer) inHandler(chName string, inboundMsgQueue chan<- *InboundMsg) { - log.Tracef("Peer %v inHandler [%v]", p, chName) +func (p *Peer) recvHandler(chName String, inboundPacketQueue chan<- *InboundPacket) { + log.Tracef("Peer %v recvHandler [%v]", p, chName) channel := p.channels[chName] - inQueue := channel.InQueue() + recvQueue := channel.RecvQueue() FOR_LOOP: for { select { case <-p.quit: break FOR_LOOP - case msg := <-inQueue: - // send to inboundMsgQueue - inboundMsg := &InboundMsg{ + case pkt := <-recvQueue: + // send to inboundPacketQueue + inboundPacket := &InboundPacket{ Peer: p, Channel: channel, Time: Time{time.Now()}, - Msg: msg, + Packet: pkt, } select { case <-p.quit: break FOR_LOOP - case inboundMsgQueue <- inboundMsg: + case inboundPacketQueue <- inboundPacket: continue } } } - log.Tracef("Peer %v inHandler [%v] closed", p, chName) + log.Tracef("Peer %v recvHandler [%v] closed", p, chName) // cleanup // (none) } -func (p *Peer) outHandler(chName string) { - log.Tracef("Peer %v outHandler [%v]", p, chName) - outQueue := p.channels[chName].outQueue +func (p *Peer) sendHandler(chName String) { + log.Tracef("Peer %v sendHandler [%v]", p, chName) + chSendQueue := p.channels[chName].sendQueue FOR_LOOP: for { select { case <-p.quit: break FOR_LOOP - case msg := <-outQueue: - log.Tracef("Sending msg to peer outQueue") + case pkt := <-chSendQueue: + log.Tracef("Sending packet to peer chSendQueue") // blocks until the connection is Stop'd, // which happens when this peer is Stop'd. - p.conn.QueueOut(msg.Bytes) + p.conn.Send(pkt) } } - log.Tracef("Peer %v outHandler [%v] closed", p, chName) + log.Tracef("Peer %v sendHandler [%v] closed", p, chName) // cleanup // (none) } @@ -145,28 +145,28 @@ func (p *Peer) outHandler(chName string) { /* Channel */ type Channel struct { - name string - inQueue chan Msg - outQueue chan Msg + name String + recvQueue chan Packet + sendQueue chan Packet //stats Stats } -func NewChannel(name string, bufferSize int) *Channel { +func NewChannel(name String, bufferSize int) *Channel { return &Channel{ name: name, - inQueue: make(chan Msg, bufferSize), - outQueue: make(chan Msg, bufferSize), + recvQueue: make(chan Packet, bufferSize), + sendQueue: make(chan Packet, bufferSize), } } -func (c *Channel) Name() string { +func (c *Channel) Name() String { return c.name } -func (c *Channel) InQueue() <-chan Msg { - return c.inQueue +func (c *Channel) RecvQueue() <-chan Packet { + return c.recvQueue } -func (c *Channel) OutQueue() chan<- Msg { - return c.outQueue +func (c *Channel) SendQueue() chan<- Packet { + return c.sendQueue } diff --git a/peer/set.go b/peer/set.go deleted file mode 100644 index d832830c2..000000000 --- a/peer/set.go +++ /dev/null @@ -1,67 +0,0 @@ -package peer - -import ( - . "github.com/tendermint/tendermint/binary" - "io" -) - -/* Set - - A Set could be a bloom filter for lossy filtering, or could be a lossless filter. -*/ -type Set interface { - Binary - Add(Msg) - Has(Msg) bool - - // Loads a new set. - // Convenience factory method - Load(ByteSlice) Set -} - - -/* BloomFilterSet */ - -type BloomFilterSet struct { - lastBlockHeight UInt64 - lastHeaderHeight UInt64 -} - -func (bs *BloomFilterSet) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(String("block"), w, n, err) - n, err = WriteOnto(bs.lastBlockHeight, w, n, err) - n, err = WriteOnto(bs.lastHeaderHeight, w, n, err) - return -} - -func (bs *BloomFilterSet) Add(msg Msg) { -} - -func (bs *BloomFilterSet) Has(msg Msg) bool { - return false -} - -func (bs *BloomFilterSet) Load(bytes ByteSlice) Set { - return nil -} - - -/* BitarraySet */ - -type BlockSet struct { -} - -func (bs *BlockSet) WriteTo(w io.Writer) (n int64, err error) { - return -} - -func (bs *BlockSet) Add(msg Msg) { -} - -func (bs *BlockSet) Has(msg Msg) bool { - return false -} - -func (bs *BlockSet) Load(bytes ByteSlice) Set { - return nil -}