フレクトのクラウドblog re:newal

http://blog.flect.co.jp/cloud/からさらに引っ越しています

AWS Step Function をユニットテストするには

エンジニアの佐藤です。こんにちは。今回は「AWS Step Function のユニットテスト」を構想し、実装までに直面した課題と私が考えた解決方法についてお話ししたいと思います。

AWS Step Functionとは

公式ページに書かれてますが、 Step Functions は様々なサービスをつなげて「サーバーレス・ワークフロー」を編成する仕掛けです。 AWS には古来から様々なワークフロー支援サービスがありますが、その中では最も新しいものです。

ことの始まり

筆者はある日、とある業務システムをどうやって実装したものかと考えていました。AWSでやってくれという話でしたので、AWSの各種サービスを比較検討し、最終的にStep Functionsを選択しました。重視したポイントは以下のようなものです。

  1. 拡張性が高い。
  2. 高可用性設計。
  3. ワークフローの進捗が視覚的に確認できる。
  4. ワークフロー定義が読みやすい。

今回の業務システムには短時間で完了する軽いタスクと、リソースも時間も必要な重いタスクの2種類があり、筆者は前者をLambda Functionに、後者をECS(Fargate)で実行することにしました。

Step Functionのユニットテスト・・・って?

ところで昨今のシステム開発は、テストコードを記述してユニットテストを行うのが一般的です。筆者はこのStep Functionsで作成されるワークフロー設定(ステートマシン)についてのユニットテストを構想しました。ところが、考えていくうちに、これが結構深いタスクであることに、徐々に気がついてきました。一般的な話ですが、システムを人間の体に例えると、手足に相当する機能のユニットテストについては、準備作業はそれほど難しくありません。一方で中枢的な機能のテストのための準備作業は大きくなりがちです。動作が確実な仮の末端部品を接続してあげる必要があるからです。「ワークフロー管理」機能は、この観点から言うとシステムの最も中枢的な部分であり、その単体テストのためには仮の末端部品の接続が必要ということになります。今回で言えば、Lambda FunctionとECSに仮のモジュールを入れてあげる必要がある。

Lambda FunctionとECSの仮のモジュール、と構想したところで、筆者は更に寒気を覚えました。よく考えてみるとこの2つ、機能コードを配置するまでの作業がそれなりに手間なのです。例えばLambda Functionの場合、以下のようなステップがあります。

  1. ライブラリを含めた機能コードのアーカイブ
  2. アーカイブのS3への配置
  3. IAMの設定
  4. Lambda Functionの設定

ECSの場合はこうなります。

  1. 機能コードを含むDockerコンテナのビルド
  2. ECRへコンテナを配置
  3. IAMの設定
  4. タスク定義の作成

この「Lambda Functionの設定」と「タスク定義の作成」には、環境変数などの設定も含まれます。テスト用の仮のモジュールなどと言いながら、一式全部必要なわけです。ステートマシンのテストをするために「システムの設計方針に従って各種の設定を発行・注入する仕掛け」の開発が必要になる・・・そう気がついた筆者は、とんでもないことを始めてしまったのではないかと、暗い気持ちになりました。中でも筆者が負担に感じたのがIAMです。Lambda FunctionもECSも機能コードの実行環境であり、実行の際に「インフラ的にできること(実行権限)」の範囲をIAMロールとして定義する必要があります。しかしこれがまた、慎重さと地道な努力を要する作業なのです。「とりあえずAdmin!」とやっつけたい気持ちはやまやまですが、これは悪魔の囁き。権限設定はミニマムからスタートしないと、権限のダイエットはとても大変なのです。

なお、Lambda FunctionとStep Functionsについては、AWSがローカルランタイムを配布していますが、今回は利用を見送りました。ローカル版のFargateはありませんし、ローカル環境固有の設定負担や互換性問題もあるでしょう。それなら最初から本物を使ったほうがいい、ユニットテスト用のAWSアカウントを短時間使うだけならそんなに経費もかからない、と判断しました。

各種設定の発行・注入する仕掛け・・・って?

で、その仕掛けをどうしたのかというと、この点は大変悩んだところでした。

