ThreadPoolExecutor로 병렬 처리하기

2024년 12월 19일 목요일

Today I Learned

날짜

2024년 12월 19일 목요일

내용

시간 단축을 위해 병렬 처리하기

네이버에서 홍보해준 덕에 인스타그램 UGC 서비스를 많은 사람들이 이용하고 있다. 그만큼 저장되어있는 인스타그램 계정과 게시글, 댓글 데이터도 늘어났는데 현재 게시글은 50만개, 댓글은 35만개가 저장되어 있다. 많아진 만큼 업데이트에도 상당한 시간이 소요되고 있는데, 크론이 거의 하루동안 돌아가기도 한다. 로직은

  1. 계정 데이터(팔로우 수, 게시글 수 등) 업데이트
  2. 게시글 데이터를 가져와 새로 생성된 것 추가하기
  3. 기존에 저장된 데이터와 비교하여 삭제된 게시글 삭제하기
  4. 태그된 게시글을 가져와 새로 생성된 것 추가하기
  5. 기존에 저장된 데이터와 비교하여 삭제된 태그된 게시글 삭제하기

이다. 6백개의 계정 하나하나씩 이 모든 로직을 처리해야 한다. 당연히 오래걸릴터이니 이에 대한 개선이 필요했다. 각각을 람다함수로 처리하고 이 람다함수를 관리하는 걸 SQS가 하면 어떨까 생각했고 이에 대해 코드브루 시간에 태용님께 여쭤봤는데 병렬처리에 관한 아이디어와 코드를 주셨다. 당연히 훨씬 구현이 쉬웠고 변경점도 적었다. 각 인스타그램 계정은 데이터가 서로 독립적이라 이렇게 처리되도 문제가 될게 없었다! 그래서 코드는

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def update_instagram_data_in_batches(
    db: Session,
    max_workers: int = 5,
):
    """
    병렬적으로 인스타그램 데이터를 업데이트하는 함수.
    """
    results = []
    instagram_accounts = get_all_selected_instagram_accounts(db=db)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []

        for instagram_account in instagram_accounts:
            # 병렬 작업 제출
            future = executor.submit(
                update_instagram_account_data,
                instagram_account_id=instagram_account.id,
            )
            futures.append(future)

        # 처리 결과 수집
        for future in as_completed(futures):
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                logger.error(f"Batch processing failed: {e}")
                results.append({"result": False, "detail": str(e)})

    return {
        "result": True,
        "detail": results,
    }

기존대로 처리할 때와 바꾼 후를 테스트 해보기 위해 계정 7개를 처리해봤다.

  1. 직렬 96.92384648323059초
  2. 병렬 22.3초

무려 5분의 1로 감소!

회고

참 뿌듯하다.