Pourquoi l'asynchrone ?
L'asynchrone permet d'exécuter plusieurs opérations I/O (réseau, fichiers, base de données) en parallèle sur un seul thread. Idéal pour :
- APIs web à fort trafic (FastAPI)
- Scraping web
- Appels à des services externes
- Chat bots et websockets
Les bases
Coroutine simple
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1) # Simule une opération I/O
print("World")
# Exécuter la coroutine
asyncio.run(hello())
Exécution parallèle avec gather
async def fetch_data(url: str) -> str:
print(f"Fetching {url}...")
await asyncio.sleep(2) # Simule un appel réseau
return f"Data from {url}"
async def main():
# Exécute les 3 appels en parallèle (2s total, pas 6s)
results = await asyncio.gather(
fetch_data("api.example.com/users"),
fetch_data("api.example.com/products"),
fetch_data("api.example.com/orders")
)
print(results)
asyncio.run(main())
Tasks pour plus de contrôle
async def main():
# Créer des tasks
task1 = asyncio.create_task(fetch_data("url1"))
task2 = asyncio.create_task(fetch_data("url2"))
# Faire autre chose pendant que les tasks s'exécutent
print("Tasks créées, en attente...")
# Attendre les résultats
result1 = await task1
result2 = await task2
asyncio.run(main())
Exemple réel : Client HTTP async
import aiohttp
import asyncio
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def fetch_all_users(user_ids: list[int]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [
fetch_url(session, f"https://api.example.com/users/{uid}")
for uid in user_ids
]
return await asyncio.gather(*tasks)
# Récupère 100 users en parallèle
users = asyncio.run(fetch_all_users(range(1, 101)))
Gestion des timeouts
async def fetch_with_timeout(url: str) -> str:
try:
async with asyncio.timeout(5): # Python 3.11+
return await fetch_data(url)
except asyncio.TimeoutError:
return "Timeout!"
# Alternative pour versions antérieures
result = await asyncio.wait_for(fetch_data(url), timeout=5.0)
Semaphores : limiter la concurrence
async def fetch_limited(semaphore: asyncio.Semaphore, url: str) -> str:
async with semaphore: # Max N requêtes simultanées
return await fetch_data(url)
async def main():
semaphore = asyncio.Semaphore(10) # Max 10 requêtes parallèles
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
tasks = [fetch_limited(semaphore, url) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())
Async avec FastAPI
from fastapi import FastAPI
import httpx
app = FastAPI()
@app.get("/aggregate")
async def aggregate_data():
async with httpx.AsyncClient() as client:
users, products = await asyncio.gather(
client.get("https://api.example.com/users"),
client.get("https://api.example.com/products")
)
return {
"users": users.json(),
"products": products.json()
}
Pièges courants
- Bloquer l'event loop : N'utilisez jamais
time.sleep(), utilisezawait asyncio.sleep() - Oublier await : Sans await, la coroutine ne s'exécute pas
- Mélanger sync et async : Utilisez
run_in_executorpour le code bloquant
# Pour du code bloquant dans un contexte async
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
# Code synchrone bloquant
import time
time.sleep(2)
return "done"
async def main():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print(result)
asyncio.run(main())
Conseil : L'async n'accélère pas le CPU-bound (calculs). Pour ça, utilisez multiprocessing. Async est pour l'I/O-bound (attente réseau, disque).
Conclusion
L'asynchrone en Python est puissant une fois maîtrisé. Commencez par des cas simples (appels HTTP parallèles), puis explorez les patterns avancés selon vos besoins.
Besoin d'aide sur vos projets Python async ? Contactez-moi.