Skip to content

Commit

Permalink
Merge pull request #42 from byteskeptical/worker_cap
Browse files Browse the repository at this point in the history
Max Worker Rising
  • Loading branch information
byteskeptical committed Oct 24, 2023
2 parents 4647e08 + 10f6ffc commit 49954cc
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 31 deletions.
10 changes: 8 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,21 @@ Example
SSHException), tries=6)
# Use public key authentication
with Connection('hostname', private_key='~/.ssh/id_ed25519') as sftp:
# Resume the download of a bigfile and save it to /mnt locally.
sftp.get('bigfile', '/mnt', preserve_mtime=True, resume=True)
# Use public key authentication with optional private key password
with Connection('hostname', private_key='~/.ssh/id_ed25519',
private_key_pass='secret') as sftp:
# Recursively download a remote_directory and save it to /tmp locally.
# Don't confirm files, useful in a scenario where the server removes
# the remote file immediately after download. Preserve remote mtime on
# local copy.
# local copy. Limit the thread pool connections to the server.
sftp.get_r('remote_directory', '/tmp', confirm=False,
preserve_mtime=True)
preserve_mtime=True, workers=6)
# Use OpenSSH format config for public key authentication. Configuration
Expand Down
2 changes: 1 addition & 1 deletion docs/authors.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Authors
=======
Contributors of code, tests and documentation to the project who have agreed to have their work enjoined into the project and project license (BSD).
Contributors of code, tests and documentation to the project who have agreed to have their work enjoined into the project and project license (BSD).


Acknowledgment
Expand Down
6 changes: 5 additions & 1 deletion docs/changes.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
Change Log
==========

1.1.1 (current, released 2023-08-28)
1.1.2 (current, released 2023-10-24)
------------------------------------
* added support for setting max_workers on ThreadPool functions

1.1.1 (released 2023-08-28)
------------------------------------
* added initial support for resuming existing transfers get() & put()
* added max_concurrent_prefetch_requests parameter to get() family
Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
# built documents.
#
# The short X.Y version.
version = '1.1.1'
version = '1.1.2'
# The full version, including alpha/beta/rc tags.
release = '1.1.1'
release = '1.1.2'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
1 change: 1 addition & 0 deletions docs/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Code
a. Setup CI testing for your Fork. Currently testing is done on Github Actions but feel free to use the testing framework of your choosing.
b. Testing features that concern chmod, chown on Windows is NOT supported. Testing compression has to be ran against a local compatible sshd and not the plugin as it does NOT support this test.
c. You will need to setup an ssh daemon on your local machine and create a user: copy the contents of id_sftpretty.pub to the newly created user's authorized_keys file -- Tests that can only be run locally are skipped using the @skip_if_ci decorator so they don't fail when the test suite is run on the CI server.

#. Ensure that your name is added to the end of the :doc:`authors` file using the format Name <[email protected]> (url), where the (url) portion is optional.
#. Submit a Pull Request to the project.

Expand Down
28 changes: 18 additions & 10 deletions docs/cookbook.rst
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,27 @@ destination path matching.
:meth:`sftpretty.Connection.get_d`
----------------------------------
This sftpretty method is an abstraction above :meth:`.get` that allows you to
copy all the files in a remote directory to a local path.
copy all the files in a remote directory to a local path. This is a
multi-threaded function that can quickly surpass the remote server's concurrent
connection threshold for directories with many files. The workers attribute is
provided to allow for a more granular level of control of the thread pool only.

.. code-block:: python
# copy all files under public to a local path, preserving modification time
sftp.get_d('public', 'local-backup', preserve_mtime=True)
sftp.get_d('public', 'local-backup', preserve_mtime=True, workers=4)
:meth:`sftpretty.Connection.get_r`
----------------------------------
This sftpretty method is an abstraction that recursively copies files *and*
directories from the remote to a local path.
directories from the remote to a local path. Advice about managing concurrent
connections from above still applies.

.. code-block:: python
# copy all files *and* directories under public to a local path
sftp.get_r('public', 'local-backup', preserve_mtime=True)
sftp.get_r('public', 'local-backup', preserve_mtime=True, workers=16)
:meth:`sftpretty.Connection.put`
Expand All @@ -260,26 +264,30 @@ destination path matching.
:meth:`sftpretty.Connection.put_d`
----------------------------------
The opposite of :meth:`.get_d`, put_d allows you to copy the contents of a
local directory to a remote one via SFTP.
local directory to a remote one via SFTP. This is multi-threaded function that
can quickly surpass the remote server's concurrent connection threshold. The
workers attribute is provided to allow for a more granular level of control of
the thread pool only.

