ficapy
10/21/2014 - 5:20 PM

Celery task bug in a SQLAlchemy event

Celery task bug in a SQLAlchemy event

import unittest

from celery import Celery
from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy, event

# this is pretty standard init for celery, as exposed on flask website
def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

# App standard initialization
app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL=None,
    CELERY_RESULT_BACKEND=None,
    CELERY_ALWAYS_EAGER=True,
    SQLALCHEMY_DATABASE_URI='sqlite://'
)

celery = make_celery(app)
db = SQLAlchemy(app)


# We need a model to attach the event to
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    value = db.Column(db.Integer)

# Here is the event. Triggering a delete on the model
# instance will call the task
@event.listens_for(User, "after_delete")
def after_delete(mapper, connection, target):
    # right here
    # >>> connection.closed
    # False
    my_task.delay()
    # and now...
    # >>> connection.closed
    # True

# The task doesn't do anything so its body isn't to blame
@celery.task()
def my_task():
    # print "my task"
    return


class CeleryTestCase(unittest.TestCase):

    def setUp(self):
        db.create_all()

    def tearDown(self):
        db.session.remove()
        db.drop_all()

    def test_delete_task_outside_event(self):
        user = User(value=1)
        db.session.add(user)
        db.session.commit()
        my_task.delay()
        user.value=2
        db.session.add(user)
        db.session.commit()


    def test_delete_task_in_event(self):
        user = User(value=1)
        db.session.add(user)
        db.session.commit()
        db.session.delete(user)
        db.session.commit()


if __name__ == '__main__':
    unittest.main()
Flask==0.10.1
Flask-SQLAlchemy==2.0
Jinja2==2.7.3
MarkupSafe==0.23
SQLAlchemy==0.9.8
Werkzeug==0.9.6
amqp==1.4.6
anyjson==0.3.3
argparse==1.2.1
billiard==3.3.0.18
celery==3.1.16
itsdangerous==0.24
kombu==3.0.23
pytz==2014.7
wsgiref==0.1.2

Celery Bug

When running a celery task, in a SQLAlchemy event, in a unit-test the connection get destroyed somehow.

I attached the requirement to reproduce the bug:

git clone https://gist.github.com/5762792fc1d628843697.git
cd 5762792fc1d628843697
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python test.py

There is a bounty open for this bug on stack overflow