celery+sqlalchemy architecture demo
from base_celery import app
from model import db_session as session, Demo
from random import randint
@app.task(default_retry_delay=1, max_retries=1)
def add1():
    try:
        session.add(Demo(id=randint(1, 100000000000000), task=1))
        session.commit()
    except Exception as e:
        raise add1.retry(exc=e)#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'
from sqlalchemy import create_engine, Column, BIGINT
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
sqlalchemy_db = 'postgresql+psycopg2://user:pwd@127.0.0.1/db_name'
Base = declarative_base()
class Demo(Base):
    __tablename__ = 'demo'
    id = Column(BIGINT, primary_key=True)
    task = Column(BIGINT)
uline_engine = create_engine(sqlalchemy_db, pool_recycle=3600, echo=False)
Base.metadata.bind = uline_engine
Base.metadata.create_all(checkfirst=True)
db_session = scoped_session(sessionmaker(bind=uline_engine))#!/usr/bin/env python
#-*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'
# http://docs.celeryproject.org/en/latest/userguide/configuration.html
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/10'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYD_CONCURRENCY = 10
CELERYD_MAX_TASKS_PER_CHILD = 100  # 执行100个任务后自动退出
BROKER_CONNECTION_MAX_RETRIES = 5
CELERY_IMPORTS = (
    'task1',
)#!/usr/bin/env python
#-*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'
task1_list = []
from task1 import add1
from model import db_session,Demo
for i in range(10000):
    task1_list.append(add1.delay())
while True:
    print(db_session.query(Demo).count())
    import time
    time.sleep(1)
    print([i.status for i in task1_list])
    print(sum([i.status != 'SUCCESS' for i in task1_list]))
    if all([i.status == 'SUCCESS' for i in task1_list]):
        break#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'
import celeryconfig
from celery import Celery
from celery.signals import worker_init
from sqlalchemy import create_engine
from model import db_session, sqlalchemy_db
app = Celery('Demo')
app.config_from_object(celeryconfig)
@worker_init.connect
def initialize_session(**kwargs):
    some_engine = create_engine(sqlalchemy_db)
    db_session.configure(bind=some_engine)启动celery
celery -A base_celery worker --loglevel=info 
关闭celery(所有任务将丢失不会重启)
pkill -9 -f 'celery worker'
调用任务
python call.py