반응형

들어가며

3편에서 FastAPI에서 비동기를 활용하는 방법을 학습했다. 이번 편에서는 비동기 프로그래밍의 핵심인 여러 작업을 동시에 처리하는 방법을 깊이 있게 알아보자. asyncio.gather(), asyncio.create_task() 등의 고급 기법과 이벤트 루프의 동작 원리를 이해하면 진정한 비동기 마스터가 될 수 있다.

 

이벤트 루프 이해하기

이벤트 루프란 무엇인가?

이벤트 루프는 비동기 프로그램의 심장이다. 여러 작업을 번갈아가며 실행하여 마치 동시에 처리되는 것처럼 보이게 만든다.

import asyncio

async def 작업(이름, 단계수):
    for i in range(단계수):
        print(f"{이름} - 단계 {i+1}")
        await asyncio.sleep(0.5)  # await 키워드를 만나면 제어권 양보
    print(f"{이름} 완료!")

async def main():
    # 3개 작업이 번갈아가며 실행됨
    await asyncio.gather(
        작업("작업A", 3),
        작업("작업B", 2),
        작업("작업C", 4)
    )

asyncio.run(main())

출력 결과:

작업A - 단계 1
작업B - 단계 1
작업C - 단계 1
작업A - 단계 2
작업B - 단계 2
작업C - 단계 2
작업A - 단계 3
작업B 완료!
작업C - 단계 3
작업A 완료!
작업C - 단계 4
작업C 완료!

핵심 포인트: 이벤트 루프는 await 키워드를 만날 때마다 현재 작업을 일시 중단하고 다른 작업으로 넘어간다. asyncio.sleep(0.5)가 완료되면 해당 작업으로 다시 돌아와서 다음 줄부터 계속 실행한다.

 

asyncio.gather(): 여러 작업 한 번에 처리

기본 사용법

asyncio.gather()는 여러 비동기 작업을 동시에 실행하고, 모든 작업이 완료될 때까지 기다린다. 이 작업들은 백그라운드가 아닌 포그라운드에서 실행된다.

import aiohttp
import asyncio

async def fetch_user_data(session, user_id):
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    async with session.get(url) as response:
        return await response.json()

async def get_multiple_users():
    async with aiohttp.ClientSession() as session:
        # gather() 호출 시점에 모든 코루틴이 즉시 시작됨
        users = await asyncio.gather(
            fetch_user_data(session, 1),
            fetch_user_data(session, 2),
            fetch_user_data(session, 3)
        )
        
        return users

# FastAPI에서 사용 예시
@app.get("/users/multiple")
async def get_users_endpoint():
    users = await get_multiple_users()
    return {"users": users, "count": len(users)}

gather()의 실행 방식

gather()는 전달받은 모든 코루틴을 즉시 Task로 변환하여 실행 시작한다. 이는 백그라운드 실행이 아니라 현재 이벤트 루프에서 포그라운드로 동시 실행하는 것이다.

async def 실행방식_확인():
    print("gather() 호출 전")
    
    # 이 순간 3개 코루틴이 모두 즉시 시작됨
    results = await asyncio.gather(
        작업("A", 2),
        작업("B", 1), 
        작업("C", 3)
    )
    
    print("gather() 완료 후")
    return results

async def 작업(이름, 단계수):
    print(f"{이름} 시작")
    for i in range(단계수):
        await asyncio.sleep(0.5)
        print(f"{이름} - 단계 {i+1}")
    print(f"{이름} 완료")
    return f"{이름} 결과"

# 실행하면 "gather() 호출 전" 출력 후 즉시 모든 작업이 시작됨

gather()의 특징

  1. 순서 보장: 결과는 요청한 순서대로 반환된다
  2. 모든 작업 대기: 가장 느린 작업까지 모두 기다린다
  3. 하나라도 실패하면 전체 실패: 기본적으로 예외가 발생하면 다른 작업도 취소된다
async def 성공_작업():
    await asyncio.sleep(1)
    return "성공"

async def 실패_작업():
    await asyncio.sleep(0.5)
    raise Exception("실패!")