筆者が最初に検討したのはAWS Cloud Formationでした。しかし調べていくと、これはこれで良くないところがあると思いました。まず引っかかったのが、テンプレートの入出力がいずれもリストとなっており、階層的にできない点です。また、最終的にJSONYAML形式のテンプレートにする必要があり、いくつか見て回った範囲ではこのテンプレートの調整負担が大きく見え、Colud Formationは見送りました。

次に検討したのがCDKでした。re:Invent18で発表されたもので、基本的にNodeJSやPythonなどのプログラム言語でCloud Formationのテンプレートを効率的に作成する仕掛けです。しかしこちらはStep Functionsの実装がまだプレビュー状態で、またステートマシンの定義方法が独特で良くないと思いました。また、ランダムな付加文字列の入ったリソース名となる点も、やり込むと散らかって良くないだろうなと思えました。

筆者が最終的に選択したのは、以下のような方式でした。

  1. まずプロジェクト名に連なる一連の名前体系を決め、これを環境変数で設定する。
  2. Python言語でdeploy.pyとundeploy.pyを記述し、ここからboto3 SDKを用いて直接リソースの作成と削除を行う。リソース定義は、これらのソースコードに記述する。
  3. ユニットテストはdeploy.py実行 -> テスト -> undeploy.py実行のフローで行う。

幸いなことに今回のシステムの開発言語はPythonでした。そしてAWSリソースの定義は、たいていJSONで書かれており、Python言語のdictionaryとほとんど同義(しかもコメントも挿入できる)。そして複数のリソース定義を体系的に作っていくにはプログラムフローが効率的で、デプロイ自体もその延長でSDKを呼び出してやってしまえば、小さくまとまらないかと目論んだわけです。

最初に「プロジェクト名に連なる一連の名前体系を決める」ですが、今回構想した「Step FunctionsステートマシンがLambda FunctionとECSコンテナを実行する」実装の場合、以下のようになりました。

export AWS_ACCOUNT_ID=`aws sts get-caller-identity --output json | jq -r -j ".Account"`
export AWS_DEFAULT_REGION=`aws configure get default.region | tr -d \\n`
export PROJECT_NAME=project01

export LAMBDA_FUNCTION_NAME="${PROJECT_NAME}_lambda"
export LAMBDA_FUNCTION_ARCHIVE_NAME="${FUNCTION_NAME}.zip"
export S3_LAMBDA_ARCHIVE_PATH="${PROJECT_NAME}/${FUNCTION_ARCHIVE_NAME}"
export BUCKET_NAME=s3_bucket_name12345s
export LAMBDA_POLICY_NAME="${PROJECT_NAME}LambdaPolicy"
export LAMBDA_ROLE_NAME="${PROJECT_NAME}LambdaRole"
export LAMBDA_FUNCTION_ARN="arn:aws:lambda:${AWS_DEFAULT_REGION}:${AWS_ACCOUNT_ID}:function:${LAMBDA_FUNCTION_NAME}:prd"

export ECR_REPOSITORY_NAME=$PROJECT_NAME
export ECS_CLUSTER_NAME=$PROJECT_NAME
export ECS_CONTAINER_TAG="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_DEFAULT_REGION}.amazonaws.com/${ECR_REPOSITORY_NAME}:latest"
export ECS_TASK_ROLE_NAME="${PROJECT_NAME}TaskRole"
export ECS_TASK_POLICY_NAME="${PROJECT_NAME}TaskPolicy"
export ECS_TASK_EXECUTION_ROLE_NAME="${PROJECT_NAME}TaskExecutionRole"
export ECS_TASK_EXECUTION_POLICY_NAME="${PROJECT_NAME}TaskExecutionPolicy"
export ECS_TASK_DEFINITION_NAME="${PROJECT_NAME}Task"
export ECS_SUBNETS="subnet-12345678"

export SFN_NAME=$PROJECT_NAME
export SFN_ROLE_NAME="${PROJECT_NAME}SfnRole"
export SFN_POLICY_NAME="${PROJECT_NAME}SfnPolicy"

もういきなりお腹いっぱいな感じがありますが、ここに列挙したどれ一つ不要なものはありません。機能コードをサーバーレスに埋めていくとは、こういう手間を伴うものなのです。

デプロイコードとユニットテスト

そしてここからLambda Function、ECSコンテナ、そしてStep Functionsステートマシンをデプロイするコードを書いていくわけですが、ステートマシンをデプロイする部分をご紹介すると、以下のようになりました。