.. code-block:: python
# copy files from images, to remote static/images directory,
# preserving modification times on files
sftp.put_d('images', 'static/images', preserve_mtime=True)
sftp.put_d('images', 'static/images', preserve_mtime=True, workers=6)
:meth:`sftpretty.Connection.put_r`
----------------------------------
This method copies all files *and* directories from a local path to a remote
path. It creates directories, and happily succeeds even if the target
directories already exist.
path. It creates directories, and happily succeeds even if the target
directories already exist. Advice about managing concurrent connections from
above still applies.

.. code-block:: python
# recursively copy files + directories from local static, to remote static,
# preserving modification times on directories and files
sftp.put_r('static', 'static', preserve_mtime=True)
sftp.put_r('static', 'static', preserve_mtime=True, workers=12)
:meth:`sftpretty.Connection.cd`
Expand Down Expand Up @@ -388,7 +396,7 @@ SFTPAttribute.filename, instead of paramiko's arbitrary order.
:meth:`sftpretty.Connection.mkdir`
----------------------------------
Just like :meth:`.chmod`, the mode is an integer representation of the octal
number to use. Just like the unix cmd, `chmod` you use 744 not 0744 or 0o744.
number to use. Just like the unix cmd, `chmod` you use 744 not 0744 or 0o744.