async def gather_예외처리():
    try:
        # 하나가 실패하면 전체 실패
        results = await asyncio.gather(
            성공_작업(),
            실패_작업(),
            성공_작업()
        )
    except Exception as e:
        print(f"전체 실패: {e}")
        return None
    
    # 모든 작업이 성공해야만 여기 도달
    return results

# 예외를 무시하고 계속 진행하려면
async def gather_예외무시():
    results = await asyncio.gather(
        성공_작업(),
        실패_작업(),
        성공_작업(),
        return_exceptions=True  # 예외도 결과로 반환
    )
    
    # [성공, Exception('실패!'), 성공] 형태로 반환
    return results

 

asyncio.create_task(): 작업을 백그라운드에서 실행

Task 객체란 무엇인가?

Task 객체는 이벤트 루프에서 실행되는 코루틴의 래퍼다. 코루틴을 Task로 변환하면 다음과 같은 특징을 갖는다:

  1. 즉시 실행 시작: Task 생성과 동시에 이벤트 루프에서 실행이 시작됨
  2. 백그라운드 실행: 다른 코드가 실행되는 동안 병렬로 진행됨
  3. 상태 추적 가능: 실행 중, 완료, 취소 등의 상태를 확인할 수 있음
  4. 결과 저장: 완료 후 결과를 저장하고 언제든 접근 가능
  5. 취소 가능: 실행 중인 Task를 중도에 취소할 수 있음
import asyncio

async def 백그라운드_작업(작업명, 소요시간):
    print(f"{작업명} 백그라운드에서 시작")
    await asyncio.sleep(소요시간)
    print(f"{작업명} 완료")
    return f"{작업명} 결과"

async def task_특징_확인():
    print("=== Task 객체 생성과 특징 ===")
    
    # Task 생성 - 즉시 실행 시작
    task1 = asyncio.create_task(백그라운드_작업("작업A", 2))
    task2 = asyncio.create_task(백그라운드_작업("작업B", 1))
    
    print(f"Task1 상태: {task1.done()}")  # False (아직 실행 중)
    print(f"Task2 상태: {task2.done()}")  # False (아직 실행 중)
    
    print("다른 작업 수행 중...")
    await asyncio.sleep(0.5)
    
    print(f"0.5초 후 Task2 상태: {task2.done()}")  # 아마 False (1초 작업이므로)
    
    # Task 결과 기다리기
    result1 = await task1  # Task가 완료되면 결과 반환
    result2 = await task2
    
    print(f"Task1 완료 상태: {task1.done()}")  # True
    print(f"Task2 완료 상태: {task2.done()}")  # True
    print(f"저장된 결과1: {task1.result()}")  # 완료 후 언제든 결과 접근 가능
    print(f"저장된 결과2: {task2.result()}")
    
    return [result1, result2]

asyncio.run(task_특징_확인())

 결과

=== Task 객체 생성과 특징 ===
Task1 상태: False
Task2 상태: False
다른 작업 수행 중...
작업A 백그라운드에서 시작
작업B 백그라운드에서 시작
0.5초 후 Task2 상태: False
작업B 완료
작업A 완료
Task1 완료 상태: True
Task2 완료 상태: True
저장된 결과1: 작업A 결과
저장된 결과2: 작업B 결과

 

코루틴 vs Task 객체 비교

코루틴과 Task 객체는 비동기 프로그래밍에서 서로 다른 역할을 한다:

코루틴 (Coroutine)

  • 지연 실행: async def 함수를 호출해도 즉시 실행되지 않음
  • 수동 실행: await를 만날 때까지 대기 상태
  • 일회성: 한 번 실행되면 재사용 불가
  • 상태 없음: 실행 상태나 결과를 추적할 수 없음

Task 객체

  • 즉시 실행: 생성과 동시에 이벤트 루프에서 실행 시작
  • 자동 실행: 백그라운드에서 자동으로 진행
  • 재사용 가능: 완료 후에도 결과에 계속 접근 가능
  • 상태 추적: 실행 중/완료/취소 등 상태 확인 가능
import asyncio

async def 백그라운드_작업(작업명, 소요시간):
    print(f"{작업명} 백그라운드에서 시작")
    await asyncio.sleep(소요시간)
    print(f"{작업명} 완료")
    return f"{작업명} 결과"