def deploy():
    print('deploying...')

    # (中略)

    containerDef = {
        'CLUSTER_ARN': f"arn:aws:ecs:{AWS_DEFAULT_REGION}:{AWS_ACCOUNT_ID}:cluster/{ECS_CLUSTER_NAME}",
        'TASK_DEFINITION_NAME': ECS_TASK_DEFINITION_NAME,
        'CONTAINER_TAG': ECS_CONTAINER_TAG,
        'ENVIRONMENTS': [
            {
                "Name":"TASK_TOKEN",
                "Value.$":"$$.Task.Token"
                },
            {
                "Name":"DATA",
                "Value.$":'$'
            }
        ]
    } # end of containerDef

    sm = get_steps(
        lambdaFunctionArn=LAMBDA_FUNCTION_ARN,
        containerDefinition=containerDef
    )

    sfnClient = boto3.client('stepfunctions')
    print('creating state machine...')
    # print(json.dumps(sm, indent=4))
    res = sfnClient.create_state_machine(
        name=SFN_NAME,
        definition=json.dumps(sm, indent=4),
        roleArn=f'arn:aws:iam::{AWS_ACCOUNT_ID}:role/{SFN_ROLE_NAME}',
        tags=[TAG_sl]
    )
# end of deploy

Step Functionsのステートマシン自体は、JSON文書として記述されます。ここへステートマシンから実行するLambda FunctionやECSのタスク情報を含めていくわけですが、これをステートマシン定義取得関数get_stepsの引数で指定するようにしておきました。 こうすれば、ユニットテストからこのデプロイコードが呼び出されたときに、本番とは違う仮のLambda FunctionやECSタスク情報を設定することができるだろうとの目論見です。

get_steps自体は以下のような実装になりました。

def get_steps(
    lambdaFunctionArn=None,
    containerDefinition=None
):
    SFN_NAME = os.environ['SFN_NAME']

    sm = {
        "Comment": SFN_NAME,
        "StartAt": "STEP1",
        "TimeoutSeconds": 300,
        "States": {
            "STEP1": {
                "Type": "Task",
                "Resource": lambdaFunctionArn,
                "TimeoutSeconds": 10,
                "InputPath": "$.input",
                "ResultPath": "$.res1",
                "OutputPath": "$",
                "Next": "STEP2",
                "Catch": [ {
                    "ErrorEquals": [ "States.ALL" ],
                    "Next": "FAILED"
                }]
            }, # end of STEP1
            "STEP2": {
                "Type": "Task",
                "Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken",
                "TimeoutSeconds": 300,
                "Parameters": __create_ecs_parameters(containerDefinition),
                "InputPath": "$.res1",
                "ResultPath": "$.res2",
                "OutputPath": "$",
                "Next": "SUCCEEDED",
                "Catch": [ {
                    "ErrorEquals": [ "States.ALL" ],
                    "Next": "FAILED"
                }]
            }, # end of STEP2
            "SUCCEEDED": {
                "Type": "Succeed"
            },
            "FAILED": {
                "Type": "Fail"
            }
        } # end of States
    } # end of sm

    return sm
# end of get_steps

これはステートマシンのユニットテストの方法論の開発のためのテストのために仮組みしたものですが、これが下の図のようなステートマシンになります。

f:id:masashi-sato-flect:20191209141151p:plain
ステートマシン

予想外に苦労したのが __create_ecs_parameters の部分です。ステートマシンからECSを呼び出すにはタスク定義のARNを指定する必要がありますが、このタスク定義は更新の度に「project01Task:77」のように末尾の序数が加算されていき、この数字を指定する手段はありません。つまり目的のコンテナが指定されているタスク定義を(たいていは最新のタスク定義でしょうが)掘り出すしかないようなのです。これを掘り出してステートマシンのコンテナ定義を作成するのが、下記のような内容の __create_ecs_parameters 関数になります。