.. code-block:: python
Expand Down
11 changes: 9 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,22 @@ Example
exceptions=(NoValidConnectionsError, socket.timeout,
SSHException), tries=6)
# Use public key authentication
with Connection('hostname', private_key='~/.ssh/id_ed25519') as sftp:
# Resume the download of a bigfile and save it to /mnt locally.
sftp.get('bigfile', '/mnt', preserve_mtime=True, resume=True)
# Use public key authentication with optional private key password
with Connection('hostname', private_key='~/.ssh/id_ed25519',
private_key_pass='secret') as sftp:
# Recursively download a remote_directory and save it to /tmp locally.
# Don't confirm files, useful in a scenario where the server removes
# the remote file immediately after download. Preserve remote mtime on
# local copy.
# local copy. Limit the thread pool connections to the server.
sftp.get_r('remote_directory', '/tmp', confirm=False,
preserve_mtime=True)
preserve_mtime=True, workers=6)
# Use OpenSSH format config for public key authentication. Configuration
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
packages=['sftpretty', ],
platforms=['any'],
url='https://github.com/byteskeptical/sftpretty',
version='1.1.1',
version='1.1.2',
classifiers=[
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: BSD License',
Expand Down
42 changes: 30 additions & 12 deletions sftpretty/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ def _get(self, remotefile, localpath=None, callback=None,

def get_d(self, remotedir, localdir, callback=None,
max_concurrent_prefetch_requests=None, pattern=None,
prefetch=True, preserve_mtime=False, resume=False,
prefetch=True, preserve_mtime=False, resume=False, workers=None,
exceptions=None, tries=None, backoff=2, delay=1,
logger=getLogger(__name__), silent=False):
'''Get the contents of remotedir and write to locadir. Non-recursive.
Expand All @@ -484,6 +484,9 @@ def get_d(self, remotedir, localdir, callback=None,
it's st_atime)
:param bool resume: *Default: False* - Continue a previous transfer
based on destination path matching.
:param int workers: *Default: None* - If None, defaults to number of
processors plus 4. Set to less than or equal to allowed
concurrent connections on server.
:param Exception exceptions: Exception(s) to check. May be a tuple of
exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,)
or (ValueError, IOError(errno.ECOMM))
Expand Down Expand Up @@ -530,7 +533,8 @@ def get_d(self, remotedir, localdir, callback=None,

if paths != []:
thread_prefix = uuid4().hex
with ThreadPoolExecutor(thread_name_prefix=thread_prefix) as pool:
with ThreadPoolExecutor(max_workers=workers,
thread_name_prefix=thread_prefix) as pool:
logger.debug(f'Thread Prefix: [{thread_prefix}]')
threads = {
pool.submit(self.get, remote, local,
Expand Down Expand Up @@ -561,7 +565,7 @@ def get_d(self, remotedir, localdir, callback=None,

def get_r(self, remotedir, localdir, callback=None,
max_concurrent_prefetch_requests=None, pattern=None,
prefetch=True, preserve_mtime=False, resume=False,
prefetch=True, preserve_mtime=False, resume=False, workers=None,
exceptions=None, tries=None, backoff=2, delay=1,
logger=getLogger(__name__), silent=False):
'''Recursively copy remotedir structure to localdir
Expand All @@ -583,6 +587,9 @@ def get_r(self, remotedir, localdir, callback=None,
it's st_atime)
:param bool resume: *Default: False* - Continue a previous transfer
based on destination path matching.
:param int workers: *Default: None* - If None, defaults to number of
processors plus 4. Set to less than or equal to allowed
concurrent connections on server.
:param Exception exceptions: Exception(s) to check. May be a tuple of
exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,)
or (ValueError, IOError(errno.ECOMM))
Expand Down Expand Up @@ -618,8 +625,9 @@ def get_r(self, remotedir, localdir, callback=None,
max_concurrent_prefetch_requests=max_concurrent_prefetch_requests, # noqa: E501
pattern=pattern, prefetch=prefetch,
preserve_mtime=preserve_mtime, resume=resume,
exceptions=exceptions, tries=tries, backoff=backoff,
delay=delay, logger=logger, silent=silent)
workers=workers, exceptions=exceptions, tries=tries,
backoff=backoff, delay=delay, logger=logger,
silent=silent)

def getfo(self, remotefile, flo, callback=None,
max_concurrent_prefetch_requests=None, prefetch=True,
Expand Down Expand Up @@ -771,8 +779,9 @@ def _put(self, localfile, remotepath=None, callback=None,
resume=resume)

def put_d(self, localdir, remotedir, callback=None, confirm=True,
preserve_mtime=False, resume=False, exceptions=None, tries=None,
backoff=2, delay=1, logger=getLogger(__name__), silent=False):
preserve_mtime=False, resume=False, workers=None,
exceptions=None, tries=None, backoff=2, delay=1,
logger=getLogger(__name__), silent=False):
'''Copies a local directory's contents to a remotepath
:param str localdir: The local directory to copy remotely.
Expand All @@ -788,6 +797,9 @@ def put_d(self, localdir, remotedir, callback=None, confirm=True,
it's st_atime)
:param bool resume: *Default: False* - Continue a previous transfer
based on destination path matching.
:param int workers: *Default: None* - If None, defaults to number of
processors plus 4. Set to less than or equal to allowed
concurrent connections on server.
:param Exception exceptions: Exception(s) to check. May be a tuple of
exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,)
or (ValueError, IOError(errno.ECOMM))
Expand Down Expand Up @@ -824,7 +836,8 @@ def put_d(self, localdir, remotedir, callback=None, confirm=True,

if paths != []:
thread_prefix = uuid4().hex
with ThreadPoolExecutor(thread_name_prefix=thread_prefix) as pool:
with ThreadPoolExecutor(max_workers=workers,
thread_name_prefix=thread_prefix) as pool:
logger.debug(f'Thread Prefix: [{thread_prefix}]')
threads = {
pool.submit(self.put, local, remote,
Expand Down Expand Up @@ -852,8 +865,9 @@ def put_d(self, localdir, remotedir, callback=None, confirm=True,
logger.info(f'No files found in directory [{localdir}]')

def put_r(self, localdir, remotedir, callback=None, confirm=True,
preserve_mtime=False, resume=False, exceptions=None, tries=None,
backoff=2, delay=1, logger=getLogger(__name__), silent=False):
preserve_mtime=False, resume=False, workers=None,
exceptions=None, tries=None, backoff=2, delay=1,
logger=getLogger(__name__), silent=False):
'''Recursively copies a local directory's contents to a remotepath
:param str localdir: The local directory to copy remotely.
Expand All @@ -869,6 +883,9 @@ def put_r(self, localdir, remotedir, callback=None, confirm=True,
it's st_atime)
:param bool resume: *Default: False* - Continue a previous transfer
based on destination path matching.
:param int workers: *Default: None* - If None, defaults to number of
processors plus 4. Set to less than or equal to allowed
concurrent connections on server.
:param Exception exceptions: Exception(s) to check. May be a tuple of
exceptions to check. IOError or IOError(errno.ECOMM) or (IOError,)
or (ValueError, IOError(errno.ECOMM))
Expand Down Expand Up @@ -901,8 +918,9 @@ def put_r(self, localdir, remotedir, callback=None, confirm=True,
for local, remote in tree[roots]:
self.put_d(local, remote, callback=callback, confirm=confirm,
preserve_mtime=preserve_mtime, resume=resume,
exceptions=exceptions, tries=tries, backoff=backoff,
delay=delay, logger=logger, silent=silent)
workers=workers, exceptions=exceptions, tries=tries,
backoff=backoff, delay=delay, logger=logger,
silent=silent)

def putfo(self, flo, remotepath=None, file_size=None, callback=None,
confirm=True, exceptions=None, tries=None, backoff=2,
Expand Down

0 comments on commit 49954cc

Please sign in to comment.