侧边栏壁纸
博主头像
一朵云的博客博主等级

拥抱生活,向阳而生。

  • 累计撰写 108 篇文章
  • 累计创建 28 个标签
  • 累计收到 7 条评论

目 录CONTENT

文章目录

Python -- httpx + asyncio 的异步并发请求

一朵云
2023-04-22 / 0 评论 / 1 点赞 / 1536 阅读 / 19889 字

Python httpx + asyncio 的异步并发请求

前言:

​ ​ httpx库的并发请求是基于 asyncio 的,asyncio 是 Python 标准库中用于编写 异步 I/O 程序 的模块。它基于 事件循环(event loop)协程(coroutines) 实现,非常适合处理 I/O 密集型任务(如网络请求、数据库查询和文件读写等),而不是 CPU 密集型任务(如大量的数学运算、加密解密、图像处理等)。

⚠️ 注意:asyncio 不是多线程或多进程,而是单线程内通过事件循环调度多个协程交替执行 —— 这叫“并发”,不是“并行”。

用法:

1、使用 asyncio.create_task() 并发执行

import asyncio

async def fetch(i):
    print(f"Start {i}")
    await asyncio.sleep(1)
    print(f"Done {i}")

async def main():
    task1 = asyncio.create_task(fetch(1))
    task2 = asyncio.create_task(fetch(2))

    await task1  # 这两个 await 只是为了确保任务完成
    await task2

asyncio.run(main())

2、使用 asyncio.gather() 并发执行**(推荐)**

import asyncio

async def fetch(i):
    print(f"Start {i}")
    await asyncio.sleep(1)
    print(f"Done {i}")

async def main():
    await asyncio.gather(
        fetch(1),
        fetch(2),
        fetch(3),
    )

asyncio.run(main())

流程图解:

用法1,图解运行过程如下:

Event Loop 开始运行
│
├─ 调度 main()
│   │
│   ├─ 创建 task1 = fetch(1) → 加入事件循环
│   ├─ 创建 task2 = fetch(2) → 加入事件循环
│   │
│   ├─ await task1 → main() 暂停,等待 task1 完成
│   └─ await task2 → 等待 task2 完成(若已完成则直接通过)
│
├─ task1 开始执行
│   ├─ 打印 "Start 1"
│   └─ await asyncio.sleep(1) → 暂停,让出 CPU
│
├─ task2 开始执行
│   ├─ 打印 "Start 2"
│   └─ await asyncio.sleep(1) → 暂停,让出 CPU
│
└─ 约 1 秒后,sleep 完成
    ├─ task1 打印 "Done 1"
    └─ task2 打印 "Done 2"

main() 完成 → Event Loop 停止

用法2,图解运行过程如下:

Event Loop 开始运行
│
├─ 调度 main()
│   │
│   ├─ 创建 fetch(1) 协程 → 加入事件循环
│   ├─ 创建 fetch(2) 协程 → 加入事件循环
│   ├─ 创建 fetch(3) 协程 → 加入事件循环
│   │
│   └─ 等待所有协程完成(await gather)
│
├─ fetch(1) 开始执行
│   ├─ 打印 "Start 1"
│   └─ await asyncio.sleep(1) → 暂停,让出 CPU
│
├─ fetch(2) 开始执行
│   ├─ 打印 "Start 2"
│   └─ await asyncio.sleep(1) → 暂停,让出 CPU
│
├─ fetch(3) 开始执行
│   ├─ 打印 "Start 3"
│   └─ await asyncio.sleep(1) → 暂停,让出 CPU
│
└─ 约 1 秒后,所有 sleep 完成
    ├─ fetch(1) 打印 "Done 1"
    ├─ fetch(2) 打印 "Done 2"
    └─ fetch(3) 打印 "Done 3"

main() 完成 → Event Loop 停止

实操推演:

预期:对 getStudentInfo?id=1 接口进行10次搞笑并发操作。

下面会通过5个实例来逐步认识asyncio的能力和使用,最终会得到高效的并发实例。

实例1、使用for循环

简单使用for循环10次接口。

import random
import time
import httpx
import asyncio
import json
import logging
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')


