AWS 람다함수의 재요청 방지

2024년 7월 25일 목요일

Today I Learned

날짜

2024년 7월 25일 목요일

내용

람다의 재요청 방지

데이터 임포트를 요청하면 boto3로 AWS Lambda를 호출하도록 구현했다.

1
2
3
4
5
6
7
8
9
10
11
12
13
def call_lambda_function(function_name: str, payload: dict):
    """AWS Lambda 함수 호출
    InvokeType: RequestResponse(동기) | Event(비동기) | DryRun(인증)
    """
    try:
        response = lambda_client.invoke(
            FunctionName=function_name,
            InvocationType="RequestResponse",
            Payload=payload,  # 전송할 데이터
        )
        return response
    except Exception as e:
        logging.exception(str(e))

맨 처음에는 RequestResponse로 보냈었다. 그런데 디버깅 과정에서 요청이 실패할때마다 1분 간격으로 재시도가 발생해서, 데이터가 꼬이고 쓸데없는 로그가 많이 남아 힘들었다. 그래서 비동기 호출인 Event로 바꾸었다. 근데 비동기 함수라 로직이 원하는대로 실행되지 않았다.. 알아보니 RequestResponse 호출을 했을 때 요청이 실패하면 AWS에서 자동으로 3회 정도 재시도를 한다고 한다. Lambda 클라이언트를 설정할 때 config를 설정해서 이를 방지 할 수 있었다.

1
2
3
4
5
6
7
8
9
from botocore.config import Config

lambda_client = boto3.client(
    "lambda",
    config=Config(retries={"max_attempts": 0}), # 재시도 재한
    region_name=region,
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)

에러처리

꽤 길고 복잡한 로직이다보니 클라이언트의 요청에 즉각적으로 응답할 수 없었다. 그래서 우선 True를 반환하고 Lambda를 호출한다. 프론트에선 True를 반환받으면 10초에 한번씩 임포팅 상태를 추적한다. 만약 임포팅 과정에서 에러가 발생하거나 다른 이유로 중단하면 실패했음을 반환해주어야 하고, 이건 임포트 로그에 반영되어 저장되야 한다.

결국 곳곳에, 함수마다 try-except 문을 설치했다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def create_spreadsheet_file(
        self, spreadsheet_id: str, raw_data_format_type: RawDataFormat
    ):
        """스프레드시트 내 시트파일을 생성합니다.
        이미 동일한 제목의 시트파일이 존재할 경우 해당 시트파일 ID로 설정합니다."""
        try:
            already_exist = False
            creds = self.make_google_credentials()
            service = build("sheets", "v4", credentials=creds)
            title = self.create_spreadsheet_file_title(
                raw_data_format_type=raw_data_format_type
            )

            batch_update_spreadsheet_request_body = {
                "requests": [{"addSheet": {"properties": {"title": title}}}]
            }
            sheetfile_id = ""
        except Exception as e:
            print(f"error: {e}")

        try:
            res = (
                service.spreadsheets()
                .batchUpdate(
                    spreadsheetId=spreadsheet_id,
                    body=batch_update_spreadsheet_request_body,
                )
                .execute()
            )
            sheetfile_id = res["replies"][0]["addSheet"]["properties"]["sheetId"]
        except Exception as e:
            error_message = str(e)
            if "이미 있습니다" in error_message:
                sheetfile_id = self.get_sheetfile_id(spreadsheet_id, title)
                already_exist = True

        try:
            if raw_data_format_type == RawDataFormat.NSA_GTD.value:
                row_length = 11
            elif raw_data_format_type == RawDataFormat.NSA_CTD.value:
                row_length = 9
            elif raw_data_format_type == RawDataFormat.NSA_TD.value:
                row_length = 13
            elif raw_data_format_type == RawDataFormat.NSA_T.value:
                row_length = 11

            delete_row_request = {
                "deleteDimension": {
                    "range": {
                        "sheetId": sheetfile_id,
                        "dimension": "COLUMNS",
                        "startIndex": row_length,
                        "endIndex": 26,  # Z열까지 숨긴다고 가정 (0부터 시작하는 인덱스)
                    }
                }
            }

            merge_first_row_request = {
                "mergeCells": {
                    "range": {
                        "sheetId": sheetfile_id,
                        "startRowIndex": 0,
                        "endRowIndex": 1,
                        "startColumnIndex": 0,
                        "endColumnIndex": row_length,
                    },
                    "mergeType": "MERGE_ALL",
                }
            }

            insert_text_request = {
                "updateCells": {
                    "range": {
                        "sheetId": sheetfile_id,
                        "startRowIndex": 0,
                        "endRowIndex": 1,
                        "startColumnIndex": 0,
                        "endColumnIndex": row_length,
                    },
                    "rows": [
                        {
                            "values": [
                                {
                                    "userEnteredValue": {
                                        "stringValue": "본 raw시트의 명칭이나 열 위치를 수정하지마세요. 수정하게 되면 데이터가 갱신될 때 데이터 참조 위치가 바뀌어 보고서에 데이터오류가 발생합니다."
                                    }
                                }
                            ]
                        }
                    ],
                    "fields": "userEnteredValue",
                }
            }

            add_column_request = {
                "appendDimension": {
                    "sheetId": sheetfile_id,
                    "dimension": "ROWS",
                    "length": 200000,
                }
            }

            all_requests = []
            if not already_exist:
                all_requests.append(delete_row_request)

            all_requests.append(merge_first_row_request)
            all_requests.append(insert_text_request)
            all_requests.append(add_column_request)

            body = {
                "requests": all_requests,
            }
            service.spreadsheets().batchUpdate(
                spreadsheetId=spreadsheet_id, body=body
            ).execute()
        except HttpError as e:
            error_message = str(e)
            if "초과" in error_message:
                # 최대 셀 개수 초과시트 지우고 재생성 로직필요
                return {"error": "최대 셀 개수 초과"}

            else:
                return {"error": error_message}
        return str(sheetfile_id)

상당히 더러운데… 더 좋은 방법을 생각해봐야겠다.

회고

될듯 안될듯 될듯 안될듯 될듯 안될듯 안된다.