How the worker runs?
django-streams
has a custom command to run the worker that will take of running the streams
and manage the gracefully shutdown
.
python manage.py worker
The custom django command
does two things:
- Subscribe to
signal.SIGINT
andsignal.SIGTERM
to stop the worker - Start the
engine
# django_streams.management.commands.worker.py
import logging
import signal
from django.core.management.base import BaseCommand
from django_streams.factories import create_engine
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Start kafka consumers"
def handle(self, *args, **options):
# StreamEngine is a Singlenton, so it will return the same instance
# as the user has defined in the custom django app.
engine = create_engine()
logger.info(f"Starting Engine with streams {engine}")
# Listening signals from main Thread
signal.signal(signal.SIGINT, engine.sync_stop) # IMPORTANT
signal.signal(signal.SIGTERM, engine.sync_stop) # IMPORTANT
# start worker
engine.sync_start()
Signals
When you write a custom command in django
you are in a sync
context but coroutines
must run in an async
context.
When the command/program
finishes, the main thread
finishes but not the secondary thread
, unless that the worker is subscribed to
the signals signal.SIGINT
and signal.SIGTERM
. Subscribing to the signals will guarantee that:
- The
kafka consumers
are stopped in a proper way. - Gracefully shutdown.
- If you are using
kubernetes
, the pod will be terminated faster.
So, the worker
have to subscribe to the signals:
def handle(self, *args, **options):
# Listening signals from main Thread
signal.signal(signal.SIGINT, engine.sync_stop) # IMPORTANT
signal.signal(signal.SIGTERM, engine.sync_stop) # IMPORTANT
# start the engine
engine.sync_start()