Skip to content

Commit

Permalink
Initial commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
danijar committed Aug 27, 2023
0 parents commit b8a70e9
Show file tree
Hide file tree
Showing 17 changed files with 1,781 additions and 0 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: embodied
on:
push: { branches: [main] }
pull_request: { branches: [main] }
jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: False
matrix:
python-version: ["3.10", "3.11"]
steps:
- uses: actions/checkout@v2
- name: Install Python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install pytest
python -m pip install -e .
- name: Run tests
run: |
python -m pytest
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.pytest_cache
.git
dist
__pycache__/
*.py[cod]
*.egg-info
MUJOCO_LOG.TXT
;
19 changes: 19 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Copyright 2023 Danijar Hafner

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
84 changes: 84 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
[![PyPI](https://img.shields.io/pypi/v/zerofun.svg)](https://pypi.python.org/pypi/zerofun/#history)

# zerofun

Remote function calls for array data using [ZMQ](https://zeromq.org/).

## Overview

Zerofun provides a `Server` that you can bind functions to and a `Client` that
can call the messages and receive their results. The function inputs and
results are both flat **dicts of Numpy arrays**. The data is sent efficiently
without serialization to maximize throughput.

## Installation

```sh
pip install zerofun
```

## Example

This example runs the server and client in the same Python program using
subprocesses, but they could also be separate Python scripts running on
different machines.

```python
def server():
import zerofun
server = zerofun.Server('tcp://*:2222')
server.bind('add', lambda data: {'result': data['foo'] + data['bar']})
server.bind('msg', lambda data: print('Message from client:', data['msg']))
server.run()

def client():
import zerofun
client = zerofun.Client('tcp://localhost:2222')
client.connect()
future = client.add({'foo': 1, 'bar': 1})
result = future.result()
print(result) # {'result': 2}
client.msg({'msg': 'Hello World'})

if __name__ == '__main__':
import zerofun
server_proc = zerofun.Process(server, start=True)
client_proc = zerofun.Process(client, start=True)
client_proc.join()
server_proc.terminate()
```

## Features

Several productivity and performance features are available:

- **Request batching:** The server can batch requests together so that the user
function receives a dict of stacked arrays and the function result will be
split and sent back to the corresponding clients.
- **Multithreading:** Servers can use a thread pool to process multiple
requests in parallel. Optionally, each function can also request its own
thread pool to allow functions to block (e.g. for rate limiting) without
blocking other functions.
- **Async clients:** Clients can send multiple overlapping requests and wait
on the results when needed using `Future` objects. The maximum number of
inflight requests can be limited to avoid requests building up when the
server is slower than the client.
- **Error handling:** Exceptions raised in server functions are reported to the
client and raised in `future.result()` or, if the user did not store the
future object, on the next request. Worker exception can also be reraised in
the server application using `server.check()`.
- **Heartbeating:** Clients can send ping requests when they have not received
a result from the server for a while, allowing to wait for results that take
a long time to compute without assuming connection loss.
- **Concurrency:** `Thread` and `Process` implementations with exception
forwarding that can be forcefully terminated by the parent, which Python
threads do not natively support. Stoppable threads and processes are also
available for coorperative shutdown.
- **GIL load reduction:** The `ProcServer` behaves just like the normal
`Server` but uses a background process to batch requests and fan out results,
substantially reducing GIL load for the server workers in the main process.

## Questions

Please open a [GitHub issue](https://github.com/danijar/zerofun/issues) for
each question. Over time, we will add common questions to the README.
22 changes: 22 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
def server():
import zerofun
server = zerofun.Server('tcp://*:2222')
server.bind('add', lambda data: {'result': data['foo'] + data['bar']})
server.bind('msg', lambda data: print('Message from client:', data['msg']))
server.run()

def client():
import zerofun
client = zerofun.Client('tcp://localhost:2222')
client.connect()
future = client.add({'foo': 1, 'bar': 1})
result = future.result()
print(result) # {'result': 2}
client.msg({'msg': 'Hello World'})

if __name__ == '__main__':
import zerofun
server_proc = zerofun.Process(server, start=True)
client_proc = zerofun.Process(client, start=True)
client_proc.join()
server_proc.terminate()
24 changes: 24 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pathlib
import re
import setuptools

text = (pathlib.Path(__file__).parent / 'zerofun/__init__.py').read_text()
version = re.search(r"__version__ = '(.*)'", text).group(1)

setuptools.setup(
name='zerofun',
version=version,
author='Danijar Hafner',
author_email='[email protected]',
description='Remote function calls for array data using ZMQ',
url='http://github.com/danijar/zerofun',
long_description=pathlib.Path('README.md').read_text(),
long_description_content_type='text/markdown',
packages=setuptools.find_packages(),
install_requires=['numpy', 'msgpack', 'pyzmq', 'cloudpickle'],
classifiers=[
'Intended Audience :: Science/Research',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3',
],
)
61 changes: 61 additions & 0 deletions tests/test_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import multiprocessing as mp
import pathlib
import sys
import time
import traceback

sys.path.append(str(pathlib.Path(__file__).parent.parent))

import pytest
import zerofun


class TestProcess:

def test_terminate(self):
def fn():
while True:
time.sleep(0.01)
worker = zerofun.Process(fn, start=True)
worker.terminate()
worker.join()

def test_stop(self):
def fn(context, q):
q.put('start')
while context.running:
time.sleep(0.01)
q.put('stop')
q = mp.get_context().SimpleQueue()
worker = zerofun.StoppableProcess(fn, q)
worker.start()
worker.stop()
assert q.get() == 'start'
assert q.get() == 'stop'

def test_exitcode(self):
worker = zerofun.Process(lambda: None)
assert worker.exitcode is None
worker.start()
worker.join()
assert worker.exitcode == 0

def test_exception(self):
def fn1234(q):
q.put(42)
raise KeyError('foo')
q = mp.get_context().SimpleQueue()
worker = zerofun.Process(fn1234, q, start=True)
q.get()
time.sleep(0.2)
assert not worker.alive
assert worker.exitcode == 1
with pytest.raises(KeyError) as info:
worker.check()
assert repr(info.value) == "KeyError('foo')"
tb = ''.join(traceback.format_exception(info.value))
assert "KeyError: 'foo'" in tb
if sys.version_info.minor >= 11:
assert 'Traceback' in tb
assert ' File ' in tb
assert 'fn1234' in tb
Loading

0 comments on commit b8a70e9

Please sign in to comment.