async def 비교_예시():
    print("=== 코루틴 직접 사용 ===")
    # 코루틴 객체 생성 (아직 실행되지 않음)
    코루틴_객체 = 백그라운드_작업("코루틴", 1)
    print(f"코루틴 타입: {type(코루틴_객체)}")  # <class 'coroutine'>
    
    print("다른 작업 중...")
    await asyncio.sleep(0.5)  # 코루틴은 아직 실행되지 않음
    
    # 이제 실행
    결과 = await 코루틴_객체
    print(f"코루틴 결과: {결과}")
    
    print("\n=== Task 객체 사용 ===")
    # Task 생성과 동시에 실행 시작
    task = asyncio.create_task(백그라운드_작업("태스크", 1))
    print(f"Task 타입: {type(task)}")  # <class '_asyncio.Task'>
    
    print("다른 작업 중...")
    await asyncio.sleep(0.5)  # Task는 이미 실행 중
    
    # 결과 받기
    결과 = await task
    print(f"Task 결과: {결과}")

asyncio.run(비교_예시())

결과

=== 코루틴 직접 사용 ===
코루틴 타입: <class 'coroutine'>
다른 작업 중...
코루틴 백그라운드에서 시작
코루틴 완료
코루틴 결과: 코루틴 결과

=== Task 객체 사용 ===
Task 타입: <class '_asyncio.Task'>
다른 작업 중...
태스크 백그라운드에서 시작
태스크 완료
Task 결과: 태스크 결과

핵심 차이점:

  • 코루틴: 레시피 카드처럼 실행 '방법'만 담고 있음
  • Task: 실제로 요리가 진행 중인 상태, 언제든 진행 상황 확인 가능

 

Task 객체의 실행 타이밍

asyncio.create_task()는 코루틴을 Task 객체로 변환하여 이벤트 루프에서 즉시 실행을 시작한다.

async def 백그라운드_작업(작업명, 소요시간):
    print(f"{작업명} 백그라운드에서 시작")
    await asyncio.sleep(소요시간)
    print(f"{작업명} 완료")
    return f"{작업명} 결과"

async def task_예시():
    print("=== Task 생성과 동시에 실행 시작 ===")
    
    # Task 생성과 동시에 백그라운드에서 실행 시작
    task1 = asyncio.create_task(백그라운드_작업("작업A", 2))
    task2 = asyncio.create_task(백그라운드_작업("작업B", 1))
    
    print("다른 작업 수행 중...")
    await asyncio.sleep(0.5)
    print("아직도 다른 작업 중...")
    
    # 이제 결과를 기다림
    result1 = await task1
    result2 = await task2
    
    return [result1, result2]

asyncio.run(task_예시())

출력 결과:

=== Task 생성과 동시에 실행 시작 ===
작업A 백그라운드에서 시작
작업B 백그라운드에서 시작
다른 작업 수행 중...
작업B 완료
아직도 다른 작업 중...
작업A 완료

 

gather() vs create_task() 비교

두 방식은 모두 내부적으로 Task 객체를 생성하지만, 여러 측면에서 차이가 있다:

1. Task 노출 방식

  • gather(): Task를 내부적으로 생성하지만 개발자에게 숨김
  • create_task(): Task 객체를 개발자에게 직접 반환

2. 실행 시작 시점

  • gather(): 호출 시점에 모든 코루틴이 즉시 시작
  • create_task(): 생성 시점에 개별적으로 시작

3. 제어 수준

  • gather(): 간단하지만 제어 불가 (상태 확인, 취소 등 불가)
  • create_task(): 복잡하지만 세밀한 제어 가능
import asyncio
import time

async def 백그라운드_작업(작업명, 소요시간):
    print(f"{작업명} 백그라운드에서 시작")
    await asyncio.sleep(소요시간)
    print(f"{작업명} 완료")
    return f"{작업명} 결과"

