Today I Learned
날짜
2025년 1월 21일 화요일
내용
Step Functions에서 Choice로 분기처리하기
웹훅 개선 작업을 진행중이다. 지난주에 말했듯 Step Functions로 계획중인데 오늘 헤매던 곳을 뚫어냈다. 기존과 마찬가지로 웹훅이 서버로 전송되면 SQS 큐에 메시지를 담는다. 매핑된 람다는 메시지가 하나 도착하면 꺼내서 Step Functions에서 정의한 상태 머신을 호출해서 로직을 진행한다. 이 상태머신은 이 메시지의 종류(주문, 혜택, 고객 등)에 따라 알맞은 람다를 호출해야 한다. 그림으로 보자면
이런 형태다. 저 노드 하나하나가 state라는 개념이다. 저기서 저 DetermineTopic이 Choice State인데, 정의한 조건에 따라 알맞은 람다를 호출해줘야 한다. 그 상단에서 최초로 호출되는 웹훅 데이터 디코딩 람다부터 살펴보자
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import json
import base64
def lambda_handler(event, context):
try:
first_record = event[0]
body_json_string = first_record.get('body')
if not body_json_string:
raise ValueError("No 'body' field found in the first record.")
# 'body' 문자열을 JSON으로 파싱
body_dict = json.loads(body_json_string)
# 'input' 필드에서 Base64 인코딩된 데이터 가져오기
encoded_input = body_dict.get('input')
if not encoded_input:
raise ValueError("No 'input' field found in the body.")
# Base64 디코딩
decoded_bytes = base64.b64decode(encoded_input)
decoded_str = decoded_bytes.decode('utf-8')
# 디코딩된 문자열을 JSON으로 파싱
data_dict = json.loads(decoded_str)
return data_dict
except Exception as e:
print(f"Error in DecodeInputFunction: {e}")
raise e
상태머신이 호출될 때 쿼리 파라미터로 문자열로 인코딩된 json이 같이 들어오는데 이걸 다시 dict로 바꿔준다. 이 과정이 없으려면 상태머신을 호출할때 path parameter로 별도로 필요한 값을 제공하던가 해야하는데, 이 과정이 굳이 필요한가 싶었다. 일반적인 Step Functions는 상태 전이에 따라 값이 부과되기 때문에 이럴 경우 돈낭비겠지만 express workflows에선 오로지 호출 시간에 따라 가격이 책정되어서 다를바가 없다. 이 과정이 추가된다고 1초 이상 더 지연될리도 만무하고…
마지막에 보면 알겠지만 데이터를 반환하는데 기억해두자.
이제 이 Choice state 정의를 코드로 살펴보면
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
"DetermineTopic": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.webhook_type",
"StringEquals": "Order",
"Comment": "주문 웹훅",
"Next": "주문 처리 람다"
},
{
"Variable": "$.webhook_type",
"StringEquals": "Product",
"Comment": "상품 웹훅",
"Next": "상품 처리 람다"
},
{
"Variable": "$.webhook_type",
"StringEquals": "ECommercePlatformBenefit",
"Comment": "혜택 웹훅",
"Next": "혜택 처리 람다"
},
{
"Variable": "$.webhook_type",
"StringEquals": "Category",
"Comment": "카테고리 웹훅",
"Next": "\b카테고리 처리 람다"
},
{
"Variable": "$.webhook_type",
"StringEquals": "Customer",
"Next": "고객 처리 람다"
}
],
"Default": "DLQ에 저장"
},
이렇게 되어있다. “Variable” 값이 “StringEquals” 랑 같으면 “Next”로 이동시킬거다! 라는 의미다. 저 $ 는 이전 state에서 넘어온 값을 의미한다. 위에 보이는 정의는 전체 정의에서 Choice state 에 해당하는 부분만 가져온건데 조금 위로 가서 웹훅 데이터 디코딩 state 에 관한 정의를 보면
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
"웹훅 데이터 디코딩": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-northeast-2:341465539168:function:WebhookUpgrade_EventNameDecode:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"Next": "DetermineTopic",
"Comment": "데이터 디코딩으로 웹훅 종류(event_name) 파악"
},
이런식으로 되어있다. 위에서 4번째 줄에 "OutputPath": "$.Payload",
가 중요한데, 그 다음 상태로 전이될때 어떤 값을 주냐를 결정한다는 의미다. 만약 "OutputPath": "$",
로 되어있으면 위 람다 함수에서 반환한 데이터에다가 이전 호출된 state의 실행 정보도 같이 반환된다. 그러면 저 Choice State가 참조해야할 값을 파싱해서 지정해줘야 하기 때문에 여러모로 골치아프다. 이 설정때문에 choice state가 참조할 것을 찾지 못해 하루종일 고생했다.
이렇게 잘 정의해서 실행해보면
잘 실행된다!
회고
새로운 걸 해보는 것만큼 재밌는게 없다.