AWS Step Functions から別のサービスを直接統合するときに「最適化された統合 (Optimized integrations)」と「AWS SDK 統合 (AWS SDK integrations)」という選択肢がある.例えば AWS Step Functions から Amazon SageMaker Processing を実行する場合,AWS Step Functions 側で実行完了を待つ必要があることが多く,最適化された統合であれば .sync
がサポートされているため,AWS Step Functions の Resource
に arn:aws:states:::sagemaker:createProcessingJob.sync
と指定すれば簡単に解決できる👏
AWS CDK を使うと
実は AWS CDK では一部の最適化された統合はサポートされてなく,例えば Amazon SageMaker だと CreateHyperParameterTuningJob
/ CreateLabelingJob
/ CreateProcessingJob
は現状 aws_stepfunctions_tasks
に実装されていなかった.
Amazon SageMaker Processing (CreateProcessingJob
) に関しては issue も出ていた💡
よって,現状では AWS CDK の aws_stepfunctions_tasks.CallAwsService
で AWS SDK 統合を使う必要があるけど,AWS SDK 統合だと別のサービスを呼び出したら終了になってしまうという課題も残る.結果的に .waitForTaskToken
を活用したり,AWS Lambda 関数を独自実装して Amazon SageMaker Processing の DescribeProcessingJob
API で ProcessingJobStatus
をチェックしたりという工夫が必要になってしまう💨
aws_stepfunctions.CustomState
を使う
少し前置きが長くなったけど,AWS CDK の aws_stepfunctions_tasks
でサポートされてなく .sync
を実現したい場合に aws_stepfunctions.CustomState
が使える❗️CustomState
を使えば Amazon States Language (ASL) のまま AWS CDK の実装に組み込める.今回は Amazon SageMaker Processing を例に検証したことをまとめておく.基本的に他のアクションでも同じように実現できるはず〜 \( 'ω')/
1. Before: aws_stepfunctions_tasks.CallAwsService
まずは aws_stepfunctions_tasks.CallAwsService
を使って AWS Step Functions と Amazon SageMaker Processing の「AWS SDK 統合」を実装するサンプルを紹介する.AWS CDK で作るリソースは Amazon SageMaker Processing で動かすコンテナイメージを管理する Amazon ECR 関連と AWS Step Functions 関連で,あとは細かく IAM Role なども必要になってくる.ちなみに Amazon SageMaker Processing ではシンプルに hello-world イメージを動かすため,ProcessingInputs
や ProcessingOutputConfig
などの設定は省略している😃
ポイントは aws_stepfunctions_tasks.CallAwsService
で service: 'sagemaker'
と action: 'createProcessingJob'
を設定しているところ.とにかく Amazon SageMaker Processing を実行するだけなら簡単.ちなみに ProcessingJobName
は重複できない仕様になっているため,AWS Step Functions の組み込み関数 States.Format
と States.UUID
を組み合わせて動的に生成するようにした👌組み込み関数便利〜
import { Stack, StackProps, aws_ecr, aws_iam, aws_stepfunctions, aws_stepfunctions_tasks, } from 'aws-cdk-lib' import * as ecrdeploy from 'cdk-ecr-deployment' import { Construct } from 'constructs' export class SandboxCdkSageMakerProcessingStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props) const repository = new aws_ecr.Repository(this, 'HelloWorldRepository', { repositoryName: 'hello-world', }) new ecrdeploy.ECRDeployment(this, 'HelloWorldRepositoryDeployment', { src: new ecrdeploy.DockerImageName('hello-world'), dest: new ecrdeploy.DockerImageName(repository.repositoryUriForTag('latest')), }) const sageMakerRole = new aws_iam.Role(this, 'SageMakerRole', { roleName: 'sandbox-sagemaker-role', assumedBy: new aws_iam.ServicePrincipal('sagemaker.amazonaws.com') }) repository.grantPull(sageMakerRole) const helloWorldProcessingJob = new aws_stepfunctions_tasks.CallAwsService(this, 'HelloWorldProcessingJob', { service: 'sagemaker', action: 'createProcessingJob', parameters: { 'ProcessingJobName.$': `States.Format('hello-world-{}', States.UUID())`, 'RoleArn': sageMakerRole.roleArn, 'ProcessingResources': { 'ClusterConfig': { 'InstanceCount': 1, 'InstanceType': 'ml.t3.medium', 'VolumeSizeInGB': 1, } }, 'AppSpecification': { 'ImageUri': '000000000000.dkr.ecr.ap-northeast-1.amazonaws.com/hello-world' }, 'StoppingCondition': { 'MaxRuntimeInSeconds': 600 } }, iamResources: ['*'], additionalIamStatements: [ new aws_iam.PolicyStatement({ actions: ['iam:PassRole'], resources: ['*'], }) ] }) new aws_stepfunctions.StateMachine(this, 'SandboxStateMachine', { stateMachineName: 'sandbox', definitionBody: aws_stepfunctions.DefinitionBody.fromChainable(helloWorldProcessingJob), }) } }
AWS CDK をデプロイして AWS Step Functions を実行すると,AWS Step Functions はすぐに終了して Amazon SageMaker Processing は裏で動いていた👀
2. After: aws_stepfunctions.CustomState
今度は aws_stepfunctions.CustomState
を使って AWS Step Functions と Amazon SageMaker Processing の「最適化された統合」を実装するサンプルを紹介する.
実装は大きく変化せず,大きく2つのポイントがある.まず1つ目は aws_stepfunctions.CustomState
で Type: 'Task'
と Resource: 'arn:aws:states:::sagemaker:createProcessingJob.sync'
を設定しているところ.Amazon States Language (ASL) として表現できるため,Amazon SageMaker Processing を .sync
で実行できる.ちなみに AWS SDK 統合だと arn:aws:states:::aws-sdk:sagemaker:createProcessingJob
という ARN になる💡
2つ目は AWS Step Functions に設定した IAM Role に別途ポリシーを追加する必要があるところ.aws_stepfunctions_tasks.CallAwsService
だと iamResources
/ iamAction
/ additionalIamStatements
あたりを設定すれば自動的にポリシーが追加される仕組みになっている.ちなみに今回の例はポリシーを少し雑に設定しているため,最小権限の原則に沿って狭めてもらえると良いかと🙏
import { Stack, StackProps, aws_ecr, aws_iam, aws_stepfunctions, } from 'aws-cdk-lib' import * as ecrdeploy from 'cdk-ecr-deployment' import { Construct } from 'constructs' export class SandboxCdkSageMakerProcessingStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props) const repository = new aws_ecr.Repository(this, 'HelloWorldRepository', { repositoryName: 'hello-world', }) new ecrdeploy.ECRDeployment(this, 'HelloWorldRepositoryDeployment', { src: new ecrdeploy.DockerImageName('hello-world'), dest: new ecrdeploy.DockerImageName(repository.repositoryUriForTag('latest')), }) const sageMakerRole = new aws_iam.Role(this, 'SageMakerRole', { roleName: 'sandbox-sagemaker-role', assumedBy: new aws_iam.ServicePrincipal('sagemaker.amazonaws.com') }) repository.grantPull(sageMakerRole) const helloWorldProcessingJob = new aws_stepfunctions.CustomState(this, 'HelloWorldProcessingJobCustom', { stateJson: { Type: 'Task', Resource: 'arn:aws:states:::sagemaker:createProcessingJob.sync', Parameters: { 'ProcessingJobName.$': `States.Format('hello-world-{}', States.UUID())`, 'RoleArn': sageMakerRole.roleArn, 'ProcessingResources': { 'ClusterConfig': { 'InstanceCount': 1, 'InstanceType': 'ml.t3.medium', 'VolumeSizeInGB': 1, } }, 'AppSpecification': { 'ImageUri': '000000000000.dkr.ecr.ap-northeast-1.amazonaws.com/hello-world' }, 'StoppingCondition': { 'MaxRuntimeInSeconds': 600 } }, } }) new aws_stepfunctions.StateMachine(this, 'SandboxStateMachine', { stateMachineName: 'sandbox', definitionBody: aws_stepfunctions.DefinitionBody.fromChainable(helloWorldProcessingJob), }).addToRolePolicy( new aws_iam.PolicyStatement( { actions: [ 'events:DescribeRule', 'events:PutRule', 'events:PutTargets', 'iam:PassRole', 'sagemaker:AddTags', 'sagemaker:CreateProcessingJob', ], resources: [ '*' ], } )) } }
AWS CDK をデプロイして AWS Step Functions を実行すると,今度は Amazon SageMaker Processing の実行完了まで待てるようになった❗️やったー👏
Amazon SageMaker Processing Job を2回実行したログも載せておく📝