ficapy
12/12/2016 - 6:43 AM

celery+sqlalchemy architecture demo

celery+sqlalchemy architecture demo

from base_celery import app
from model import db_session as session, Demo
from random import randint

@app.task(default_retry_delay=1, max_retries=1)
def add1():
    try:
        session.add(Demo(id=randint(1, 100000000000000), task=1))
        session.commit()
    except Exception as e:
        raise add1.retry(exc=e)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'

from sqlalchemy import create_engine, Column, BIGINT
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base

sqlalchemy_db = 'postgresql+psycopg2://user:pwd@127.0.0.1/db_name'

Base = declarative_base()


class Demo(Base):
    __tablename__ = 'demo'
    id = Column(BIGINT, primary_key=True)
    task = Column(BIGINT)


uline_engine = create_engine(sqlalchemy_db, pool_recycle=3600, echo=False)
Base.metadata.bind = uline_engine
Base.metadata.create_all(checkfirst=True)
db_session = scoped_session(sessionmaker(bind=uline_engine))
#!/usr/bin/env python
#-*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'


# http://docs.celeryproject.org/en/latest/userguide/configuration.html

BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/10'

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYD_CONCURRENCY = 10
CELERYD_MAX_TASKS_PER_CHILD = 100  # 执行100个任务后自动退出

BROKER_CONNECTION_MAX_RETRIES = 5


CELERY_IMPORTS = (
    'task1',
)
#!/usr/bin/env python
#-*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'


task1_list = []
from task1 import add1
from model import db_session,Demo

for i in range(10000):
    task1_list.append(add1.delay())

while True:
    print(db_session.query(Demo).count())
    import time
    time.sleep(1)
    print([i.status for i in task1_list])
    print(sum([i.status != 'SUCCESS' for i in task1_list]))
    if all([i.status == 'SUCCESS' for i in task1_list]):
        break
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Ficapy
# Create: '12/11/16'

import celeryconfig
from celery import Celery
from celery.signals import worker_init
from sqlalchemy import create_engine
from model import db_session, sqlalchemy_db

app = Celery('Demo')
app.config_from_object(celeryconfig)


@worker_init.connect
def initialize_session(**kwargs):
    some_engine = create_engine(sqlalchemy_db)
    db_session.configure(bind=some_engine)
  1. 启动celery

    celery -A base_celery worker --loglevel=info 
    
  2. 关闭celery(所有任务将丢失不会重启)

    pkill -9 -f 'celery worker'
    
  3. 调用任务

    python call.py