a geicko-2 based round robin ranking system designed to test c++ battleship submissions
battleship.dunkirk.sh
1package sftp
2
3import (
4 "bytes"
5 "context"
6 "encoding/binary"
7 "errors"
8 "fmt"
9 "io"
10 "math"
11 "os"
12 "path"
13 "sync"
14 "sync/atomic"
15 "syscall"
16 "time"
17
18 "github.com/kr/fs"
19 "golang.org/x/crypto/ssh"
20
21 "github.com/pkg/sftp/internal/encoding/ssh/filexfer/openssh"
22)
23
24var (
25 // ErrInternalInconsistency indicates the packets sent and the data queued to be
26 // written to the file don't match up. It is an unusual error and usually is
27 // caused by bad behavior server side or connection issues. The error is
28 // limited in scope to the call where it happened, the client object is still
29 // OK to use as long as the connection is still open.
30 ErrInternalInconsistency = errors.New("internal inconsistency")
31 // InternalInconsistency alias for ErrInternalInconsistency.
32 //
33 // Deprecated: please use ErrInternalInconsistency
34 InternalInconsistency = ErrInternalInconsistency
35)
36
37// A ClientOption is a function which applies configuration to a Client.
38type ClientOption func(*Client) error
39
40// MaxPacketChecked sets the maximum size of the payload, measured in bytes.
41// This option only accepts sizes servers should support, ie. <= 32768 bytes.
42//
43// If you get the error "failed to send packet header: EOF" when copying a
44// large file, try lowering this number.
45//
46// The default packet size is 32768 bytes.
47func MaxPacketChecked(size int) ClientOption {
48 return func(c *Client) error {
49 if size < 1 {
50 return errors.New("size must be greater or equal to 1")
51 }
52 if size > 32768 {
53 return errors.New("sizes larger than 32KB might not work with all servers")
54 }
55 c.maxPacket = size
56 return nil
57 }
58}
59
60// MaxPacketUnchecked sets the maximum size of the payload, measured in bytes.
61// It accepts sizes larger than the 32768 bytes all servers should support.
62// Only use a setting higher than 32768 if your application always connects to
63// the same server or after sufficiently broad testing.
64//
65// If you get the error "failed to send packet header: EOF" when copying a
66// large file, try lowering this number.
67//
68// The default packet size is 32768 bytes.
69func MaxPacketUnchecked(size int) ClientOption {
70 return func(c *Client) error {
71 if size < 1 {
72 return errors.New("size must be greater or equal to 1")
73 }
74 c.maxPacket = size
75 return nil
76 }
77}
78
79// MaxPacket sets the maximum size of the payload, measured in bytes.
80// This option only accepts sizes servers should support, ie. <= 32768 bytes.
81// This is a synonym for MaxPacketChecked that provides backward compatibility.
82//
83// If you get the error "failed to send packet header: EOF" when copying a
84// large file, try lowering this number.
85//
86// The default packet size is 32768 bytes.
87func MaxPacket(size int) ClientOption {
88 return MaxPacketChecked(size)
89}
90
91// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
92//
93// The default maximum concurrent requests is 64.
94func MaxConcurrentRequestsPerFile(n int) ClientOption {
95 return func(c *Client) error {
96 if n < 1 {
97 return errors.New("n must be greater or equal to 1")
98 }
99 c.maxConcurrentRequests = n
100 return nil
101 }
102}
103
104// UseConcurrentWrites allows the Client to perform concurrent Writes.
105//
106// Using concurrency while doing writes, requires special consideration.
107// A write to a later offset in a file after an error,
108// could end up with a file length longer than what was successfully written.
109//
110// When using this option, if you receive an error during `io.Copy` or `io.WriteTo`,
111// you may need to `Truncate` the target Writer to avoid “holes” in the data written.
112func UseConcurrentWrites(value bool) ClientOption {
113 return func(c *Client) error {
114 c.useConcurrentWrites = value
115 return nil
116 }
117}
118
119// UseConcurrentReads allows the Client to perform concurrent Reads.
120//
121// Concurrent reads are generally safe to use and not using them will degrade
122// performance, so this option is enabled by default.
123//
124// When enabled, WriteTo will use Stat/Fstat to get the file size and determines
125// how many concurrent workers to use.
126// Some "read once" servers will delete the file if they receive a stat call on an
127// open file and then the download will fail.
128// Disabling concurrent reads you will be able to download files from these servers.
129// If concurrent reads are disabled, the UseFstat option is ignored.
130func UseConcurrentReads(value bool) ClientOption {
131 return func(c *Client) error {
132 c.disableConcurrentReads = !value
133 return nil
134 }
135}
136
137// UseFstat sets whether to use Fstat or Stat when File.WriteTo is called
138// (usually when copying files).
139// Some servers limit the amount of open files and calling Stat after opening
140// the file will throw an error From the server. Setting this flag will call
141// Fstat instead of Stat which is suppose to be called on an open file handle.
142//
143// It has been found that that with IBM Sterling SFTP servers which have
144// "extractability" level set to 1 which means only 1 file can be opened at
145// any given time.
146//
147// If the server you are working with still has an issue with both Stat and
148// Fstat calls you can always open a file and read it until the end.
149//
150// Another reason to read the file until its end and Fstat doesn't work is
151// that in some servers, reading a full file will automatically delete the
152// file as some of these mainframes map the file to a message in a queue.
153// Once the file has been read it will get deleted.
154func UseFstat(value bool) ClientOption {
155 return func(c *Client) error {
156 c.useFstat = value
157 return nil
158 }
159}
160
161// CopyStderrTo specifies a writer to which the standard error of the remote sftp-server command should be written.
162//
163// The writer passed in will not be automatically closed.
164// It is the responsibility of the caller to coordinate closure of any writers.
165func CopyStderrTo(wr io.Writer) ClientOption {
166 return func(c *Client) error {
167 c.stderrTo = wr
168 return nil
169 }
170}
171
172// Client represents an SFTP session on a *ssh.ClientConn SSH connection.
173// Multiple Clients can be active on a single SSH connection, and a Client
174// may be called concurrently from multiple Goroutines.
175//
176// Client implements the github.com/kr/fs.FileSystem interface.
177type Client struct {
178 clientConn
179
180 stderrTo io.Writer
181
182 ext map[string]string // Extensions (name -> data).
183
184 maxPacket int // max packet size read or written.
185 maxConcurrentRequests int
186 nextid uint32
187
188 // write concurrency is… error prone.
189 // Default behavior should be to not use it.
190 useConcurrentWrites bool
191 useFstat bool
192 disableConcurrentReads bool
193}
194
195// NewClient creates a new SFTP client on conn, using zero or more option
196// functions.
197func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) {
198 s, err := conn.NewSession()
199 if err != nil {
200 return nil, err
201 }
202
203 pw, err := s.StdinPipe()
204 if err != nil {
205 return nil, err
206 }
207 pr, err := s.StdoutPipe()
208 if err != nil {
209 return nil, err
210 }
211 perr, err := s.StderrPipe()
212 if err != nil {
213 return nil, err
214 }
215
216 if err := s.RequestSubsystem("sftp"); err != nil {
217 return nil, err
218 }
219
220 return newClientPipe(pr, perr, pw, s.Wait, opts...)
221}
222
223// NewClientPipe creates a new SFTP client given a Reader and a WriteCloser.
224// This can be used for connecting to an SFTP server over TCP/TLS or by using
225// the system's ssh client program (e.g. via exec.Command).
226func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Client, error) {
227 return newClientPipe(rd, nil, wr, nil, opts...)
228}
229
230func newClientPipe(rd, stderr io.Reader, wr io.WriteCloser, wait func() error, opts ...ClientOption) (*Client, error) {
231 c := &Client{
232 clientConn: clientConn{
233 conn: conn{
234 Reader: rd,
235 WriteCloser: wr,
236 },
237 inflight: make(map[uint32]chan<- result),
238 closed: make(chan struct{}),
239 wait: wait,
240 },
241
242 ext: make(map[string]string),
243
244 maxPacket: 1 << 15,
245 maxConcurrentRequests: 64,
246 }
247
248 for _, opt := range opts {
249 if err := opt(c); err != nil {
250 wr.Close()
251 return nil, err
252 }
253 }
254
255 if stderr != nil {
256 wr := io.Discard
257 if c.stderrTo != nil {
258 wr = c.stderrTo
259 }
260
261 go func() {
262 // DO NOT close the writer!
263 // Programs may pass in `os.Stderr` to write the remote stderr to,
264 // and the program may continue after disconnect by reconnecting.
265 // But if we've closed their stderr, then we just messed everything up.
266
267 if _, err := io.Copy(wr, stderr); err != nil {
268 debug("error copying stderr: %v", err)
269 }
270 }()
271 }
272
273 if err := c.sendInit(); err != nil {
274 wr.Close()
275 return nil, fmt.Errorf("error sending init packet to server: %w", err)
276 }
277
278 if err := c.recvVersion(); err != nil {
279 wr.Close()
280 return nil, fmt.Errorf("error receiving version packet from server: %w", err)
281 }
282
283 c.clientConn.wg.Add(1)
284 go func() {
285 defer c.clientConn.wg.Done()
286
287 if err := c.clientConn.recv(); err != nil {
288 c.clientConn.broadcastErr(err)
289 }
290 }()
291
292 return c, nil
293}
294
295// Create creates the named file mode 0666 (before umask), truncating it if it
296// already exists. If successful, methods on the returned File can be used for
297// I/O; the associated file descriptor has mode O_RDWR. If you need more
298// control over the flags/mode used to open the file see client.OpenFile.
299//
300// Note that some SFTP servers (eg. AWS Transfer) do not support opening files
301// read/write at the same time. For those services you will need to use
302// `client.OpenFile(os.O_WRONLY|os.O_CREATE|os.O_TRUNC)`.
303func (c *Client) Create(path string) (*File, error) {
304 return c.open(path, toPflags(os.O_RDWR|os.O_CREATE|os.O_TRUNC))
305}
306
307const sftpProtocolVersion = 3 // https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt
308
309func (c *Client) sendInit() error {
310 return c.clientConn.conn.sendPacket(&sshFxInitPacket{
311 Version: sftpProtocolVersion, // https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt
312 })
313}
314
315// returns the next value of c.nextid
316func (c *Client) nextID() uint32 {
317 return atomic.AddUint32(&c.nextid, 1)
318}
319
320func (c *Client) recvVersion() error {
321 typ, data, err := c.recvPacket(0)
322 if err != nil {
323 if err == io.EOF {
324 return fmt.Errorf("server unexpectedly closed connection: %w", io.ErrUnexpectedEOF)
325 }
326
327 return err
328 }
329
330 if typ != sshFxpVersion {
331 return &unexpectedPacketErr{sshFxpVersion, typ}
332 }
333
334 version, data, err := unmarshalUint32Safe(data)
335 if err != nil {
336 return err
337 }
338
339 if version != sftpProtocolVersion {
340 return &unexpectedVersionErr{sftpProtocolVersion, version}
341 }
342
343 for len(data) > 0 {
344 var ext extensionPair
345 ext, data, err = unmarshalExtensionPair(data)
346 if err != nil {
347 return err
348 }
349 c.ext[ext.Name] = ext.Data
350 }
351
352 return nil
353}
354
355// HasExtension checks whether the server supports a named extension.
356//
357// The first return value is the extension data reported by the server
358// (typically a version number).
359func (c *Client) HasExtension(name string) (string, bool) {
360 data, ok := c.ext[name]
361 return data, ok
362}
363
364// Walk returns a new Walker rooted at root.
365func (c *Client) Walk(root string) *fs.Walker {
366 return fs.WalkFS(root, c)
367}
368
369// ReadDir reads the directory named by p
370// and returns a list of directory entries.
371func (c *Client) ReadDir(p string) ([]os.FileInfo, error) {
372 return c.ReadDirContext(context.Background(), p)
373}
374
375// ReadDirContext reads the directory named by p
376// and returns a list of directory entries.
377// The passed context can be used to cancel the operation
378// returning all entries listed up to the cancellation.
379func (c *Client) ReadDirContext(ctx context.Context, p string) ([]os.FileInfo, error) {
380 handle, err := c.opendir(ctx, p)
381 if err != nil {
382 return nil, err
383 }
384 defer c.close(handle) // this has to defer earlier than the lock below
385 var entries []os.FileInfo
386 var done = false
387 for !done {
388 id := c.nextID()
389 typ, data, err1 := c.sendPacket(ctx, nil, &sshFxpReaddirPacket{
390 ID: id,
391 Handle: handle,
392 })
393 if err1 != nil {
394 err = err1
395 done = true
396 break
397 }
398 switch typ {
399 case sshFxpName:
400 sid, data := unmarshalUint32(data)
401 if sid != id {
402 return nil, &unexpectedIDErr{id, sid}
403 }
404 count, data := unmarshalUint32(data)
405 for i := uint32(0); i < count; i++ {
406 var filename string
407 filename, data = unmarshalString(data)
408 _, data = unmarshalString(data) // discard longname
409 var attr *FileStat
410 attr, data, err = unmarshalAttrs(data)
411 if err != nil {
412 return nil, err
413 }
414 if filename == "." || filename == ".." {
415 continue
416 }
417 entries = append(entries, fileInfoFromStat(attr, path.Base(filename)))
418 }
419 case sshFxpStatus:
420 // TODO(dfc) scope warning!
421 err = normaliseError(unmarshalStatus(id, data))
422 done = true
423 default:
424 return nil, unimplementedPacketErr(typ)
425 }
426 }
427 if err == io.EOF {
428 err = nil
429 }
430 return entries, err
431}
432
433func (c *Client) opendir(ctx context.Context, path string) (string, error) {
434 id := c.nextID()
435 typ, data, err := c.sendPacket(ctx, nil, &sshFxpOpendirPacket{
436 ID: id,
437 Path: path,
438 })
439 if err != nil {
440 return "", err
441 }
442 switch typ {
443 case sshFxpHandle:
444 sid, data := unmarshalUint32(data)
445 if sid != id {
446 return "", &unexpectedIDErr{id, sid}
447 }
448 handle, _ := unmarshalString(data)
449 return handle, nil
450 case sshFxpStatus:
451 return "", normaliseError(unmarshalStatus(id, data))
452 default:
453 return "", unimplementedPacketErr(typ)
454 }
455}
456
457// Stat returns a FileInfo structure describing the file specified by path 'p'.
458// If 'p' is a symbolic link, the returned FileInfo structure describes the referent file.
459func (c *Client) Stat(p string) (os.FileInfo, error) {
460 fs, err := c.stat(p)
461 if err != nil {
462 return nil, err
463 }
464 return fileInfoFromStat(fs, path.Base(p)), nil
465}
466
467// Lstat returns a FileInfo structure describing the file specified by path 'p'.
468// If 'p' is a symbolic link, the returned FileInfo structure describes the symbolic link.
469func (c *Client) Lstat(p string) (os.FileInfo, error) {
470 id := c.nextID()
471 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpLstatPacket{
472 ID: id,
473 Path: p,
474 })
475 if err != nil {
476 return nil, err
477 }
478 switch typ {
479 case sshFxpAttrs:
480 sid, data := unmarshalUint32(data)
481 if sid != id {
482 return nil, &unexpectedIDErr{id, sid}
483 }
484 attr, _, err := unmarshalAttrs(data)
485 if err != nil {
486 // avoid returning a valid value from fileInfoFromStats if err != nil.
487 return nil, err
488 }
489 return fileInfoFromStat(attr, path.Base(p)), nil
490 case sshFxpStatus:
491 return nil, normaliseError(unmarshalStatus(id, data))
492 default:
493 return nil, unimplementedPacketErr(typ)
494 }
495}
496
497// ReadLink reads the target of a symbolic link.
498func (c *Client) ReadLink(p string) (string, error) {
499 id := c.nextID()
500 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpReadlinkPacket{
501 ID: id,
502 Path: p,
503 })
504 if err != nil {
505 return "", err
506 }
507 switch typ {
508 case sshFxpName:
509 sid, data := unmarshalUint32(data)
510 if sid != id {
511 return "", &unexpectedIDErr{id, sid}
512 }
513 count, data := unmarshalUint32(data)
514 if count != 1 {
515 return "", unexpectedCount(1, count)
516 }
517 filename, _ := unmarshalString(data) // ignore dummy attributes
518 return filename, nil
519 case sshFxpStatus:
520 return "", normaliseError(unmarshalStatus(id, data))
521 default:
522 return "", unimplementedPacketErr(typ)
523 }
524}
525
526// Link creates a hard link at 'newname', pointing at the same inode as 'oldname'
527func (c *Client) Link(oldname, newname string) error {
528 id := c.nextID()
529 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpHardlinkPacket{
530 ID: id,
531 Oldpath: oldname,
532 Newpath: newname,
533 })
534 if err != nil {
535 return err
536 }
537 switch typ {
538 case sshFxpStatus:
539 return normaliseError(unmarshalStatus(id, data))
540 default:
541 return unimplementedPacketErr(typ)
542 }
543}
544
545// Symlink creates a symbolic link at 'newname', pointing at target 'oldname'
546func (c *Client) Symlink(oldname, newname string) error {
547 id := c.nextID()
548 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpSymlinkPacket{
549 ID: id,
550 Linkpath: newname,
551 Targetpath: oldname,
552 })
553 if err != nil {
554 return err
555 }
556 switch typ {
557 case sshFxpStatus:
558 return normaliseError(unmarshalStatus(id, data))
559 default:
560 return unimplementedPacketErr(typ)
561 }
562}
563
564func (c *Client) fsetstat(handle string, flags uint32, attrs interface{}) error {
565 id := c.nextID()
566 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpFsetstatPacket{
567 ID: id,
568 Handle: handle,
569 Flags: flags,
570 Attrs: attrs,
571 })
572 if err != nil {
573 return err
574 }
575 switch typ {
576 case sshFxpStatus:
577 return normaliseError(unmarshalStatus(id, data))
578 default:
579 return unimplementedPacketErr(typ)
580 }
581}
582
583// setstat is a convience wrapper to allow for changing of various parts of the file descriptor.
584func (c *Client) setstat(path string, flags uint32, attrs interface{}) error {
585 id := c.nextID()
586 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpSetstatPacket{
587 ID: id,
588 Path: path,
589 Flags: flags,
590 Attrs: attrs,
591 })
592 if err != nil {
593 return err
594 }
595 switch typ {
596 case sshFxpStatus:
597 return normaliseError(unmarshalStatus(id, data))
598 default:
599 return unimplementedPacketErr(typ)
600 }
601}
602
603// Chtimes changes the access and modification times of the named file.
604func (c *Client) Chtimes(path string, atime time.Time, mtime time.Time) error {
605 type times struct {
606 Atime uint32
607 Mtime uint32
608 }
609 attrs := times{uint32(atime.Unix()), uint32(mtime.Unix())}
610 return c.setstat(path, sshFileXferAttrACmodTime, attrs)
611}
612
613// Chown changes the user and group owners of the named file.
614func (c *Client) Chown(path string, uid, gid int) error {
615 type owner struct {
616 UID uint32
617 GID uint32
618 }
619 attrs := owner{uint32(uid), uint32(gid)}
620 return c.setstat(path, sshFileXferAttrUIDGID, attrs)
621}
622
623// Chmod changes the permissions of the named file.
624//
625// Chmod does not apply a umask, because even retrieving the umask is not
626// possible in a portable way without causing a race condition. Callers
627// should mask off umask bits, if desired.
628func (c *Client) Chmod(path string, mode os.FileMode) error {
629 return c.setstat(path, sshFileXferAttrPermissions, toChmodPerm(mode))
630}
631
632// Truncate sets the size of the named file. Although it may be safely assumed
633// that if the size is less than its current size it will be truncated to fit,
634// the SFTP protocol does not specify what behavior the server should do when setting
635// size greater than the current size.
636func (c *Client) Truncate(path string, size int64) error {
637 return c.setstat(path, sshFileXferAttrSize, uint64(size))
638}
639
640// SetExtendedData sets extended attributes of the named file. It uses the
641// SSH_FILEXFER_ATTR_EXTENDED flag in the setstat request.
642//
643// This flag provides a general extension mechanism for vendor-specific extensions.
644// Names of the attributes should be a string of the format "name@domain", where "domain"
645// is a valid, registered domain name and "name" identifies the method. Server
646// implementations SHOULD ignore extended data fields that they do not understand.
647func (c *Client) SetExtendedData(path string, extended []StatExtended) error {
648 attrs := &FileStat{
649 Extended: extended,
650 }
651 return c.setstat(path, sshFileXferAttrExtended, attrs)
652}
653
654// Open opens the named file for reading. If successful, methods on the
655// returned file can be used for reading; the associated file descriptor
656// has mode O_RDONLY.
657func (c *Client) Open(path string) (*File, error) {
658 return c.open(path, toPflags(os.O_RDONLY))
659}
660
661// OpenFile is the generalized open call; most users will use Open or
662// Create instead. It opens the named file with specified flag (O_RDONLY
663// etc.). If successful, methods on the returned File can be used for I/O.
664func (c *Client) OpenFile(path string, f int) (*File, error) {
665 return c.open(path, toPflags(f))
666}
667
668func (c *Client) open(path string, pflags uint32) (*File, error) {
669 id := c.nextID()
670 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpOpenPacket{
671 ID: id,
672 Path: path,
673 Pflags: pflags,
674 })
675 if err != nil {
676 return nil, err
677 }
678 switch typ {
679 case sshFxpHandle:
680 sid, data := unmarshalUint32(data)
681 if sid != id {
682 return nil, &unexpectedIDErr{id, sid}
683 }
684 handle, _ := unmarshalString(data)
685 return &File{c: c, path: path, handle: handle}, nil
686 case sshFxpStatus:
687 return nil, normaliseError(unmarshalStatus(id, data))
688 default:
689 return nil, unimplementedPacketErr(typ)
690 }
691}
692
693// close closes a handle handle previously returned in the response
694// to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid
695// immediately after this request has been sent.
696func (c *Client) close(handle string) error {
697 id := c.nextID()
698 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpClosePacket{
699 ID: id,
700 Handle: handle,
701 })
702 if err != nil {
703 return err
704 }
705 switch typ {
706 case sshFxpStatus:
707 return normaliseError(unmarshalStatus(id, data))
708 default:
709 return unimplementedPacketErr(typ)
710 }
711}
712
713func (c *Client) stat(path string) (*FileStat, error) {
714 id := c.nextID()
715 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpStatPacket{
716 ID: id,
717 Path: path,
718 })
719 if err != nil {
720 return nil, err
721 }
722 switch typ {
723 case sshFxpAttrs:
724 sid, data := unmarshalUint32(data)
725 if sid != id {
726 return nil, &unexpectedIDErr{id, sid}
727 }
728 attr, _, err := unmarshalAttrs(data)
729 return attr, err
730 case sshFxpStatus:
731 return nil, normaliseError(unmarshalStatus(id, data))
732 default:
733 return nil, unimplementedPacketErr(typ)
734 }
735}
736
737func (c *Client) fstat(handle string) (*FileStat, error) {
738 id := c.nextID()
739 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpFstatPacket{
740 ID: id,
741 Handle: handle,
742 })
743 if err != nil {
744 return nil, err
745 }
746 switch typ {
747 case sshFxpAttrs:
748 sid, data := unmarshalUint32(data)
749 if sid != id {
750 return nil, &unexpectedIDErr{id, sid}
751 }
752 attr, _, err := unmarshalAttrs(data)
753 return attr, err
754 case sshFxpStatus:
755 return nil, normaliseError(unmarshalStatus(id, data))
756 default:
757 return nil, unimplementedPacketErr(typ)
758 }
759}
760
761// StatVFS retrieves VFS statistics from a remote host.
762//
763// It implements the statvfs@openssh.com SSH_FXP_EXTENDED feature
764// from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt.
765func (c *Client) StatVFS(path string) (*StatVFS, error) {
766 // send the StatVFS packet to the server
767 id := c.nextID()
768 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpStatvfsPacket{
769 ID: id,
770 Path: path,
771 })
772 if err != nil {
773 return nil, err
774 }
775
776 switch typ {
777 // server responded with valid data
778 case sshFxpExtendedReply:
779 var response StatVFS
780 err = binary.Read(bytes.NewReader(data), binary.BigEndian, &response)
781 if err != nil {
782 return nil, errors.New("can not parse reply")
783 }
784
785 return &response, nil
786
787 // the resquest failed
788 case sshFxpStatus:
789 return nil, normaliseError(unmarshalStatus(id, data))
790
791 default:
792 return nil, unimplementedPacketErr(typ)
793 }
794}
795
796// Join joins any number of path elements into a single path, adding a
797// separating slash if necessary. The result is Cleaned; in particular, all
798// empty strings are ignored.
799func (c *Client) Join(elem ...string) string { return path.Join(elem...) }
800
801// Remove removes the specified file or directory. An error will be returned if no
802// file or directory with the specified path exists, or if the specified directory
803// is not empty.
804func (c *Client) Remove(path string) error {
805 errF := c.removeFile(path)
806 if errF == nil {
807 return nil
808 }
809
810 errD := c.RemoveDirectory(path)
811 if errD == nil {
812 return nil
813 }
814
815 // Both failed: figure out which error to return.
816
817 if errF, ok := errF.(*os.PathError); ok {
818 // The only time it makes sense to compare errors, is when both are `*os.PathError`.
819 // We cannot test these directly with errF == errD, as that would be a pointer comparison.
820
821 if errD, ok := errD.(*os.PathError); ok && errors.Is(errF.Err, errD.Err) {
822 // If they are both pointers to PathError,
823 // and the same underlying error, then return that.
824 return errF
825 }
826 }
827
828 fi, err := c.Stat(path)
829 if err != nil {
830 return err
831 }
832
833 if fi.IsDir() {
834 return errD
835 }
836
837 return errF
838}
839
840func (c *Client) removeFile(path string) error {
841 id := c.nextID()
842 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRemovePacket{
843 ID: id,
844 Filename: path,
845 })
846 if err != nil {
847 return err
848 }
849 switch typ {
850 case sshFxpStatus:
851 err = normaliseError(unmarshalStatus(id, data))
852 if err == nil {
853 return nil
854 }
855 return &os.PathError{
856 Op: "remove",
857 Path: path,
858 Err: err,
859 }
860 default:
861 return unimplementedPacketErr(typ)
862 }
863}
864
865// RemoveDirectory removes a directory path.
866func (c *Client) RemoveDirectory(path string) error {
867 id := c.nextID()
868 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRmdirPacket{
869 ID: id,
870 Path: path,
871 })
872 if err != nil {
873 return err
874 }
875 switch typ {
876 case sshFxpStatus:
877 err = normaliseError(unmarshalStatus(id, data))
878 if err == nil {
879 return nil
880 }
881 return &os.PathError{
882 Op: "remove",
883 Path: path,
884 Err: err,
885 }
886 default:
887 return unimplementedPacketErr(typ)
888 }
889}
890
891// Rename renames a file.
892func (c *Client) Rename(oldname, newname string) error {
893 id := c.nextID()
894 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRenamePacket{
895 ID: id,
896 Oldpath: oldname,
897 Newpath: newname,
898 })
899 if err != nil {
900 return err
901 }
902 switch typ {
903 case sshFxpStatus:
904 return normaliseError(unmarshalStatus(id, data))
905 default:
906 return unimplementedPacketErr(typ)
907 }
908}
909
910// PosixRename renames a file using the posix-rename@openssh.com extension
911// which will replace newname if it already exists.
912func (c *Client) PosixRename(oldname, newname string) error {
913 id := c.nextID()
914 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpPosixRenamePacket{
915 ID: id,
916 Oldpath: oldname,
917 Newpath: newname,
918 })
919 if err != nil {
920 return err
921 }
922 switch typ {
923 case sshFxpStatus:
924 return normaliseError(unmarshalStatus(id, data))
925 default:
926 return unimplementedPacketErr(typ)
927 }
928}
929
930// RealPath can be used to have the server canonicalize any given path name to an absolute path.
931//
932// This is useful for converting path names containing ".." components,
933// or relative pathnames without a leading slash into absolute paths.
934func (c *Client) RealPath(path string) (string, error) {
935 id := c.nextID()
936 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRealpathPacket{
937 ID: id,
938 Path: path,
939 })
940 if err != nil {
941 return "", err
942 }
943 switch typ {
944 case sshFxpName:
945 sid, data := unmarshalUint32(data)
946 if sid != id {
947 return "", &unexpectedIDErr{id, sid}
948 }
949 count, data := unmarshalUint32(data)
950 if count != 1 {
951 return "", unexpectedCount(1, count)
952 }
953 filename, _ := unmarshalString(data) // ignore attributes
954 return filename, nil
955 case sshFxpStatus:
956 return "", normaliseError(unmarshalStatus(id, data))
957 default:
958 return "", unimplementedPacketErr(typ)
959 }
960}
961
962// Getwd returns the current working directory of the server. Operations
963// involving relative paths will be based at this location.
964func (c *Client) Getwd() (string, error) {
965 return c.RealPath(".")
966}
967
968// Mkdir creates the specified directory. An error will be returned if a file or
969// directory with the specified path already exists, or if the directory's
970// parent folder does not exist (the method cannot create complete paths).
971func (c *Client) Mkdir(path string) error {
972 id := c.nextID()
973 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpMkdirPacket{
974 ID: id,
975 Path: path,
976 })
977 if err != nil {
978 return err
979 }
980 switch typ {
981 case sshFxpStatus:
982 return normaliseError(unmarshalStatus(id, data))
983 default:
984 return unimplementedPacketErr(typ)
985 }
986}
987
988// MkdirAll creates a directory named path, along with any necessary parents,
989// and returns nil, or else returns an error.
990// If path is already a directory, MkdirAll does nothing and returns nil.
991// If, while making any directory, that path is found to already be a regular file, an error is returned.
992func (c *Client) MkdirAll(path string) error {
993 // Most of this code mimics https://golang.org/src/os/path.go?s=514:561#L13
994 // Fast path: if we can tell whether path is a directory or file, stop with success or error.
995 dir, err := c.Stat(path)
996 if err == nil {
997 if dir.IsDir() {
998 return nil
999 }
1000 return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR}
1001 }
1002
1003 // Slow path: make sure parent exists and then call Mkdir for path.
1004 i := len(path)
1005 for i > 0 && path[i-1] == '/' { // Skip trailing path separator.
1006 i--
1007 }
1008
1009 j := i
1010 for j > 0 && path[j-1] != '/' { // Scan backward over element.
1011 j--
1012 }
1013
1014 if j > 1 {
1015 // Create parent
1016 err = c.MkdirAll(path[0 : j-1])
1017 if err != nil {
1018 return err
1019 }
1020 }
1021
1022 // Parent now exists; invoke Mkdir and use its result.
1023 err = c.Mkdir(path)
1024 if err != nil {
1025 // Handle arguments like "foo/." by
1026 // double-checking that directory doesn't exist.
1027 dir, err1 := c.Lstat(path)
1028 if err1 == nil && dir.IsDir() {
1029 return nil
1030 }
1031 return err
1032 }
1033 return nil
1034}
1035
1036// RemoveAll delete files recursively in the directory and Recursively delete subdirectories.
1037// An error will be returned if no file or directory with the specified path exists
1038func (c *Client) RemoveAll(path string) error {
1039
1040 // Get the file/directory information
1041 fi, err := c.Stat(path)
1042 if err != nil {
1043 return err
1044 }
1045
1046 if fi.IsDir() {
1047 // Delete files recursively in the directory
1048 files, err := c.ReadDir(path)
1049 if err != nil {
1050 return err
1051 }
1052
1053 for _, file := range files {
1054 if file.IsDir() {
1055 // Recursively delete subdirectories
1056 err = c.RemoveAll(path + "/" + file.Name())
1057 if err != nil {
1058 return err
1059 }
1060 } else {
1061 // Delete individual files
1062 err = c.Remove(path + "/" + file.Name())
1063 if err != nil {
1064 return err
1065 }
1066 }
1067 }
1068
1069 }
1070
1071 return c.Remove(path)
1072
1073}
1074
1075// File represents a remote file.
1076type File struct {
1077 c *Client
1078 path string
1079
1080 mu sync.RWMutex
1081 handle string
1082 offset int64 // current offset within remote file
1083}
1084
1085// Close closes the File, rendering it unusable for I/O. It returns an
1086// error, if any.
1087func (f *File) Close() error {
1088 f.mu.Lock()
1089 defer f.mu.Unlock()
1090
1091 if f.handle == "" {
1092 return os.ErrClosed
1093 }
1094
1095 // The design principle here is that when `openssh-portable/sftp-server.c` is doing `handle_close`,
1096 // it will unconditionally mark the handle as unused,
1097 // so we need to also unconditionally mark this handle as invalid.
1098 // By invalidating our local copy of the handle,
1099 // we ensure that there cannot be any erroneous use-after-close requests sent after Close.
1100
1101 handle := f.handle
1102 f.handle = ""
1103
1104 return f.c.close(handle)
1105}
1106
1107// Name returns the name of the file as presented to Open or Create.
1108func (f *File) Name() string {
1109 return f.path
1110}
1111
1112// Read reads up to len(b) bytes from the File. It returns the number of bytes
1113// read and an error, if any. Read follows io.Reader semantics, so when Read
1114// encounters an error or EOF condition after successfully reading n > 0 bytes,
1115// it returns the number of bytes read.
1116//
1117// To maximise throughput for transferring the entire file (especially
1118// over high latency links) it is recommended to use WriteTo rather
1119// than calling Read multiple times. io.Copy will do this
1120// automatically.
1121func (f *File) Read(b []byte) (int, error) {
1122 f.mu.Lock()
1123 defer f.mu.Unlock()
1124
1125 n, err := f.readAt(b, f.offset)
1126 f.offset += int64(n)
1127 return n, err
1128}
1129
1130// readChunkAt attempts to read the whole entire length of the buffer from the file starting at the offset.
1131// It will continue progressively reading into the buffer until it fills the whole buffer, or an error occurs.
1132func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err error) {
1133 for err == nil && n < len(b) {
1134 id := f.c.nextID()
1135 typ, data, err := f.c.sendPacket(context.Background(), ch, &sshFxpReadPacket{
1136 ID: id,
1137 Handle: f.handle,
1138 Offset: uint64(off) + uint64(n),
1139 Len: uint32(len(b) - n),
1140 })
1141 if err != nil {
1142 return n, err
1143 }
1144
1145 switch typ {
1146 case sshFxpStatus:
1147 return n, normaliseError(unmarshalStatus(id, data))
1148
1149 case sshFxpData:
1150 sid, data := unmarshalUint32(data)
1151 if id != sid {
1152 return n, &unexpectedIDErr{id, sid}
1153 }
1154
1155 l, data := unmarshalUint32(data)
1156 n += copy(b[n:], data[:l])
1157
1158 default:
1159 return n, unimplementedPacketErr(typ)
1160 }
1161 }
1162
1163 return
1164}
1165
1166func (f *File) readAtSequential(b []byte, off int64) (read int, err error) {
1167 for read < len(b) {
1168 rb := b[read:]
1169 if len(rb) > f.c.maxPacket {
1170 rb = rb[:f.c.maxPacket]
1171 }
1172 n, err := f.readChunkAt(nil, rb, off+int64(read))
1173 if n < 0 {
1174 panic("sftp.File: returned negative count from readChunkAt")
1175 }
1176 if n > 0 {
1177 read += n
1178 }
1179 if err != nil {
1180 return read, err
1181 }
1182 }
1183 return read, nil
1184}
1185
1186// ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns
1187// the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics,
1188// so the file offset is not altered during the read.
1189func (f *File) ReadAt(b []byte, off int64) (int, error) {
1190 f.mu.RLock()
1191 defer f.mu.RUnlock()
1192
1193 return f.readAt(b, off)
1194}
1195
1196// readAt must be called while holding either the Read or Write mutex in File.
1197// This code is concurrent safe with itself, but not with Close.
1198func (f *File) readAt(b []byte, off int64) (int, error) {
1199 if f.handle == "" {
1200 return 0, os.ErrClosed
1201 }
1202
1203 if len(b) <= f.c.maxPacket {
1204 // This should be able to be serviced with 1/2 requests.
1205 // So, just do it directly.
1206 return f.readChunkAt(nil, b, off)
1207 }
1208
1209 if f.c.disableConcurrentReads {
1210 return f.readAtSequential(b, off)
1211 }
1212
1213 // Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests.
1214 // This allows writes with a suitably large buffer to transfer data at a much faster rate
1215 // by overlapping round trip times.
1216
1217 cancel := make(chan struct{})
1218
1219 concurrency := len(b)/f.c.maxPacket + 1
1220 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
1221 concurrency = f.c.maxConcurrentRequests
1222 }
1223
1224 resPool := newResChanPool(concurrency)
1225
1226 type work struct {
1227 id uint32
1228 res chan result
1229
1230 b []byte
1231 off int64
1232 }
1233 workCh := make(chan work)
1234
1235 // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
1236 go func() {
1237 defer close(workCh)
1238
1239 b := b
1240 offset := off
1241 chunkSize := f.c.maxPacket
1242
1243 for len(b) > 0 {
1244 rb := b
1245 if len(rb) > chunkSize {
1246 rb = rb[:chunkSize]
1247 }
1248
1249 id := f.c.nextID()
1250 res := resPool.Get()
1251
1252 f.c.dispatchRequest(res, &sshFxpReadPacket{
1253 ID: id,
1254 Handle: f.handle,
1255 Offset: uint64(offset),
1256 Len: uint32(len(rb)),
1257 })
1258
1259 select {
1260 case workCh <- work{id, res, rb, offset}:
1261 case <-cancel:
1262 return
1263 }
1264
1265 offset += int64(len(rb))
1266 b = b[len(rb):]
1267 }
1268 }()
1269
1270 type rErr struct {
1271 off int64
1272 err error
1273 }
1274 errCh := make(chan rErr)
1275
1276 var wg sync.WaitGroup
1277 wg.Add(concurrency)
1278 for i := 0; i < concurrency; i++ {
1279 // Map_i: each worker gets work, and then performs the Read into its buffer from its respective offset.
1280 go func() {
1281 defer wg.Done()
1282
1283 for packet := range workCh {
1284 var n int
1285
1286 s := <-packet.res
1287 resPool.Put(packet.res)
1288
1289 err := s.err
1290 if err == nil {
1291 switch s.typ {
1292 case sshFxpStatus:
1293 err = normaliseError(unmarshalStatus(packet.id, s.data))
1294
1295 case sshFxpData:
1296 sid, data := unmarshalUint32(s.data)
1297 if packet.id != sid {
1298 err = &unexpectedIDErr{packet.id, sid}
1299
1300 } else {
1301 l, data := unmarshalUint32(data)
1302 n = copy(packet.b, data[:l])
1303
1304 // For normal disk files, it is guaranteed that this will read
1305 // the specified number of bytes, or up to end of file.
1306 // This implies, if we have a short read, that means EOF.
1307 if n < len(packet.b) {
1308 err = io.EOF
1309 }
1310 }
1311
1312 default:
1313 err = unimplementedPacketErr(s.typ)
1314 }
1315 }
1316
1317 if err != nil {
1318 // return the offset as the start + how much we read before the error.
1319 errCh <- rErr{packet.off + int64(n), err}
1320
1321 // DO NOT return.
1322 // We want to ensure that workCh is drained before wg.Wait returns.
1323 }
1324 }
1325 }()
1326 }
1327
1328 // Wait for long tail, before closing results.
1329 go func() {
1330 wg.Wait()
1331 close(errCh)
1332 }()
1333
1334 // Reduce: collect all the results into a relevant return: the earliest offset to return an error.
1335 firstErr := rErr{math.MaxInt64, nil}
1336 for rErr := range errCh {
1337 if rErr.off <= firstErr.off {
1338 firstErr = rErr
1339 }
1340
1341 select {
1342 case <-cancel:
1343 default:
1344 // stop any more work from being distributed. (Just in case.)
1345 close(cancel)
1346 }
1347 }
1348
1349 if firstErr.err != nil {
1350 // firstErr.err != nil if and only if firstErr.off > our starting offset.
1351 return int(firstErr.off - off), firstErr.err
1352 }
1353
1354 // As per spec for io.ReaderAt, we return nil error if and only if we read everything.
1355 return len(b), nil
1356}
1357
1358// writeToSequential implements WriteTo, but works sequentially with no parallelism.
1359func (f *File) writeToSequential(w io.Writer) (written int64, err error) {
1360 b := make([]byte, f.c.maxPacket)
1361 ch := make(chan result, 1) // reusable channel
1362
1363 for {
1364 n, err := f.readChunkAt(ch, b, f.offset)
1365 if n < 0 {
1366 panic("sftp.File: returned negative count from readChunkAt")
1367 }
1368
1369 if n > 0 {
1370 f.offset += int64(n)
1371
1372 m, err := w.Write(b[:n])
1373 written += int64(m)
1374
1375 if err != nil {
1376 return written, err
1377 }
1378 }
1379
1380 if err != nil {
1381 if err == io.EOF {
1382 return written, nil // return nil explicitly.
1383 }
1384
1385 return written, err
1386 }
1387 }
1388}
1389
1390// WriteTo writes the file to the given Writer.
1391// The return value is the number of bytes written.
1392// Any error encountered during the write is also returned.
1393//
1394// This method is preferred over calling Read multiple times
1395// to maximise throughput for transferring the entire file,
1396// especially over high latency links.
1397func (f *File) WriteTo(w io.Writer) (written int64, err error) {
1398 f.mu.Lock()
1399 defer f.mu.Unlock()
1400
1401 if f.handle == "" {
1402 return 0, os.ErrClosed
1403 }
1404
1405 if f.c.disableConcurrentReads {
1406 return f.writeToSequential(w)
1407 }
1408
1409 // For concurrency, we want to guess how many concurrent workers we should use.
1410 var fileStat *FileStat
1411 if f.c.useFstat {
1412 fileStat, err = f.c.fstat(f.handle)
1413 } else {
1414 fileStat, err = f.c.stat(f.path)
1415 }
1416 if err != nil {
1417 return 0, err
1418 }
1419
1420 fileSize := fileStat.Size
1421 if fileSize <= uint64(f.c.maxPacket) || !isRegular(fileStat.Mode) {
1422 // only regular files are guaranteed to return (full read) xor (partial read, next error)
1423 return f.writeToSequential(w)
1424 }
1425
1426 concurrency64 := fileSize/uint64(f.c.maxPacket) + 1 // a bad guess, but better than no guess
1427 if concurrency64 > uint64(f.c.maxConcurrentRequests) || concurrency64 < 1 {
1428 concurrency64 = uint64(f.c.maxConcurrentRequests)
1429 }
1430 // Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow.
1431 concurrency := int(concurrency64)
1432
1433 chunkSize := f.c.maxPacket
1434 pool := newBufPool(concurrency, chunkSize)
1435 resPool := newResChanPool(concurrency)
1436
1437 cancel := make(chan struct{})
1438 var wg sync.WaitGroup
1439 defer func() {
1440 // Once the writing Reduce phase has ended, all the feed work needs to unconditionally stop.
1441 close(cancel)
1442
1443 // We want to wait until all outstanding goroutines with an `f` or `f.c` reference have completed.
1444 // Just to be sure we don’t orphan any goroutines any hanging references.
1445 wg.Wait()
1446 }()
1447
1448 type writeWork struct {
1449 b []byte
1450 off int64
1451 err error
1452
1453 next chan writeWork
1454 }
1455 writeCh := make(chan writeWork)
1456
1457 type readWork struct {
1458 id uint32
1459 res chan result
1460 off int64
1461
1462 cur, next chan writeWork
1463 }
1464 readCh := make(chan readWork)
1465
1466 // Slice: hand out chunks of work on demand, with a `cur` and `next` channel built-in for sequencing.
1467 go func() {
1468 defer close(readCh)
1469
1470 off := f.offset
1471
1472 cur := writeCh
1473 for {
1474 id := f.c.nextID()
1475 res := resPool.Get()
1476
1477 next := make(chan writeWork)
1478 readWork := readWork{
1479 id: id,
1480 res: res,
1481 off: off,
1482
1483 cur: cur,
1484 next: next,
1485 }
1486
1487 f.c.dispatchRequest(res, &sshFxpReadPacket{
1488 ID: id,
1489 Handle: f.handle,
1490 Offset: uint64(off),
1491 Len: uint32(chunkSize),
1492 })
1493
1494 select {
1495 case readCh <- readWork:
1496 case <-cancel:
1497 return
1498 }
1499
1500 off += int64(chunkSize)
1501 cur = next
1502 }
1503 }()
1504
1505 wg.Add(concurrency)
1506 for i := 0; i < concurrency; i++ {
1507 // Map_i: each worker gets readWork, and does the Read into a buffer at the given offset.
1508 go func() {
1509 defer wg.Done()
1510
1511 for readWork := range readCh {
1512 var b []byte
1513 var n int
1514
1515 s := <-readWork.res
1516 resPool.Put(readWork.res)
1517
1518 err := s.err
1519 if err == nil {
1520 switch s.typ {
1521 case sshFxpStatus:
1522 err = normaliseError(unmarshalStatus(readWork.id, s.data))
1523
1524 case sshFxpData:
1525 sid, data := unmarshalUint32(s.data)
1526 if readWork.id != sid {
1527 err = &unexpectedIDErr{readWork.id, sid}
1528
1529 } else {
1530 l, data := unmarshalUint32(data)
1531 b = pool.Get()[:l]
1532 n = copy(b, data[:l])
1533 b = b[:n]
1534 }
1535
1536 default:
1537 err = unimplementedPacketErr(s.typ)
1538 }
1539 }
1540
1541 writeWork := writeWork{
1542 b: b,
1543 off: readWork.off,
1544 err: err,
1545
1546 next: readWork.next,
1547 }
1548
1549 select {
1550 case readWork.cur <- writeWork:
1551 case <-cancel:
1552 }
1553
1554 // DO NOT return.
1555 // We want to ensure that readCh is drained before wg.Wait returns.
1556 }
1557 }()
1558 }
1559
1560 // Reduce: serialize the results from the reads into sequential writes.
1561 cur := writeCh
1562 for {
1563 packet, ok := <-cur
1564 if !ok {
1565 return written, errors.New("sftp.File.WriteTo: unexpectedly closed channel")
1566 }
1567
1568 // Because writes are serialized, this will always be the last successfully read byte.
1569 f.offset = packet.off + int64(len(packet.b))
1570
1571 if len(packet.b) > 0 {
1572 n, err := w.Write(packet.b)
1573 written += int64(n)
1574 if err != nil {
1575 return written, err
1576 }
1577 }
1578
1579 if packet.err != nil {
1580 if packet.err == io.EOF {
1581 return written, nil
1582 }
1583
1584 return written, packet.err
1585 }
1586
1587 pool.Put(packet.b)
1588 cur = packet.next
1589 }
1590}
1591
1592// Stat returns the FileInfo structure describing file. If there is an
1593// error.
1594func (f *File) Stat() (os.FileInfo, error) {
1595 f.mu.RLock()
1596 defer f.mu.RUnlock()
1597
1598 if f.handle == "" {
1599 return nil, os.ErrClosed
1600 }
1601
1602 return f.stat()
1603}
1604
1605func (f *File) stat() (os.FileInfo, error) {
1606 fs, err := f.c.fstat(f.handle)
1607 if err != nil {
1608 return nil, err
1609 }
1610 return fileInfoFromStat(fs, path.Base(f.path)), nil
1611}
1612
1613// Write writes len(b) bytes to the File. It returns the number of bytes
1614// written and an error, if any. Write returns a non-nil error when n !=
1615// len(b).
1616//
1617// To maximise throughput for transferring the entire file (especially
1618// over high latency links) it is recommended to use ReadFrom rather
1619// than calling Write multiple times. io.Copy will do this
1620// automatically.
1621func (f *File) Write(b []byte) (int, error) {
1622 f.mu.Lock()
1623 defer f.mu.Unlock()
1624
1625 if f.handle == "" {
1626 return 0, os.ErrClosed
1627 }
1628
1629 n, err := f.writeAt(b, f.offset)
1630 f.offset += int64(n)
1631 return n, err
1632}
1633
1634func (f *File) writeChunkAt(ch chan result, b []byte, off int64) (int, error) {
1635 typ, data, err := f.c.sendPacket(context.Background(), ch, &sshFxpWritePacket{
1636 ID: f.c.nextID(),
1637 Handle: f.handle,
1638 Offset: uint64(off),
1639 Length: uint32(len(b)),
1640 Data: b,
1641 })
1642 if err != nil {
1643 return 0, err
1644 }
1645
1646 switch typ {
1647 case sshFxpStatus:
1648 id, _ := unmarshalUint32(data)
1649 err := normaliseError(unmarshalStatus(id, data))
1650 if err != nil {
1651 return 0, err
1652 }
1653
1654 default:
1655 return 0, unimplementedPacketErr(typ)
1656 }
1657
1658 return len(b), nil
1659}
1660
1661// writeAtConcurrent implements WriterAt, but works concurrently rather than sequentially.
1662func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) {
1663 // Split the write into multiple maxPacket sized concurrent writes
1664 // bounded by maxConcurrentRequests. This allows writes with a suitably
1665 // large buffer to transfer data at a much faster rate due to
1666 // overlapping round trip times.
1667
1668 cancel := make(chan struct{})
1669
1670 type work struct {
1671 id uint32
1672 res chan result
1673
1674 off int64
1675 }
1676 workCh := make(chan work)
1677
1678 concurrency := len(b)/f.c.maxPacket + 1
1679 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
1680 concurrency = f.c.maxConcurrentRequests
1681 }
1682
1683 pool := newResChanPool(concurrency)
1684
1685 // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
1686 go func() {
1687 defer close(workCh)
1688
1689 var read int
1690 chunkSize := f.c.maxPacket
1691
1692 for read < len(b) {
1693 wb := b[read:]
1694 if len(wb) > chunkSize {
1695 wb = wb[:chunkSize]
1696 }
1697
1698 id := f.c.nextID()
1699 res := pool.Get()
1700 off := off + int64(read)
1701
1702 f.c.dispatchRequest(res, &sshFxpWritePacket{
1703 ID: id,
1704 Handle: f.handle,
1705 Offset: uint64(off),
1706 Length: uint32(len(wb)),
1707 Data: wb,
1708 })
1709
1710 select {
1711 case workCh <- work{id, res, off}:
1712 case <-cancel:
1713 return
1714 }
1715
1716 read += len(wb)
1717 }
1718 }()
1719
1720 type wErr struct {
1721 off int64
1722 err error
1723 }
1724 errCh := make(chan wErr)
1725
1726 var wg sync.WaitGroup
1727 wg.Add(concurrency)
1728 for i := 0; i < concurrency; i++ {
1729 // Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
1730 go func() {
1731 defer wg.Done()
1732
1733 for work := range workCh {
1734 s := <-work.res
1735 pool.Put(work.res)
1736
1737 err := s.err
1738 if err == nil {
1739 switch s.typ {
1740 case sshFxpStatus:
1741 err = normaliseError(unmarshalStatus(work.id, s.data))
1742 default:
1743 err = unimplementedPacketErr(s.typ)
1744 }
1745 }
1746
1747 if err != nil {
1748 errCh <- wErr{work.off, err}
1749 }
1750 }
1751 }()
1752 }
1753
1754 // Wait for long tail, before closing results.
1755 go func() {
1756 wg.Wait()
1757 close(errCh)
1758 }()
1759
1760 // Reduce: collect all the results into a relevant return: the earliest offset to return an error.
1761 firstErr := wErr{math.MaxInt64, nil}
1762 for wErr := range errCh {
1763 if wErr.off <= firstErr.off {
1764 firstErr = wErr
1765 }
1766
1767 select {
1768 case <-cancel:
1769 default:
1770 // stop any more work from being distributed. (Just in case.)
1771 close(cancel)
1772 }
1773 }
1774
1775 if firstErr.err != nil {
1776 // firstErr.err != nil if and only if firstErr.off >= our starting offset.
1777 return int(firstErr.off - off), firstErr.err
1778 }
1779
1780 return len(b), nil
1781}
1782
1783// WriteAt writes up to len(b) byte to the File at a given offset `off`. It returns
1784// the number of bytes written and an error, if any. WriteAt follows io.WriterAt semantics,
1785// so the file offset is not altered during the write.
1786func (f *File) WriteAt(b []byte, off int64) (written int, err error) {
1787 f.mu.RLock()
1788 defer f.mu.RUnlock()
1789
1790 if f.handle == "" {
1791 return 0, os.ErrClosed
1792 }
1793
1794 return f.writeAt(b, off)
1795}
1796
1797// writeAt must be called while holding either the Read or Write mutex in File.
1798// This code is concurrent safe with itself, but not with Close.
1799func (f *File) writeAt(b []byte, off int64) (written int, err error) {
1800 if len(b) <= f.c.maxPacket {
1801 // We can do this in one write.
1802 return f.writeChunkAt(nil, b, off)
1803 }
1804
1805 if f.c.useConcurrentWrites {
1806 return f.writeAtConcurrent(b, off)
1807 }
1808
1809 ch := make(chan result, 1) // reusable channel
1810
1811 chunkSize := f.c.maxPacket
1812
1813 for written < len(b) {
1814 wb := b[written:]
1815 if len(wb) > chunkSize {
1816 wb = wb[:chunkSize]
1817 }
1818
1819 n, err := f.writeChunkAt(ch, wb, off+int64(written))
1820 if n > 0 {
1821 written += n
1822 }
1823
1824 if err != nil {
1825 return written, err
1826 }
1827 }
1828
1829 return len(b), nil
1830}
1831
1832// ReadFromWithConcurrency implements ReaderFrom,
1833// but uses the given concurrency to issue multiple requests at the same time.
1834//
1835// Giving a concurrency of less than one will default to the Client’s max concurrency.
1836//
1837// Otherwise, the given concurrency will be capped by the Client's max concurrency.
1838//
1839// When one needs to guarantee concurrent reads/writes, this method is preferred
1840// over ReadFrom.
1841func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) {
1842 f.mu.Lock()
1843 defer f.mu.Unlock()
1844
1845 return f.readFromWithConcurrency(r, concurrency)
1846}
1847
1848func (f *File) readFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) {
1849 if f.handle == "" {
1850 return 0, os.ErrClosed
1851 }
1852
1853 // Split the write into multiple maxPacket sized concurrent writes.
1854 // This allows writes with a suitably large reader
1855 // to transfer data at a much faster rate due to overlapping round trip times.
1856
1857 cancel := make(chan struct{})
1858
1859 type work struct {
1860 id uint32
1861 res chan result
1862
1863 off int64
1864 }
1865 workCh := make(chan work)
1866
1867 type rwErr struct {
1868 off int64
1869 err error
1870 }
1871 errCh := make(chan rwErr)
1872
1873 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
1874 concurrency = f.c.maxConcurrentRequests
1875 }
1876
1877 pool := newResChanPool(concurrency)
1878
1879 // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
1880 go func() {
1881 defer close(workCh)
1882
1883 b := make([]byte, f.c.maxPacket)
1884 off := f.offset
1885
1886 for {
1887 // Fill the entire buffer.
1888 n, err := io.ReadFull(r, b)
1889
1890 if n > 0 {
1891 read += int64(n)
1892
1893 id := f.c.nextID()
1894 res := pool.Get()
1895
1896 f.c.dispatchRequest(res, &sshFxpWritePacket{
1897 ID: id,
1898 Handle: f.handle,
1899 Offset: uint64(off),
1900 Length: uint32(n),
1901 Data: b[:n],
1902 })
1903
1904 select {
1905 case workCh <- work{id, res, off}:
1906 case <-cancel:
1907 return
1908 }
1909
1910 off += int64(n)
1911 }
1912
1913 if err != nil {
1914 if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
1915 errCh <- rwErr{off, err}
1916 }
1917 return
1918 }
1919 }
1920 }()
1921
1922 var wg sync.WaitGroup
1923 wg.Add(concurrency)
1924 for i := 0; i < concurrency; i++ {
1925 // Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
1926 go func() {
1927 defer wg.Done()
1928
1929 for work := range workCh {
1930 s := <-work.res
1931 pool.Put(work.res)
1932
1933 err := s.err
1934 if err == nil {
1935 switch s.typ {
1936 case sshFxpStatus:
1937 err = normaliseError(unmarshalStatus(work.id, s.data))
1938 default:
1939 err = unimplementedPacketErr(s.typ)
1940 }
1941 }
1942
1943 if err != nil {
1944 errCh <- rwErr{work.off, err}
1945
1946 // DO NOT return.
1947 // We want to ensure that workCh is drained before wg.Wait returns.
1948 }
1949 }
1950 }()
1951 }
1952
1953 // Wait for long tail, before closing results.
1954 go func() {
1955 wg.Wait()
1956 close(errCh)
1957 }()
1958
1959 // Reduce: Collect all the results into a relevant return: the earliest offset to return an error.
1960 firstErr := rwErr{math.MaxInt64, nil}
1961 for rwErr := range errCh {
1962 if rwErr.off <= firstErr.off {
1963 firstErr = rwErr
1964 }
1965
1966 select {
1967 case <-cancel:
1968 default:
1969 // stop any more work from being distributed.
1970 close(cancel)
1971 }
1972 }
1973
1974 if firstErr.err != nil {
1975 // firstErr.err != nil if and only if firstErr.off is a valid offset.
1976 //
1977 // firstErr.off will then be the lesser of:
1978 // * the offset of the first error from writing,
1979 // * the last successfully read offset.
1980 //
1981 // This could be less than the last successfully written offset,
1982 // which is the whole reason for the UseConcurrentWrites() ClientOption.
1983 //
1984 // Callers are responsible for truncating any SFTP files to a safe length.
1985 f.offset = firstErr.off
1986
1987 // ReadFrom is defined to return the read bytes, regardless of any writer errors.
1988 return read, firstErr.err
1989 }
1990
1991 f.offset += read
1992 return read, nil
1993}
1994
1995// ReadFrom reads data from r until EOF and writes it to the file. The return
1996// value is the number of bytes read. Any error except io.EOF encountered
1997// during the read is also returned.
1998//
1999// This method is preferred over calling Write multiple times
2000// to maximise throughput for transferring the entire file,
2001// especially over high-latency links.
2002//
2003// To ensure concurrent writes, the given r needs to implement one of
2004// the following receiver methods:
2005//
2006// Len() int
2007// Size() int64
2008// Stat() (os.FileInfo, error)
2009//
2010// or be an instance of [io.LimitedReader] to determine the number of possible
2011// concurrent requests. Otherwise, reads/writes are performed sequentially.
2012// ReadFromWithConcurrency can be used explicitly to guarantee concurrent
2013// processing of the reader.
2014func (f *File) ReadFrom(r io.Reader) (int64, error) {
2015 f.mu.Lock()
2016 defer f.mu.Unlock()
2017
2018 if f.handle == "" {
2019 return 0, os.ErrClosed
2020 }
2021
2022 if f.c.useConcurrentWrites {
2023 var remain int64
2024 switch r := r.(type) {
2025 case interface{ Len() int }:
2026 remain = int64(r.Len())
2027
2028 case interface{ Size() int64 }:
2029 remain = r.Size()
2030
2031 case *io.LimitedReader:
2032 remain = r.N
2033
2034 case interface{ Stat() (os.FileInfo, error) }:
2035 info, err := r.Stat()
2036 if err == nil {
2037 remain = info.Size()
2038 }
2039 }
2040
2041 if remain < 0 {
2042 // We can strongly assert that we want default max concurrency here.
2043 return f.readFromWithConcurrency(r, f.c.maxConcurrentRequests)
2044 }
2045
2046 if remain > int64(f.c.maxPacket) {
2047 // Otherwise, only use concurrency, if it would be at least two packets.
2048
2049 // This is the best reasonable guess we can make.
2050 concurrency64 := remain/int64(f.c.maxPacket) + 1
2051
2052 // We need to cap this value to an `int` size value to avoid overflow on 32-bit machines.
2053 // So, we may as well pre-cap it to `f.c.maxConcurrentRequests`.
2054 if concurrency64 > int64(f.c.maxConcurrentRequests) {
2055 concurrency64 = int64(f.c.maxConcurrentRequests)
2056 }
2057
2058 return f.readFromWithConcurrency(r, int(concurrency64))
2059 }
2060 }
2061
2062 ch := make(chan result, 1) // reusable channel
2063
2064 b := make([]byte, f.c.maxPacket)
2065
2066 var read int64
2067 for {
2068 // Fill the entire buffer.
2069 n, err := io.ReadFull(r, b)
2070 if n < 0 {
2071 panic("sftp.File: reader returned negative count from Read")
2072 }
2073
2074 if n > 0 {
2075 read += int64(n)
2076
2077 m, err2 := f.writeChunkAt(ch, b[:n], f.offset)
2078 f.offset += int64(m)
2079
2080 if err == nil {
2081 err = err2
2082 }
2083 }
2084
2085 if err != nil {
2086 if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
2087 return read, nil // return nil explicitly.
2088 }
2089
2090 return read, err
2091 }
2092 }
2093}
2094
2095// Seek implements io.Seeker by setting the client offset for the next Read or
2096// Write. It returns the next offset read. Seeking before or after the end of
2097// the file is undefined. Seeking relative to the end calls Stat.
2098func (f *File) Seek(offset int64, whence int) (int64, error) {
2099 f.mu.Lock()
2100 defer f.mu.Unlock()
2101
2102 if f.handle == "" {
2103 return 0, os.ErrClosed
2104 }
2105
2106 switch whence {
2107 case io.SeekStart:
2108 case io.SeekCurrent:
2109 offset += f.offset
2110 case io.SeekEnd:
2111 fi, err := f.stat()
2112 if err != nil {
2113 return f.offset, err
2114 }
2115 offset += fi.Size()
2116 default:
2117 return f.offset, unimplementedSeekWhence(whence)
2118 }
2119
2120 if offset < 0 {
2121 return f.offset, os.ErrInvalid
2122 }
2123
2124 f.offset = offset
2125 return f.offset, nil
2126}
2127
2128// Chown changes the uid/gid of the current file.
2129func (f *File) Chown(uid, gid int) error {
2130 f.mu.RLock()
2131 defer f.mu.RUnlock()
2132
2133 if f.handle == "" {
2134 return os.ErrClosed
2135 }
2136
2137 return f.c.fsetstat(f.handle, sshFileXferAttrUIDGID, &FileStat{
2138 UID: uint32(uid),
2139 GID: uint32(gid),
2140 })
2141}
2142
2143// Chmod changes the permissions of the current file.
2144//
2145// See Client.Chmod for details.
2146func (f *File) Chmod(mode os.FileMode) error {
2147 f.mu.RLock()
2148 defer f.mu.RUnlock()
2149
2150 if f.handle == "" {
2151 return os.ErrClosed
2152 }
2153
2154 return f.c.fsetstat(f.handle, sshFileXferAttrPermissions, toChmodPerm(mode))
2155}
2156
2157// SetExtendedData sets extended attributes of the current file. It uses the
2158// SSH_FILEXFER_ATTR_EXTENDED flag in the setstat request.
2159//
2160// This flag provides a general extension mechanism for vendor-specific extensions.
2161// Names of the attributes should be a string of the format "name@domain", where "domain"
2162// is a valid, registered domain name and "name" identifies the method. Server
2163// implementations SHOULD ignore extended data fields that they do not understand.
2164func (f *File) SetExtendedData(path string, extended []StatExtended) error {
2165 f.mu.RLock()
2166 defer f.mu.RUnlock()
2167
2168 if f.handle == "" {
2169 return os.ErrClosed
2170 }
2171
2172 attrs := &FileStat{
2173 Extended: extended,
2174 }
2175
2176 return f.c.fsetstat(f.handle, sshFileXferAttrExtended, attrs)
2177}
2178
2179// Truncate sets the size of the current file. Although it may be safely assumed
2180// that if the size is less than its current size it will be truncated to fit,
2181// the SFTP protocol does not specify what behavior the server should do when setting
2182// size greater than the current size.
2183// We send a SSH_FXP_FSETSTAT here since we have a file handle
2184func (f *File) Truncate(size int64) error {
2185 f.mu.RLock()
2186 defer f.mu.RUnlock()
2187
2188 if f.handle == "" {
2189 return os.ErrClosed
2190 }
2191
2192 return f.c.fsetstat(f.handle, sshFileXferAttrSize, uint64(size))
2193}
2194
2195// Sync requests a flush of the contents of a File to stable storage.
2196//
2197// Sync requires the server to support the fsync@openssh.com extension.
2198func (f *File) Sync() error {
2199 f.mu.Lock()
2200 defer f.mu.Unlock()
2201
2202 if f.handle == "" {
2203 return os.ErrClosed
2204 }
2205
2206 if data, ok := f.c.HasExtension(openssh.ExtensionFSync().Name); !ok || data != "1" {
2207 return &StatusError{
2208 Code: sshFxOPUnsupported,
2209 msg: "fsync not supported",
2210 }
2211 }
2212
2213 id := f.c.nextID()
2214 typ, data, err := f.c.sendPacket(context.Background(), nil, &sshFxpFsyncPacket{
2215 ID: id,
2216 Handle: f.handle,
2217 })
2218
2219 switch {
2220 case err != nil:
2221 return err
2222 case typ == sshFxpStatus:
2223 return normaliseError(unmarshalStatus(id, data))
2224 default:
2225 return &unexpectedPacketErr{want: sshFxpStatus, got: typ}
2226 }
2227}
2228
2229// normaliseError normalises an error into a more standard form that can be
2230// checked against stdlib errors like io.EOF or os.ErrNotExist.
2231func normaliseError(err error) error {
2232 switch err := err.(type) {
2233 case *StatusError:
2234 switch err.Code {
2235 case sshFxEOF:
2236 return io.EOF
2237 case sshFxNoSuchFile:
2238 return os.ErrNotExist
2239 case sshFxPermissionDenied:
2240 return os.ErrPermission
2241 case sshFxOk:
2242 return nil
2243 default:
2244 return err
2245 }
2246 default:
2247 return err
2248 }
2249}
2250
2251// flags converts the flags passed to OpenFile into ssh flags.
2252// Unsupported flags are ignored.
2253func toPflags(f int) uint32 {
2254 var out uint32
2255 switch f & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) {
2256 case os.O_RDONLY:
2257 out |= sshFxfRead
2258 case os.O_WRONLY:
2259 out |= sshFxfWrite
2260 case os.O_RDWR:
2261 out |= sshFxfRead | sshFxfWrite
2262 }
2263 if f&os.O_APPEND == os.O_APPEND {
2264 out |= sshFxfAppend
2265 }
2266 if f&os.O_CREATE == os.O_CREATE {
2267 out |= sshFxfCreat
2268 }
2269 if f&os.O_TRUNC == os.O_TRUNC {
2270 out |= sshFxfTrunc
2271 }
2272 if f&os.O_EXCL == os.O_EXCL {
2273 out |= sshFxfExcl
2274 }
2275 return out
2276}
2277
2278// toChmodPerm converts Go permission bits to POSIX permission bits.
2279//
2280// This differs from fromFileMode in that we preserve the POSIX versions of
2281// setuid, setgid and sticky in m, because we've historically supported those
2282// bits, and we mask off any non-permission bits.
2283func toChmodPerm(m os.FileMode) (perm uint32) {
2284 const mask = os.ModePerm | os.FileMode(s_ISUID|s_ISGID|s_ISVTX)
2285 perm = uint32(m & mask)
2286
2287 if m&os.ModeSetuid != 0 {
2288 perm |= s_ISUID
2289 }
2290 if m&os.ModeSetgid != 0 {
2291 perm |= s_ISGID
2292 }
2293 if m&os.ModeSticky != 0 {
2294 perm |= s_ISVTX
2295 }
2296
2297 return perm
2298}