skipped 7 lines 8 8 ) 9 9 10 10 // remoteRecord is a record of a recently seen remote peer, with the time it was 11 - // last seen and a send queue. 11 + // last seen and queues of outgoing packets. 12 12 type remoteRecord struct { 13 13 Addr net.Addr 14 14 LastSeen time.Time 15 15 SendQueue chan []byte 16 + Stash chan []byte 16 17 } 17 18 18 19 // RemoteMap manages a mapping of live remote peers, keyed by address, to their 19 - // respective send queues. RemoteMap's functions are safe to call from multiple 20 - // goroutines. 20 + // respective send queues. Each peer has two queues: a primary send queue, and a 21 + // "stash". The primary send queue is returned by the SendQueue method. The 22 + // stash is an auxiliary one-element queue accessed using the Stash and Unstash 23 + // methods. The stash is meant for use by callers that need to "unread" a packet 24 + // that's already been removed from the primary send queue. 25 + // 26 + // RemoteMap's functions are safe to call from multiple goroutines. 21 27 type RemoteMap struct { 22 28 // We use an inner structure to avoid exposing public heap.Interface 23 29 // functions to users of remoteMap. skipped 38 lines 62 68 func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte { 63 69 m.lock.Lock() 64 70 defer m.lock.Unlock() 65 - return m.inner.SendQueue (addr, time.Now()) 71 + return m.inner.Lookup (addr, time.Now()). SendQueue 72 + } 73 + 74 + // Stash places p in the stash corresponding to addr, if the stash is not 75 + // already occupied. Returns true if the p was placed in the stash, false 76 + // otherwise. 77 + func (m *RemoteMap) Stash(addr net.Addr, p []byte) bool { 78 + m.lock.Lock() 79 + defer m.lock.Unlock() 80 + select { 81 + case m.inner.Lookup(addr, time.Now()).Stash <- p: 82 + return true 83 + default: 84 + return false 85 + } 86 + } 87 + 88 + // Unstash returns the channel that reads from the stash for addr. 89 + func (m *RemoteMap) Unstash(addr net.Addr) <-chan []byte { 90 + m.lock.Lock() 91 + defer m.lock.Unlock() 92 + return m.inner.Lookup(addr, time.Now()).Stash 66 93 } 67 94 68 95 // remoteMapInner is the inner type of RemoteMap, implementing heap.Interface. skipped 15 lines 84 111 } 85 112 } 86 113 87 - // SendQueue finds the existing record corresponding to addr, or creates a new 88 - // one if none exists yet. It updates the record's LastSeen time and returns its 89 - // SendQueue. 90 - func (inner *remoteMapInner) SendQueue (addr net.Addr, now time.Time) chan [ ] byte { 114 + // Lookup finds the existing record corresponding to addr, or creates a new 115 + // one if none exists yet. It updates the record's LastSeen time and returns the 116 + // record. 117 + func (inner *remoteMapInner) Lookup (addr net.Addr, now time.Time) * remoteRecord { 91 118 var record *remoteRecord 92 119 i, ok := inner.byAddr[addr] 93 120 if ok { skipped 7 lines 101 128 Addr: addr, 102 129 LastSeen: now, 103 130 SendQueue: make(chan []byte, queueSize), 131 + Stash: make(chan []byte, 1), 104 132 } 105 133 heap.Push(inner, record) 106 134 } 107 - return record. SendQueue 135 + return record 108 136 } 109 137 110 138 // heap.Interface for remoteMapInner. skipped 40 lines