a geicko-2 based round robin ranking system designed to test c++ battleship submissions battleship.dunkirk.sh
at main 9.4 kB view raw
1package sftp 2 3import ( 4 "context" 5 "errors" 6 "io" 7 "path" 8 "path/filepath" 9 "strconv" 10 "sync" 11) 12 13const defaultMaxTxPacket uint32 = 1 << 15 14 15// Handlers contains the 4 SFTP server request handlers. 16type Handlers struct { 17 FileGet FileReader 18 FilePut FileWriter 19 FileCmd FileCmder 20 FileList FileLister 21} 22 23// RequestServer abstracts the sftp protocol with an http request-like protocol 24type RequestServer struct { 25 Handlers Handlers 26 27 *serverConn 28 pktMgr *packetManager 29 30 startDirectory string 31 maxTxPacket uint32 32 33 mu sync.RWMutex 34 handleCount int 35 openRequests map[string]*Request 36} 37 38// A RequestServerOption is a function which applies configuration to a RequestServer. 39type RequestServerOption func(*RequestServer) 40 41// WithRSAllocator enable the allocator. 42// After processing a packet we keep in memory the allocated slices 43// and we reuse them for new packets. 44// The allocator is experimental 45func WithRSAllocator() RequestServerOption { 46 return func(rs *RequestServer) { 47 alloc := newAllocator() 48 rs.pktMgr.alloc = alloc 49 rs.conn.alloc = alloc 50 } 51} 52 53// WithStartDirectory sets a start directory to use as base for relative paths. 54// If unset the default is "/" 55func WithStartDirectory(startDirectory string) RequestServerOption { 56 return func(rs *RequestServer) { 57 rs.startDirectory = cleanPath(startDirectory) 58 } 59} 60 61// WithRSMaxTxPacket sets the maximum size of the payload returned to the client, 62// measured in bytes. The default value is 32768 bytes, and this option 63// can only be used to increase it. Setting this option to a larger value 64// should be safe, because the client decides the size of the requested payload. 65// 66// The default maximum packet size is 32768 bytes. 67func WithRSMaxTxPacket(size uint32) RequestServerOption { 68 return func(rs *RequestServer) { 69 if size < defaultMaxTxPacket { 70 return 71 } 72 73 rs.maxTxPacket = size 74 } 75} 76 77// NewRequestServer creates/allocates/returns new RequestServer. 78// Normally there will be one server per user-session. 79func NewRequestServer(rwc io.ReadWriteCloser, h Handlers, options ...RequestServerOption) *RequestServer { 80 svrConn := &serverConn{ 81 conn: conn{ 82 Reader: rwc, 83 WriteCloser: rwc, 84 }, 85 } 86 rs := &RequestServer{ 87 Handlers: h, 88 89 serverConn: svrConn, 90 pktMgr: newPktMgr(svrConn), 91 92 startDirectory: "/", 93 maxTxPacket: defaultMaxTxPacket, 94 95 openRequests: make(map[string]*Request), 96 } 97 98 for _, o := range options { 99 o(rs) 100 } 101 return rs 102} 103 104// New Open packet/Request 105func (rs *RequestServer) nextRequest(r *Request) string { 106 rs.mu.Lock() 107 defer rs.mu.Unlock() 108 109 rs.handleCount++ 110 111 r.handle = strconv.Itoa(rs.handleCount) 112 rs.openRequests[r.handle] = r 113 114 return r.handle 115} 116 117// Returns Request from openRequests, bool is false if it is missing. 118// 119// The Requests in openRequests work essentially as open file descriptors that 120// you can do different things with. What you are doing with it are denoted by 121// the first packet of that type (read/write/etc). 122func (rs *RequestServer) getRequest(handle string) (*Request, bool) { 123 rs.mu.RLock() 124 defer rs.mu.RUnlock() 125 126 r, ok := rs.openRequests[handle] 127 return r, ok 128} 129 130// Close the Request and clear from openRequests map 131func (rs *RequestServer) closeRequest(handle string) error { 132 rs.mu.Lock() 133 defer rs.mu.Unlock() 134 135 if r, ok := rs.openRequests[handle]; ok { 136 delete(rs.openRequests, handle) 137 return r.close() 138 } 139 140 return EBADF 141} 142 143// Close the read/write/closer to trigger exiting the main server loop 144func (rs *RequestServer) Close() error { return rs.conn.Close() } 145 146func (rs *RequestServer) serveLoop(pktChan chan<- orderedRequest) error { 147 defer close(pktChan) // shuts down sftpServerWorkers 148 149 var err error 150 var pkt requestPacket 151 var pktType fxp 152 var pktBytes []byte 153 154 for { 155 pktType, pktBytes, err = rs.serverConn.recvPacket(rs.pktMgr.getNextOrderID()) 156 if err != nil { 157 // we don't care about releasing allocated pages here, the server will quit and the allocator freed 158 return err 159 } 160 161 pkt, err = makePacket(rxPacket{pktType, pktBytes}) 162 if err != nil { 163 switch { 164 case errors.Is(err, errUnknownExtendedPacket): 165 // do nothing 166 default: 167 debug("makePacket err: %v", err) 168 rs.conn.Close() // shuts down recvPacket 169 return err 170 } 171 } 172 173 pktChan <- rs.pktMgr.newOrderedRequest(pkt) 174 } 175} 176 177// Serve requests for user session 178func (rs *RequestServer) Serve() error { 179 defer func() { 180 if rs.pktMgr.alloc != nil { 181 rs.pktMgr.alloc.Free() 182 } 183 }() 184 185 ctx, cancel := context.WithCancel(context.Background()) 186 defer cancel() 187 188 var wg sync.WaitGroup 189 runWorker := func(ch chan orderedRequest) { 190 wg.Add(1) 191 go func() { 192 defer wg.Done() 193 if err := rs.packetWorker(ctx, ch); err != nil { 194 rs.conn.Close() // shuts down recvPacket 195 } 196 }() 197 } 198 pktChan := rs.pktMgr.workerChan(runWorker) 199 200 err := rs.serveLoop(pktChan) 201 202 wg.Wait() // wait for all workers to exit 203 204 rs.mu.Lock() 205 defer rs.mu.Unlock() 206 207 // make sure all open requests are properly closed 208 // (eg. possible on dropped connections, client crashes, etc.) 209 for handle, req := range rs.openRequests { 210 if err == io.EOF { 211 err = io.ErrUnexpectedEOF 212 } 213 req.transferError(err) 214 215 delete(rs.openRequests, handle) 216 req.close() 217 } 218 219 return err 220} 221 222func (rs *RequestServer) packetWorker(ctx context.Context, pktChan chan orderedRequest) error { 223 for pkt := range pktChan { 224 orderID := pkt.orderID() 225 if epkt, ok := pkt.requestPacket.(*sshFxpExtendedPacket); ok { 226 if epkt.SpecificPacket != nil { 227 pkt.requestPacket = epkt.SpecificPacket 228 } 229 } 230 231 var rpkt responsePacket 232 switch pkt := pkt.requestPacket.(type) { 233 case *sshFxInitPacket: 234 rpkt = &sshFxVersionPacket{Version: sftpProtocolVersion, Extensions: sftpExtensions} 235 case *sshFxpClosePacket: 236 handle := pkt.getHandle() 237 rpkt = statusFromError(pkt.ID, rs.closeRequest(handle)) 238 case *sshFxpRealpathPacket: 239 var realPath string 240 var err error 241 242 switch pather := rs.Handlers.FileList.(type) { 243 case RealPathFileLister: 244 realPath, err = pather.RealPath(pkt.getPath()) 245 case legacyRealPathFileLister: 246 realPath = pather.RealPath(pkt.getPath()) 247 default: 248 realPath = cleanPathWithBase(rs.startDirectory, pkt.getPath()) 249 } 250 if err != nil { 251 rpkt = statusFromError(pkt.ID, err) 252 } else { 253 rpkt = cleanPacketPath(pkt, realPath) 254 } 255 case *sshFxpOpendirPacket: 256 request := requestFromPacket(ctx, pkt, rs.startDirectory) 257 handle := rs.nextRequest(request) 258 rpkt = request.opendir(rs.Handlers, pkt) 259 if _, ok := rpkt.(*sshFxpHandlePacket); !ok { 260 // if we return an error we have to remove the handle from the active ones 261 rs.closeRequest(handle) 262 } 263 case *sshFxpOpenPacket: 264 request := requestFromPacket(ctx, pkt, rs.startDirectory) 265 handle := rs.nextRequest(request) 266 rpkt = request.open(rs.Handlers, pkt) 267 if _, ok := rpkt.(*sshFxpHandlePacket); !ok { 268 // if we return an error we have to remove the handle from the active ones 269 rs.closeRequest(handle) 270 } 271 case *sshFxpFstatPacket: 272 handle := pkt.getHandle() 273 request, ok := rs.getRequest(handle) 274 if !ok { 275 rpkt = statusFromError(pkt.ID, EBADF) 276 } else { 277 request = &Request{ 278 Method: "Stat", 279 Filepath: cleanPathWithBase(rs.startDirectory, request.Filepath), 280 } 281 rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket) 282 } 283 case *sshFxpFsetstatPacket: 284 handle := pkt.getHandle() 285 request, ok := rs.getRequest(handle) 286 if !ok { 287 rpkt = statusFromError(pkt.ID, EBADF) 288 } else { 289 request = &Request{ 290 Method: "Setstat", 291 Filepath: cleanPathWithBase(rs.startDirectory, request.Filepath), 292 } 293 rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket) 294 } 295 case *sshFxpExtendedPacketPosixRename: 296 request := &Request{ 297 Method: "PosixRename", 298 Filepath: cleanPathWithBase(rs.startDirectory, pkt.Oldpath), 299 Target: cleanPathWithBase(rs.startDirectory, pkt.Newpath), 300 } 301 rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket) 302 case *sshFxpExtendedPacketStatVFS: 303 request := &Request{ 304 Method: "StatVFS", 305 Filepath: cleanPathWithBase(rs.startDirectory, pkt.Path), 306 } 307 rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket) 308 case hasHandle: 309 handle := pkt.getHandle() 310 request, ok := rs.getRequest(handle) 311 if !ok { 312 rpkt = statusFromError(pkt.id(), EBADF) 313 } else { 314 rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket) 315 } 316 case hasPath: 317 request := requestFromPacket(ctx, pkt, rs.startDirectory) 318 rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket) 319 request.close() 320 default: 321 rpkt = statusFromError(pkt.id(), ErrSSHFxOpUnsupported) 322 } 323 324 rs.pktMgr.readyPacket( 325 rs.pktMgr.newOrderedResponse(rpkt, orderID)) 326 } 327 return nil 328} 329 330// clean and return name packet for file 331func cleanPacketPath(pkt *sshFxpRealpathPacket, realPath string) responsePacket { 332 return &sshFxpNamePacket{ 333 ID: pkt.id(), 334 NameAttrs: []*sshFxpNameAttr{ 335 { 336 Name: realPath, 337 LongName: realPath, 338 Attrs: emptyFileStat, 339 }, 340 }, 341 } 342} 343 344// Makes sure we have a clean POSIX (/) absolute path to work with 345func cleanPath(p string) string { 346 return cleanPathWithBase("/", p) 347} 348 349func cleanPathWithBase(base, p string) string { 350 p = filepath.ToSlash(filepath.Clean(p)) 351 if !path.IsAbs(p) { 352 return path.Join(base, p) 353 } 354 return p 355}