async def 완전비교_예시():
    print("=== gather() 방식 ===")
    시작시간 = time.time()

    # 호출과 동시에 모든 작업 시작, Task 접근 불가
    results1 = await asyncio.gather(
        백그라운드_작업("G1", 1),
        백그라운드_작업("G2", 2),
        백그라운드_작업("G3", 1)
    )

    gather_시간 = time.time() - 시작시간
    print(f"gather 결과: {results1}")
    print(f"gather 총 시간: {gather_시간:.1f}초\n")

    print("=== create_task() 방식 ===")
    시작시간 = time.time()

    # 각 Task 생성 시점에 개별적으로 시작
    task1 = asyncio.create_task(백그라운드_작업("T1", 1))
    task2 = asyncio.create_task(백그라운드_작업("T2", 2))
    task3 = asyncio.create_task(백그라운드_작업("T3", 1))

    # Task 제어 가능
    print(f"Task1 상태: {task1.done()}, {task1.get_name()}")
    print(f"Task2 상태: {task2.done()}, {task2.get_name()}")
    print(f"Task3 상태: {task3.done()}, {task3.get_name()}")

    # 중간에 다른 작업 수행 가능
    await asyncio.sleep(0.5)
    print("중간 작업 완료")

    # 완료된 Task만 먼저 확인 가능
    if task1.done():
        print(f"Task1 이미 완료: {task1.result()}")

    # 나머지 Task들 기다리기
    results2 = await asyncio.gather(task1, task2, task3)

    task_시간 = time.time() - 시작시간
    print(f"create_task 결과: {results2}")
    print(f"create_task 총 시간: {task_시간:.1f}초")

asyncio.run(완전비교_예시())

결과

=== gather() 방식 ===
G1 백그라운드에서 시작
G2 백그라운드에서 시작
G3 백그라운드에서 시작
G1 완료
G3 완료
G2 완료
gather 결과: ['G1 결과', 'G2 결과', 'G3 결과']
gather 총 시간: 2.0초

=== create_task() 방식 ===
Task1 상태: False, Task-5
Task2 상태: False, Task-6
Task3 상태: False, Task-7
T1 백그라운드에서 시작
T2 백그라운드에서 시작
T3 백그라운드에서 시작
중간 작업 완료
T1 완료
T3 완료
T2 완료
create_task 결과: ['T1 결과', 'T2 결과', 'T3 결과']
create_task 총 시간: 2.0초

4. 사용 시기 가이드

gather() 사용:

  • 간단한 병렬 처리가 목적
  • Task 제어가 필요 없음
  • 모든 작업이 완료되기만 하면 됨

create_task() 사용:

  • Task 상태 모니터링 필요
  • 중간에 Task 취소 가능성
  • 작업 완료 순서에 따른 처리 필요
  • 더 복잡한 비동기 패턴 구현

 

실무에서 자주 사용하는 패턴들

1. 조건부 작업 처리

@app.get("/user/{user_id}/dashboard")
async def get_user_dashboard(user_id: int, include_analytics: bool = False):
    # 기본 작업들은 항상 실행
    basic_tasks = [
        asyncio.create_task(get_user_profile(user_id)),
        asyncio.create_task(get_user_notifications(user_id)),
        asyncio.create_task(get_user_settings(user_id))
    ]
    
    # 조건부 작업 추가
    if include_analytics:
        basic_tasks.append(asyncio.create_task(get_user_analytics(user_id)))
    
    # 모든 필요한 작업 동시 실행
    results = await asyncio.gather(*basic_tasks)
    
    dashboard = {
        "profile": results[0],
        "notifications": results[1],
        "settings": results[2]
    }
    
    if include_analytics:
        dashboard["analytics"] = results[3]
    
    return dashboard

async def get_user_profile(user_id: int):
    await asyncio.sleep(0.3)
    return {"user_id": user_id, "name": "홍길동"}

async def get_user_notifications(user_id: int):
    await asyncio.sleep(0.2)
    return [{"id": 1, "message": "새 메시지"}]

async def get_user_settings(user_id: int):
    await asyncio.sleep(0.1)
    return {"theme": "dark", "language": "ko"}

async def get_user_analytics(user_id: int):
    await asyncio.sleep(0.8)  # 분석 데이터는 시간이 오래 걸림
    return {"views": 1234, "clicks": 567}

2. 타임아웃과 함께 사용

