今天介绍通过celery实现一个异步任务。有这样一个需求,前端发起一个查询的请求,但是发起查询后,查询可能不会立即返回结果。这时候,发起查询后,后端可以把这次查询当作一个task,并立即返回一个能唯一表明该task的值,如taskID(用户后面可以通过这个taskID 随时查看结果),用户收到这个taskID后,可以转去处理其他任务,而不必一直等待查询结果。后端API调用celery来处理这个task,并将结果值保存在一个csv文件中,后面用户通过taskID 查询时返回结果。
def application(environ,start_response): """部分代码省略""" query_string = environ['QUERY_STRING'] serviceGroupName = "" for getParam in query_string.split("&"): params = getParam.split("=") resultInfo = "" if params[0] == "type": alertType = params[1] elif params[0] == "projectName": projectName = params[1] elif params[0] == "serviceGroupName": serviceGroupName = params[1] else: resultInfo = error_info(-1, "GET参数只能为type=<?>&projectName=<?>&serviceGroupName=<?>;必须指定三个参数", {}) return [resultInfo] taskId = 1 result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv' contentInfo = json.dumps({"taskId":1,"opType":"continue","serviceGroupName":serviceGroupName,"dbHost":dbHost,"dbPasswd":dbPasswd,"dbUser":dbUser,"dbPort":dbPort}) result = getServiceInfo.apply_async((contentInfo,),queue="getServiceInfo") taskInfo = "任务已经创建,详情请查看:http://10.4.34.254/api/task"% (taskId) return [resultInfo]
getServiceInfo.apply_async((contentInfo,),queue=”getServiceInfo”),重点是这一行,apply_async()方法会返回一个AsyncResult实例,通过这个实例可以跟踪任务状态轨迹。
要使用此功能,需要提供结果后台(result backend),这样才有地方存储任务状态等信息。其中,getServiceInfo是自定义的一个task,后续会介绍到,contentInfo是传递的一个参数,queue是指定队列名称。
上面这个函数的原型如下:
task.apply_async(args[, kwargs[, …]])
其中 args 和 kwargs 分别是 task 接收的参数,当然它也接受额外的参数对任务进行控制。
在 Celery 中执行任务的方法一共有三种:
1. delay, 用来进行最简单便捷的任务执行(delay在第3小节的测试中使用过,它可以看作是apply_async的一个快捷方式);
2. apply_async, 对于任务的执行附加额外的参数,对任务进行控制;
3. app.send_task, 可以执行未在 Celery 中进行注册的任务。
celery文件配置
在python的库存放路径中(一般是/usr/lib/python2.6/site-packages),创建一个文件夹proj,进入proj目录,创建三个文件,init,将proj声明一个python包,celepy,其内容如下:
#_*_ coding:utf-8 _*_ from __future__ import absolute_import from celery import Celery app = Celery("proj", broker="amqp://user:password@localhost//", backend="amqp", include=["proj.tasks"] ) app.conf.update( CELERY_ROUTES={ "proj.tasks.getServerInfo":{"queue":"getServerInfo"}, } ) if __name__=="__main__": app.start()
这里我们定义了模块名称proj以及celery 路由。
还有一个文件,task.py
#_*_ coding:utf-8 _*_i from __future__ import absolute_import from proj.celery import app import random import simplejson as json import types import time import MySQLdb import urllib2 import ConfigParser as cparser import hmac import hashlib import base64 @app.task def getServiceInfo(contentInfo): contentInfo = json.loads(contentInfo) serviceGroupName = contentInfo['serviceGroupName'] dbHost = contentInfo['dbHost'] dbPort = int(contentInfo['dbPort']) dbUser = contentInfo['dbUser'] dbPasswd = contentInfo['dbPasswd'] msgLib = MessageLib.MessageLib() Sql = "Your SQL" #第三步:连接数据库,执行代码逻辑 try: db_connection = MySQLdb.connect(host=dbHost, port=dbPort, passwd=dbPasswd, db="cmdb", user=dbUser, connect_timeout=2, charset="utf8") cursor = db_connection.cursor() cursor.execute(getServiceGroupHostSql) row = cursor.fetchall() result = [] for line in row: ... result.append(tempMysqlHighInfo) resultInfo = msgLib.success_info(result) return resultInfo except Exception, e: raise errorInfo = "dbhost:%s, port:%s, error:%s" % (dbHost, dbPort, str(e)) #return getServiceGroupHostSql,errorInfo return msgLib.error_info(-1, errorInfo, {})
启动celery
celery -A proj worker -Q getServiceInfo -l debug -c 6
最后,写一个结果,专门获取查询结果的结果,传入的参数为taskID,部分代码如下:
def application(environ,start_response): status = '400 ERROR' response_headers = [('Content-type', 'application/json;charset=utf-8')] start_response(status, response_headers) status = '200 OK' response_headers = [('Content-type', 'application/json;charset=utf-8')] start_response(status, response_headers) if environ['REQUEST_METHOD'] != "GET": resultInfo = msgLib.error_info(-1, "http请求类型不是GET", {}) return [resultInfo] query_string = environ['QUERY_STRING'] serviceGroupName = "" for getParam in query_string.split("&"): params = getParam.split("=") resultInfo = "" if params[0] == "taskId": taskId = params[1] else: resultInfo = msgLib.error_info(-1, "GET参数无比指定taskId这个参数", {}) return [resultInfo] logging.info(query_string) result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv' result = [] try: with open (result_file_name,'rb') as fp: lines = csv.reader(fp) for line in lines : result.append(line) resultInfo = msgLib.success_info(result) return resultInfo except Exception, e: errorInfo = "some thing wrong" return msgLib.error_info(-1, errorInfo, {})
以上这篇通过celery异步处理一个查询任务的完整代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
更新日志
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]