AWS CDK を使用してタスクを並列実行するステートマシンを構築する

AWS CDK を使用してタスクを並列実行するステートマシンを構築する

はじめに

AWS CDKを使用し、待機状態や並列タスクの実行、エラーハンドリングなどの機能を含むステートマシンを作成する方法を本記事で解説します。本記事では TypeScript を使用しています。

環境

本記事では以下のバージョンの AWS CDK を使用しています。

$ cdk version
2.80.0 (build bbdb16a)

ステートマシンの作成

ステートマシンの定義には以下の機能を含めます。

  • Waitステートを使用して実行時に入力された日時まで待機する

  • Parallelステートを使用して複数のタスクを並列実行する

  • 並列実行されるタスクのうち1つでも失敗すると他のタスクも停止しますが、これを防ぐためにエラーハンドリングを追加し、失敗した場合でもステートマシンが正常終了するようにします

  • しかし、並列タスクが失敗した場合にステートマシンが成功状態にならないように、エラーをキャッチした際に特定の文字列を出力し、それを基に後続のステートで分岐を行います

CDK のコード

以下に CDK のコードを示します。

import * as cdk from 'aws-cdk-lib';
import * as stepfunctions from 'aws-cdk-lib/aws-stepfunctions';
import * as stepfunctions_tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';

export class StepfunctionsDemoStack extends cdk.Stack {
 constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
 super(scope, id, props);

 const success = this.defineSuccessState('success');
 const failure = this.defineFailureState('Fail');
 const waitTask = this.defineWaitState('wait');

 const childStateMachine1 = this.defineChildStateMachine('childStateMachine1', success);
 const childStateMachine2 = this.defineChildStateMachine('childStateMachine2', failure);

 const task1 = this.defineChildTask('childTask1', childStateMachine1);
 const task2 = this.defineChildTask('childTask2', childStateMachine2);

 const catchErrorState1 = this.defineCatchErrorState('catchErrorState1', 'task1');
 const catchErrorState2 = this.defineCatchErrorState('catchErrorState2', 'task2');

 const parallel = this.defineParallelState('All jobs', [task1, catchErrorState1], [task2, catchErrorState2]);

 const getResult = this.defineGetResultState('getResults');
 const parentSuccess = this.defineSuccessState('parentSuccess');
 const parentFailure = this.defineFailureState('parentFailure');
 const checkResult = this.defineChoiceState('checkResult', parentFailure, parentSuccess);

 this.defineParentStateMachine('parentStateMachine', waitTask, parallel, getResult, checkResult);
 }

 private defineSuccessState(id: string) {
 return new stepfunctions.Succeed(this, id);
 }

 private defineFailureState(id: string) {
 return new stepfunctions.Fail(this, id, {
 error: 'WorkflowFailure',
 cause: "Something went wrong",
 });
 }

 private defineWaitState(id: string) {
 return new stepfunctions.Wait(this, id, {
 time: stepfunctions.WaitTime.timestampPath('$.waitSeconds'),
 });
 }

 private defineChildStateMachine(id: string, definition: stepfunctions.IChainable) {
 return new stepfunctions.StateMachine(this, id, {
 stateMachineName: id,
 definition: definition,
 });
 }

 private defineChildTask(id: string, stateMachine: stepfunctions.IStateMachine) {
 return new stepfunctions_tasks.StepFunctionsStartExecution(this, id, {
 stateMachine: stateMachine,
 integrationPattern: stepfunctions.IntegrationPattern.RUN_JOB,
 input: stepfunctions.TaskInput.fromObject({
 token: stepfunctions.JsonPath.taskToken,
 foo: 'bar',
 }),
 });
 }

 private defineCatchErrorState(id: string, error: string) {
 return new stepfunctions.Pass(this, id, {
 result: stepfunctions.Result.fromObject({ error: error }),
 });
 }

 private defineParallelState(id: string, task1: [stepfunctions.TaskStateBase, stepfunctions.IChainable], task2: [stepfunctions.TaskStateBase, stepfunctions.IChainable]) {
 return new stepfunctions.Parallel(this, id)
 .branch(task1[0].addCatch(task1[1], { errors: ['States.ALL'] }))
 .branch(task2[0].addCatch(task2[1], { errors: ['States.ALL'] }));
 }

 private defineGetResultState(id: string) {
 return new stepfunctions.Pass(this, id, {
 parameters: { 'result.$': 'States.JsonToString($)' }
 });
 }

 private defineChoiceState(id: string, failureState: stepfunctions.IChainable, successState: stepfunctions.IChainable) {
 return new stepfunctions.Choice(this, id)
 .when(stepfunctions.Condition.stringMatches('$.result', '*error*'), failureState)
 .otherwise(successState);
 }

 private defineParentStateMachine(id: string, waitState: stepfunctions.INextable, parallelState: stepfunctions.IChainable, getResultState: stepfunctions.IChainable, choiceState: stepfunctions.IChainable) {
 return new stepfunctions.StateMachine(this, id, {
 stateMachineName: id,
 definition: waitState.next(parallelState).next(getResultState).next(choiceState)
 });
 }
}

