목차
들어가며
3편에서 FastAPI에서 비동기를 활용하는 방법을 학습했다. 이번 편에서는 비동기 프로그래밍의 핵심인 여러 작업을 동시에 처리하는 방법을 깊이 있게 알아보자. asyncio.gather(), asyncio.create_task() 등의 고급 기법과 이벤트 루프의 동작 원리를 이해하면 진정한 비동기 마스터가 될 수 있다.
이벤트 루프 이해하기
이벤트 루프란 무엇인가?
이벤트 루프는 비동기 프로그램의 심장이다. 여러 작업을 번갈아가며 실행하여 마치 동시에 처리되는 것처럼 보이게 만든다.
import asyncio
async def 작업(이름, 단계수):
for i in range(단계수):
print(f"{이름} - 단계 {i+1}")
await asyncio.sleep(0.5) # await 키워드를 만나면 제어권 양보
print(f"{이름} 완료!")
async def main():
# 3개 작업이 번갈아가며 실행됨
await asyncio.gather(
작업("작업A", 3),
작업("작업B", 2),
작업("작업C", 4)
)
asyncio.run(main())
출력 결과:
작업A - 단계 1
작업B - 단계 1
작업C - 단계 1
작업A - 단계 2
작업B - 단계 2
작업C - 단계 2
작업A - 단계 3
작업B 완료!
작업C - 단계 3
작업A 완료!
작업C - 단계 4
작업C 완료!
핵심 포인트: 이벤트 루프는 await 키워드를 만날 때마다 현재 작업을 일시 중단하고 다른 작업으로 넘어간다. asyncio.sleep(0.5)가 완료되면 해당 작업으로 다시 돌아와서 다음 줄부터 계속 실행한다.
asyncio.gather(): 여러 작업 한 번에 처리
기본 사용법
asyncio.gather()는 여러 비동기 작업을 동시에 실행하고, 모든 작업이 완료될 때까지 기다린다. 이 작업들은 백그라운드가 아닌 포그라운드에서 실행된다.
import aiohttp
import asyncio
async def fetch_user_data(session, user_id):
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
async with session.get(url) as response:
return await response.json()
async def get_multiple_users():
async with aiohttp.ClientSession() as session:
# gather() 호출 시점에 모든 코루틴이 즉시 시작됨
users = await asyncio.gather(
fetch_user_data(session, 1),
fetch_user_data(session, 2),
fetch_user_data(session, 3)
)
return users
# FastAPI에서 사용 예시
@app.get("/users/multiple")
async def get_users_endpoint():
users = await get_multiple_users()
return {"users": users, "count": len(users)}
gather()의 실행 방식
gather()는 전달받은 모든 코루틴을 즉시 Task로 변환하여 실행 시작한다. 이는 백그라운드 실행이 아니라 현재 이벤트 루프에서 포그라운드로 동시 실행하는 것이다.
async def 실행방식_확인():
print("gather() 호출 전")
# 이 순간 3개 코루틴이 모두 즉시 시작됨
results = await asyncio.gather(
작업("A", 2),
작업("B", 1),
작업("C", 3)
)
print("gather() 완료 후")
return results
async def 작업(이름, 단계수):
print(f"{이름} 시작")
for i in range(단계수):
await asyncio.sleep(0.5)
print(f"{이름} - 단계 {i+1}")
print(f"{이름} 완료")
return f"{이름} 결과"
# 실행하면 "gather() 호출 전" 출력 후 즉시 모든 작업이 시작됨
gather()의 특징
- 순서 보장: 결과는 요청한 순서대로 반환된다
- 모든 작업 대기: 가장 느린 작업까지 모두 기다린다
- 하나라도 실패하면 전체 실패: 기본적으로 예외가 발생하면 다른 작업도 취소된다
async def 성공_작업():
await asyncio.sleep(1)
return "성공"
async def 실패_작업():
await asyncio.sleep(0.5)
raise Exception("실패!")
async def gather_예외처리():
try:
# 하나가 실패하면 전체 실패
results = await asyncio.gather(
성공_작업(),
실패_작업(),
성공_작업()
)
except Exception as e:
print(f"전체 실패: {e}")
return None
# 모든 작업이 성공해야만 여기 도달
return results
# 예외를 무시하고 계속 진행하려면
async def gather_예외무시():
results = await asyncio.gather(
성공_작업(),
실패_작업(),
성공_작업(),
return_exceptions=True # 예외도 결과로 반환
)
# [성공, Exception('실패!'), 성공] 형태로 반환
return results
asyncio.create_task(): 작업을 백그라운드에서 실행
Task 객체란 무엇인가?
Task 객체는 이벤트 루프에서 실행되는 코루틴의 래퍼다. 코루틴을 Task로 변환하면 다음과 같은 특징을 갖는다:
- 즉시 실행 시작: Task 생성과 동시에 이벤트 루프에서 실행이 시작됨
- 백그라운드 실행: 다른 코드가 실행되는 동안 병렬로 진행됨
- 상태 추적 가능: 실행 중, 완료, 취소 등의 상태를 확인할 수 있음
- 결과 저장: 완료 후 결과를 저장하고 언제든 접근 가능
- 취소 가능: 실행 중인 Task를 중도에 취소할 수 있음
import asyncio
async def 백그라운드_작업(작업명, 소요시간):
print(f"{작업명} 백그라운드에서 시작")
await asyncio.sleep(소요시간)
print(f"{작업명} 완료")
return f"{작업명} 결과"
async def task_특징_확인():
print("=== Task 객체 생성과 특징 ===")
# Task 생성 - 즉시 실행 시작
task1 = asyncio.create_task(백그라운드_작업("작업A", 2))
task2 = asyncio.create_task(백그라운드_작업("작업B", 1))
print(f"Task1 상태: {task1.done()}") # False (아직 실행 중)
print(f"Task2 상태: {task2.done()}") # False (아직 실행 중)
print("다른 작업 수행 중...")
await asyncio.sleep(0.5)
print(f"0.5초 후 Task2 상태: {task2.done()}") # 아마 False (1초 작업이므로)
# Task 결과 기다리기
result1 = await task1 # Task가 완료되면 결과 반환
result2 = await task2
print(f"Task1 완료 상태: {task1.done()}") # True
print(f"Task2 완료 상태: {task2.done()}") # True
print(f"저장된 결과1: {task1.result()}") # 완료 후 언제든 결과 접근 가능
print(f"저장된 결과2: {task2.result()}")
return [result1, result2]
asyncio.run(task_특징_확인())
결과
=== Task 객체 생성과 특징 ===
Task1 상태: False
Task2 상태: False
다른 작업 수행 중...
작업A 백그라운드에서 시작
작업B 백그라운드에서 시작
0.5초 후 Task2 상태: False
작업B 완료
작업A 완료
Task1 완료 상태: True
Task2 완료 상태: True
저장된 결과1: 작업A 결과
저장된 결과2: 작업B 결과
코루틴 vs Task 객체 비교
코루틴과 Task 객체는 비동기 프로그래밍에서 서로 다른 역할을 한다:
코루틴 (Coroutine)
- 지연 실행: async def 함수를 호출해도 즉시 실행되지 않음
- 수동 실행: await를 만날 때까지 대기 상태
- 일회성: 한 번 실행되면 재사용 불가
- 상태 없음: 실행 상태나 결과를 추적할 수 없음
Task 객체
- 즉시 실행: 생성과 동시에 이벤트 루프에서 실행 시작
- 자동 실행: 백그라운드에서 자동으로 진행
- 재사용 가능: 완료 후에도 결과에 계속 접근 가능
- 상태 추적: 실행 중/완료/취소 등 상태 확인 가능
import asyncio
async def 백그라운드_작업(작업명, 소요시간):
print(f"{작업명} 백그라운드에서 시작")
await asyncio.sleep(소요시간)
print(f"{작업명} 완료")
return f"{작업명} 결과"
async def 비교_예시():
print("=== 코루틴 직접 사용 ===")
# 코루틴 객체 생성 (아직 실행되지 않음)
코루틴_객체 = 백그라운드_작업("코루틴", 1)
print(f"코루틴 타입: {type(코루틴_객체)}") # <class 'coroutine'>
print("다른 작업 중...")
await asyncio.sleep(0.5) # 코루틴은 아직 실행되지 않음
# 이제 실행
결과 = await 코루틴_객체
print(f"코루틴 결과: {결과}")
print("\n=== Task 객체 사용 ===")
# Task 생성과 동시에 실행 시작
task = asyncio.create_task(백그라운드_작업("태스크", 1))
print(f"Task 타입: {type(task)}") # <class '_asyncio.Task'>
print("다른 작업 중...")
await asyncio.sleep(0.5) # Task는 이미 실행 중
# 결과 받기
결과 = await task
print(f"Task 결과: {결과}")
asyncio.run(비교_예시())
결과
=== 코루틴 직접 사용 ===
코루틴 타입: <class 'coroutine'>
다른 작업 중...
코루틴 백그라운드에서 시작
코루틴 완료
코루틴 결과: 코루틴 결과
=== Task 객체 사용 ===
Task 타입: <class '_asyncio.Task'>
다른 작업 중...
태스크 백그라운드에서 시작
태스크 완료
Task 결과: 태스크 결과
핵심 차이점:
- 코루틴: 레시피 카드처럼 실행 '방법'만 담고 있음
- Task: 실제로 요리가 진행 중인 상태, 언제든 진행 상황 확인 가능
Task 객체의 실행 타이밍
asyncio.create_task()는 코루틴을 Task 객체로 변환하여 이벤트 루프에서 즉시 실행을 시작한다.
async def 백그라운드_작업(작업명, 소요시간):
print(f"{작업명} 백그라운드에서 시작")
await asyncio.sleep(소요시간)
print(f"{작업명} 완료")
return f"{작업명} 결과"
async def task_예시():
print("=== Task 생성과 동시에 실행 시작 ===")
# Task 생성과 동시에 백그라운드에서 실행 시작
task1 = asyncio.create_task(백그라운드_작업("작업A", 2))
task2 = asyncio.create_task(백그라운드_작업("작업B", 1))
print("다른 작업 수행 중...")
await asyncio.sleep(0.5)
print("아직도 다른 작업 중...")
# 이제 결과를 기다림
result1 = await task1
result2 = await task2
return [result1, result2]
asyncio.run(task_예시())
출력 결과:
=== Task 생성과 동시에 실행 시작 ===
작업A 백그라운드에서 시작
작업B 백그라운드에서 시작
다른 작업 수행 중...
작업B 완료
아직도 다른 작업 중...
작업A 완료
gather() vs create_task() 비교
두 방식은 모두 내부적으로 Task 객체를 생성하지만, 여러 측면에서 차이가 있다:
1. Task 노출 방식
- gather(): Task를 내부적으로 생성하지만 개발자에게 숨김
- create_task(): Task 객체를 개발자에게 직접 반환
2. 실행 시작 시점
- gather(): 호출 시점에 모든 코루틴이 즉시 시작
- create_task(): 생성 시점에 개별적으로 시작
3. 제어 수준
- gather(): 간단하지만 제어 불가 (상태 확인, 취소 등 불가)
- create_task(): 복잡하지만 세밀한 제어 가능
import asyncio
import time
async def 백그라운드_작업(작업명, 소요시간):
print(f"{작업명} 백그라운드에서 시작")
await asyncio.sleep(소요시간)
print(f"{작업명} 완료")
return f"{작업명} 결과"
async def 완전비교_예시():
print("=== gather() 방식 ===")
시작시간 = time.time()
# 호출과 동시에 모든 작업 시작, Task 접근 불가
results1 = await asyncio.gather(
백그라운드_작업("G1", 1),
백그라운드_작업("G2", 2),
백그라운드_작업("G3", 1)
)
gather_시간 = time.time() - 시작시간
print(f"gather 결과: {results1}")
print(f"gather 총 시간: {gather_시간:.1f}초\n")
print("=== create_task() 방식 ===")
시작시간 = time.time()
# 각 Task 생성 시점에 개별적으로 시작
task1 = asyncio.create_task(백그라운드_작업("T1", 1))
task2 = asyncio.create_task(백그라운드_작업("T2", 2))
task3 = asyncio.create_task(백그라운드_작업("T3", 1))
# Task 제어 가능
print(f"Task1 상태: {task1.done()}, {task1.get_name()}")
print(f"Task2 상태: {task2.done()}, {task2.get_name()}")
print(f"Task3 상태: {task3.done()}, {task3.get_name()}")
# 중간에 다른 작업 수행 가능
await asyncio.sleep(0.5)
print("중간 작업 완료")
# 완료된 Task만 먼저 확인 가능
if task1.done():
print(f"Task1 이미 완료: {task1.result()}")
# 나머지 Task들 기다리기
results2 = await asyncio.gather(task1, task2, task3)
task_시간 = time.time() - 시작시간
print(f"create_task 결과: {results2}")
print(f"create_task 총 시간: {task_시간:.1f}초")
asyncio.run(완전비교_예시())
결과
=== gather() 방식 ===
G1 백그라운드에서 시작
G2 백그라운드에서 시작
G3 백그라운드에서 시작
G1 완료
G3 완료
G2 완료
gather 결과: ['G1 결과', 'G2 결과', 'G3 결과']
gather 총 시간: 2.0초
=== create_task() 방식 ===
Task1 상태: False, Task-5
Task2 상태: False, Task-6
Task3 상태: False, Task-7
T1 백그라운드에서 시작
T2 백그라운드에서 시작
T3 백그라운드에서 시작
중간 작업 완료
T1 완료
T3 완료
T2 완료
create_task 결과: ['T1 결과', 'T2 결과', 'T3 결과']
create_task 총 시간: 2.0초
4. 사용 시기 가이드
gather() 사용:
- 간단한 병렬 처리가 목적
- Task 제어가 필요 없음
- 모든 작업이 완료되기만 하면 됨
create_task() 사용:
- Task 상태 모니터링 필요
- 중간에 Task 취소 가능성
- 작업 완료 순서에 따른 처리 필요
- 더 복잡한 비동기 패턴 구현
실무에서 자주 사용하는 패턴들
1. 조건부 작업 처리
@app.get("/user/{user_id}/dashboard")
async def get_user_dashboard(user_id: int, include_analytics: bool = False):
# 기본 작업들은 항상 실행
basic_tasks = [
asyncio.create_task(get_user_profile(user_id)),
asyncio.create_task(get_user_notifications(user_id)),
asyncio.create_task(get_user_settings(user_id))
]
# 조건부 작업 추가
if include_analytics:
basic_tasks.append(asyncio.create_task(get_user_analytics(user_id)))
# 모든 필요한 작업 동시 실행
results = await asyncio.gather(*basic_tasks)
dashboard = {
"profile": results[0],
"notifications": results[1],
"settings": results[2]
}
if include_analytics:
dashboard["analytics"] = results[3]
return dashboard
async def get_user_profile(user_id: int):
await asyncio.sleep(0.3)
return {"user_id": user_id, "name": "홍길동"}
async def get_user_notifications(user_id: int):
await asyncio.sleep(0.2)
return [{"id": 1, "message": "새 메시지"}]
async def get_user_settings(user_id: int):
await asyncio.sleep(0.1)
return {"theme": "dark", "language": "ko"}
async def get_user_analytics(user_id: int):
await asyncio.sleep(0.8) # 분석 데이터는 시간이 오래 걸림
return {"views": 1234, "clicks": 567}
2. 타임아웃과 함께 사용
async def fetch_with_timeout(url: str, timeout: float = 5.0):
"""타임아웃이 있는 외부 API 호출"""
try:
async with aiohttp.ClientSession() as session:
result = await asyncio.wait_for(
session.get(url),
timeout=timeout
)
return await result.json()
except asyncio.TimeoutError:
return {"error": "타임아웃"}
except Exception as e:
return {"error": str(e)}
@app.get("/external-data")
async def get_external_data():
# 여러 외부 API를 동시 호출, 각각 다른 타임아웃
tasks = [
asyncio.create_task(fetch_with_timeout("https://api1.com/data", 3.0)),
asyncio.create_task(fetch_with_timeout("https://api2.com/data", 5.0)),
asyncio.create_task(fetch_with_timeout("https://api3.com/data", 2.0))
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
"api1": results[0],
"api2": results[1],
"api3": results[2]
}
3. 동적 작업 생성
@app.post("/process-batch")
async def process_batch(user_ids: list[int]):
"""동적으로 생성되는 여러 사용자 처리"""
# 사용자 ID 목록에 따라 동적으로 Task 생성
tasks = []
for user_id in user_ids:
task = asyncio.create_task(process_single_user(user_id))
tasks.append(task)
# 모든 사용자 처리 완료까지 대기
results = await asyncio.gather(*tasks, return_exceptions=True)
# 성공/실패 분류
successful = []
failed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failed.append({"user_id": user_ids[i], "error": str(result)})
else:
successful.append(result)
return {
"successful_count": len(successful),
"failed_count": len(failed),
"successful": successful,
"failed": failed
}
async def process_single_user(user_id: int):
"""개별 사용자 처리 (실패 가능성 있음)"""
await asyncio.sleep(0.5) # 처리 시간 시뮬레이션
# 10% 확률로 실패
import random
if random.random() < 0.1:
raise Exception(f"사용자 {user_id} 처리 실패")
return {"user_id": user_id, "status": "processed"}
주의사항과 베스트 프랙티스
1. 너무 많은 동시 작업은 금물
# 나쁜 예시 - 1000개 동시 실행
async def bad_concurrent_example():
tasks = []
for i in range(1000):
tasks.append(asyncio.create_task(heavy_task(i)))
return await asyncio.gather(*tasks) # 메모리 부족, 성능 저하
# 좋은 예시 - 배치 단위로 처리
async def good_concurrent_example():
batch_size = 10
all_results = []
for i in range(0, 1000, batch_size):
batch_tasks = []
for j in range(i, min(i + batch_size, 1000)):
batch_tasks.append(asyncio.create_task(heavy_task(j)))
batch_results = await asyncio.gather(*batch_tasks)
all_results.extend(batch_results)
return all_results
2. 메모리 누수 방지
async def prevent_memory_leak():
"""Task 정리를 통한 메모리 누수 방지"""
tasks = []
try:
for i in range(10):
task = asyncio.create_task(some_task(i))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
except Exception as e:
# 예외 발생 시 남은 Task들 정리
for task in tasks:
if not task.done():
task.cancel()
# 취소된 Task들이 완전히 정리될 때까지 대기
await asyncio.gather(*tasks, return_exceptions=True)
raise e
마무리
여러 비동기 작업을 동시에 처리하는 것은 비동기 프로그래밍의 핵심이다. asyncio.gather()로 간단한 동시 실행을, asyncio.create_task()로 더 세밀한 제어를 할 수 있다. 이벤트 루프가 이 모든 작업들을 효율적으로 관리해주므로, 개발자는 비즈니스 로직에 집중할 수 있다.
다음 편에서는 비동기와 데이터베이스 연동, 그리고 실제 프로덕션 환경에서 고려해야 할 사항들을 알아보겠다.
다음 편 예고
5편에서는 다음 내용을 다룰 예정이다:
- 비동기 컨텍스트 매니저 활용
- async with, async for, 비동기 제너레이터
'Python > 문법' 카테고리의 다른 글
[Python] 비동기(async) 프로그래밍 5 (0) | 2025.05.23 |
---|---|
[Python] 비동기(async) 프로그래밍 3 (0) | 2025.05.22 |
[Python] 비동기(async) 프로그래밍 2 (0) | 2025.05.22 |
[Python] 비동기(async) 프로그래밍 1 (0) | 2025.05.22 |
[Python] Enum 정리 (0) | 2025.04.12 |