class httpxMethods:

    @staticmethod
    def get_method(task_id):
        logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
        # get请求
        getUrl = "http://127.0.0.1:8080/getStudentInfo"
        payload = {'id': '1'}
        res = httpx.get(getUrl, params=payload).json()
        json_str = json.dumps(res, indent=4, ensure_ascii=False)
        logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
        return json_str


if __name__ == "__main__":
    startTime = time.time()
    for i in range(1, 11):
        result = httpxMethods.get_method(i)
        # logging.info(result)
    endTime = time.time()
    logging.info(f'总共耗时:{endTime-startTime}')

客户端日志:

客户端demo0.png

服务端日志:

服务端demo0.png

👀️ 总结:

这个 for循环操作耗时约9秒,比较费时间,并且服务端收到的请求不是一个时间点进来的,并不符合并发!

实例2、引入asyncio异步处理

import random
import time
import httpx
import asyncio
import json
import logging
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')

class httpxMethods:

    # 设置最大并发数
    MAX_CONCURRENCY = 10
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    # 自定义线程池大小(防止资源耗尽)
    executor = ThreadPoolExecutor(max_workers=10)


    @staticmethod
    async def async_get_method(client, task_id):
        logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')

        async with httpxMethods.semaphore:

            url = "http://127.0.0.1:8080/getStudentInfo"
            payload = {'id': 1}

            try:
                # 1. 发起请求
                response = await client.get(url, params=payload, timeout=10.0)
                # 2. 检查状态码
                response.raise_for_status()
                # 3. 解析JSON
                try:
                    data = response.json()  # 注意:这里不需要await!
                    logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
                    return data
                except json.JSONDecodeError:
                    return {"error": "Invalid JSON response"}

            except httpx.HTTPStatusError as e:
                return {"error": f"HTTP error: {e}"}
            except httpx.RequestError as e:
                return {"error": f"Request error: {e}"}

    @staticmethod
    async def run_concurrent_requests(num):

        async with httpx.AsyncClient(timeout=10.0) as client:
            tasks = [
                httpxMethods.async_get_method(client, i)
                for i in range(num)
            ]

            results = await asyncio.gather(*tasks)
            return json.dumps(results, indent=4, ensure_ascii=False)


if __name__ == "__main__":
    startTime = time.time()
    result = asyncio.run(httpxMethods.run_concurrent_requests(10))
    # logging.info(result)
    endTime = time.time()
    logging.info(f'总共耗时:{endTime-startTime}')


客户端日志:

从日志上看,10次请求总耗时1秒左右完成。

demo1客户端.png

服务端日志

服务端接收到的请求均是同一个时间点进来的。

demo1服务端.png

👀️ 总结:

这段演示代码比较简单,是通过 asyncio.gather()来进行 asyncio异步并发多个协程 Task,10次接口请求从原来 for循环的9秒缩减到1秒左右,大大提高了效率,实现并发效果。

实例3、业务逻辑升级,get请求前需查询数据库

我们平常的接口请求基本不会像 实例2这样简单,常常会遇到若干场景,如接口请求前,需要我们查询数据库,获得关键信息来作为接口的入参,再进行并发!

import random
import time
import httpx
import asyncio
import json
import logging
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')


class httpxMethods:

    # 设置最大并发数
    MAX_CONCURRENCY = 10
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    # 自定义线程池大小(防止资源耗尽)
    executor = ThreadPoolExecutor(max_workers=10)

    @staticmethod
    async def get_data():
        # 模拟同步操作消耗时间,如pymysql操作
        time.sleep(random.choice([1, 3, 5]))
        return 1

    @staticmethod
    async def async_get_method(client, task_id):

        logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')

        async with httpxMethods.semaphore:

            url = "http://127.0.0.1:8080/getStudentInfo"
            payload = {'id': await httpxMethods.get_data()}

            try:
                # 1. 发起请求
                response = await client.get(url, params=payload, timeout=10.0)
                # 2. 检查状态码
                response.raise_for_status()
                # 3. 解析JSON
                try:
                    data = response.json()  # 注意:这里不需要await!
                    logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
                    return data
                except json.JSONDecodeError:
                    return {"error": "Invalid JSON response"}

            except httpx.HTTPStatusError as e:
                return {"error": f"HTTP error: {e}"}
            except httpx.RequestError as e:
                return {"error": f"Request error: {e}"}

    @staticmethod
    async def run_concurrent_requests(num):

        async with httpx.AsyncClient(timeout=10.0) as client:
            tasks = [
                httpxMethods.async_get_method(client, i)
                for i in range(num)
            ]

            results = await asyncio.gather(*tasks)
            return json.dumps(results, indent=4, ensure_ascii=False)


