Python httpx + asyncio 的异步并发请求
前言:
httpx库的并发请求是基于 asyncio
的,asyncio
是 Python 标准库中用于编写 异步 I/O 程序 的模块。它基于 事件循环(event loop) 和 协程(coroutines) 实现,非常适合处理 I/O 密集型任务(如网络请求、数据库查询和文件读写等),而不是 CPU 密集型任务(如大量的数学运算、加密解密、图像处理等)。
⚠️ 注意:
asyncio
不是多线程或多进程,而是单线程内通过事件循环调度多个协程交替执行 —— 这叫“并发”,不是“并行”。
用法:
1、使用 asyncio.create_task()
并发执行
import asyncio
async def fetch(i):
print(f"Start {i}")
await asyncio.sleep(1)
print(f"Done {i}")
async def main():
task1 = asyncio.create_task(fetch(1))
task2 = asyncio.create_task(fetch(2))
await task1 # 这两个 await 只是为了确保任务完成
await task2
asyncio.run(main())
2、使用 asyncio.gather()
并发执行**(推荐)**
import asyncio
async def fetch(i):
print(f"Start {i}")
await asyncio.sleep(1)
print(f"Done {i}")
async def main():
await asyncio.gather(
fetch(1),
fetch(2),
fetch(3),
)
asyncio.run(main())
流程图解:
用法1,图解运行过程如下:
Event Loop 开始运行
│
├─ 调度 main()
│ │
│ ├─ 创建 task1 = fetch(1) → 加入事件循环
│ ├─ 创建 task2 = fetch(2) → 加入事件循环
│ │
│ ├─ await task1 → main() 暂停,等待 task1 完成
│ └─ await task2 → 等待 task2 完成(若已完成则直接通过)
│
├─ task1 开始执行
│ ├─ 打印 "Start 1"
│ └─ await asyncio.sleep(1) → 暂停,让出 CPU
│
├─ task2 开始执行
│ ├─ 打印 "Start 2"
│ └─ await asyncio.sleep(1) → 暂停,让出 CPU
│
└─ 约 1 秒后,sleep 完成
├─ task1 打印 "Done 1"
└─ task2 打印 "Done 2"
main() 完成 → Event Loop 停止
用法2,图解运行过程如下:
Event Loop 开始运行
│
├─ 调度 main()
│ │
│ ├─ 创建 fetch(1) 协程 → 加入事件循环
│ ├─ 创建 fetch(2) 协程 → 加入事件循环
│ ├─ 创建 fetch(3) 协程 → 加入事件循环
│ │
│ └─ 等待所有协程完成(await gather)
│
├─ fetch(1) 开始执行
│ ├─ 打印 "Start 1"
│ └─ await asyncio.sleep(1) → 暂停,让出 CPU
│
├─ fetch(2) 开始执行
│ ├─ 打印 "Start 2"
│ └─ await asyncio.sleep(1) → 暂停,让出 CPU
│
├─ fetch(3) 开始执行
│ ├─ 打印 "Start 3"
│ └─ await asyncio.sleep(1) → 暂停,让出 CPU
│
└─ 约 1 秒后,所有 sleep 完成
├─ fetch(1) 打印 "Done 1"
├─ fetch(2) 打印 "Done 2"
└─ fetch(3) 打印 "Done 3"
main() 完成 → Event Loop 停止
实操推演:
预期:对
getStudentInfo?id=1
接口进行10次搞笑并发操作。
下面会通过5个实例来逐步认识asyncio的能力和使用,最终会得到高效的并发实例。
实例1、使用for循环
简单使用for循环10次接口。
import random
import time
import httpx
import asyncio
import json
import logging
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
class httpxMethods:
@staticmethod
def get_method(task_id):
logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
# get请求
getUrl = "http://127.0.0.1:8080/getStudentInfo"
payload = {'id': '1'}
res = httpx.get(getUrl, params=payload).json()
json_str = json.dumps(res, indent=4, ensure_ascii=False)
logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
return json_str
if __name__ == "__main__":
startTime = time.time()
for i in range(1, 11):
result = httpxMethods.get_method(i)
# logging.info(result)
endTime = time.time()
logging.info(f'总共耗时:{endTime-startTime}')
客户端日志:
服务端日志:
👀️ 总结:
这个 for
循环操作耗时约9秒,比较费时间,并且服务端收到的请求不是一个时间点进来的,并不符合并发!
实例2、引入asyncio异步处理
import random
import time
import httpx
import asyncio
import json
import logging
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
class httpxMethods:
# 设置最大并发数
MAX_CONCURRENCY = 10
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
# 自定义线程池大小(防止资源耗尽)
executor = ThreadPoolExecutor(max_workers=10)
@staticmethod
async def async_get_method(client, task_id):
logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
async with httpxMethods.semaphore:
url = "http://127.0.0.1:8080/getStudentInfo"
payload = {'id': 1}
try:
# 1. 发起请求
response = await client.get(url, params=payload, timeout=10.0)
# 2. 检查状态码
response.raise_for_status()
# 3. 解析JSON
try:
data = response.json() # 注意:这里不需要await!
logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
return data
except json.JSONDecodeError:
return {"error": "Invalid JSON response"}
except httpx.HTTPStatusError as e:
return {"error": f"HTTP error: {e}"}
except httpx.RequestError as e:
return {"error": f"Request error: {e}"}
@staticmethod
async def run_concurrent_requests(num):
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = [
httpxMethods.async_get_method(client, i)
for i in range(num)
]
results = await asyncio.gather(*tasks)
return json.dumps(results, indent=4, ensure_ascii=False)
if __name__ == "__main__":
startTime = time.time()
result = asyncio.run(httpxMethods.run_concurrent_requests(10))
# logging.info(result)
endTime = time.time()
logging.info(f'总共耗时:{endTime-startTime}')
客户端日志:
从日志上看,10次请求总耗时1秒左右完成。
服务端日志
服务端接收到的请求均是同一个时间点进来的。
👀️ 总结:
这段演示代码比较简单,是通过 asyncio.gather()
来进行 asyncio
异步并发多个协程 Task
,10次接口请求从原来 for
循环的9秒缩减到1秒左右,大大提高了效率,实现并发效果。
实例3、业务逻辑升级,get请求前需查询数据库
我们平常的接口请求基本不会像 实例2
这样简单,常常会遇到若干场景,如接口请求前,需要我们查询数据库,获得关键信息来作为接口的入参,再进行并发!
import random
import time
import httpx
import asyncio
import json
import logging
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
class httpxMethods:
# 设置最大并发数
MAX_CONCURRENCY = 10
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
# 自定义线程池大小(防止资源耗尽)
executor = ThreadPoolExecutor(max_workers=10)
@staticmethod
async def get_data():
# 模拟同步操作消耗时间,如pymysql操作
time.sleep(random.choice([1, 3, 5]))
return 1
@staticmethod
async def async_get_method(client, task_id):
logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
async with httpxMethods.semaphore:
url = "http://127.0.0.1:8080/getStudentInfo"
payload = {'id': await httpxMethods.get_data()}
try:
# 1. 发起请求
response = await client.get(url, params=payload, timeout=10.0)
# 2. 检查状态码
response.raise_for_status()
# 3. 解析JSON
try:
data = response.json() # 注意:这里不需要await!
logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
return data
except json.JSONDecodeError:
return {"error": "Invalid JSON response"}
except httpx.HTTPStatusError as e:
return {"error": f"HTTP error: {e}"}
except httpx.RequestError as e:
return {"error": f"Request error: {e}"}
@staticmethod
async def run_concurrent_requests(num):
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = [
httpxMethods.async_get_method(client, i)
for i in range(num)
]
results = await asyncio.gather(*tasks)
return json.dumps(results, indent=4, ensure_ascii=False)
if __name__ == "__main__":
startTime = time.time()
result = asyncio.run(httpxMethods.run_concurrent_requests(10))
# logging.info(result)
endTime = time.time()
logging.info(f'总共耗时:{endTime-startTime}')
客户端日志:
从日志上看,增加了模拟数据库查询操作的get_data()后,耗时又增加到了30秒左右。
服务端日志:
从日志的打印时间上看,接口请求依旧还是同一秒进来的。
总结:
客户端代码的运行效率太慢了,明显不符合asyncio的异步特性,虽然从服务端上看是并发,但并不高效呀!!!不高效的原因:就是新增的 get_data()
方法卡线程了,这里用到“time.sleep()
”模拟数据库查询,属于是同步阻塞函数,而 asyncio
是单线程内通过事件循环调度多个协程交替执行来提高效率的,冲突了。
实例4、引入loop.run_in_executor()增效
针对实例3的窘境,asyncio
提供了loop.run_in_executor()方法,可以将同步阻塞的代码放到这个线程池中运行,避免阻塞事件循环!
import random
import time
import httpx
import asyncio
import json
import logging
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
class httpxMethods:
# 设置最大并发数
MAX_CONCURRENCY = 10
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
# 自定义线程池大小(防止资源耗尽)
executor = ThreadPoolExecutor(max_workers=10)
@staticmethod
def get_data():
# 模拟同步操作消耗时间,如pymysql操作
time.sleep(random.choice([1, 3, 5]))
return 1
@staticmethod
async def async_get_method(client, task_id):
logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
# loop.run_in_executor()将 get_data 放到线程池中并发执行,提高了效率。
loop = asyncio.get_event_loop()
id1 = await loop.run_in_executor(httpxMethods.executor, httpxMethods.get_data)
async with httpxMethods.semaphore:
url = "http://127.0.0.1:8080/getStudentInfo"
payload = {'id': id1}
try:
# 1. 发起请求
response = await client.get(url, params=payload, timeout=10.0)
# 2. 检查状态码
response.raise_for_status()
# 3. 解析JSON
try:
data = response.json() # 注意:这里不需要await!
logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
return data
except json.JSONDecodeError:
return {"error": "Invalid JSON response"}
except httpx.HTTPStatusError as e:
return {"error": f"HTTP error: {e}"}
except httpx.RequestError as e:
return {"error": f"Request error: {e}"}
@staticmethod
async def run_concurrent_requests(num):
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = [
httpxMethods.async_get_method(client, i)
for i in range(num)
]
results = await asyncio.gather(*tasks)
return json.dumps(results, indent=4, ensure_ascii=False)
if __name__ == "__main__":
startTime = time.time()
result = asyncio.run(httpxMethods.run_concurrent_requests(10))
# logging.info(result)
endTime = time.time()
logging.info(f'总共耗时:{endTime-startTime}')
客户端日志:
从日志上看,增加了 loop.run_in_executor()
后,客户端代码耗时从30秒缩减到了6秒左右!!!
服务端日志:
很可惜,服务端的日志上看,接口并不是并发请求进来的!!!
总结:
客户端提高了效率的原因是 loop.run_in_executor()
,它让 get_data()
并发执行了!
服务端接收的接口不是同一时间点进来的原因,也是因为 loop.run_in_executor()
,基于 time.sleep(random.choice([1, 3, 5]))
随机睡几秒,那么各个任务就可能在不同的时间点完成准备工作,最终导致请求不能在同一时间点发送出去。
总而言之,就是并发执行了逻辑代码,但没有并发发起接口请求!!!
实例5_1、使用计数器来实现并发
这里演示方法1,通过计数器来实现接口并发!
import random
import time
import httpx
import asyncio
import logging
import json
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
class httpxMethods:
MAX_CONCURRENCY = 10
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
executor = ThreadPoolExecutor(max_workers=10)
@staticmethod
def get_data():
time.sleep(random.choice([1, 3, 5]))
return 1
@staticmethod
async def async_get_method(client, task_id, ready_event, counter):
logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
# 执行同步阻塞操作
loop = asyncio.get_event_loop()
student_id = await loop.run_in_executor(httpxMethods.executor, httpxMethods.get_data)
# 计数器,保证线程安全地更新计数(确保前置操作完成,再调用 ready_event.set() 通知所有任务可以继续)
async with counter['lock']:
counter['completed'] += 1
if counter['completed'] == counter['total']:
ready_event.set()
# 等待统一信号
await ready_event.wait()
# 控制并发请求
async with httpxMethods.semaphore:
url = "http://127.0.0.1:8080/getStudentInfo"
try:
response = await client.get(url, params={'id': student_id}, timeout=15.0)
logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
return response.json()
except Exception as e:
return {"error": str(e)}
@staticmethod
async def run_concurrent_requests(num_tasks):
ready_event = asyncio.Event()
counter = {'completed': 0, 'total': num_tasks, 'lock': asyncio.Lock()}
async with httpx.AsyncClient(timeout=20.0) as client:
tasks = [
httpxMethods.async_get_method(client, i, ready_event, counter)
for i in range(num_tasks)
]
return await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
start_time = time.time()
results = asyncio.run(httpxMethods.run_concurrent_requests(10))
logging.info(results)
logging.info(f"总耗时: {time.time()-start_time: .2f}秒")
实例5_2、使用 asyncio.Barrier
来实现接口并发(推荐)
推荐!!!
import random
import time
import httpx
import asyncio
import logging
import json
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
class httpxMethods:
MAX_CONCURRENCY = 10
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
executor = ThreadPoolExecutor(max_workers=10)
@staticmethod
def get_data():
time.sleep(random.choice([1, 3, 5]))
return 1
@staticmethod
async def async_get_method(client, task_id, barrier):
logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
# 执行同步阻塞操作
loop = asyncio.get_event_loop()
student_id = await loop.run_in_executor(httpxMethods.executor, httpxMethods.get_data)
# 所有任务在此等待彼此完成前置操作
await barrier.wait()
# 控制并发请求
async with httpxMethods.semaphore:
url = "http://127.0.0.1:8080/getStudentInfo"
try:
response = await client.get(url, params={'id': student_id}, timeout=15.0)
logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
return response.json()
except Exception as e:
return {"error": str(e)}
@staticmethod
async def run_concurrent_requests(num_tasks):
# 创建一个 barrier,num_tasks 个任务都到达后才会释放
barrier = asyncio.Barrier(num_tasks)
counter = {'completed': 0, 'total': num_tasks, 'lock': asyncio.Lock()}
async with httpx.AsyncClient(timeout=20.0) as client:
tasks = [
httpxMethods.async_get_method(client, i, barrier)
for i in range(num_tasks)
]
return await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
start_time = time.time()
results = asyncio.run(httpxMethods.run_concurrent_requests(10))
logging.info(results)
logging.info(f"总耗时: {time.time()-start_time: .2f}秒")
客户端日志:
代码运行耗时依旧是6秒左右!
服务端日志:
服务端日志上看,接口都是同一个时间点请求进来的!!!
总结:
实例5的2种方式都能实现最终高效并发的目的!更推荐使用 Barrier
更简洁、更直观、更符合语义。
asyncio.Barrier(n)
是一个同步原语,用于让一组协程都等待彼此到达某个点(称为“屏障点”),然后一起继续执行。
补充:
java服务端代码:
https://gitee.com/a_cloud/python_springboot_demo.git
python客户端代码:
https://gitee.com/a_cloud/self_study_python.git
评论区