a geicko-2 based round robin ranking system designed to test c++ battleship submissions battleship.dunkirk.sh
at main 59 kB view raw
1package sftp 2 3import ( 4 "bytes" 5 "context" 6 "encoding/binary" 7 "errors" 8 "fmt" 9 "io" 10 "math" 11 "os" 12 "path" 13 "sync" 14 "sync/atomic" 15 "syscall" 16 "time" 17 18 "github.com/kr/fs" 19 "golang.org/x/crypto/ssh" 20 21 "github.com/pkg/sftp/internal/encoding/ssh/filexfer/openssh" 22) 23 24var ( 25 // ErrInternalInconsistency indicates the packets sent and the data queued to be 26 // written to the file don't match up. It is an unusual error and usually is 27 // caused by bad behavior server side or connection issues. The error is 28 // limited in scope to the call where it happened, the client object is still 29 // OK to use as long as the connection is still open. 30 ErrInternalInconsistency = errors.New("internal inconsistency") 31 // InternalInconsistency alias for ErrInternalInconsistency. 32 // 33 // Deprecated: please use ErrInternalInconsistency 34 InternalInconsistency = ErrInternalInconsistency 35) 36 37// A ClientOption is a function which applies configuration to a Client. 38type ClientOption func(*Client) error 39 40// MaxPacketChecked sets the maximum size of the payload, measured in bytes. 41// This option only accepts sizes servers should support, ie. <= 32768 bytes. 42// 43// If you get the error "failed to send packet header: EOF" when copying a 44// large file, try lowering this number. 45// 46// The default packet size is 32768 bytes. 47func MaxPacketChecked(size int) ClientOption { 48 return func(c *Client) error { 49 if size < 1 { 50 return errors.New("size must be greater or equal to 1") 51 } 52 if size > 32768 { 53 return errors.New("sizes larger than 32KB might not work with all servers") 54 } 55 c.maxPacket = size 56 return nil 57 } 58} 59 60// MaxPacketUnchecked sets the maximum size of the payload, measured in bytes. 61// It accepts sizes larger than the 32768 bytes all servers should support. 62// Only use a setting higher than 32768 if your application always connects to 63// the same server or after sufficiently broad testing. 64// 65// If you get the error "failed to send packet header: EOF" when copying a 66// large file, try lowering this number. 67// 68// The default packet size is 32768 bytes. 69func MaxPacketUnchecked(size int) ClientOption { 70 return func(c *Client) error { 71 if size < 1 { 72 return errors.New("size must be greater or equal to 1") 73 } 74 c.maxPacket = size 75 return nil 76 } 77} 78 79// MaxPacket sets the maximum size of the payload, measured in bytes. 80// This option only accepts sizes servers should support, ie. <= 32768 bytes. 81// This is a synonym for MaxPacketChecked that provides backward compatibility. 82// 83// If you get the error "failed to send packet header: EOF" when copying a 84// large file, try lowering this number. 85// 86// The default packet size is 32768 bytes. 87func MaxPacket(size int) ClientOption { 88 return MaxPacketChecked(size) 89} 90 91// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file. 92// 93// The default maximum concurrent requests is 64. 94func MaxConcurrentRequestsPerFile(n int) ClientOption { 95 return func(c *Client) error { 96 if n < 1 { 97 return errors.New("n must be greater or equal to 1") 98 } 99 c.maxConcurrentRequests = n 100 return nil 101 } 102} 103 104// UseConcurrentWrites allows the Client to perform concurrent Writes. 105// 106// Using concurrency while doing writes, requires special consideration. 107// A write to a later offset in a file after an error, 108// could end up with a file length longer than what was successfully written. 109// 110// When using this option, if you receive an error during `io.Copy` or `io.WriteTo`, 111// you may need to `Truncate` the target Writer to avoid “holes” in the data written. 112func UseConcurrentWrites(value bool) ClientOption { 113 return func(c *Client) error { 114 c.useConcurrentWrites = value 115 return nil 116 } 117} 118 119// UseConcurrentReads allows the Client to perform concurrent Reads. 120// 121// Concurrent reads are generally safe to use and not using them will degrade 122// performance, so this option is enabled by default. 123// 124// When enabled, WriteTo will use Stat/Fstat to get the file size and determines 125// how many concurrent workers to use. 126// Some "read once" servers will delete the file if they receive a stat call on an 127// open file and then the download will fail. 128// Disabling concurrent reads you will be able to download files from these servers. 129// If concurrent reads are disabled, the UseFstat option is ignored. 130func UseConcurrentReads(value bool) ClientOption { 131 return func(c *Client) error { 132 c.disableConcurrentReads = !value 133 return nil 134 } 135} 136 137// UseFstat sets whether to use Fstat or Stat when File.WriteTo is called 138// (usually when copying files). 139// Some servers limit the amount of open files and calling Stat after opening 140// the file will throw an error From the server. Setting this flag will call 141// Fstat instead of Stat which is suppose to be called on an open file handle. 142// 143// It has been found that that with IBM Sterling SFTP servers which have 144// "extractability" level set to 1 which means only 1 file can be opened at 145// any given time. 146// 147// If the server you are working with still has an issue with both Stat and 148// Fstat calls you can always open a file and read it until the end. 149// 150// Another reason to read the file until its end and Fstat doesn't work is 151// that in some servers, reading a full file will automatically delete the 152// file as some of these mainframes map the file to a message in a queue. 153// Once the file has been read it will get deleted. 154func UseFstat(value bool) ClientOption { 155 return func(c *Client) error { 156 c.useFstat = value 157 return nil 158 } 159} 160 161// CopyStderrTo specifies a writer to which the standard error of the remote sftp-server command should be written. 162// 163// The writer passed in will not be automatically closed. 164// It is the responsibility of the caller to coordinate closure of any writers. 165func CopyStderrTo(wr io.Writer) ClientOption { 166 return func(c *Client) error { 167 c.stderrTo = wr 168 return nil 169 } 170} 171 172// Client represents an SFTP session on a *ssh.ClientConn SSH connection. 173// Multiple Clients can be active on a single SSH connection, and a Client 174// may be called concurrently from multiple Goroutines. 175// 176// Client implements the github.com/kr/fs.FileSystem interface. 177type Client struct { 178 clientConn 179 180 stderrTo io.Writer 181 182 ext map[string]string // Extensions (name -> data). 183 184 maxPacket int // max packet size read or written. 185 maxConcurrentRequests int 186 nextid uint32 187 188 // write concurrency is… error prone. 189 // Default behavior should be to not use it. 190 useConcurrentWrites bool 191 useFstat bool 192 disableConcurrentReads bool 193} 194 195// NewClient creates a new SFTP client on conn, using zero or more option 196// functions. 197func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) { 198 s, err := conn.NewSession() 199 if err != nil { 200 return nil, err 201 } 202 203 pw, err := s.StdinPipe() 204 if err != nil { 205 return nil, err 206 } 207 pr, err := s.StdoutPipe() 208 if err != nil { 209 return nil, err 210 } 211 perr, err := s.StderrPipe() 212 if err != nil { 213 return nil, err 214 } 215 216 if err := s.RequestSubsystem("sftp"); err != nil { 217 return nil, err 218 } 219 220 return newClientPipe(pr, perr, pw, s.Wait, opts...) 221} 222 223// NewClientPipe creates a new SFTP client given a Reader and a WriteCloser. 224// This can be used for connecting to an SFTP server over TCP/TLS or by using 225// the system's ssh client program (e.g. via exec.Command). 226func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Client, error) { 227 return newClientPipe(rd, nil, wr, nil, opts...) 228} 229 230func newClientPipe(rd, stderr io.Reader, wr io.WriteCloser, wait func() error, opts ...ClientOption) (*Client, error) { 231 c := &Client{ 232 clientConn: clientConn{ 233 conn: conn{ 234 Reader: rd, 235 WriteCloser: wr, 236 }, 237 inflight: make(map[uint32]chan<- result), 238 closed: make(chan struct{}), 239 wait: wait, 240 }, 241 242 ext: make(map[string]string), 243 244 maxPacket: 1 << 15, 245 maxConcurrentRequests: 64, 246 } 247 248 for _, opt := range opts { 249 if err := opt(c); err != nil { 250 wr.Close() 251 return nil, err 252 } 253 } 254 255 if stderr != nil { 256 wr := io.Discard 257 if c.stderrTo != nil { 258 wr = c.stderrTo 259 } 260 261 go func() { 262 // DO NOT close the writer! 263 // Programs may pass in `os.Stderr` to write the remote stderr to, 264 // and the program may continue after disconnect by reconnecting. 265 // But if we've closed their stderr, then we just messed everything up. 266 267 if _, err := io.Copy(wr, stderr); err != nil { 268 debug("error copying stderr: %v", err) 269 } 270 }() 271 } 272 273 if err := c.sendInit(); err != nil { 274 wr.Close() 275 return nil, fmt.Errorf("error sending init packet to server: %w", err) 276 } 277 278 if err := c.recvVersion(); err != nil { 279 wr.Close() 280 return nil, fmt.Errorf("error receiving version packet from server: %w", err) 281 } 282 283 c.clientConn.wg.Add(1) 284 go func() { 285 defer c.clientConn.wg.Done() 286 287 if err := c.clientConn.recv(); err != nil { 288 c.clientConn.broadcastErr(err) 289 } 290 }() 291 292 return c, nil 293} 294 295// Create creates the named file mode 0666 (before umask), truncating it if it 296// already exists. If successful, methods on the returned File can be used for 297// I/O; the associated file descriptor has mode O_RDWR. If you need more 298// control over the flags/mode used to open the file see client.OpenFile. 299// 300// Note that some SFTP servers (eg. AWS Transfer) do not support opening files 301// read/write at the same time. For those services you will need to use 302// `client.OpenFile(os.O_WRONLY|os.O_CREATE|os.O_TRUNC)`. 303func (c *Client) Create(path string) (*File, error) { 304 return c.open(path, toPflags(os.O_RDWR|os.O_CREATE|os.O_TRUNC)) 305} 306 307const sftpProtocolVersion = 3 // https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt 308 309func (c *Client) sendInit() error { 310 return c.clientConn.conn.sendPacket(&sshFxInitPacket{ 311 Version: sftpProtocolVersion, // https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt 312 }) 313} 314 315// returns the next value of c.nextid 316func (c *Client) nextID() uint32 { 317 return atomic.AddUint32(&c.nextid, 1) 318} 319 320func (c *Client) recvVersion() error { 321 typ, data, err := c.recvPacket(0) 322 if err != nil { 323 if err == io.EOF { 324 return fmt.Errorf("server unexpectedly closed connection: %w", io.ErrUnexpectedEOF) 325 } 326 327 return err 328 } 329 330 if typ != sshFxpVersion { 331 return &unexpectedPacketErr{sshFxpVersion, typ} 332 } 333 334 version, data, err := unmarshalUint32Safe(data) 335 if err != nil { 336 return err 337 } 338 339 if version != sftpProtocolVersion { 340 return &unexpectedVersionErr{sftpProtocolVersion, version} 341 } 342 343 for len(data) > 0 { 344 var ext extensionPair 345 ext, data, err = unmarshalExtensionPair(data) 346 if err != nil { 347 return err 348 } 349 c.ext[ext.Name] = ext.Data 350 } 351 352 return nil 353} 354 355// HasExtension checks whether the server supports a named extension. 356// 357// The first return value is the extension data reported by the server 358// (typically a version number). 359func (c *Client) HasExtension(name string) (string, bool) { 360 data, ok := c.ext[name] 361 return data, ok 362} 363 364// Walk returns a new Walker rooted at root. 365func (c *Client) Walk(root string) *fs.Walker { 366 return fs.WalkFS(root, c) 367} 368 369// ReadDir reads the directory named by p 370// and returns a list of directory entries. 371func (c *Client) ReadDir(p string) ([]os.FileInfo, error) { 372 return c.ReadDirContext(context.Background(), p) 373} 374 375// ReadDirContext reads the directory named by p 376// and returns a list of directory entries. 377// The passed context can be used to cancel the operation 378// returning all entries listed up to the cancellation. 379func (c *Client) ReadDirContext(ctx context.Context, p string) ([]os.FileInfo, error) { 380 handle, err := c.opendir(ctx, p) 381 if err != nil { 382 return nil, err 383 } 384 defer c.close(handle) // this has to defer earlier than the lock below 385 var entries []os.FileInfo 386 var done = false 387 for !done { 388 id := c.nextID() 389 typ, data, err1 := c.sendPacket(ctx, nil, &sshFxpReaddirPacket{ 390 ID: id, 391 Handle: handle, 392 }) 393 if err1 != nil { 394 err = err1 395 done = true 396 break 397 } 398 switch typ { 399 case sshFxpName: 400 sid, data := unmarshalUint32(data) 401 if sid != id { 402 return nil, &unexpectedIDErr{id, sid} 403 } 404 count, data := unmarshalUint32(data) 405 for i := uint32(0); i < count; i++ { 406 var filename string 407 filename, data = unmarshalString(data) 408 _, data = unmarshalString(data) // discard longname 409 var attr *FileStat 410 attr, data, err = unmarshalAttrs(data) 411 if err != nil { 412 return nil, err 413 } 414 if filename == "." || filename == ".." { 415 continue 416 } 417 entries = append(entries, fileInfoFromStat(attr, path.Base(filename))) 418 } 419 case sshFxpStatus: 420 // TODO(dfc) scope warning! 421 err = normaliseError(unmarshalStatus(id, data)) 422 done = true 423 default: 424 return nil, unimplementedPacketErr(typ) 425 } 426 } 427 if err == io.EOF { 428 err = nil 429 } 430 return entries, err 431} 432 433func (c *Client) opendir(ctx context.Context, path string) (string, error) { 434 id := c.nextID() 435 typ, data, err := c.sendPacket(ctx, nil, &sshFxpOpendirPacket{ 436 ID: id, 437 Path: path, 438 }) 439 if err != nil { 440 return "", err 441 } 442 switch typ { 443 case sshFxpHandle: 444 sid, data := unmarshalUint32(data) 445 if sid != id { 446 return "", &unexpectedIDErr{id, sid} 447 } 448 handle, _ := unmarshalString(data) 449 return handle, nil 450 case sshFxpStatus: 451 return "", normaliseError(unmarshalStatus(id, data)) 452 default: 453 return "", unimplementedPacketErr(typ) 454 } 455} 456 457// Stat returns a FileInfo structure describing the file specified by path 'p'. 458// If 'p' is a symbolic link, the returned FileInfo structure describes the referent file. 459func (c *Client) Stat(p string) (os.FileInfo, error) { 460 fs, err := c.stat(p) 461 if err != nil { 462 return nil, err 463 } 464 return fileInfoFromStat(fs, path.Base(p)), nil 465} 466 467// Lstat returns a FileInfo structure describing the file specified by path 'p'. 468// If 'p' is a symbolic link, the returned FileInfo structure describes the symbolic link. 469func (c *Client) Lstat(p string) (os.FileInfo, error) { 470 id := c.nextID() 471 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpLstatPacket{ 472 ID: id, 473 Path: p, 474 }) 475 if err != nil { 476 return nil, err 477 } 478 switch typ { 479 case sshFxpAttrs: 480 sid, data := unmarshalUint32(data) 481 if sid != id { 482 return nil, &unexpectedIDErr{id, sid} 483 } 484 attr, _, err := unmarshalAttrs(data) 485 if err != nil { 486 // avoid returning a valid value from fileInfoFromStats if err != nil. 487 return nil, err 488 } 489 return fileInfoFromStat(attr, path.Base(p)), nil 490 case sshFxpStatus: 491 return nil, normaliseError(unmarshalStatus(id, data)) 492 default: 493 return nil, unimplementedPacketErr(typ) 494 } 495} 496 497// ReadLink reads the target of a symbolic link. 498func (c *Client) ReadLink(p string) (string, error) { 499 id := c.nextID() 500 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpReadlinkPacket{ 501 ID: id, 502 Path: p, 503 }) 504 if err != nil { 505 return "", err 506 } 507 switch typ { 508 case sshFxpName: 509 sid, data := unmarshalUint32(data) 510 if sid != id { 511 return "", &unexpectedIDErr{id, sid} 512 } 513 count, data := unmarshalUint32(data) 514 if count != 1 { 515 return "", unexpectedCount(1, count) 516 } 517 filename, _ := unmarshalString(data) // ignore dummy attributes 518 return filename, nil 519 case sshFxpStatus: 520 return "", normaliseError(unmarshalStatus(id, data)) 521 default: 522 return "", unimplementedPacketErr(typ) 523 } 524} 525 526// Link creates a hard link at 'newname', pointing at the same inode as 'oldname' 527func (c *Client) Link(oldname, newname string) error { 528 id := c.nextID() 529 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpHardlinkPacket{ 530 ID: id, 531 Oldpath: oldname, 532 Newpath: newname, 533 }) 534 if err != nil { 535 return err 536 } 537 switch typ { 538 case sshFxpStatus: 539 return normaliseError(unmarshalStatus(id, data)) 540 default: 541 return unimplementedPacketErr(typ) 542 } 543} 544 545// Symlink creates a symbolic link at 'newname', pointing at target 'oldname' 546func (c *Client) Symlink(oldname, newname string) error { 547 id := c.nextID() 548 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpSymlinkPacket{ 549 ID: id, 550 Linkpath: newname, 551 Targetpath: oldname, 552 }) 553 if err != nil { 554 return err 555 } 556 switch typ { 557 case sshFxpStatus: 558 return normaliseError(unmarshalStatus(id, data)) 559 default: 560 return unimplementedPacketErr(typ) 561 } 562} 563 564func (c *Client) fsetstat(handle string, flags uint32, attrs interface{}) error { 565 id := c.nextID() 566 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpFsetstatPacket{ 567 ID: id, 568 Handle: handle, 569 Flags: flags, 570 Attrs: attrs, 571 }) 572 if err != nil { 573 return err 574 } 575 switch typ { 576 case sshFxpStatus: 577 return normaliseError(unmarshalStatus(id, data)) 578 default: 579 return unimplementedPacketErr(typ) 580 } 581} 582 583// setstat is a convience wrapper to allow for changing of various parts of the file descriptor. 584func (c *Client) setstat(path string, flags uint32, attrs interface{}) error { 585 id := c.nextID() 586 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpSetstatPacket{ 587 ID: id, 588 Path: path, 589 Flags: flags, 590 Attrs: attrs, 591 }) 592 if err != nil { 593 return err 594 } 595 switch typ { 596 case sshFxpStatus: 597 return normaliseError(unmarshalStatus(id, data)) 598 default: 599 return unimplementedPacketErr(typ) 600 } 601} 602 603// Chtimes changes the access and modification times of the named file. 604func (c *Client) Chtimes(path string, atime time.Time, mtime time.Time) error { 605 type times struct { 606 Atime uint32 607 Mtime uint32 608 } 609 attrs := times{uint32(atime.Unix()), uint32(mtime.Unix())} 610 return c.setstat(path, sshFileXferAttrACmodTime, attrs) 611} 612 613// Chown changes the user and group owners of the named file. 614func (c *Client) Chown(path string, uid, gid int) error { 615 type owner struct { 616 UID uint32 617 GID uint32 618 } 619 attrs := owner{uint32(uid), uint32(gid)} 620 return c.setstat(path, sshFileXferAttrUIDGID, attrs) 621} 622 623// Chmod changes the permissions of the named file. 624// 625// Chmod does not apply a umask, because even retrieving the umask is not 626// possible in a portable way without causing a race condition. Callers 627// should mask off umask bits, if desired. 628func (c *Client) Chmod(path string, mode os.FileMode) error { 629 return c.setstat(path, sshFileXferAttrPermissions, toChmodPerm(mode)) 630} 631 632// Truncate sets the size of the named file. Although it may be safely assumed 633// that if the size is less than its current size it will be truncated to fit, 634// the SFTP protocol does not specify what behavior the server should do when setting 635// size greater than the current size. 636func (c *Client) Truncate(path string, size int64) error { 637 return c.setstat(path, sshFileXferAttrSize, uint64(size)) 638} 639 640// SetExtendedData sets extended attributes of the named file. It uses the 641// SSH_FILEXFER_ATTR_EXTENDED flag in the setstat request. 642// 643// This flag provides a general extension mechanism for vendor-specific extensions. 644// Names of the attributes should be a string of the format "name@domain", where "domain" 645// is a valid, registered domain name and "name" identifies the method. Server 646// implementations SHOULD ignore extended data fields that they do not understand. 647func (c *Client) SetExtendedData(path string, extended []StatExtended) error { 648 attrs := &FileStat{ 649 Extended: extended, 650 } 651 return c.setstat(path, sshFileXferAttrExtended, attrs) 652} 653 654// Open opens the named file for reading. If successful, methods on the 655// returned file can be used for reading; the associated file descriptor 656// has mode O_RDONLY. 657func (c *Client) Open(path string) (*File, error) { 658 return c.open(path, toPflags(os.O_RDONLY)) 659} 660 661// OpenFile is the generalized open call; most users will use Open or 662// Create instead. It opens the named file with specified flag (O_RDONLY 663// etc.). If successful, methods on the returned File can be used for I/O. 664func (c *Client) OpenFile(path string, f int) (*File, error) { 665 return c.open(path, toPflags(f)) 666} 667 668func (c *Client) open(path string, pflags uint32) (*File, error) { 669 id := c.nextID() 670 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpOpenPacket{ 671 ID: id, 672 Path: path, 673 Pflags: pflags, 674 }) 675 if err != nil { 676 return nil, err 677 } 678 switch typ { 679 case sshFxpHandle: 680 sid, data := unmarshalUint32(data) 681 if sid != id { 682 return nil, &unexpectedIDErr{id, sid} 683 } 684 handle, _ := unmarshalString(data) 685 return &File{c: c, path: path, handle: handle}, nil 686 case sshFxpStatus: 687 return nil, normaliseError(unmarshalStatus(id, data)) 688 default: 689 return nil, unimplementedPacketErr(typ) 690 } 691} 692 693// close closes a handle handle previously returned in the response 694// to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid 695// immediately after this request has been sent. 696func (c *Client) close(handle string) error { 697 id := c.nextID() 698 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpClosePacket{ 699 ID: id, 700 Handle: handle, 701 }) 702 if err != nil { 703 return err 704 } 705 switch typ { 706 case sshFxpStatus: 707 return normaliseError(unmarshalStatus(id, data)) 708 default: 709 return unimplementedPacketErr(typ) 710 } 711} 712 713func (c *Client) stat(path string) (*FileStat, error) { 714 id := c.nextID() 715 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpStatPacket{ 716 ID: id, 717 Path: path, 718 }) 719 if err != nil { 720 return nil, err 721 } 722 switch typ { 723 case sshFxpAttrs: 724 sid, data := unmarshalUint32(data) 725 if sid != id { 726 return nil, &unexpectedIDErr{id, sid} 727 } 728 attr, _, err := unmarshalAttrs(data) 729 return attr, err 730 case sshFxpStatus: 731 return nil, normaliseError(unmarshalStatus(id, data)) 732 default: 733 return nil, unimplementedPacketErr(typ) 734 } 735} 736 737func (c *Client) fstat(handle string) (*FileStat, error) { 738 id := c.nextID() 739 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpFstatPacket{ 740 ID: id, 741 Handle: handle, 742 }) 743 if err != nil { 744 return nil, err 745 } 746 switch typ { 747 case sshFxpAttrs: 748 sid, data := unmarshalUint32(data) 749 if sid != id { 750 return nil, &unexpectedIDErr{id, sid} 751 } 752 attr, _, err := unmarshalAttrs(data) 753 return attr, err 754 case sshFxpStatus: 755 return nil, normaliseError(unmarshalStatus(id, data)) 756 default: 757 return nil, unimplementedPacketErr(typ) 758 } 759} 760 761// StatVFS retrieves VFS statistics from a remote host. 762// 763// It implements the statvfs@openssh.com SSH_FXP_EXTENDED feature 764// from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt. 765func (c *Client) StatVFS(path string) (*StatVFS, error) { 766 // send the StatVFS packet to the server 767 id := c.nextID() 768 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpStatvfsPacket{ 769 ID: id, 770 Path: path, 771 }) 772 if err != nil { 773 return nil, err 774 } 775 776 switch typ { 777 // server responded with valid data 778 case sshFxpExtendedReply: 779 var response StatVFS 780 err = binary.Read(bytes.NewReader(data), binary.BigEndian, &response) 781 if err != nil { 782 return nil, errors.New("can not parse reply") 783 } 784 785 return &response, nil 786 787 // the resquest failed 788 case sshFxpStatus: 789 return nil, normaliseError(unmarshalStatus(id, data)) 790 791 default: 792 return nil, unimplementedPacketErr(typ) 793 } 794} 795 796// Join joins any number of path elements into a single path, adding a 797// separating slash if necessary. The result is Cleaned; in particular, all 798// empty strings are ignored. 799func (c *Client) Join(elem ...string) string { return path.Join(elem...) } 800 801// Remove removes the specified file or directory. An error will be returned if no 802// file or directory with the specified path exists, or if the specified directory 803// is not empty. 804func (c *Client) Remove(path string) error { 805 errF := c.removeFile(path) 806 if errF == nil { 807 return nil 808 } 809 810 errD := c.RemoveDirectory(path) 811 if errD == nil { 812 return nil 813 } 814 815 // Both failed: figure out which error to return. 816 817 if errF, ok := errF.(*os.PathError); ok { 818 // The only time it makes sense to compare errors, is when both are `*os.PathError`. 819 // We cannot test these directly with errF == errD, as that would be a pointer comparison. 820 821 if errD, ok := errD.(*os.PathError); ok && errors.Is(errF.Err, errD.Err) { 822 // If they are both pointers to PathError, 823 // and the same underlying error, then return that. 824 return errF 825 } 826 } 827 828 fi, err := c.Stat(path) 829 if err != nil { 830 return err 831 } 832 833 if fi.IsDir() { 834 return errD 835 } 836 837 return errF 838} 839 840func (c *Client) removeFile(path string) error { 841 id := c.nextID() 842 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRemovePacket{ 843 ID: id, 844 Filename: path, 845 }) 846 if err != nil { 847 return err 848 } 849 switch typ { 850 case sshFxpStatus: 851 err = normaliseError(unmarshalStatus(id, data)) 852 if err == nil { 853 return nil 854 } 855 return &os.PathError{ 856 Op: "remove", 857 Path: path, 858 Err: err, 859 } 860 default: 861 return unimplementedPacketErr(typ) 862 } 863} 864 865// RemoveDirectory removes a directory path. 866func (c *Client) RemoveDirectory(path string) error { 867 id := c.nextID() 868 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRmdirPacket{ 869 ID: id, 870 Path: path, 871 }) 872 if err != nil { 873 return err 874 } 875 switch typ { 876 case sshFxpStatus: 877 err = normaliseError(unmarshalStatus(id, data)) 878 if err == nil { 879 return nil 880 } 881 return &os.PathError{ 882 Op: "remove", 883 Path: path, 884 Err: err, 885 } 886 default: 887 return unimplementedPacketErr(typ) 888 } 889} 890 891// Rename renames a file. 892func (c *Client) Rename(oldname, newname string) error { 893 id := c.nextID() 894 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRenamePacket{ 895 ID: id, 896 Oldpath: oldname, 897 Newpath: newname, 898 }) 899 if err != nil { 900 return err 901 } 902 switch typ { 903 case sshFxpStatus: 904 return normaliseError(unmarshalStatus(id, data)) 905 default: 906 return unimplementedPacketErr(typ) 907 } 908} 909 910// PosixRename renames a file using the posix-rename@openssh.com extension 911// which will replace newname if it already exists. 912func (c *Client) PosixRename(oldname, newname string) error { 913 id := c.nextID() 914 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpPosixRenamePacket{ 915 ID: id, 916 Oldpath: oldname, 917 Newpath: newname, 918 }) 919 if err != nil { 920 return err 921 } 922 switch typ { 923 case sshFxpStatus: 924 return normaliseError(unmarshalStatus(id, data)) 925 default: 926 return unimplementedPacketErr(typ) 927 } 928} 929 930// RealPath can be used to have the server canonicalize any given path name to an absolute path. 931// 932// This is useful for converting path names containing ".." components, 933// or relative pathnames without a leading slash into absolute paths. 934func (c *Client) RealPath(path string) (string, error) { 935 id := c.nextID() 936 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRealpathPacket{ 937 ID: id, 938 Path: path, 939 }) 940 if err != nil { 941 return "", err 942 } 943 switch typ { 944 case sshFxpName: 945 sid, data := unmarshalUint32(data) 946 if sid != id { 947 return "", &unexpectedIDErr{id, sid} 948 } 949 count, data := unmarshalUint32(data) 950 if count != 1 { 951 return "", unexpectedCount(1, count) 952 } 953 filename, _ := unmarshalString(data) // ignore attributes 954 return filename, nil 955 case sshFxpStatus: 956 return "", normaliseError(unmarshalStatus(id, data)) 957 default: 958 return "", unimplementedPacketErr(typ) 959 } 960} 961 962// Getwd returns the current working directory of the server. Operations 963// involving relative paths will be based at this location. 964func (c *Client) Getwd() (string, error) { 965 return c.RealPath(".") 966} 967 968// Mkdir creates the specified directory. An error will be returned if a file or 969// directory with the specified path already exists, or if the directory's 970// parent folder does not exist (the method cannot create complete paths). 971func (c *Client) Mkdir(path string) error { 972 id := c.nextID() 973 typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpMkdirPacket{ 974 ID: id, 975 Path: path, 976 }) 977 if err != nil { 978 return err 979 } 980 switch typ { 981 case sshFxpStatus: 982 return normaliseError(unmarshalStatus(id, data)) 983 default: 984 return unimplementedPacketErr(typ) 985 } 986} 987 988// MkdirAll creates a directory named path, along with any necessary parents, 989// and returns nil, or else returns an error. 990// If path is already a directory, MkdirAll does nothing and returns nil. 991// If, while making any directory, that path is found to already be a regular file, an error is returned. 992func (c *Client) MkdirAll(path string) error { 993 // Most of this code mimics https://golang.org/src/os/path.go?s=514:561#L13 994 // Fast path: if we can tell whether path is a directory or file, stop with success or error. 995 dir, err := c.Stat(path) 996 if err == nil { 997 if dir.IsDir() { 998 return nil 999 } 1000 return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR} 1001 } 1002 1003 // Slow path: make sure parent exists and then call Mkdir for path. 1004 i := len(path) 1005 for i > 0 && path[i-1] == '/' { // Skip trailing path separator. 1006 i-- 1007 } 1008 1009 j := i 1010 for j > 0 && path[j-1] != '/' { // Scan backward over element. 1011 j-- 1012 } 1013 1014 if j > 1 { 1015 // Create parent 1016 err = c.MkdirAll(path[0 : j-1]) 1017 if err != nil { 1018 return err 1019 } 1020 } 1021 1022 // Parent now exists; invoke Mkdir and use its result. 1023 err = c.Mkdir(path) 1024 if err != nil { 1025 // Handle arguments like "foo/." by 1026 // double-checking that directory doesn't exist. 1027 dir, err1 := c.Lstat(path) 1028 if err1 == nil && dir.IsDir() { 1029 return nil 1030 } 1031 return err 1032 } 1033 return nil 1034} 1035 1036// RemoveAll delete files recursively in the directory and Recursively delete subdirectories. 1037// An error will be returned if no file or directory with the specified path exists 1038func (c *Client) RemoveAll(path string) error { 1039 1040 // Get the file/directory information 1041 fi, err := c.Stat(path) 1042 if err != nil { 1043 return err 1044 } 1045 1046 if fi.IsDir() { 1047 // Delete files recursively in the directory 1048 files, err := c.ReadDir(path) 1049 if err != nil { 1050 return err 1051 } 1052 1053 for _, file := range files { 1054 if file.IsDir() { 1055 // Recursively delete subdirectories 1056 err = c.RemoveAll(path + "/" + file.Name()) 1057 if err != nil { 1058 return err 1059 } 1060 } else { 1061 // Delete individual files 1062 err = c.Remove(path + "/" + file.Name()) 1063 if err != nil { 1064 return err 1065 } 1066 } 1067 } 1068 1069 } 1070 1071 return c.Remove(path) 1072 1073} 1074 1075// File represents a remote file. 1076type File struct { 1077 c *Client 1078 path string 1079 1080 mu sync.RWMutex 1081 handle string 1082 offset int64 // current offset within remote file 1083} 1084 1085// Close closes the File, rendering it unusable for I/O. It returns an 1086// error, if any. 1087func (f *File) Close() error { 1088 f.mu.Lock() 1089 defer f.mu.Unlock() 1090 1091 if f.handle == "" { 1092 return os.ErrClosed 1093 } 1094 1095 // The design principle here is that when `openssh-portable/sftp-server.c` is doing `handle_close`, 1096 // it will unconditionally mark the handle as unused, 1097 // so we need to also unconditionally mark this handle as invalid. 1098 // By invalidating our local copy of the handle, 1099 // we ensure that there cannot be any erroneous use-after-close requests sent after Close. 1100 1101 handle := f.handle 1102 f.handle = "" 1103 1104 return f.c.close(handle) 1105} 1106 1107// Name returns the name of the file as presented to Open or Create. 1108func (f *File) Name() string { 1109 return f.path 1110} 1111 1112// Read reads up to len(b) bytes from the File. It returns the number of bytes 1113// read and an error, if any. Read follows io.Reader semantics, so when Read 1114// encounters an error or EOF condition after successfully reading n > 0 bytes, 1115// it returns the number of bytes read. 1116// 1117// To maximise throughput for transferring the entire file (especially 1118// over high latency links) it is recommended to use WriteTo rather 1119// than calling Read multiple times. io.Copy will do this 1120// automatically. 1121func (f *File) Read(b []byte) (int, error) { 1122 f.mu.Lock() 1123 defer f.mu.Unlock() 1124 1125 n, err := f.readAt(b, f.offset) 1126 f.offset += int64(n) 1127 return n, err 1128} 1129 1130// readChunkAt attempts to read the whole entire length of the buffer from the file starting at the offset. 1131// It will continue progressively reading into the buffer until it fills the whole buffer, or an error occurs. 1132func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err error) { 1133 for err == nil && n < len(b) { 1134 id := f.c.nextID() 1135 typ, data, err := f.c.sendPacket(context.Background(), ch, &sshFxpReadPacket{ 1136 ID: id, 1137 Handle: f.handle, 1138 Offset: uint64(off) + uint64(n), 1139 Len: uint32(len(b) - n), 1140 }) 1141 if err != nil { 1142 return n, err 1143 } 1144 1145 switch typ { 1146 case sshFxpStatus: 1147 return n, normaliseError(unmarshalStatus(id, data)) 1148 1149 case sshFxpData: 1150 sid, data := unmarshalUint32(data) 1151 if id != sid { 1152 return n, &unexpectedIDErr{id, sid} 1153 } 1154 1155 l, data := unmarshalUint32(data) 1156 n += copy(b[n:], data[:l]) 1157 1158 default: 1159 return n, unimplementedPacketErr(typ) 1160 } 1161 } 1162 1163 return 1164} 1165 1166func (f *File) readAtSequential(b []byte, off int64) (read int, err error) { 1167 for read < len(b) { 1168 rb := b[read:] 1169 if len(rb) > f.c.maxPacket { 1170 rb = rb[:f.c.maxPacket] 1171 } 1172 n, err := f.readChunkAt(nil, rb, off+int64(read)) 1173 if n < 0 { 1174 panic("sftp.File: returned negative count from readChunkAt") 1175 } 1176 if n > 0 { 1177 read += n 1178 } 1179 if err != nil { 1180 return read, err 1181 } 1182 } 1183 return read, nil 1184} 1185 1186// ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns 1187// the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics, 1188// so the file offset is not altered during the read. 1189func (f *File) ReadAt(b []byte, off int64) (int, error) { 1190 f.mu.RLock() 1191 defer f.mu.RUnlock() 1192 1193 return f.readAt(b, off) 1194} 1195 1196// readAt must be called while holding either the Read or Write mutex in File. 1197// This code is concurrent safe with itself, but not with Close. 1198func (f *File) readAt(b []byte, off int64) (int, error) { 1199 if f.handle == "" { 1200 return 0, os.ErrClosed 1201 } 1202 1203 if len(b) <= f.c.maxPacket { 1204 // This should be able to be serviced with 1/2 requests. 1205 // So, just do it directly. 1206 return f.readChunkAt(nil, b, off) 1207 } 1208 1209 if f.c.disableConcurrentReads { 1210 return f.readAtSequential(b, off) 1211 } 1212 1213 // Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests. 1214 // This allows writes with a suitably large buffer to transfer data at a much faster rate 1215 // by overlapping round trip times. 1216 1217 cancel := make(chan struct{}) 1218 1219 concurrency := len(b)/f.c.maxPacket + 1 1220 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { 1221 concurrency = f.c.maxConcurrentRequests 1222 } 1223 1224 resPool := newResChanPool(concurrency) 1225 1226 type work struct { 1227 id uint32 1228 res chan result 1229 1230 b []byte 1231 off int64 1232 } 1233 workCh := make(chan work) 1234 1235 // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets. 1236 go func() { 1237 defer close(workCh) 1238 1239 b := b 1240 offset := off 1241 chunkSize := f.c.maxPacket 1242 1243 for len(b) > 0 { 1244 rb := b 1245 if len(rb) > chunkSize { 1246 rb = rb[:chunkSize] 1247 } 1248 1249 id := f.c.nextID() 1250 res := resPool.Get() 1251 1252 f.c.dispatchRequest(res, &sshFxpReadPacket{ 1253 ID: id, 1254 Handle: f.handle, 1255 Offset: uint64(offset), 1256 Len: uint32(len(rb)), 1257 }) 1258 1259 select { 1260 case workCh <- work{id, res, rb, offset}: 1261 case <-cancel: 1262 return 1263 } 1264 1265 offset += int64(len(rb)) 1266 b = b[len(rb):] 1267 } 1268 }() 1269 1270 type rErr struct { 1271 off int64 1272 err error 1273 } 1274 errCh := make(chan rErr) 1275 1276 var wg sync.WaitGroup 1277 wg.Add(concurrency) 1278 for i := 0; i < concurrency; i++ { 1279 // Map_i: each worker gets work, and then performs the Read into its buffer from its respective offset. 1280 go func() { 1281 defer wg.Done() 1282 1283 for packet := range workCh { 1284 var n int 1285 1286 s := <-packet.res 1287 resPool.Put(packet.res) 1288 1289 err := s.err 1290 if err == nil { 1291 switch s.typ { 1292 case sshFxpStatus: 1293 err = normaliseError(unmarshalStatus(packet.id, s.data)) 1294 1295 case sshFxpData: 1296 sid, data := unmarshalUint32(s.data) 1297 if packet.id != sid { 1298 err = &unexpectedIDErr{packet.id, sid} 1299 1300 } else { 1301 l, data := unmarshalUint32(data) 1302 n = copy(packet.b, data[:l]) 1303 1304 // For normal disk files, it is guaranteed that this will read 1305 // the specified number of bytes, or up to end of file. 1306 // This implies, if we have a short read, that means EOF. 1307 if n < len(packet.b) { 1308 err = io.EOF 1309 } 1310 } 1311 1312 default: 1313 err = unimplementedPacketErr(s.typ) 1314 } 1315 } 1316 1317 if err != nil { 1318 // return the offset as the start + how much we read before the error. 1319 errCh <- rErr{packet.off + int64(n), err} 1320 1321 // DO NOT return. 1322 // We want to ensure that workCh is drained before wg.Wait returns. 1323 } 1324 } 1325 }() 1326 } 1327 1328 // Wait for long tail, before closing results. 1329 go func() { 1330 wg.Wait() 1331 close(errCh) 1332 }() 1333 1334 // Reduce: collect all the results into a relevant return: the earliest offset to return an error. 1335 firstErr := rErr{math.MaxInt64, nil} 1336 for rErr := range errCh { 1337 if rErr.off <= firstErr.off { 1338 firstErr = rErr 1339 } 1340 1341 select { 1342 case <-cancel: 1343 default: 1344 // stop any more work from being distributed. (Just in case.) 1345 close(cancel) 1346 } 1347 } 1348 1349 if firstErr.err != nil { 1350 // firstErr.err != nil if and only if firstErr.off > our starting offset. 1351 return int(firstErr.off - off), firstErr.err 1352 } 1353 1354 // As per spec for io.ReaderAt, we return nil error if and only if we read everything. 1355 return len(b), nil 1356} 1357 1358// writeToSequential implements WriteTo, but works sequentially with no parallelism. 1359func (f *File) writeToSequential(w io.Writer) (written int64, err error) { 1360 b := make([]byte, f.c.maxPacket) 1361 ch := make(chan result, 1) // reusable channel 1362 1363 for { 1364 n, err := f.readChunkAt(ch, b, f.offset) 1365 if n < 0 { 1366 panic("sftp.File: returned negative count from readChunkAt") 1367 } 1368 1369 if n > 0 { 1370 f.offset += int64(n) 1371 1372 m, err := w.Write(b[:n]) 1373 written += int64(m) 1374 1375 if err != nil { 1376 return written, err 1377 } 1378 } 1379 1380 if err != nil { 1381 if err == io.EOF { 1382 return written, nil // return nil explicitly. 1383 } 1384 1385 return written, err 1386 } 1387 } 1388} 1389 1390// WriteTo writes the file to the given Writer. 1391// The return value is the number of bytes written. 1392// Any error encountered during the write is also returned. 1393// 1394// This method is preferred over calling Read multiple times 1395// to maximise throughput for transferring the entire file, 1396// especially over high latency links. 1397func (f *File) WriteTo(w io.Writer) (written int64, err error) { 1398 f.mu.Lock() 1399 defer f.mu.Unlock() 1400 1401 if f.handle == "" { 1402 return 0, os.ErrClosed 1403 } 1404 1405 if f.c.disableConcurrentReads { 1406 return f.writeToSequential(w) 1407 } 1408 1409 // For concurrency, we want to guess how many concurrent workers we should use. 1410 var fileStat *FileStat 1411 if f.c.useFstat { 1412 fileStat, err = f.c.fstat(f.handle) 1413 } else { 1414 fileStat, err = f.c.stat(f.path) 1415 } 1416 if err != nil { 1417 return 0, err 1418 } 1419 1420 fileSize := fileStat.Size 1421 if fileSize <= uint64(f.c.maxPacket) || !isRegular(fileStat.Mode) { 1422 // only regular files are guaranteed to return (full read) xor (partial read, next error) 1423 return f.writeToSequential(w) 1424 } 1425 1426 concurrency64 := fileSize/uint64(f.c.maxPacket) + 1 // a bad guess, but better than no guess 1427 if concurrency64 > uint64(f.c.maxConcurrentRequests) || concurrency64 < 1 { 1428 concurrency64 = uint64(f.c.maxConcurrentRequests) 1429 } 1430 // Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow. 1431 concurrency := int(concurrency64) 1432 1433 chunkSize := f.c.maxPacket 1434 pool := newBufPool(concurrency, chunkSize) 1435 resPool := newResChanPool(concurrency) 1436 1437 cancel := make(chan struct{}) 1438 var wg sync.WaitGroup 1439 defer func() { 1440 // Once the writing Reduce phase has ended, all the feed work needs to unconditionally stop. 1441 close(cancel) 1442 1443 // We want to wait until all outstanding goroutines with an `f` or `f.c` reference have completed. 1444 // Just to be sure we don’t orphan any goroutines any hanging references. 1445 wg.Wait() 1446 }() 1447 1448 type writeWork struct { 1449 b []byte 1450 off int64 1451 err error 1452 1453 next chan writeWork 1454 } 1455 writeCh := make(chan writeWork) 1456 1457 type readWork struct { 1458 id uint32 1459 res chan result 1460 off int64 1461 1462 cur, next chan writeWork 1463 } 1464 readCh := make(chan readWork) 1465 1466 // Slice: hand out chunks of work on demand, with a `cur` and `next` channel built-in for sequencing. 1467 go func() { 1468 defer close(readCh) 1469 1470 off := f.offset 1471 1472 cur := writeCh 1473 for { 1474 id := f.c.nextID() 1475 res := resPool.Get() 1476 1477 next := make(chan writeWork) 1478 readWork := readWork{ 1479 id: id, 1480 res: res, 1481 off: off, 1482 1483 cur: cur, 1484 next: next, 1485 } 1486 1487 f.c.dispatchRequest(res, &sshFxpReadPacket{ 1488 ID: id, 1489 Handle: f.handle, 1490 Offset: uint64(off), 1491 Len: uint32(chunkSize), 1492 }) 1493 1494 select { 1495 case readCh <- readWork: 1496 case <-cancel: 1497 return 1498 } 1499 1500 off += int64(chunkSize) 1501 cur = next 1502 } 1503 }() 1504 1505 wg.Add(concurrency) 1506 for i := 0; i < concurrency; i++ { 1507 // Map_i: each worker gets readWork, and does the Read into a buffer at the given offset. 1508 go func() { 1509 defer wg.Done() 1510 1511 for readWork := range readCh { 1512 var b []byte 1513 var n int 1514 1515 s := <-readWork.res 1516 resPool.Put(readWork.res) 1517 1518 err := s.err 1519 if err == nil { 1520 switch s.typ { 1521 case sshFxpStatus: 1522 err = normaliseError(unmarshalStatus(readWork.id, s.data)) 1523 1524 case sshFxpData: 1525 sid, data := unmarshalUint32(s.data) 1526 if readWork.id != sid { 1527 err = &unexpectedIDErr{readWork.id, sid} 1528 1529 } else { 1530 l, data := unmarshalUint32(data) 1531 b = pool.Get()[:l] 1532 n = copy(b, data[:l]) 1533 b = b[:n] 1534 } 1535 1536 default: 1537 err = unimplementedPacketErr(s.typ) 1538 } 1539 } 1540 1541 writeWork := writeWork{ 1542 b: b, 1543 off: readWork.off, 1544 err: err, 1545 1546 next: readWork.next, 1547 } 1548 1549 select { 1550 case readWork.cur <- writeWork: 1551 case <-cancel: 1552 } 1553 1554 // DO NOT return. 1555 // We want to ensure that readCh is drained before wg.Wait returns. 1556 } 1557 }() 1558 } 1559 1560 // Reduce: serialize the results from the reads into sequential writes. 1561 cur := writeCh 1562 for { 1563 packet, ok := <-cur 1564 if !ok { 1565 return written, errors.New("sftp.File.WriteTo: unexpectedly closed channel") 1566 } 1567 1568 // Because writes are serialized, this will always be the last successfully read byte. 1569 f.offset = packet.off + int64(len(packet.b)) 1570 1571 if len(packet.b) > 0 { 1572 n, err := w.Write(packet.b) 1573 written += int64(n) 1574 if err != nil { 1575 return written, err 1576 } 1577 } 1578 1579 if packet.err != nil { 1580 if packet.err == io.EOF { 1581 return written, nil 1582 } 1583 1584 return written, packet.err 1585 } 1586 1587 pool.Put(packet.b) 1588 cur = packet.next 1589 } 1590} 1591 1592// Stat returns the FileInfo structure describing file. If there is an 1593// error. 1594func (f *File) Stat() (os.FileInfo, error) { 1595 f.mu.RLock() 1596 defer f.mu.RUnlock() 1597 1598 if f.handle == "" { 1599 return nil, os.ErrClosed 1600 } 1601 1602 return f.stat() 1603} 1604 1605func (f *File) stat() (os.FileInfo, error) { 1606 fs, err := f.c.fstat(f.handle) 1607 if err != nil { 1608 return nil, err 1609 } 1610 return fileInfoFromStat(fs, path.Base(f.path)), nil 1611} 1612 1613// Write writes len(b) bytes to the File. It returns the number of bytes 1614// written and an error, if any. Write returns a non-nil error when n != 1615// len(b). 1616// 1617// To maximise throughput for transferring the entire file (especially 1618// over high latency links) it is recommended to use ReadFrom rather 1619// than calling Write multiple times. io.Copy will do this 1620// automatically. 1621func (f *File) Write(b []byte) (int, error) { 1622 f.mu.Lock() 1623 defer f.mu.Unlock() 1624 1625 if f.handle == "" { 1626 return 0, os.ErrClosed 1627 } 1628 1629 n, err := f.writeAt(b, f.offset) 1630 f.offset += int64(n) 1631 return n, err 1632} 1633 1634func (f *File) writeChunkAt(ch chan result, b []byte, off int64) (int, error) { 1635 typ, data, err := f.c.sendPacket(context.Background(), ch, &sshFxpWritePacket{ 1636 ID: f.c.nextID(), 1637 Handle: f.handle, 1638 Offset: uint64(off), 1639 Length: uint32(len(b)), 1640 Data: b, 1641 }) 1642 if err != nil { 1643 return 0, err 1644 } 1645 1646 switch typ { 1647 case sshFxpStatus: 1648 id, _ := unmarshalUint32(data) 1649 err := normaliseError(unmarshalStatus(id, data)) 1650 if err != nil { 1651 return 0, err 1652 } 1653 1654 default: 1655 return 0, unimplementedPacketErr(typ) 1656 } 1657 1658 return len(b), nil 1659} 1660 1661// writeAtConcurrent implements WriterAt, but works concurrently rather than sequentially. 1662func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) { 1663 // Split the write into multiple maxPacket sized concurrent writes 1664 // bounded by maxConcurrentRequests. This allows writes with a suitably 1665 // large buffer to transfer data at a much faster rate due to 1666 // overlapping round trip times. 1667 1668 cancel := make(chan struct{}) 1669 1670 type work struct { 1671 id uint32 1672 res chan result 1673 1674 off int64 1675 } 1676 workCh := make(chan work) 1677 1678 concurrency := len(b)/f.c.maxPacket + 1 1679 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { 1680 concurrency = f.c.maxConcurrentRequests 1681 } 1682 1683 pool := newResChanPool(concurrency) 1684 1685 // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets. 1686 go func() { 1687 defer close(workCh) 1688 1689 var read int 1690 chunkSize := f.c.maxPacket 1691 1692 for read < len(b) { 1693 wb := b[read:] 1694 if len(wb) > chunkSize { 1695 wb = wb[:chunkSize] 1696 } 1697 1698 id := f.c.nextID() 1699 res := pool.Get() 1700 off := off + int64(read) 1701 1702 f.c.dispatchRequest(res, &sshFxpWritePacket{ 1703 ID: id, 1704 Handle: f.handle, 1705 Offset: uint64(off), 1706 Length: uint32(len(wb)), 1707 Data: wb, 1708 }) 1709 1710 select { 1711 case workCh <- work{id, res, off}: 1712 case <-cancel: 1713 return 1714 } 1715 1716 read += len(wb) 1717 } 1718 }() 1719 1720 type wErr struct { 1721 off int64 1722 err error 1723 } 1724 errCh := make(chan wErr) 1725 1726 var wg sync.WaitGroup 1727 wg.Add(concurrency) 1728 for i := 0; i < concurrency; i++ { 1729 // Map_i: each worker gets work, and does the Write from each buffer to its respective offset. 1730 go func() { 1731 defer wg.Done() 1732 1733 for work := range workCh { 1734 s := <-work.res 1735 pool.Put(work.res) 1736 1737 err := s.err 1738 if err == nil { 1739 switch s.typ { 1740 case sshFxpStatus: 1741 err = normaliseError(unmarshalStatus(work.id, s.data)) 1742 default: 1743 err = unimplementedPacketErr(s.typ) 1744 } 1745 } 1746 1747 if err != nil { 1748 errCh <- wErr{work.off, err} 1749 } 1750 } 1751 }() 1752 } 1753 1754 // Wait for long tail, before closing results. 1755 go func() { 1756 wg.Wait() 1757 close(errCh) 1758 }() 1759 1760 // Reduce: collect all the results into a relevant return: the earliest offset to return an error. 1761 firstErr := wErr{math.MaxInt64, nil} 1762 for wErr := range errCh { 1763 if wErr.off <= firstErr.off { 1764 firstErr = wErr 1765 } 1766 1767 select { 1768 case <-cancel: 1769 default: 1770 // stop any more work from being distributed. (Just in case.) 1771 close(cancel) 1772 } 1773 } 1774 1775 if firstErr.err != nil { 1776 // firstErr.err != nil if and only if firstErr.off >= our starting offset. 1777 return int(firstErr.off - off), firstErr.err 1778 } 1779 1780 return len(b), nil 1781} 1782 1783// WriteAt writes up to len(b) byte to the File at a given offset `off`. It returns 1784// the number of bytes written and an error, if any. WriteAt follows io.WriterAt semantics, 1785// so the file offset is not altered during the write. 1786func (f *File) WriteAt(b []byte, off int64) (written int, err error) { 1787 f.mu.RLock() 1788 defer f.mu.RUnlock() 1789 1790 if f.handle == "" { 1791 return 0, os.ErrClosed 1792 } 1793 1794 return f.writeAt(b, off) 1795} 1796 1797// writeAt must be called while holding either the Read or Write mutex in File. 1798// This code is concurrent safe with itself, but not with Close. 1799func (f *File) writeAt(b []byte, off int64) (written int, err error) { 1800 if len(b) <= f.c.maxPacket { 1801 // We can do this in one write. 1802 return f.writeChunkAt(nil, b, off) 1803 } 1804 1805 if f.c.useConcurrentWrites { 1806 return f.writeAtConcurrent(b, off) 1807 } 1808 1809 ch := make(chan result, 1) // reusable channel 1810 1811 chunkSize := f.c.maxPacket 1812 1813 for written < len(b) { 1814 wb := b[written:] 1815 if len(wb) > chunkSize { 1816 wb = wb[:chunkSize] 1817 } 1818 1819 n, err := f.writeChunkAt(ch, wb, off+int64(written)) 1820 if n > 0 { 1821 written += n 1822 } 1823 1824 if err != nil { 1825 return written, err 1826 } 1827 } 1828 1829 return len(b), nil 1830} 1831 1832// ReadFromWithConcurrency implements ReaderFrom, 1833// but uses the given concurrency to issue multiple requests at the same time. 1834// 1835// Giving a concurrency of less than one will default to the Client’s max concurrency. 1836// 1837// Otherwise, the given concurrency will be capped by the Client's max concurrency. 1838// 1839// When one needs to guarantee concurrent reads/writes, this method is preferred 1840// over ReadFrom. 1841func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) { 1842 f.mu.Lock() 1843 defer f.mu.Unlock() 1844 1845 return f.readFromWithConcurrency(r, concurrency) 1846} 1847 1848func (f *File) readFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) { 1849 if f.handle == "" { 1850 return 0, os.ErrClosed 1851 } 1852 1853 // Split the write into multiple maxPacket sized concurrent writes. 1854 // This allows writes with a suitably large reader 1855 // to transfer data at a much faster rate due to overlapping round trip times. 1856 1857 cancel := make(chan struct{}) 1858 1859 type work struct { 1860 id uint32 1861 res chan result 1862 1863 off int64 1864 } 1865 workCh := make(chan work) 1866 1867 type rwErr struct { 1868 off int64 1869 err error 1870 } 1871 errCh := make(chan rwErr) 1872 1873 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { 1874 concurrency = f.c.maxConcurrentRequests 1875 } 1876 1877 pool := newResChanPool(concurrency) 1878 1879 // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets. 1880 go func() { 1881 defer close(workCh) 1882 1883 b := make([]byte, f.c.maxPacket) 1884 off := f.offset 1885 1886 for { 1887 // Fill the entire buffer. 1888 n, err := io.ReadFull(r, b) 1889 1890 if n > 0 { 1891 read += int64(n) 1892 1893 id := f.c.nextID() 1894 res := pool.Get() 1895 1896 f.c.dispatchRequest(res, &sshFxpWritePacket{ 1897 ID: id, 1898 Handle: f.handle, 1899 Offset: uint64(off), 1900 Length: uint32(n), 1901 Data: b[:n], 1902 }) 1903 1904 select { 1905 case workCh <- work{id, res, off}: 1906 case <-cancel: 1907 return 1908 } 1909 1910 off += int64(n) 1911 } 1912 1913 if err != nil { 1914 if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { 1915 errCh <- rwErr{off, err} 1916 } 1917 return 1918 } 1919 } 1920 }() 1921 1922 var wg sync.WaitGroup 1923 wg.Add(concurrency) 1924 for i := 0; i < concurrency; i++ { 1925 // Map_i: each worker gets work, and does the Write from each buffer to its respective offset. 1926 go func() { 1927 defer wg.Done() 1928 1929 for work := range workCh { 1930 s := <-work.res 1931 pool.Put(work.res) 1932 1933 err := s.err 1934 if err == nil { 1935 switch s.typ { 1936 case sshFxpStatus: 1937 err = normaliseError(unmarshalStatus(work.id, s.data)) 1938 default: 1939 err = unimplementedPacketErr(s.typ) 1940 } 1941 } 1942 1943 if err != nil { 1944 errCh <- rwErr{work.off, err} 1945 1946 // DO NOT return. 1947 // We want to ensure that workCh is drained before wg.Wait returns. 1948 } 1949 } 1950 }() 1951 } 1952 1953 // Wait for long tail, before closing results. 1954 go func() { 1955 wg.Wait() 1956 close(errCh) 1957 }() 1958 1959 // Reduce: Collect all the results into a relevant return: the earliest offset to return an error. 1960 firstErr := rwErr{math.MaxInt64, nil} 1961 for rwErr := range errCh { 1962 if rwErr.off <= firstErr.off { 1963 firstErr = rwErr 1964 } 1965 1966 select { 1967 case <-cancel: 1968 default: 1969 // stop any more work from being distributed. 1970 close(cancel) 1971 } 1972 } 1973 1974 if firstErr.err != nil { 1975 // firstErr.err != nil if and only if firstErr.off is a valid offset. 1976 // 1977 // firstErr.off will then be the lesser of: 1978 // * the offset of the first error from writing, 1979 // * the last successfully read offset. 1980 // 1981 // This could be less than the last successfully written offset, 1982 // which is the whole reason for the UseConcurrentWrites() ClientOption. 1983 // 1984 // Callers are responsible for truncating any SFTP files to a safe length. 1985 f.offset = firstErr.off 1986 1987 // ReadFrom is defined to return the read bytes, regardless of any writer errors. 1988 return read, firstErr.err 1989 } 1990 1991 f.offset += read 1992 return read, nil 1993} 1994 1995// ReadFrom reads data from r until EOF and writes it to the file. The return 1996// value is the number of bytes read. Any error except io.EOF encountered 1997// during the read is also returned. 1998// 1999// This method is preferred over calling Write multiple times 2000// to maximise throughput for transferring the entire file, 2001// especially over high-latency links. 2002// 2003// To ensure concurrent writes, the given r needs to implement one of 2004// the following receiver methods: 2005// 2006// Len() int 2007// Size() int64 2008// Stat() (os.FileInfo, error) 2009// 2010// or be an instance of [io.LimitedReader] to determine the number of possible 2011// concurrent requests. Otherwise, reads/writes are performed sequentially. 2012// ReadFromWithConcurrency can be used explicitly to guarantee concurrent 2013// processing of the reader. 2014func (f *File) ReadFrom(r io.Reader) (int64, error) { 2015 f.mu.Lock() 2016 defer f.mu.Unlock() 2017 2018 if f.handle == "" { 2019 return 0, os.ErrClosed 2020 } 2021 2022 if f.c.useConcurrentWrites { 2023 var remain int64 2024 switch r := r.(type) { 2025 case interface{ Len() int }: 2026 remain = int64(r.Len()) 2027 2028 case interface{ Size() int64 }: 2029 remain = r.Size() 2030 2031 case *io.LimitedReader: 2032 remain = r.N 2033 2034 case interface{ Stat() (os.FileInfo, error) }: 2035 info, err := r.Stat() 2036 if err == nil { 2037 remain = info.Size() 2038 } 2039 } 2040 2041 if remain < 0 { 2042 // We can strongly assert that we want default max concurrency here. 2043 return f.readFromWithConcurrency(r, f.c.maxConcurrentRequests) 2044 } 2045 2046 if remain > int64(f.c.maxPacket) { 2047 // Otherwise, only use concurrency, if it would be at least two packets. 2048 2049 // This is the best reasonable guess we can make. 2050 concurrency64 := remain/int64(f.c.maxPacket) + 1 2051 2052 // We need to cap this value to an `int` size value to avoid overflow on 32-bit machines. 2053 // So, we may as well pre-cap it to `f.c.maxConcurrentRequests`. 2054 if concurrency64 > int64(f.c.maxConcurrentRequests) { 2055 concurrency64 = int64(f.c.maxConcurrentRequests) 2056 } 2057 2058 return f.readFromWithConcurrency(r, int(concurrency64)) 2059 } 2060 } 2061 2062 ch := make(chan result, 1) // reusable channel 2063 2064 b := make([]byte, f.c.maxPacket) 2065 2066 var read int64 2067 for { 2068 // Fill the entire buffer. 2069 n, err := io.ReadFull(r, b) 2070 if n < 0 { 2071 panic("sftp.File: reader returned negative count from Read") 2072 } 2073 2074 if n > 0 { 2075 read += int64(n) 2076 2077 m, err2 := f.writeChunkAt(ch, b[:n], f.offset) 2078 f.offset += int64(m) 2079 2080 if err == nil { 2081 err = err2 2082 } 2083 } 2084 2085 if err != nil { 2086 if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { 2087 return read, nil // return nil explicitly. 2088 } 2089 2090 return read, err 2091 } 2092 } 2093} 2094 2095// Seek implements io.Seeker by setting the client offset for the next Read or 2096// Write. It returns the next offset read. Seeking before or after the end of 2097// the file is undefined. Seeking relative to the end calls Stat. 2098func (f *File) Seek(offset int64, whence int) (int64, error) { 2099 f.mu.Lock() 2100 defer f.mu.Unlock() 2101 2102 if f.handle == "" { 2103 return 0, os.ErrClosed 2104 } 2105 2106 switch whence { 2107 case io.SeekStart: 2108 case io.SeekCurrent: 2109 offset += f.offset 2110 case io.SeekEnd: 2111 fi, err := f.stat() 2112 if err != nil { 2113 return f.offset, err 2114 } 2115 offset += fi.Size() 2116 default: 2117 return f.offset, unimplementedSeekWhence(whence) 2118 } 2119 2120 if offset < 0 { 2121 return f.offset, os.ErrInvalid 2122 } 2123 2124 f.offset = offset 2125 return f.offset, nil 2126} 2127 2128// Chown changes the uid/gid of the current file. 2129func (f *File) Chown(uid, gid int) error { 2130 f.mu.RLock() 2131 defer f.mu.RUnlock() 2132 2133 if f.handle == "" { 2134 return os.ErrClosed 2135 } 2136 2137 return f.c.fsetstat(f.handle, sshFileXferAttrUIDGID, &FileStat{ 2138 UID: uint32(uid), 2139 GID: uint32(gid), 2140 }) 2141} 2142 2143// Chmod changes the permissions of the current file. 2144// 2145// See Client.Chmod for details. 2146func (f *File) Chmod(mode os.FileMode) error { 2147 f.mu.RLock() 2148 defer f.mu.RUnlock() 2149 2150 if f.handle == "" { 2151 return os.ErrClosed 2152 } 2153 2154 return f.c.fsetstat(f.handle, sshFileXferAttrPermissions, toChmodPerm(mode)) 2155} 2156 2157// SetExtendedData sets extended attributes of the current file. It uses the 2158// SSH_FILEXFER_ATTR_EXTENDED flag in the setstat request. 2159// 2160// This flag provides a general extension mechanism for vendor-specific extensions. 2161// Names of the attributes should be a string of the format "name@domain", where "domain" 2162// is a valid, registered domain name and "name" identifies the method. Server 2163// implementations SHOULD ignore extended data fields that they do not understand. 2164func (f *File) SetExtendedData(path string, extended []StatExtended) error { 2165 f.mu.RLock() 2166 defer f.mu.RUnlock() 2167 2168 if f.handle == "" { 2169 return os.ErrClosed 2170 } 2171 2172 attrs := &FileStat{ 2173 Extended: extended, 2174 } 2175 2176 return f.c.fsetstat(f.handle, sshFileXferAttrExtended, attrs) 2177} 2178 2179// Truncate sets the size of the current file. Although it may be safely assumed 2180// that if the size is less than its current size it will be truncated to fit, 2181// the SFTP protocol does not specify what behavior the server should do when setting 2182// size greater than the current size. 2183// We send a SSH_FXP_FSETSTAT here since we have a file handle 2184func (f *File) Truncate(size int64) error { 2185 f.mu.RLock() 2186 defer f.mu.RUnlock() 2187 2188 if f.handle == "" { 2189 return os.ErrClosed 2190 } 2191 2192 return f.c.fsetstat(f.handle, sshFileXferAttrSize, uint64(size)) 2193} 2194 2195// Sync requests a flush of the contents of a File to stable storage. 2196// 2197// Sync requires the server to support the fsync@openssh.com extension. 2198func (f *File) Sync() error { 2199 f.mu.Lock() 2200 defer f.mu.Unlock() 2201 2202 if f.handle == "" { 2203 return os.ErrClosed 2204 } 2205 2206 if data, ok := f.c.HasExtension(openssh.ExtensionFSync().Name); !ok || data != "1" { 2207 return &StatusError{ 2208 Code: sshFxOPUnsupported, 2209 msg: "fsync not supported", 2210 } 2211 } 2212 2213 id := f.c.nextID() 2214 typ, data, err := f.c.sendPacket(context.Background(), nil, &sshFxpFsyncPacket{ 2215 ID: id, 2216 Handle: f.handle, 2217 }) 2218 2219 switch { 2220 case err != nil: 2221 return err 2222 case typ == sshFxpStatus: 2223 return normaliseError(unmarshalStatus(id, data)) 2224 default: 2225 return &unexpectedPacketErr{want: sshFxpStatus, got: typ} 2226 } 2227} 2228 2229// normaliseError normalises an error into a more standard form that can be 2230// checked against stdlib errors like io.EOF or os.ErrNotExist. 2231func normaliseError(err error) error { 2232 switch err := err.(type) { 2233 case *StatusError: 2234 switch err.Code { 2235 case sshFxEOF: 2236 return io.EOF 2237 case sshFxNoSuchFile: 2238 return os.ErrNotExist 2239 case sshFxPermissionDenied: 2240 return os.ErrPermission 2241 case sshFxOk: 2242 return nil 2243 default: 2244 return err 2245 } 2246 default: 2247 return err 2248 } 2249} 2250 2251// flags converts the flags passed to OpenFile into ssh flags. 2252// Unsupported flags are ignored. 2253func toPflags(f int) uint32 { 2254 var out uint32 2255 switch f & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) { 2256 case os.O_RDONLY: 2257 out |= sshFxfRead 2258 case os.O_WRONLY: 2259 out |= sshFxfWrite 2260 case os.O_RDWR: 2261 out |= sshFxfRead | sshFxfWrite 2262 } 2263 if f&os.O_APPEND == os.O_APPEND { 2264 out |= sshFxfAppend 2265 } 2266 if f&os.O_CREATE == os.O_CREATE { 2267 out |= sshFxfCreat 2268 } 2269 if f&os.O_TRUNC == os.O_TRUNC { 2270 out |= sshFxfTrunc 2271 } 2272 if f&os.O_EXCL == os.O_EXCL { 2273 out |= sshFxfExcl 2274 } 2275 return out 2276} 2277 2278// toChmodPerm converts Go permission bits to POSIX permission bits. 2279// 2280// This differs from fromFileMode in that we preserve the POSIX versions of 2281// setuid, setgid and sticky in m, because we've historically supported those 2282// bits, and we mask off any non-permission bits. 2283func toChmodPerm(m os.FileMode) (perm uint32) { 2284 const mask = os.ModePerm | os.FileMode(s_ISUID|s_ISGID|s_ISVTX) 2285 perm = uint32(m & mask) 2286 2287 if m&os.ModeSetuid != 0 { 2288 perm |= s_ISUID 2289 } 2290 if m&os.ModeSetgid != 0 { 2291 perm |= s_ISGID 2292 } 2293 if m&os.ModeSticky != 0 { 2294 perm |= s_ISVTX 2295 } 2296 2297 return perm 2298}