def __create_ecs_parameters(container_definitions):

    ECS_SUBNETS          = os.environ['ECS_SUBNETS'].split(',')
    TASK_DEFINITION_NAME = container_definitions['TASK_DEFINITION_NAME']
    CONTAINER_TAG        = container_definitions['CONTAINER_TAG']
    
    ecsClient = boto3.client('ecs')

    lTd = []
    nextToken = None
    while True:
        params = {
            'familyPrefix': TASK_DEFINITION_NAME,
            'status':       'ACTIVE',
            'sort':         'DESC'
        }
        if nextToken != None:
            params['nextToken'] = nextToken
        res = ecsClient.list_task_definitions(**params)

        if 'nextToken' in res:
            nextToken = res['nextToken']
        lTd.extend(res['taskDefinitionArns'])
        if nextToken is None:
            break
    # end of while (true)

    tdArn = ''
    for td in lTd:
        tdDesc = ecsClient.describe_task_definition(
            taskDefinition=td
        )
        image = tdDesc['taskDefinition']['containerDefinitions'][0]['image']
        # print(image)
        if image == CONTAINER_TAG:
            tdArn = td
            break
    # end of for (td)
    if tdArn == '':
        raise Exception(f'no task definition {TASK_DEFINITION_NAME} found for container {CONTAINER_TAG}')

    ret = {
        'LaunchType': 'FARGATE',
        'Cluster': container_definitions['CLUSTER_ARN'],
        'TaskDefinition': tdArn,
        'Overrides': {
            'ContainerOverrides': [{
                'Name': TASK_DEFINITION_NAME,
                'Environment': container_definitions['ENVIRONMENTS']
            }] # end of ContainerOverrides
        }, # end of Overrides
        'NetworkConfiguration': {
            'AwsvpcConfiguration': {
                'Subnets': ECS_SUBNETS,
                'AssignPublicIp': 'ENABLED'
            }
        } # end of NetrowkConfiguration
    }
    return ret
# end of __create_ecs_parameters

このような工夫を経て、ようやく当初の目的の「ステートマシンのユニットテスト」に到達しました。テストコード全体は以下のようになりました。

class TestSfn(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        deploy(test=True)
    # end of setUpClass

    @classmethod
    def tearDownClass(cls):
        undeploy()
    # end of tearDownClass   
    
    def test_sfn(self):
        
        AWS_ACCOUNT_ID     = os.environ['AWS_ACCOUNT_ID']
        AWS_DEFAULT_REGION = os.environ['AWS_DEFAULT_REGION']
        SFN_NAME           = os.environ['SFN_NAME']
        
        sfnClient = boto3.client('stepfunctions')
        
        execName = SFN_NAME + '_' + str(datetime.now().timestamp())
        input = {
            'input': {"Number1":10,"Number2":5}
        }
        
        res = sfnClient.start_execution(
            stateMachineArn=f'arn:aws:states:{AWS_DEFAULT_REGION}:{AWS_ACCOUNT_ID}:stateMachine:{SFN_NAME}',
            name=execName,
            input=json.dumps(input)
        )
        execArn = res['executionArn']
        res = {}
        while True:
            res = sfnClient.describe_execution(
                executionArn=execArn
            )
            status = res['status']
            if status != 'RUNNING':
                print(res)
                break
            time.sleep(5)
        # end of while True
        output = json.loads(res['output'])
        self.assertEqual(output['res2']['Difference'], 5)
    # end of test_sfn
# end of class TestClass01

このテストコードを実行することで、当初の目論見であったステートマシンのユニットテストが実行できるようになりました。テスト用のLambda Functionやコンテナと組み合わせれば、複雑な状態管理の作り込みと繰り返しの動作確認が可能になったと思います。

ふり返って、どうか

正直なところ、なかなか複雑な気持ちです。ステート管理・高可用性・柔軟かつ効率的なリソースアロケーションを目標として計画した目論見でしたが、結局CI/CDみたいなことをやることになってしまいました。Step Functionを本格的に業務活用しようとすると、自動デプロイの仕掛けを何かしら用意する必要があると感じました。設計目標はクリアできそうですが、けっこう大変でした。

特に設定が煩雑になったのはECSを呼び出す部分です。この点については以下のような支援機能がStep Functionに用意されていると、より開発作業が効率的になるのではないかと思いました。

  • ステートマシンとECSの入出力の高機能化。ステートマシンの入力値をJSONのまま受け渡しできるAPIの追加など。(現状では環境変数文字列にする必要がある。)
  • 個別のステートマシンからはECRのコンテナのみを指定して実行できるように簡略化。ステートマシン全体に既定のコンテナクラスタやタスク定義を設定する機能など。

最後までお読みいただきありがとうございました。