Projects STRLCPY dnstt Commits 938463ce
🤬
  • Add a one-packet "stash" to QueuePacketConn.

    I want a way to "unread" a packet from an send queue, in the case where
    I'm packing packets into a fixed space and don't know when I'm done
    until I've read one too many packets. There's no way to insert the extra
    packet at the head of the cannel representing the send queue, so that it
    will be the next thing received from OutgoingQueue. Instead, add an
    separate one-element queue called the stash. The caller can stash an
    excess packet, then prioritize checking it in the next round by calling
    Unstash before OutgoingQueue.
  • Loading...
  • David Fifield committed 4 years ago
    938463ce
    1 parent 8526369e
  • ■ ■ ■ ■ ■ ■
    turbotunnel/queuepacketconn.go
    skipped 20 lines
    21 21  // method inserts a packet into the incoming queue, to eventually be returned by
    22 22  // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
    23 23  // which can later by accessed through the OutgoingQueue method.
     24 +//
     25 +// Besides the outgoing queues, there is also a one-element "stash" for each
     26 +// remote peer address. You can stash a packet using the Stash method, and get
     27 +// it back later by receiving from the channel returned by Unstash. The stash is
     28 +// meant as a convenient place to temporarily store a single packet, such as
     29 +// when you've read one too many packets from the send queue and need to store
     30 +// the extra packet to be processed first in the next pass. It's the caller's
     31 +// responsibility to Unstash what they have Stashed. Calling Stash does not put
     32 +// the packet at the head of the send queue; if there is the possibility that a
     33 +// packet has been stashed, it must be checked for by calling Unstash in
     34 +// addition to OutgoingQueue.
    24 35  type QueuePacketConn struct {
    25 36   remotes *RemoteMap
    26 37   localAddr net.Addr
    skipped 39 lines
    66 77  // written to the address in question using WriteTo.
    67 78  func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
    68 79   return c.remotes.SendQueue(addr)
     80 +}
     81 + 
     82 +// Stash places p in the stash for addr, if the stash is not already occupied.
     83 +// Returns true if the packet was placed in the stash, or false if the stash was
     84 +// already occupied. This method is similar to WriteTo, except that it puts the
     85 +// packet in the stash queue (accessible via Unstash), rather than the outgoing
     86 +// queue (accessible via OutgoingQueue).
     87 +func (c *QueuePacketConn) Stash(p []byte, addr net.Addr) bool {
     88 + return c.remotes.Stash(addr, p)
     89 +}
     90 + 
     91 +// Unstash returns the channel that represents the stash for addr.
     92 +func (c *QueuePacketConn) Unstash(addr net.Addr) <-chan []byte {
     93 + return c.remotes.Unstash(addr)
    69 94  }
    70 95   
    71 96  // ReadFrom returns a packet and address previously stored by QueueIncoming.
    skipped 67 lines
  • ■ ■ ■ ■ ■
    turbotunnel/remotemap.go
    skipped 7 lines
    8 8  )
    9 9   
    10 10  // remoteRecord is a record of a recently seen remote peer, with the time it was
    11  -// last seen and a send queue.
     11 +// last seen and queues of outgoing packets.
    12 12  type remoteRecord struct {
    13 13   Addr net.Addr
    14 14   LastSeen time.Time
    15 15   SendQueue chan []byte
     16 + Stash chan []byte
    16 17  }
    17 18   
    18 19  // RemoteMap manages a mapping of live remote peers, keyed by address, to their
    19  -// respective send queues. RemoteMap's functions are safe to call from multiple
    20  -// goroutines.
     20 +// respective send queues. Each peer has two queues: a primary send queue, and a
     21 +// "stash". The primary send queue is returned by the SendQueue method. The
     22 +// stash is an auxiliary one-element queue accessed using the Stash and Unstash
     23 +// methods. The stash is meant for use by callers that need to "unread" a packet
     24 +// that's already been removed from the primary send queue.
     25 +//
     26 +// RemoteMap's functions are safe to call from multiple goroutines.
    21 27  type RemoteMap struct {
    22 28   // We use an inner structure to avoid exposing public heap.Interface
    23 29   // functions to users of remoteMap.
    skipped 38 lines
    62 68  func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte {
    63 69   m.lock.Lock()
    64 70   defer m.lock.Unlock()
    65  - return m.inner.SendQueue(addr, time.Now())
     71 + return m.inner.Lookup(addr, time.Now()).SendQueue
     72 +}
     73 + 
     74 +// Stash places p in the stash corresponding to addr, if the stash is not
     75 +// already occupied. Returns true if the p was placed in the stash, false
     76 +// otherwise.
     77 +func (m *RemoteMap) Stash(addr net.Addr, p []byte) bool {
     78 + m.lock.Lock()
     79 + defer m.lock.Unlock()
     80 + select {
     81 + case m.inner.Lookup(addr, time.Now()).Stash <- p:
     82 + return true
     83 + default:
     84 + return false
     85 + }
     86 +}
     87 + 
     88 +// Unstash returns the channel that reads from the stash for addr.
     89 +func (m *RemoteMap) Unstash(addr net.Addr) <-chan []byte {
     90 + m.lock.Lock()
     91 + defer m.lock.Unlock()
     92 + return m.inner.Lookup(addr, time.Now()).Stash
    66 93  }
    67 94   
    68 95  // remoteMapInner is the inner type of RemoteMap, implementing heap.Interface.
    skipped 15 lines
    84 111   }
    85 112  }
    86 113   
    87  -// SendQueue finds the existing record corresponding to addr, or creates a new
    88  -// one if none exists yet. It updates the record's LastSeen time and returns its
    89  -// SendQueue.
    90  -func (inner *remoteMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte {
     114 +// Lookup finds the existing record corresponding to addr, or creates a new
     115 +// one if none exists yet. It updates the record's LastSeen time and returns the
     116 +// record.
     117 +func (inner *remoteMapInner) Lookup(addr net.Addr, now time.Time) *remoteRecord {
    91 118   var record *remoteRecord
    92 119   i, ok := inner.byAddr[addr]
    93 120   if ok {
    skipped 7 lines
    101 128   Addr: addr,
    102 129   LastSeen: now,
    103 130   SendQueue: make(chan []byte, queueSize),
     131 + Stash: make(chan []byte, 1),
    104 132   }
    105 133   heap.Push(inner, record)
    106 134   }
    107  - return record.SendQueue
     135 + return record
    108 136  }
    109 137   
    110 138  // heap.Interface for remoteMapInner.
    skipped 40 lines
Please wait...
Page is in error, reload to recover