Python asyncio 并发采集背压与取消传播调试实战:从卡死到优雅收敛

Python asyncio 并发采集背压与取消传播调试实战:从卡死到优雅收敛

技术主题:Python 编程语言
内容方向:具体功能的调试过程(并发采集中的背压与取消传播)

引言

在一次高并发采集任务中,我们遇到了“启动很快、跑一阵就卡住,CPU 占用低、任务不收敛”的诡异现象。初看像是网络不稳,深入分析后发现是典型的 asyncio 背压缺失与取消传播缺位:生产者无限制投喂、消费者被慢请求拖住、超时后任务遗留无法收敛。本文复盘完整调试过程,并给出可复用的代码骨架:有界队列 + 限流 + 超时 + 幂等重试 + 结构化并发 + 优雅关停。

一、问题现象与影响

  • 现象:
    • 运行 5-10 分钟后 QPS 降至个位数;
    • metrics 显示在途任务数持续增长,完成速率下降;
    • Ctrl+C 退出时阻塞数十秒,仍有 pending tasks;
    • 个别请求耗时超过 60s,堆栈聚焦在 TCP 读写等待。
  • 影响:
    • 吞吐下降 70%+,长尾延迟恶化,任务无法按窗口完成;
    • 频繁残留僵尸连接,影响下游服务限流与黑名单。

二、排查步骤

  1. 复现与最小化:构造 2% 慢接口 + 随机丢包,快速触发卡死;
  2. 指标对齐:记录提交速率、在途任务、完成速率、超时/重试计数;
  3. 采样堆栈:多次采样定位停滞点在网络等待与 gather 阻塞;
  4. 假设与验证:
    • 假设 A:无背压导致排队爆炸 → 加有界队列验证;
    • 假设 B:取消不传播导致僵尸任务 → 引入超时 + 取消级联验证;
    • 假设 C:连接池过大/过小 → 调整连接上限验证。

三、根因分析

  • 生产者使用 for … create_task 无上限投喂,缺乏背压;
  • 使用 asyncio.gather(…, return_exceptions=True) 扫尾,但对内部超时/取消未处理,导致任务泄漏;
  • aiohttp 默认连接池未限流,遇到慢端点时连接被耗尽,整体吞吐雪崩;
  • 信号处理缺失,无法优雅关停,导致退出时大量 pending。

四、改造思路(工程化范式)

  • 背压:
    • 有界 asyncio.Queue 实现生产者-消费者,队列满则生产者 await,形成自然背压;
    • 并发用 asyncio.Semaphore 或连接池上限保证“最大在途”。
  • 超时与重试:
    • 每个 I/O 加 asyncio.wait_for + 指数退避,设置重试上限与幂等键避免重复副作用;
  • 取消传播与结构化并发:
    • Python 3.11+ 使用 asyncio.TaskGroup 保证异常/取消向下游传播并收敛;
    • 3.10- 使用手动 task 集合和取消风暴屏蔽(asyncio.shield 仅在必要处)。
  • 连接池与限流:
    • 限制 TCP 连接总数与并发出站请求数,避免被慢端点拖死;
  • 优雅关停:
    • 捕获信号,先停止生产,再等待队列清空,超时后取消未完成消费者与在途任务。

五、关键代码(可直接套用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# python 3.11+ 推荐方案:TaskGroup + 有界队列 + 超时/重试
import asyncio
import signal
import random
from typing import Optional
import aiohttp

MAX_CONN = 200
CONCURRENCY = 100
QUEUE_SIZE = 1000
REQUEST_TIMEOUT = 8
RETRY = 2

class GracefulExit(SystemExit):
pass

async def fetch(session: aiohttp.ClientSession, url: str, *, attempt: int = 0) -> str:
delay = min(0.5 * (2 ** attempt) + random.random() * 0.2, 3.0)
try:
async def _do():
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.text()
return await asyncio.wait_for(_do(), timeout=REQUEST_TIMEOUT)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt < RETRY:
await asyncio.sleep(delay)
return await fetch(session, url, attempt=attempt + 1)
raise e

async def worker(name: int, q: asyncio.Queue, sem: asyncio.Semaphore, session: aiohttp.ClientSession):
while True:
try:
url = await q.get()
async with sem: # 并发上限
try:
data = await fetch(session, url)
# TODO: 处理结果,注意幂等
finally:
q.task_done()
except asyncio.CancelledError:
# 收尾逻辑(如 flush 缓冲)
raise

async def produce(urls, q: asyncio.Queue):
for u in urls:
await q.put(u) # 有界队列,天然背压

async def main(urls):
stop = asyncio.Event()

def _signal_handler():
stop.set()

loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, _signal_handler)
except NotImplementedError:
pass

conn = aiohttp.TCPConnector(limit=MAX_CONN, enable_cleanup_closed=True)
timeout = aiohttp.ClientTimeout(total=None, sock_connect=10, sock_read=REQUEST_TIMEOUT + 2)

q: asyncio.Queue[str] = asyncio.Queue(maxsize=QUEUE_SIZE)
sem = asyncio.Semaphore(CONCURRENCY)

async with aiohttp.ClientSession(connector=conn, timeout=timeout) as session:
async with asyncio.TaskGroup() as tg:
tg.create_task(produce(urls, q))
for i in range(CONCURRENCY):
tg.create_task(worker(i, q, sem, session))

# 关停协程
async def shutdown_watcher():
await stop.wait()
# 停止生产:不再 put,新任务不进入队列
# 等待已有任务在一个软超时内完成
try:
await asyncio.wait_for(q.join(), timeout=15)
except asyncio.TimeoutError:
pass
# 取消所有剩余工作者
tg._tasks # type: ignore[attr-defined]
for t in list(tg._tasks): # 访问 TaskGroup 内部任务集合(3.11 无公开 API,小心升级)
if t is not asyncio.current_task():
t.cancel()

tg.create_task(shutdown_watcher())

if __name__ == "__main__":
# 示例 URL
urls = [f"https://httpbin.org/delay/{random.choice([0,0,0,1,2])}" for _ in range(5000)]
asyncio.run(main(urls))

3.10- 版本可以用手动任务集管理与统一取消,关键点保持一致:有界队列、限流、超时/重试、优雅关停。

六、验证与指标

  • 吞吐与延迟:记录提交速率、完成速率、在途任务、P95/P99 延迟;
  • 稳定性:重试次数、超时比例、取消任务数、连接池占用率;
  • 收敛性:收到 SIGINT 后到“全部任务完成或被取消”的时间;
  • 观察期建议至少 1 小时,覆盖慢端点与网络抖动窗口。

七、常见坑与对策

  • gather 全包 return_exceptions=True 掩盖异常 → 使用 TaskGroup 或逐任务 await;
  • 无界 create_task 洪泛 → 有界队列/信号量;
  • 只设总超时,不设读超时 → 分层超时(连接/读/总);
  • 取消风暴 → 仅在关键区段 shield,避免吞掉取消;
  • 重试无幂等 → 给请求打幂等键,或在落库侧去重。

总结

这次事故的根因不是“网络不稳定”,而是并发工程缺少背压与取消传播。引入有界队列+限流、分层超时与幂等重试、结构化并发与优雅关停后,系统能在压力变化与异常情况下稳定收敛。以上骨架可直接迁移至你的采集、调用聚合或批处理任务中,先把“在途任务是有上限的”这一点落实,再谈吞吐优化。