if __name__ == "__main__":
    startTime = time.time()
    result = asyncio.run(httpxMethods.run_concurrent_requests(10))
    # logging.info(result)
    endTime = time.time()
    logging.info(f'总共耗时:{endTime-startTime}')


客户端日志:

从日志上看,增加了模拟数据库查询操作的get_data()后,耗时又增加到了30秒左右。

demo2客户端.png

服务端日志:

从日志的打印时间上看,接口请求依旧还是同一秒进来的。

demo2服务端.png

总结:

客户端代码的运行效率太慢了,明显不符合asyncio的异步特性,虽然从服务端上看是并发,但并不高效呀!!!不高效的原因:就是新增的 get_data()方法卡线程了,这里用到“time.sleep()”模拟数据库查询,属于是同步阻塞函数,而 asyncio 是单线程内通过事件循环调度多个协程交替执行来提高效率的,冲突了。

实例4、引入loop.run_in_executor()增效

针对实例3的窘境,asyncio 提供了loop.run_in_executor()方法,可以将同步阻塞的代码放到这个线程池中运行,避免阻塞事件循环!

import random
import time
import httpx
import asyncio
import json
import logging
from concurrent.futures import ThreadPoolExecutor


logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')


class httpxMethods:

    # 设置最大并发数
    MAX_CONCURRENCY = 10
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    # 自定义线程池大小(防止资源耗尽)
    executor = ThreadPoolExecutor(max_workers=10)

    @staticmethod
    def get_data():
        # 模拟同步操作消耗时间,如pymysql操作
        time.sleep(random.choice([1, 3, 5]))
        return 1

    @staticmethod
    async def async_get_method(client, task_id):

        logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')

        # loop.run_in_executor()将 get_data 放到线程池中并发执行,提高了效率。
        loop = asyncio.get_event_loop()
        id1 = await loop.run_in_executor(httpxMethods.executor, httpxMethods.get_data)

        async with httpxMethods.semaphore:

            url = "http://127.0.0.1:8080/getStudentInfo"
            payload = {'id': id1}

            try:
                # 1. 发起请求
                response = await client.get(url, params=payload, timeout=10.0)
                # 2. 检查状态码
                response.raise_for_status()
                # 3. 解析JSON
                try:
                    data = response.json()  # 注意:这里不需要await!
                    logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
                    return data
                except json.JSONDecodeError:
                    return {"error": "Invalid JSON response"}

            except httpx.HTTPStatusError as e:
                return {"error": f"HTTP error: {e}"}
            except httpx.RequestError as e:
                return {"error": f"Request error: {e}"}

    @staticmethod
    async def run_concurrent_requests(num):

        async with httpx.AsyncClient(timeout=10.0) as client:
            tasks = [
                httpxMethods.async_get_method(client, i)
                for i in range(num)
            ]

            results = await asyncio.gather(*tasks)
            return json.dumps(results, indent=4, ensure_ascii=False)


if __name__ == "__main__":
    startTime = time.time()
    result = asyncio.run(httpxMethods.run_concurrent_requests(10))
    # logging.info(result)
    endTime = time.time()
    logging.info(f'总共耗时:{endTime-startTime}')


客户端日志:

从日志上看,增加了 loop.run_in_executor()后,客户端代码耗时从30秒缩减到了6秒左右!!!

demo3客户端.png

服务端日志:

很可惜,服务端的日志上看,接口并不是并发请求进来的!!!

demo3服务端.png

总结:

客户端提高了效率的原因是 loop.run_in_executor(),它让 get_data()并发执行了!

服务端接收的接口不是同一时间点进来的原因,也是因为 loop.run_in_executor(),基于 time.sleep(random.choice([1, 3, 5])) 随机睡几秒,那么各个任务就可能在不同的时间点完成准备工作,最终导致请求不能在同一时间点发送出去。

总而言之,就是并发执行了逻辑代码,但没有并发发起接口请求!!!

实例5_1、使用计数器来实现并发

这里演示方法1,通过计数器来实现接口并发!

