a geicko-2 based round robin ranking system designed to test c++ battleship submissions battleship.dunkirk.sh
at main 4.7 kB view raw
1package sftp 2 3import ( 4 "context" 5 "encoding" 6 "fmt" 7 "io" 8 "sync" 9) 10 11// conn implements a bidirectional channel on which client and server 12// connections are multiplexed. 13type conn struct { 14 io.Reader 15 io.WriteCloser 16 // this is the same allocator used in packet manager 17 alloc *allocator 18 sync.Mutex // used to serialise writes to sendPacket 19} 20 21// the orderID is used in server mode if the allocator is enabled. 22// For the client mode just pass 0. 23// It returns io.EOF if the connection is closed and 24// there are no more packets to read. 25func (c *conn) recvPacket(orderID uint32) (fxp, []byte, error) { 26 return recvPacket(c, c.alloc, orderID) 27} 28 29func (c *conn) sendPacket(m encoding.BinaryMarshaler) error { 30 c.Lock() 31 defer c.Unlock() 32 33 return sendPacket(c, m) 34} 35 36func (c *conn) Close() error { 37 c.Lock() 38 defer c.Unlock() 39 return c.WriteCloser.Close() 40} 41 42type clientConn struct { 43 conn 44 wg sync.WaitGroup 45 46 wait func() error // if non-nil, call this during Wait() to get a possible remote status error. 47 48 sync.Mutex // protects inflight 49 inflight map[uint32]chan<- result // outstanding requests 50 51 closed chan struct{} 52 err error 53} 54 55// Wait blocks until the conn has shut down, and return the error 56// causing the shutdown. It can be called concurrently from multiple 57// goroutines. 58func (c *clientConn) Wait() error { 59 <-c.closed 60 61 if c.wait == nil { 62 // Only return this error if c.wait won't return something more useful. 63 return c.err 64 } 65 66 if err := c.wait(); err != nil { 67 68 // TODO: when https://github.com/golang/go/issues/35025 is fixed, 69 // we can remove this if block entirely. 70 // Right now, it’s always going to return this, so it is not useful. 71 // But we have this code here so that as soon as the ssh library is updated, 72 // we can return a possibly more useful error. 73 if err.Error() == "ssh: session not started" { 74 return c.err 75 } 76 77 return err 78 } 79 80 // c.wait returned no error; so, let's return something maybe more useful. 81 return c.err 82} 83 84// Close closes the SFTP session. 85func (c *clientConn) Close() error { 86 defer c.wg.Wait() 87 return c.conn.Close() 88} 89 90// recv continuously reads from the server and forwards responses to the 91// appropriate channel. 92func (c *clientConn) recv() error { 93 defer c.conn.Close() 94 95 for { 96 typ, data, err := c.recvPacket(0) 97 if err != nil { 98 return err 99 } 100 sid, _, err := unmarshalUint32Safe(data) 101 if err != nil { 102 return err 103 } 104 105 ch, ok := c.getChannel(sid) 106 if !ok { 107 // This is an unexpected occurrence. Send the error 108 // back to all listeners so that they terminate 109 // gracefully. 110 return fmt.Errorf("sid not found: %d", sid) 111 } 112 113 ch <- result{typ: typ, data: data} 114 } 115} 116 117func (c *clientConn) putChannel(ch chan<- result, sid uint32) bool { 118 c.Lock() 119 defer c.Unlock() 120 121 select { 122 case <-c.closed: 123 // already closed with broadcastErr, return error on chan. 124 ch <- result{err: ErrSSHFxConnectionLost} 125 return false 126 default: 127 } 128 129 c.inflight[sid] = ch 130 return true 131} 132 133func (c *clientConn) getChannel(sid uint32) (chan<- result, bool) { 134 c.Lock() 135 defer c.Unlock() 136 137 ch, ok := c.inflight[sid] 138 delete(c.inflight, sid) 139 140 return ch, ok 141} 142 143// result captures the result of receiving the a packet from the server 144type result struct { 145 typ fxp 146 data []byte 147 err error 148} 149 150type idmarshaler interface { 151 id() uint32 152 encoding.BinaryMarshaler 153} 154 155func (c *clientConn) sendPacket(ctx context.Context, ch chan result, p idmarshaler) (fxp, []byte, error) { 156 if cap(ch) < 1 { 157 ch = make(chan result, 1) 158 } 159 160 c.dispatchRequest(ch, p) 161 162 select { 163 case <-ctx.Done(): 164 return 0, nil, ctx.Err() 165 case s := <-ch: 166 return s.typ, s.data, s.err 167 } 168} 169 170// dispatchRequest should ideally only be called by race-detection tests outside of this file, 171// where you have to ensure two packets are in flight sequentially after each other. 172func (c *clientConn) dispatchRequest(ch chan<- result, p idmarshaler) { 173 sid := p.id() 174 175 if !c.putChannel(ch, sid) { 176 // already closed. 177 return 178 } 179 180 if err := c.conn.sendPacket(p); err != nil { 181 if ch, ok := c.getChannel(sid); ok { 182 ch <- result{err: err} 183 } 184 } 185} 186 187// broadcastErr sends an error to all goroutines waiting for a response. 188func (c *clientConn) broadcastErr(err error) { 189 c.Lock() 190 defer c.Unlock() 191 192 bcastRes := result{err: ErrSSHFxConnectionLost} 193 for sid, ch := range c.inflight { 194 ch <- bcastRes 195 196 // Replace the chan in inflight, 197 // we have hijacked this chan, 198 // and this guarantees always-only-once sending. 199 c.inflight[sid] = make(chan<- result, 1) 200 } 201 202 c.err = err 203 close(c.closed) 204} 205 206type serverConn struct { 207 conn 208} 209 210func (s *serverConn) sendError(id uint32, err error) error { 211 return s.sendPacket(statusFromError(id, err)) 212}