Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.
The requirements are easy_installable since we’re using ghettoq instead of the more powerful but complicated RabbitMQ:
sudo pip install django-celery sudo pip install djkombu
In the simplest case, we can just add the apps and point them to use the current django db settings:
INSTALLED_APPS += ( 'djcelery', 'djkombu' ) CARROT_BACKEND = "django" CELERY_RESULT_BACKEND = "database" CELERY_TRACK_STARTED = True BROKER_BACKEND = "djkombu.transport.DatabaseTransport" #Make sure to add any modules containing tasks other than app.tasks CELERY_IMPORT = ('myapp.other_tasks',)
After that, make sure to run python manage.py syncdb to create the necessary tables
Just a normal python function with a decorator:
from celery.decorators import task @task(rate_limit='4/m') def get_shps(name, **kwargs): logger = get_shps.get_logger(**kwargs) logger.info("Starting the task") polygons = some_long_process(name) return polygons
rate_limit sets the frequency of task exceution (in this case 4 tasks per minute or ‘4/m’)
Beware of ‘unpicklable’ objects getting passed around.
Like a cron job. Requires that you start the celeryd service with a ‘heartbeat’:
from celery.decorators import periodic_task from datetime import timedelta @periodic_task(run_every=timedelta(seconds=30)) def do_stuff(**kwargs): clean_up_temp_files() return True
You can call the task directly to run syncronously:
x = get_shps('test')
Or you can do it async via the task queue:
ax = get_shps.delay('test')
The result of the async call can be monitored and the result retrieved when ready:
ax.status # u'PENDING' ax.status # u'STARTED' ax.status # u'SUCCESS' ax.ready() # True or False ax.result # The polygon objects returned by the task
The manner in which the import tasks statement is structured is very important to Celery. Where one of the following strategies may work on one machine or platform, the other strategy might be necessary on another machine or platform.
>>>from my_proj.my_app.tasks import add >>>result = add.delay(2,2) >>>result.status PENDING >>>from my_proj.my_app import tasks >>>result = tasks.add.delay(2,2) >>>result.status SUCCESS
You can test your import strategy in the shell with a simple process such as add.
from celery.decorators import task @task def add(x, y): return x + y
If the process seems to register with Celery but never completes (status equals PENDING and never changes), then your import command is not structured correctly for your platform. If result.status eventually returns STARTED or SUCCESS, then your import command is structured correctly and should be written as such in your code.
This has to be running to execute the jobs. If, for whatever reason, the celeryd service is stopped, jobs can still get added to the queue but wont get run until the celeryd process is fired up again.
You can run it from the command line in a terminal:
python manage.py celeryd -v 2 -l DEBUG -c 1 -B -E
Note the -B flag to turn on the ‘hearbeat’ for periodic tasks, the -c 1 which limits the operation to a single cpu
For production environments, use an init.d script. And example is in the code repository at madrona/apache/celeryd. Instructions are contained in the comments of that file.