import random
import time
import httpx
import asyncio
import logging
import json
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')


class httpxMethods:
    MAX_CONCURRENCY = 10
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    executor = ThreadPoolExecutor(max_workers=10)

    @staticmethod
    def get_data():
        time.sleep(random.choice([1, 3, 5]))
        return 1

    @staticmethod
    async def async_get_method(client, task_id, ready_event, counter):

        logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
        # 执行同步阻塞操作
        loop = asyncio.get_event_loop()
        student_id = await loop.run_in_executor(httpxMethods.executor, httpxMethods.get_data)

        # 计数器,保证线程安全地更新计数(确保前置操作完成,再调用 ready_event.set() 通知所有任务可以继续)
        async with counter['lock']:
            counter['completed'] += 1
            if counter['completed'] == counter['total']:
                ready_event.set()

        # 等待统一信号
        await ready_event.wait()

        # 控制并发请求
        async with httpxMethods.semaphore:
            url = "http://127.0.0.1:8080/getStudentInfo"
            try:
                response = await client.get(url, params={'id': student_id}, timeout=15.0)
                logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
                return response.json()
            except Exception as e:
                return {"error": str(e)}

    @staticmethod
    async def run_concurrent_requests(num_tasks):
        ready_event = asyncio.Event()
        counter = {'completed': 0, 'total': num_tasks, 'lock': asyncio.Lock()}

        async with httpx.AsyncClient(timeout=20.0) as client:
            tasks = [
                httpxMethods.async_get_method(client, i, ready_event, counter)
                for i in range(num_tasks)
            ]
            return await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    start_time = time.time()
    results = asyncio.run(httpxMethods.run_concurrent_requests(10))
    logging.info(results)
    logging.info(f"总耗时: {time.time()-start_time: .2f}秒")

实例5_2、使用 asyncio.Barrier来实现接口并发(推荐)

推荐!!!

import random
import time
import httpx
import asyncio
import logging
import json
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')


class httpxMethods:
    MAX_CONCURRENCY = 10
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    executor = ThreadPoolExecutor(max_workers=10)

    @staticmethod
    def get_data():
        time.sleep(random.choice([1, 3, 5]))
        return 1

    @staticmethod
    async def async_get_method(client, task_id, barrier):

        logging.info(f'[任务{task_id}]开始时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
        # 执行同步阻塞操作
        loop = asyncio.get_event_loop()
        student_id = await loop.run_in_executor(httpxMethods.executor, httpxMethods.get_data)

        # 所有任务在此等待彼此完成前置操作
        await barrier.wait()

        # 控制并发请求
        async with httpxMethods.semaphore:
            url = "http://127.0.0.1:8080/getStudentInfo"
            try:
                response = await client.get(url, params={'id': student_id}, timeout=15.0)
                logging.info(f'[任务{task_id}]结束时间:{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))}')
                return response.json()
            except Exception as e:
                return {"error": str(e)}

    @staticmethod
    async def run_concurrent_requests(num_tasks):
        # 创建一个 barrier,num_tasks 个任务都到达后才会释放
        barrier = asyncio.Barrier(num_tasks)
        counter = {'completed': 0, 'total': num_tasks, 'lock': asyncio.Lock()}

        async with httpx.AsyncClient(timeout=20.0) as client:
            tasks = [
                httpxMethods.async_get_method(client, i, barrier)
                for i in range(num_tasks)
            ]
            return await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    start_time = time.time()
    results = asyncio.run(httpxMethods.run_concurrent_requests(10))
    logging.info(results)
    logging.info(f"总耗时: {time.time()-start_time: .2f}秒")

客户端日志:

代码运行耗时依旧是6秒左右!

demo4客户端.png

服务端日志:

服务端日志上看,接口都是同一个时间点请求进来的!!!

demo4服务端.png

总结:

实例5的2种方式都能实现最终高效并发的目的!更推荐使用 Barrier 更简洁、更直观、更符合语义。

asyncio.Barrier(n) 是一个同步原语,用于让一组协程都等待彼此到达某个点(称为“屏障点”),然后一起继续执行。

补充:

java服务端代码:

https://gitee.com/a_cloud/python_springboot_demo.git

python客户端代码:

https://gitee.com/a_cloud/self_study_python.git
1

评论区