async def fetch_with_timeout(url: str, timeout: float = 5.0):
    """타임아웃이 있는 외부 API 호출"""
    try:
        async with aiohttp.ClientSession() as session:
            result = await asyncio.wait_for(
                session.get(url), 
                timeout=timeout
            )
            return await result.json()
    except asyncio.TimeoutError:
        return {"error": "타임아웃"}
    except Exception as e:
        return {"error": str(e)}

@app.get("/external-data")
async def get_external_data():
    # 여러 외부 API를 동시 호출, 각각 다른 타임아웃
    tasks = [
        asyncio.create_task(fetch_with_timeout("https://api1.com/data", 3.0)),
        asyncio.create_task(fetch_with_timeout("https://api2.com/data", 5.0)),
        asyncio.create_task(fetch_with_timeout("https://api3.com/data", 2.0))
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    return {
        "api1": results[0],
        "api2": results[1], 
        "api3": results[2]
    }

3. 동적 작업 생성

@app.post("/process-batch")
async def process_batch(user_ids: list[int]):
    """동적으로 생성되는 여러 사용자 처리"""
    
    # 사용자 ID 목록에 따라 동적으로 Task 생성
    tasks = []
    for user_id in user_ids:
        task = asyncio.create_task(process_single_user(user_id))
        tasks.append(task)
    
    # 모든 사용자 처리 완료까지 대기
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # 성공/실패 분류
    successful = []
    failed = []
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            failed.append({"user_id": user_ids[i], "error": str(result)})
        else:
            successful.append(result)
    
    return {
        "successful_count": len(successful),
        "failed_count": len(failed),
        "successful": successful,
        "failed": failed
    }

async def process_single_user(user_id: int):
    """개별 사용자 처리 (실패 가능성 있음)"""
    await asyncio.sleep(0.5)  # 처리 시간 시뮬레이션
    
    # 10% 확률로 실패
    import random
    if random.random() < 0.1:
        raise Exception(f"사용자 {user_id} 처리 실패")
    
    return {"user_id": user_id, "status": "processed"}

 

주의사항과 베스트 프랙티스

1. 너무 많은 동시 작업은 금물

# 나쁜 예시 - 1000개 동시 실행
async def bad_concurrent_example():
    tasks = []
    for i in range(1000):
        tasks.append(asyncio.create_task(heavy_task(i)))
    
    return await asyncio.gather(*tasks)  # 메모리 부족, 성능 저하

# 좋은 예시 - 배치 단위로 처리
async def good_concurrent_example():
    batch_size = 10
    all_results = []
    
    for i in range(0, 1000, batch_size):
        batch_tasks = []
        for j in range(i, min(i + batch_size, 1000)):
            batch_tasks.append(asyncio.create_task(heavy_task(j)))
        
        batch_results = await asyncio.gather(*batch_tasks)
        all_results.extend(batch_results)
    
    return all_results

2. 메모리 누수 방지

async def prevent_memory_leak():
    """Task 정리를 통한 메모리 누수 방지"""
    tasks = []
    
    try:
        for i in range(10):
            task = asyncio.create_task(some_task(i))
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        return results
    
    except Exception as e:
        # 예외 발생 시 남은 Task들 정리
        for task in tasks:
            if not task.done():
                task.cancel()
        
        # 취소된 Task들이 완전히 정리될 때까지 대기
        await asyncio.gather(*tasks, return_exceptions=True)
        
        raise e

 

마무리

여러 비동기 작업을 동시에 처리하는 것은 비동기 프로그래밍의 핵심이다. asyncio.gather()로 간단한 동시 실행을, asyncio.create_task()로 더 세밀한 제어를 할 수 있다. 이벤트 루프가 이 모든 작업들을 효율적으로 관리해주므로, 개발자는 비즈니스 로직에 집중할 수 있다.

다음 편에서는 비동기와 데이터베이스 연동, 그리고 실제 프로덕션 환경에서 고려해야 할 사항들을 알아보겠다.

 

다음 편 예고

5편에서는 다음 내용을 다룰 예정이다:

  • 비동기 컨텍스트 매니저 활용
  • async with, async for, 비동기 제너레이터
반응형

+ Recent posts