作成されたステートマシンの定義

CDK によって実際に作成されたステートマシンの定義はこちらです。

{
 "StartAt": "wait",
 "States": {
 "wait": {
 "Type": "Wait",
 "TimestampPath": "$.waitSeconds",
 "Next": "All jobs"
 },
 "All jobs": {
 "Type": "Parallel",
 "Next": "getResults",
 "Branches": [
 {
 "StartAt": "childTask1",
 "States": {
 "childTask1": {
 "End": true,
 "Catch": [
 {
 "ErrorEquals": [
 "States.ALL"
 ],
 "Next": "catchErrorState1"
 }
 ],
 "Type": "Task",
 "Resource": "arn:aws:states:::states:startExecution.sync:2",
 "Parameters": {
 "Input": {
 "token.$": "$$.Task.Token",
 "foo": "bar"
 },
 "StateMachineArn": "arn:aws:states:ap-northeast-1:012345678901:stateMachine:childStateMachine1"
 }
 },
 "catchErrorState1": {
 "Type": "Pass",
 "Result": {
 "error": "task1"
 },
 "End": true
 }
 }
 },
 {
 "StartAt": "childTask2",
 "States": {
 "childTask2": {
 "End": true,
 "Catch": [
 {
 "ErrorEquals": [
 "States.ALL"
 ],
 "Next": "catchErrorState2"
 }
 ],
 "Type": "Task",
 "Resource": "arn:aws:states:::states:startExecution.sync:2",
 "Parameters": {
 "Input": {
 "token.$": "$$.Task.Token",
 "foo": "bar"
 },
 "StateMachineArn": "arn:aws:states:ap-northeast-1:012345678901:stateMachine:childStateMachine2"
 }
 },
 "catchErrorState2": {
 "Type": "Pass",
 "Result": {
 "error": "task2"
 },
 "End": true
 }
 }
 }
 ]
 },
 "getResults": {
 "Type": "Pass",
 "Parameters": {
 "result.$": "States.JsonToString($)"
 },
 "Next": "checkResult"
 },
 "checkResult": {
 "Type": "Choice",
 "Choices": [
 {
 "Variable": "$.result",
 "StringMatches": "*error*",
 "Next": "parentFailure"
 }
 ],
 "Default": "parentSuccess"
 },
 "parentSuccess": {
 "Type": "Succeed"
 },
 "parentFailure": {
 "Type": "Fail",
 "Error": "WorkflowFailure",
 "Cause": "Something went wrong"
 }
 }
}

おわりに

本記事では、AWS CDKを使用して、複雑なステートマシンを簡単に作成する方法を説明しました。特定時刻までの待機、各タスクの並列実行、エラーハンドリング、タスクの出力に基づく条件分岐など、実際の業務で頻繁に必要とされる機能を網羅しています。 この記事がどなたかの参考になれば幸いです。

参考