celery使用有感

最近在搞任务下发系统,调用接口传入关键参数,进行爬虫爬取。

初始化

一开始是通过threading+subprocess+fastapi来实现的任务下发。

但是遇到很多的问题,虽然都解决了,但是感觉效果并不理想。

目前是通过sqlite3来存储下发任务的记录,然后利用fastapi搭建接口服务。通过接口接收关键词以及爬虫路径然后将任务存放到fastapi提供的BackgrounTasks中,之后利用subprocess执行脚本。

这里说两个困扰我时间比较长的问题

  1. 无法批量执行任务
  2. 部分任务需要查询状态后才能重新执行

这里其实比较容易解决的是2,我们的接口程序关闭后,一些爬虫程序没有结束也跟着关闭了,会导致数据库中的状态没有改变,所以这里直接在程序启动前将所有的任务状态修改成结束/未开始即可。

然后无法批量执行任务这个其实是subprocess执行脚本的问题。

我这里其实一开始以为可以等待任务执行结束后,将返回的状态更新到数据库中。所以写了process.wait()用来等待子进程结束,结果发现一直不会走到下一个进程。通过Chat GPT发现原来这里会阻塞进程,将process.wait()去掉就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
process = subprocess.Popen(
["python", script_path, script_args],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True,
encoding='utf-8'
)
subprocess_pid = process.pid
process.wait()
# stdout_data, stderr_data = process.communicate()
# result = {
# "stdout": str(stdout_data),
# "stderr": str(stderr_data)
# }

但是这样会出现一个问题就是没办法获取到任务执行后的结果,所以我又增加了一个新的接口,通过pid来查询任务状态。

1
2
3
4
5
6
7
8
def get_process_status(pid):
try:
process = psutil.Process(pid)
return {"code": 0, "message": process.status(), "status": "进行中"}
except psutil.NoSuchProcess:
return {"code": 1, "message": "进程已结束/进程不存在/出现错误", "status": "结束"}
except Exception as e:
return {"code": 2, "message": f"未知错误: {str(e)}", "status": "错误"}

这样在执行任务的时候,就可以知道任务的实时情况了。

进化

我后来又有一个爬虫需要通过下发来完成,并且这个任务不是很急,所以准备优雅的搞一下。

通过fastapi+celery+flower来实现任务下发。

Celery

Celery是一个分布式任务队列系统,通常用于在分布式系统中执行后台作业和调度任务。Celery建立在消息传递的基础上,可以用于执行实时计算以及调度任务。

  1. 核心概念
    • Broker:消息代理,如RabbitMQ、Redis等。Celery客户端通过它发布任务,而Celery工作者从中取任务执行。
    • Worker:执行任务的进程。可以在不同的机器或同一台机器上运行多个工作者。
    • Result Backend:用于存储任务的结果。可以是RabbitMQ、Redis、数据库等。
  2. 特点
    • 支持多种消息代理。
    • 支持多种结果存储方式。
    • 自动处理失败和重试。
    • 可以设置任务的优先级、定时任务、任务过期时间等。
  3. 应用场景
    • Web请求中执行耗时的操作,如电子邮件发送、大数据处理等。
    • 定时任务,例如日常报告、定期数据更新等。

Flower

Flower是Celery的实时监控和Web管理工具。它为用户提供了一个图形界面,可以轻松监控任务队列的状态、工作进程、任务执行情况等。

  1. 核心特点
    • 实时监控Celery集群的状态。
    • 查看任务的详细信息,如参数、状态、执行时间等。
    • 动态管理工作者数量和类型。
    • 通过Web界面远程控制和管理任务。
  2. 常见功能
    • 查看每个工作者正在执行的任务。
    • 查看任务历史和统计信息。
    • 终止运行中的任务。
    • 调整工作者池的大小。

总的来说,Celery为开发者提供了在分布式系统中执行任务的能力,而Flower为其提供了监控和管理的工具,使得对Celery的管理更为便捷。

参考链接:

所以准备通过celery来实现任务调度。

并且其实修改代码还是比较容易的,毕竟爬虫代码中就有main()函数,直接在前面增加一些celery的代码即可

1
2
3
4
from celery import Celery
celery_app = Celery(config.celery_name)
celery_app.config_from_object('celery_config')
@celery_app.task

然后在接口代码中增加导入爬虫文件中的main函数以及celery_app并修改接口调用,通过main.delay()来下发任务

1
2
3
4
5
from tasks import main, celery_app
@app.post("/start-crawl/")
async def start_crawl(task: str, content_filter: str):
task_result = get_start.delay(task, content_filter)
return {"message": f"Crawling task for {task} started!", "task_id": task_result.id}

之后说一下采到的坑吧

  1. 全局变量问题
  2. 任务下发后不执行问题

我这里先遇到的2,搜了好多内容,最后发现其实应该是需要修改并发池,将其修改成eventlet即可,当然修改成gevent应该也可以,但是我没试。

修改好后就遇到了1,因为我将数据会写入到数据库中,所以很容易就发现了问题,比如说a网站的数据中居然有b网站的内容,但是这是两个完全不相干的网站,然后检查代码发现原来是全局变量的问题。

我这里的爬虫代码是通过读取一个列表中的链接来进行数据爬取的,并且会将新发现的链接写入这个列表。如果定义的是全局变量的话,就会导致不同的任务在执行的时候会串。将定义列表的代码写到main()中即可解决。

不得不说,比我自己写的任务下发要容易多了,并且需要修改的地方也不是很多,只需要再简单写一个页面即可实现完美的任务下发了。

发现fastapi和eventlet有冲突,于是将fastapi更换成了flask,问题就解决了。

冲突是无法查询celery任务的状态

ps1:新的celery的配置文件变量名改为小写了,记得修改

ps2:flower就是一个查看celery状态的监控

ps3:windows上需要修改成gevent,eventlet好不好使没有试过

ps4:有时候flower上的celery worker会显示offline但是实际上还在运行

亚马逊采集调研

亚马逊同样的请求用requests发现无法获取到数据,通过测试发现可能是因为TLS指纹的问题,通过使用curl_cffi库进行请求发现可以正常获取数据

验证码每次请求都会变解决方法

如果利用模拟浏览器等方式通过验证码会遇到验证码每次请求都会变,那么这样就不能直接请求获取到验证码数据了。

所以这里其实直接通过js下载图片就行了。

1
2
3
4
5
let c = document.createElement('canvas');let ctx = c.getContext('2d');
let img = document.querySelector('.mac_verify_img') // 获取验证码元素
c.height=img.naturalHeight;c.width=img.naturalWidth;
ctx.drawImage(img, 0, 0,img.naturalWidth, img.naturalHeight);
let base64String = c.toDataURL();

这里的base64String就是当前验证码的base64地址了

image-20230324104359504