celery使用有感
最近在搞任务下发系统,调用接口传入关键参数,进行爬虫爬取。
# 初始化
一开始是通过threading+subprocess+fastapi来实现的任务下发。
但是遇到很多的问题,虽然都解决了,但是感觉效果并不理想。
目前是通过sqlite3来存储下发任务的记录,然后利用fastapi搭建接口服务。通过接口接收关键词以及爬虫路径然后将任务存放到fastapi提供的BackgrounTasks
中,之后利用subprocess执行脚本。
这里说两个困扰我时间比较长的问题
- 无法批量执行任务
- 部分任务需要查询状态后才能重新执行
这里其实比较容易解决的是2,我们的接口程序关闭后,一些爬虫程序没有结束也跟着关闭了,会导致数据库中的状态没有改变,所以这里直接在程序启动前将所有的任务状态修改成结束/未开始即可。
然后无法批量执行任务这个其实是subprocess执行脚本的问题。
我这里其实一开始以为可以等待任务执行结束后,将返回的状态更新到数据库中。所以写了process.wait()
用来等待子进程结束,结果发现一直不会走到下一个进程。通过Chat GPT发现原来这里会阻塞进程,将process.wait()
去掉就可以了。
process = subprocess.Popen(
["python", script_path, script_args],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True,
encoding='utf-8'
)
subprocess_pid = process.pid
process.wait()
# stdout_data, stderr_data = process.communicate()
# result = {
# "stdout": str(stdout_data),
# "stderr": str(stderr_data)
# }
2
3
4
5
6
7
8
9
10
11
12
13
但是这样会出现一个问题就是没办法获取到任务执行后的结果,所以我又增加了一个新的接口,通过pid来查询任务状态。
def get_process_status(pid):
try:
process = psutil.Process(pid)
return {"code": 0, "message": process.status(), "status": "进行中"}
except psutil.NoSuchProcess:
return {"code": 1, "message": "进程已结束/进程不存在/出现错误", "status": "结束"}
except Exception as e:
return {"code": 2, "message": f"未知错误: {str(e)}", "status": "错误"}
2
3
4
5
6
7
8
这样在执行任务的时候,就可以知道任务的实时情况了。
# 进化
我后来又有一个爬虫需要通过下发来完成,并且这个任务不是很急,所以准备优雅的搞一下。
通过fastapi+celery+flower来实现任务下发。
Celery
Celery是一个分布式任务队列系统,通常用于在分布式系统中执行后台作业和调度任务。Celery建立在消息传递的基础上,可以用于执行实时计算以及调度任务。
- 核心概念:
- Broker:消息代理,如RabbitMQ、Redis等。Celery客户端通过它发布任务,而Celery工作者从中取任务执行。
- Worker:执行任务的进程。可以在不同的机器或同一台机器上运行多个工作者。
- Result Backend:用于存储任务的结果。可以是RabbitMQ、Redis、数据库等。
- 特点:
- 支持多种消息代理。
- 支持多种结果存储方式。
- 自动处理失败和重试。
- 可以设置任务的优先级、定时任务、任务过期时间等。
- 应用场景:
- Web请求中执行耗时的操作,如电子邮件发送、大数据处理等。
- 定时任务,例如日常报告、定期数据更新等。
Flower
Flower是Celery的实时监控和Web管理工具。它为用户提供了一个图形界面,可以轻松监控任务队列的状态、工作进程、任务执行情况等。
- 核心特点:
- 实时监控Celery集群的状态。
- 查看任务的详细信息,如参数、状态、执行时间等。
- 动态管理工作者数量和类型。
- 通过Web界面远程控制和管理任务。
- 常见功能:
- 查看每个工作者正在执行的任务。
- 查看任务历史和统计信息。
- 终止运行中的任务。
- 调整工作者池的大小。
总的来说,Celery为开发者提供了在分布式系统中执行任务的能力,而Flower为其提供了监控和管理的工具,使得对Celery的管理更为便捷。
参考链接:
所以准备通过celery来实现任务调度。
并且其实修改代码还是比较容易的,毕竟爬虫代码中就有main()函数,直接在前面增加一些celery的代码即可
from celery import Celery
celery_app = Celery(config.celery_name)
celery_app.config_from_object('celery_config')
@celery_app.task
2
3
4
然后在接口代码中增加导入爬虫文件中的main函数以及celery_app
并修改接口调用,通过main.delay()
来下发任务
from tasks import main, celery_app
@app.post("/start-crawl/")
async def start_crawl(task: str, content_filter: str):
task_result = get_start.delay(task, content_filter)
return {"message": f"Crawling task for {task} started!", "task_id": task_result.id}
2
3
4
5
之后说一下采到的坑吧
- 全局变量问题
- 任务下发后不执行问题
我这里先遇到的2,搜了好多内容,最后发现其实应该是需要修改并发池,将其修改成eventlet
即可,当然修改成gevent
应该也可以,但是我没试。
修改好后就遇到了1,因为我将数据会写入到数据库中,所以很容易就发现了问题,比如说a网站的数据中居然有b网站的内容,但是这是两个完全不相干的网站,然后检查代码发现原来是全局变量的问题。
我这里的爬虫代码是通过读取一个列表中的链接来进行数据爬取的,并且会将新发现的链接写入这个列表。如果定义的是全局变量的话,就会导致不同的任务在执行的时候会串。将定义列表的代码写到main()
中即可解决。
不得不说,比我自己写的任务下发要容易多了,并且需要修改的地方也不是很多,只需要再简单写一个页面即可实现完美的任务下发了。
发现fastapi和eventlet有冲突,于是将fastapi更换成了flask,问题就解决了。
冲突是无法查询celery任务的状态
ps1:新的celery的配置文件变量名改为小写了,记得修改
ps2:flower就是一个查看celery状态的监控
ps3:windows上需要修改成gevent,eventlet好不好使没有试过
ps4:有时候flower上的celery worker会显示offline但是实际上还在运行