Skip to content

Commit

Permalink
Add persistent schedule using redis jobstore
Browse files Browse the repository at this point in the history
  • Loading branch information
codingjoe committed May 23, 2023
1 parent adee99a commit d991d27
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 41 deletions.
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ Finally, you lauch the scheduler in a separate process:
python3 manage.py crontab
```

### Setup Redis as a lock backend (optional)
### Setup Redis as a lock backend and persistent schedule (optional)

If you use Redis as a broker, you can use Redis as a lock backend as well.
The lock backend is used to prevent multiple instances of the scheduler from running at the same time.
The lock backend is used to prevent multiple instances of the scheduler
from running at the same time. This is important if you have multiple
instances of your application running.

You can also use Redis as a persistent schedule backend. This is useful
if you want to keep the schedule between restarts of the scheduler.

```python
# settings.py
Expand Down Expand Up @@ -88,4 +93,4 @@ options:

[dramatiq]: https://dramatiq.io/
[apscheduler]: https://apscheduler.readthedocs.io/en/stable/
[sentry]: https://docs.sentry.io/product/crons/
[sentry]: https://docs.sentry.io/product/crons/
4 changes: 3 additions & 1 deletion dramatiq_crontab/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from django.utils import timezone

from . import _version
from .conf import get_settings
from .utils import jobstores

try:
from sentry_sdk.crons import monitor
Expand All @@ -18,7 +20,7 @@
__all__ = ["cron", "scheduler"]


scheduler = BlockingScheduler()
scheduler = BlockingScheduler(jobstores=jobstores)


def cron(schedule):
Expand Down
32 changes: 8 additions & 24 deletions dramatiq_crontab/management/commands/crontab.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,21 @@
from django.apps import apps
from django.core.management import BaseCommand

try:
import redis
from redis.exceptions import LockError
except ImportError:
redis = None
LockError = Exception
from ...utils import LockError, lock

try:
from sentry_sdk import capture_exception
except ImportError:
capture_exception = lambda e: None

from ... import scheduler
from ...conf import get_settings


def kill_softly(signum, frame):
"""Raise a KeyboardInterrupt to stop the scheduler and release the lock."""
raise KeyboardInterrupt("Received SIGTERM!")


def get_redis_client():
redis_url = get_settings().REDIS_URL
if redis and redis_url:
return redis.Redis.from_url(redis_url)


class Command(BaseCommand):
"""Run dramatiq task scheduler for all tasks with the `cron` decorator."""

Expand All @@ -54,17 +42,13 @@ def handle(self, *args, **options):
if not options["no_heartbeat"]:
importlib.import_module("dramatiq_crontab.tasks")
self.stdout.write("Scheduling heartbeat.")
redis = get_redis_client()
if redis:
try:
# Lock scheduler to prevent multiple instances from running.
with redis.lock("dramatiq-scheduler", blocking_timeout=0):
self.launch_scheduler()
except LockError as e:
capture_exception(e)
self.stderr.write("Another scheduler is already running.")
else:
self.launch_scheduler()
try:
# Lock scheduler to prevent multiple instances from running.
with lock:
self.launch_scheduler()
except LockError as e:
capture_exception(e)
self.stderr.write("Another scheduler is already running.")

def launch_scheduler(self):
signal.signal(signal.SIGTERM, kill_softly)
Expand Down
30 changes: 30 additions & 0 deletions dramatiq_crontab/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from contextlib import contextmanager

from dramatiq_crontab.conf import get_settings

if redis_url := get_settings().REDIS_URL:
import redis
from apscheduler.jobstores.redis import RedisJobStore
from redis.exceptions import LockError # noqa

redis_client = redis.Redis.from_url(redis_url)
lock = redis_client.lock("dramatiq-scheduler", blocking_timeout=0)

jobstores = {
"default": RedisJobStore(
connection_pool=redis.ConnectionPool.from_url(get_settings().REDIS_URL)
)
}


else:

@contextmanager
def lock():
"""Dummy lock context manager."""
yield

LockError = Exception
jobstores = {}

__all__ = ["LockError", "lock", "jobstores"]
Empty file added tests/conftest.py
Empty file.
15 changes: 3 additions & 12 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
from django.core.management import call_command

import dramatiq_crontab.utils
from dramatiq_crontab.management.commands import crontab


Expand All @@ -12,16 +13,6 @@ def test_kill_softly():
crontab.kill_softly(None, None)


def test_get_redis_client__none(settings):
settings.DRAMATIQ_CRONTAB = {}
assert crontab.get_redis_client() is None


@pytest.mark.skipif(crontab.redis is None, reason="redis is not installed")
def test_get_redis_client():
assert crontab.get_redis_client() is not None


class TestCrontab:
@pytest.fixture()
def patch_launch(self, monkeypatch):
Expand All @@ -48,9 +39,9 @@ def test_no_heartbeat(self, patch_launch):
assert "Loaded tasks from tests.testapp." in stdout.getvalue()
assert "Scheduling heartbeat." not in stdout.getvalue()

@pytest.mark.skipif(crontab.redis is None, reason="redis is not installed")
def test_locked(self):
with crontab.get_redis_client().lock("dramatiq-scheduler", blocking_timeout=0):
pytest.importorskip("redis", reason="redis is not installed")
with dramatiq_crontab.utils.lock:
with io.StringIO() as stderr:
call_command("crontab", stderr=stderr)
assert "Another scheduler is already running." in stderr.getvalue()
Expand Down
2 changes: 1 addition & 1 deletion tests/testapp/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
}
}

DRAMATIQ_CRONTAB = {"REDIS_URL": os.getenv("REDIS_URL", "redis://localhost:6379/0")}
DRAMATIQ_CRONTAB = {"REDIS_URL": os.getenv("REDIS_URL")}

dramatiq.set_broker(StubBroker())

Expand Down

0 comments on commit d991d27

Please sign in to comment.