From 6160425d09d38fd419f5da5ea96db3e8b549cca3 Mon Sep 17 00:00:00 2001 From: byteskeptical <40208858+byteskeptical@users.noreply.github.com> Date: Wed, 18 Oct 2023 16:26:48 +0000 Subject: [PATCH 1/3] adding workers attribute ThreadPool functions to limit the number of concurrent connections when forced to --- docs/changes.rst | 6 +++++- docs/conf.py | 4 ++-- setup.py | 2 +- sftpretty/__init__.py | 42 ++++++++++++++++++++++++++++++------------ 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/docs/changes.rst b/docs/changes.rst index 57add999..8c60e65a 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -1,7 +1,11 @@ Change Log ========== -1.1.1 (current, released 2023-08-28) +1.1.2 (current, released 2023-10-18) +------------------------------------ + * added support for setting max_workers on ThreadPool functions + +1.1.1 (released 2023-08-28) ------------------------------------ * added initial support for resuming existing transfers get() & put() * added max_concurrent_prefetch_requests parameter to get() family diff --git a/docs/conf.py b/docs/conf.py index 5880a9fb..a3eb1068 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -54,9 +54,9 @@ # built documents. # # The short X.Y version. -version = '1.1.1' +version = '1.1.2' # The full version, including alpha/beta/rc tags. -release = '1.1.1' +release = '1.1.2' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/setup.py b/setup.py index 8043277a..5db26384 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ packages=['sftpretty', ], platforms=['any'], url='https://github.com/byteskeptical/sftpretty', - version='1.1.1', + version='1.1.2', classifiers=[ 'Development Status :: 5 - Production/Stable', 'License :: OSI Approved :: BSD License', diff --git a/sftpretty/__init__.py b/sftpretty/__init__.py index bef419f9..3be0bacd 100644 --- a/sftpretty/__init__.py +++ b/sftpretty/__init__.py @@ -462,7 +462,7 @@ def _get(self, remotefile, localpath=None, callback=None, def get_d(self, remotedir, localdir, callback=None, max_concurrent_prefetch_requests=None, pattern=None, - prefetch=True, preserve_mtime=False, resume=False, + prefetch=True, preserve_mtime=False, resume=False, workers=None, exceptions=None, tries=None, backoff=2, delay=1, logger=getLogger(__name__), silent=False): '''Get the contents of remotedir and write to locadir. Non-recursive. @@ -484,6 +484,9 @@ def get_d(self, remotedir, localdir, callback=None, it's st_atime) :param bool resume: *Default: False* - Continue a previous transfer based on destination path matching. + :param int workers: *Default: None* - If None, defaults to number of + processors multiplied by 5. Set to less than or equal to allowed + concurrent connections on server. :param Exception exceptions: Exception(s) to check. May be a tuple of exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,) or (ValueError, IOError(errno.ECOMM)) @@ -530,7 +533,8 @@ def get_d(self, remotedir, localdir, callback=None, if paths != []: thread_prefix = uuid4().hex - with ThreadPoolExecutor(thread_name_prefix=thread_prefix) as pool: + with ThreadPoolExecutor(max_workers=workers, + thread_name_prefix=thread_prefix) as pool: logger.debug(f'Thread Prefix: [{thread_prefix}]') threads = { pool.submit(self.get, remote, local, @@ -561,7 +565,7 @@ def get_d(self, remotedir, localdir, callback=None, def get_r(self, remotedir, localdir, callback=None, max_concurrent_prefetch_requests=None, pattern=None, - prefetch=True, preserve_mtime=False, resume=False, + prefetch=True, preserve_mtime=False, resume=False, workers=None, exceptions=None, tries=None, backoff=2, delay=1, logger=getLogger(__name__), silent=False): '''Recursively copy remotedir structure to localdir @@ -583,6 +587,9 @@ def get_r(self, remotedir, localdir, callback=None, it's st_atime) :param bool resume: *Default: False* - Continue a previous transfer based on destination path matching. + :param int workers: *Default: None* - If None, defaults to number of + processors multiplied by 5. Set to less than or equal to allowed + concurrent connections on server. :param Exception exceptions: Exception(s) to check. May be a tuple of exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,) or (ValueError, IOError(errno.ECOMM)) @@ -618,8 +625,9 @@ def get_r(self, remotedir, localdir, callback=None, max_concurrent_prefetch_requests=max_concurrent_prefetch_requests, # noqa: E501 pattern=pattern, prefetch=prefetch, preserve_mtime=preserve_mtime, resume=resume, - exceptions=exceptions, tries=tries, backoff=backoff, - delay=delay, logger=logger, silent=silent) + workers=workers, exceptions=exceptions, tries=tries, + backoff=backoff, delay=delay, logger=logger, + silent=silent) def getfo(self, remotefile, flo, callback=None, max_concurrent_prefetch_requests=None, prefetch=True, @@ -771,8 +779,9 @@ def _put(self, localfile, remotepath=None, callback=None, resume=resume) def put_d(self, localdir, remotedir, callback=None, confirm=True, - preserve_mtime=False, resume=False, exceptions=None, tries=None, - backoff=2, delay=1, logger=getLogger(__name__), silent=False): + preserve_mtime=False, resume=False, workers=None, + exceptions=None, tries=None, backoff=2, delay=1, + logger=getLogger(__name__), silent=False): '''Copies a local directory's contents to a remotepath :param str localdir: The local directory to copy remotely. @@ -788,6 +797,9 @@ def put_d(self, localdir, remotedir, callback=None, confirm=True, it's st_atime) :param bool resume: *Default: False* - Continue a previous transfer based on destination path matching. + :param int workers: *Default: None* - If None, defaults to number of + processors multiplied by 5. Set to less than or equal to allowed + concurrent connections on server. :param Exception exceptions: Exception(s) to check. May be a tuple of exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,) or (ValueError, IOError(errno.ECOMM)) @@ -824,7 +836,8 @@ def put_d(self, localdir, remotedir, callback=None, confirm=True, if paths != []: thread_prefix = uuid4().hex - with ThreadPoolExecutor(thread_name_prefix=thread_prefix) as pool: + with ThreadPoolExecutor(max_workers=workers, + thread_name_prefix=thread_prefix) as pool: logger.debug(f'Thread Prefix: [{thread_prefix}]') threads = { pool.submit(self.put, local, remote, @@ -852,8 +865,9 @@ def put_d(self, localdir, remotedir, callback=None, confirm=True, logger.info(f'No files found in directory [{localdir}]') def put_r(self, localdir, remotedir, callback=None, confirm=True, - preserve_mtime=False, resume=False, exceptions=None, tries=None, - backoff=2, delay=1, logger=getLogger(__name__), silent=False): + preserve_mtime=False, resume=False, workers=None, + exceptions=None, tries=None, backoff=2, delay=1, + logger=getLogger(__name__), silent=False): '''Recursively copies a local directory's contents to a remotepath :param str localdir: The local directory to copy remotely. @@ -869,6 +883,9 @@ def put_r(self, localdir, remotedir, callback=None, confirm=True, it's st_atime) :param bool resume: *Default: False* - Continue a previous transfer based on destination path matching. + :param int workers: *Default: None* - If None, defaults to number of + processors multiplied by 5. Set to less than or equal to allowed + concurrent connections on server. :param Exception exceptions: Exception(s) to check. May be a tuple of exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,) or (ValueError, IOError(errno.ECOMM)) @@ -901,8 +918,9 @@ def put_r(self, localdir, remotedir, callback=None, confirm=True, for local, remote in tree[roots]: self.put_d(local, remote, callback=callback, confirm=confirm, preserve_mtime=preserve_mtime, resume=resume, - exceptions=exceptions, tries=tries, backoff=backoff, - delay=delay, logger=logger, silent=silent) + workers=workers, exceptions=exceptions, tries=tries, + backoff=backoff, delay=delay, logger=logger, + silent=silent) def putfo(self, flo, remotepath=None, file_size=None, callback=None, confirm=True, exceptions=None, tries=None, backoff=2, From 7a362df204c3a13465943b766109b80518940b9d Mon Sep 17 00:00:00 2001 From: byteskeptical <40208858+byteskeptical@users.noreply.github.com> Date: Thu, 19 Oct 2023 16:27:31 +0000 Subject: [PATCH 2/3] updating wording around default worker numbers to match upstream, adding example in README of new workers attribute, version bump --- README.rst | 6 ++++++ docs/authors.rst | 2 +- docs/contributing.rst | 1 + docs/cookbook.rst | 4 ++-- docs/index.rst | 11 +++++++++-- sftpretty/__init__.py | 8 ++++---- 6 files changed, 23 insertions(+), 9 deletions(-) diff --git a/README.rst b/README.rst index ac94ecfc..0d5d7fc1 100644 --- a/README.rst +++ b/README.rst @@ -61,6 +61,12 @@ Example SSHException), tries=6) + # Use public key authentication + with Connection('hostname', private_key='~/.ssh/id_ed25519') as sftp: + # Resume the download of a bigfile and save it to /mnt locally. + sftp.get('bigfile', '/mnt', preserve_mtime=True, resume=True) + + # Use public key authentication with optional private key password with Connection('hostname', private_key='~/.ssh/id_ed25519', private_key_pass='secret') as sftp: diff --git a/docs/authors.rst b/docs/authors.rst index 01df3bfd..78b3d773 100644 --- a/docs/authors.rst +++ b/docs/authors.rst @@ -1,6 +1,6 @@ Authors ======= - Contributors of code, tests and documentation to the project who have agreed to have their work enjoined into the project and project license (BSD). +Contributors of code, tests and documentation to the project who have agreed to have their work enjoined into the project and project license (BSD). Acknowledgment diff --git a/docs/contributing.rst b/docs/contributing.rst index 69422f78..677d5ec1 100644 --- a/docs/contributing.rst +++ b/docs/contributing.rst @@ -20,6 +20,7 @@ Code a. Setup CI testing for your Fork. Currently testing is done on Github Actions but feel free to use the testing framework of your choosing. b. Testing features that concern chmod, chown on Windows is NOT supported. Testing compression has to be ran against a local compatible sshd and not the plugin as it does NOT support this test. c. You will need to setup an ssh daemon on your local machine and create a user: copy the contents of id_sftpretty.pub to the newly created user's authorized_keys file -- Tests that can only be run locally are skipped using the @skip_if_ci decorator so they don't fail when the test suite is run on the CI server. + #. Ensure that your name is added to the end of the :doc:`authors` file using the format Name (url), where the (url) portion is optional. #. Submit a Pull Request to the project. diff --git a/docs/cookbook.rst b/docs/cookbook.rst index 50743b31..5be884e8 100644 --- a/docs/cookbook.rst +++ b/docs/cookbook.rst @@ -272,7 +272,7 @@ local directory to a remote one via SFTP. :meth:`sftpretty.Connection.put_r` ---------------------------------- This method copies all files *and* directories from a local path to a remote -path. It creates directories, and happily succeeds even if the target +path. It creates directories, and happily succeeds even if the target directories already exist. .. code-block:: python @@ -388,7 +388,7 @@ SFTPAttribute.filename, instead of paramiko's arbitrary order. :meth:`sftpretty.Connection.mkdir` ---------------------------------- Just like :meth:`.chmod`, the mode is an integer representation of the octal -number to use. Just like the unix cmd, `chmod` you use 744 not 0744 or 0o744. +number to use. Just like the unix cmd, `chmod` you use 744 not 0744 or 0o744. .. code-block:: python diff --git a/docs/index.rst b/docs/index.rst index bec6aeff..00d84f6c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -60,15 +60,22 @@ Example exceptions=(NoValidConnectionsError, socket.timeout, SSHException), tries=6) + + # Use public key authentication + with Connection('hostname', private_key='~/.ssh/id_ed25519') as sftp: + # Resume the download of a bigfile and save it to /mnt locally. + sftp.get('bigfile', '/mnt', preserve_mtime=True, resume=True) + + # Use public key authentication with optional private key password with Connection('hostname', private_key='~/.ssh/id_ed25519', private_key_pass='secret') as sftp: # Recursively download a remote_directory and save it to /tmp locally. # Don't confirm files, useful in a scenario where the server removes # the remote file immediately after download. Preserve remote mtime on - # local copy. + # local copy. Limit the concurrent connections to the server. sftp.get_r('remote_directory', '/tmp', confirm=False, - preserve_mtime=True) + preserve_mtime=True, workers=6) # Use OpenSSH format config for public key authentication. Configuration diff --git a/sftpretty/__init__.py b/sftpretty/__init__.py index 3be0bacd..0713c758 100644 --- a/sftpretty/__init__.py +++ b/sftpretty/__init__.py @@ -485,7 +485,7 @@ def get_d(self, remotedir, localdir, callback=None, :param bool resume: *Default: False* - Continue a previous transfer based on destination path matching. :param int workers: *Default: None* - If None, defaults to number of - processors multiplied by 5. Set to less than or equal to allowed + processors plus 4. Set to less than or equal to allowed concurrent connections on server. :param Exception exceptions: Exception(s) to check. May be a tuple of exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,) @@ -588,7 +588,7 @@ def get_r(self, remotedir, localdir, callback=None, :param bool resume: *Default: False* - Continue a previous transfer based on destination path matching. :param int workers: *Default: None* - If None, defaults to number of - processors multiplied by 5. Set to less than or equal to allowed + processors plus 4. Set to less than or equal to allowed concurrent connections on server. :param Exception exceptions: Exception(s) to check. May be a tuple of exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,) @@ -798,7 +798,7 @@ def put_d(self, localdir, remotedir, callback=None, confirm=True, :param bool resume: *Default: False* - Continue a previous transfer based on destination path matching. :param int workers: *Default: None* - If None, defaults to number of - processors multiplied by 5. Set to less than or equal to allowed + processors plus 4. Set to less than or equal to allowed concurrent connections on server. :param Exception exceptions: Exception(s) to check. May be a tuple of exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,) @@ -884,7 +884,7 @@ def put_r(self, localdir, remotedir, callback=None, confirm=True, :param bool resume: *Default: False* - Continue a previous transfer based on destination path matching. :param int workers: *Default: None* - If None, defaults to number of - processors multiplied by 5. Set to less than or equal to allowed + processors plus 4. Set to less than or equal to allowed concurrent connections on server. :param Exception exceptions: Exception(s) to check. May be a tuple of exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,) From 10f6ffccaa699fa073927fdff5312e9084a0cccc Mon Sep 17 00:00:00 2001 From: byteskeptical <40208858+byteskeptical@users.noreply.github.com> Date: Tue, 24 Oct 2023 13:46:31 +0000 Subject: [PATCH 3/3] adding documentation for new worker attribute, updating release date due to the procrastination of documentation --- README.rst | 6 +++--- docs/changes.rst | 2 +- docs/cookbook.rst | 24 ++++++++++++++++-------- docs/index.rst | 2 +- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/README.rst b/README.rst index 0d5d7fc1..8246b552 100644 --- a/README.rst +++ b/README.rst @@ -61,7 +61,7 @@ Example SSHException), tries=6) - # Use public key authentication + # Use public key authentication with Connection('hostname', private_key='~/.ssh/id_ed25519') as sftp: # Resume the download of a bigfile and save it to /mnt locally. sftp.get('bigfile', '/mnt', preserve_mtime=True, resume=True) @@ -73,9 +73,9 @@ Example # Recursively download a remote_directory and save it to /tmp locally. # Don't confirm files, useful in a scenario where the server removes # the remote file immediately after download. Preserve remote mtime on - # local copy. + # local copy. Limit the thread pool connections to the server. sftp.get_r('remote_directory', '/tmp', confirm=False, - preserve_mtime=True) + preserve_mtime=True, workers=6) # Use OpenSSH format config for public key authentication. Configuration diff --git a/docs/changes.rst b/docs/changes.rst index 8c60e65a..8a8bc5ef 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -1,7 +1,7 @@ Change Log ========== -1.1.2 (current, released 2023-10-18) +1.1.2 (current, released 2023-10-24) ------------------------------------ * added support for setting max_workers on ThreadPool functions diff --git a/docs/cookbook.rst b/docs/cookbook.rst index 5be884e8..523decb1 100644 --- a/docs/cookbook.rst +++ b/docs/cookbook.rst @@ -217,23 +217,27 @@ destination path matching. :meth:`sftpretty.Connection.get_d` ---------------------------------- This sftpretty method is an abstraction above :meth:`.get` that allows you to -copy all the files in a remote directory to a local path. +copy all the files in a remote directory to a local path. This is a +multi-threaded function that can quickly surpass the remote server's concurrent +connection threshold for directories with many files. The workers attribute is +provided to allow for a more granular level of control of the thread pool only. .. code-block:: python # copy all files under public to a local path, preserving modification time - sftp.get_d('public', 'local-backup', preserve_mtime=True) + sftp.get_d('public', 'local-backup', preserve_mtime=True, workers=4) :meth:`sftpretty.Connection.get_r` ---------------------------------- This sftpretty method is an abstraction that recursively copies files *and* -directories from the remote to a local path. +directories from the remote to a local path. Advice about managing concurrent +connections from above still applies. .. code-block:: python # copy all files *and* directories under public to a local path - sftp.get_r('public', 'local-backup', preserve_mtime=True) + sftp.get_r('public', 'local-backup', preserve_mtime=True, workers=16) :meth:`sftpretty.Connection.put` @@ -260,26 +264,30 @@ destination path matching. :meth:`sftpretty.Connection.put_d` ---------------------------------- The opposite of :meth:`.get_d`, put_d allows you to copy the contents of a -local directory to a remote one via SFTP. +local directory to a remote one via SFTP. This is multi-threaded function that +can quickly surpass the remote server's concurrent connection threshold. The +workers attribute is provided to allow for a more granular level of control of +the thread pool only. .. code-block:: python # copy files from images, to remote static/images directory, # preserving modification times on files - sftp.put_d('images', 'static/images', preserve_mtime=True) + sftp.put_d('images', 'static/images', preserve_mtime=True, workers=6) :meth:`sftpretty.Connection.put_r` ---------------------------------- This method copies all files *and* directories from a local path to a remote path. It creates directories, and happily succeeds even if the target -directories already exist. +directories already exist. Advice about managing concurrent connections from +above still applies. .. code-block:: python # recursively copy files + directories from local static, to remote static, # preserving modification times on directories and files - sftp.put_r('static', 'static', preserve_mtime=True) + sftp.put_r('static', 'static', preserve_mtime=True, workers=12) :meth:`sftpretty.Connection.cd` diff --git a/docs/index.rst b/docs/index.rst index 00d84f6c..54adbf04 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -73,7 +73,7 @@ Example # Recursively download a remote_directory and save it to /tmp locally. # Don't confirm files, useful in a scenario where the server removes # the remote file immediately after download. Preserve remote mtime on - # local copy. Limit the concurrent connections to the server. + # local copy. Limit the thread pool connections to the server. sftp.get_r('remote_directory', '/tmp', confirm=False, preserve_mtime=True, workers=6)