Amazon Managed Workflows for Apache Airflow (MWAA) でのAiflow v2系への移行記録

こんにちは。技術部の池田です。

この記事では、Amazon Managed Workflows for Apache Airflow (MWAA) のAirflow v2系への移行を行いましたので、その時の話をしたいとお思います。 内容としては、主に以下となります。

  • MWAA では v1.10.12の環境を直接 v2.0.2に移行できないぞ!
  • ローカルでDAGのテストをしていると互換性チェックは楽だぞ!
  • 新しいv2.0.2の環境を作るときには、最初はrequirements.txtやDAGを空っぽにしておくと良いぞ!
  • ConnectionsとVariablesの新環境への移植は頑張るんだぞ!!!

背景

MWAAでもAirflow v2系のサポートが始まりました。
先日、Airflowの勉強会に参加して初めて知ったのですが、Airflow v1系は2021/06月末でEOLです。
早く移行をしなくては!と思ってる皆様もいると思いますので、今回MWAA環境におけるAirflow v2系への移行の記録を記事にしたいと思います。 GCPのCloud Composerの方の移行に関しては、勉強会で手厚く紹介されていましたので、このリンクも合わせて載せておきます。

aws.amazon.com finatext.connpass.com

Airflow v2系移行 on MWAA

早速ですが、こちらを御覧ください。

f:id:ikeda-masashi:20210625153231p:plain

こちらは、既存のAirflow v1.10.12のMWAA環境をプルダウンメニューからv2.0.2に変更して保存すると出てくるエラーメッセージです。
MWAAの場合、既存環境を直接v2.0.2に移行することができません。ですので、もう一つv2.0.2の環境を作成して移植するという作業が必要になります。

DAGの互換性

さて、実際の移植にあたり、はじめにDAGの互換性を確認します。
今回の場合、予めローカルでDAGのテストをしていたので、ローカルテストの環境をv2.0.2にバージョンアップし、テストが通るまでDAGを修正していきました。
(先日の勉強会で知ったのですが、v1.10.15のバージョンにはAirflow v2系への移行チェックスクリプトがあるので、ローカル環境がある場合は一旦v1.10.15を経由するのも良いかもしれません)

ローカルテスト自体は、英語の記事ですが、Testing in Airflow Part 1 — DAG Validation Tests, DAG Definition Tests and Unit Tests という記事がとても参考になります。

DAGの互換性を確認するだけのテストでしたら、雑ですが以下のようになると思います。

docker-compose.yml

version: "3.6"

volumes:
  pypkg:

services:
  airflow:
    image: apache/airflow:2.0.2-python3.7
    volumes:
      -  ./:/opt/airflow/
      - pypkg:/home/airflow/.local/

tests/test_dag_integrity.py

import unittest
from unittest import mock
from airflow.models import DagBag


class TestDagIntegrity(unittest.TestCase):

    LOAD_SECOND_THRESHOLD = 2

    def setUp(self):
        self.patcher = mock.patch('airflow.models.variable.Variable.get')
        self.mock_variable = self.patcher.start()

        def variable_side_effect_func(*args):
            key = args[0]
            if key == "slack_token":
                return "dummy"
            # 使っているVariableのダミーをMockで返す
            if len(args) == 2:
                return args[1]
            return None

        self.mock_variable.side_effect = variable_side_effect_func
        self.dagbag = DagBag()

    def tearDown(self):
        self.patcher.stop()

    def test_import_dags(self):
        self.assertFalse(
            len(self.dagbag.import_errors),
            'DAG import failures. Errors: {}'.format(
                self.dagbag.import_errors
            )
        )


if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestDagIntegrity)
    unittest.TextTestRunner(verbosity=2).run(suite)
$ tree -L 1
.
├── airflow.cfg
├── dags
├── docker-compose.yml
├── requirements.txt
├── script
├── tests
└── unittests.cfg              

$ docker-compose run --rm airflow bash -c "airflow db init && pip install --user -r ./requirements.txt"
$ docker-compose run --rm airflow bash -c 'python -m unittest discover -v tests "test_*.py"'

dagsの中にはMWAAが参照しているS3へ転送するDAG定義の.pyファイルがたくさん入っています。
このように、テストをするとv1.10.12環境で動いているDAGの場合、たくさんの警告やエラーが発生すると思います。

f:id:ikeda-masashi:20210625153235p:plain

この警告やエラーが無くなるまでDAGを修正していくのが第1段階です。

新しいv2.0.2の環境を作成し、DAGを移植していく

ローカルのテストでDAGの互換性を修正した後は、既存のv1.10.12環境とは別のv2.0.2環境を実際に作成します。 その際、DAGをアップロードするS3バケットとキープレフィックスの組み合わせは、既存のものとは別にしておくことをおすすめします。 (同時に並行して動かせるので)

さて、ここでv1.10.12環境で使っているrequirements.txtを、そのままv2.0.2環境で使用しようと考えました。

requirements.txt

# https://docs.aws.amazon.com/mwaa/latest/userguide/troubleshooting.html#broken-cython
cython == 0.29.22
pyarrow == 2.0.0
awswrangler == 2.4.0

#以下略

ところが、作成されたv2.0.2環境ではこうなります。

f:id:ikeda-masashi:20210625153224p:plain

既存環境で、aws関係やcython等のパッケージバージョンを諸事情で固定していると、v2.0.2の環境ではWebServerが起動できなくなったりします。
ですので、まず環境を立ち上げるためにrequirements.txtは指定せず、あるいは空の状態で立ち上げてみることをおすすめします。

f:id:ikeda-masashi:20210625153228p:plain

このように、新旧2環境立ち上がりました。稼働費も2倍かかってしまいますので、急いで移植していきます。 ここで、ローカルテストで互換性を確認した修正積みDAGとrequiements.txtをアップロードしていきます(Pluginもある場合はそちらも)。 そして、問題なくDAGが読み込めたら、VariablesとConnectionsを移植していきます。

この辺は面倒ですが、愚直にやっていくしかありません。 弊社の場合、RedshiftとGoogle Cloud PlatformのAPIしかAirflowから触れていませんでしたので、幸いにしても個数が少なく手で一つ一つ移植するのは不可能ではありませんでした。 (この背景には、サービスで使用しているAurora MySQLは、Redshift Federated Queryを使用していました。そのため、Airflowはほぼすべてのデータソースに対してRedshiftを経由してアクセスしています。)

MWAAのv2.0.2の環境ではProvider packageをrequiements.txt に記述してもConnections のUIに反映されないという現象があります。(AWSのサポートには報告済みです。) この辺の問題は、Provider packageのConnectionの実装をみてHTTPのConnectionとして、いい感じにextraを埋めながら登録しました。

Variablesの数やConnectionsの数が多くなってくると大変だと思います。 今後は、予めAmazon Secrets ManagerにVariablesとConnectionsのBackendを切り替えておくと、次のメジャーアップデートのときに楽になるかもしれません。
Secrets Manager backendを使用すると、MWAAが動いているAWSアカウントのSecrets ManagerにVariablesとConnectionsの実態があるので、環境間での共有が簡単になると思います。

docs.aws.amazon.com

後は1つずつDAGを実行していくだけ。

ここまでくれば、あとはDAGを1つずつ有効化して、実行していくだけです。

  1. 旧環境の特定のDAGをOFFにする。
  2. 新環境の同じDAGをONにする。
  3. エラーが出ないことを祈る。 (エラーが出た場合は、トラブルシューティングをする)

これの繰り返しです。 幸いにして、弊社の場合はDAGが一度失敗してもリトライしやすいように冪等に作っていたため、この作業で苦労することはありませんでした。

旧環境にお別れをして、移行作業はおしまいです。

f:id:ikeda-masashi:20210625153105p:plain
goodbye

おわりに

今回の移行をまとめると

  • MWAA では v1.10.12の環境を直接 v2.0.2に変更できないぞ!
  • ローカルでDAGのテストをしていると互換性チェックは楽だぞ!
  • 新しいv2.0.2の環境を作るときには、最初はrequirements.txtやDAGを空っぽにしておくと良いぞ!
  • ConnectionsとVariablesの新環境への移植は頑張るんだぞ!!!

となります。

個人的にはMWAAでは、どうやらテストや構築自動化が大事そうだと感じています。
弊社では、MWAAが出る前にECS Fargateを用いてアンマネージドなAirflow立てた経験から、pytestを用いたCI環境をある程度簡単にですが整備しておりました。 そのおかげで、すんなりと移行できたように感じています。

MWAAの環境は停止することができない(開始・停止ではなく作成・削除のみである)ことから、MWAAの思想としてすでにある環境を秘伝のAirflowにするのではなく、環境を簡単に作っては壊す感じの運用を想定しているように思います。 実行ログ自体もAirflowではなくCloudwatch Logsの管轄であることからも、なんとなくそのような感じがしています。 ですので、環境の作成をある程度自動化する準備をしておくのが良いと思います。

幸いにしてAirflowはCLIが整っていますし、環境の作成自体もTerraformやCloudFormationで簡単に作れます。ほとんどの作成作業はスクリプト化できると思いますので、今回のv2系への移行に合わせて整えるとよいと思います。

MWAAのWorkerのオートスケールが便利ですので、ぜひMWAAもお使いいただければと思います!

カヤックではデータパイプラインの整備に興味のあるエンジニアも募集しています

中途採用も募集しています

GitHub Actionsに「強い」AWSの権限を渡したい ~作戦3 - AssumeRole with Google ID Token ~

こんにちは。技術部の池田です。

この記事では、Github Actions上に「強い」AWSの権限を渡すために以下のことを行います。

  • App Runnerでお手軽にGoogle ID Token 取得するためのWeb Applicationを動かす。
  • Web Applicationから取得できるGoogle ID Tokenを信頼するIAM RoleにAssumeRoleする。
  • AssumeRoleによって得られた一時的な強い権限で、強い権限を要求する作業(Deploy, Terraform Apply)をGithub Actionsで行う。

これにより、Github Actions上にAWSのアクセスキーを置かずに、ある程度安全な方法でAWS上での強い権限を要求する操作を実行できます。 そのため、例えばGithub Repositoryに不正アクセスされてしまったとしても、AWSの本番環境まで被害の及ぶ可能性が低くなります。

背景

5/14(金)に3社合同のSRE勉強会があり、弊社の藤原が『GitHub Actionsに「強い」AWSの権限を渡したい』という内容で発表いたしました。

speakerdeck.com

この発表の中では、以下の2つの作戦を取り上げていました。

  • 作戦1 - AssumeRole with MFA
  • 作戦2 - CludShell exports credentials

皆様も『GitHub Actionsに「強い」AWSの権限を渡したい』ようでして、様々な反応があったようです。
後日、藤原氏は新たな作戦を某所から仕入れてきたようです。
その新たな作戦について、概念実証を行いましたので記事にしたいと思います。

作戦3 - AssumeRole with Google ID Token

AWSのAssumeRoleにはいくつか種類があります。

  • AssumeRole
  • AssumeRoleWithSAML
  • AssumeRoleWithWebIdentity

よく使うのは、一番最初の普通のAssumeRoleだと思います。
しかしながら、 SAML 2.0-based Federation によるAssumeRole=AssumeRoleWithSAML OIDCのID Token等のWebIdentityによるAssumeRole=AssumeRoleWithWebIdentity も実は存在します。

以前の記事にて、Google Colabからお手軽にAmazon Athenaにアクセスを行いたくなり 、AssumeRoleWithWebIdentiyを用いました。

techblog.kayac.com

このように、Google ID TokenによるAssumeRoleし、一時的なAWS上での権限を取得できます。
Google ID Token は有効期限が1時間となっているため、有効期限が過ぎた後でもう一度権限を取得しようとしても失敗します。 前の 作戦1 - AssumeRole with MFAのコンセプトは、MFAトークンが約1分の有効期限であり、有効期限の短いものを使って権限を取得するというものでした。
作戦1も作戦3もコンセプトは同様であり、Google ID TokenもMFAトークンも、アクセスキーよりは外部に流出してしまったときの危険度は低いです。

さらに、作戦1 では、MFAデバイスを複数人で共有する必要があるというデメリットが発生していました。
例えば、開発チームのメンバー変更が起きたとき、場合によってはMFAデバイスを新しく設定し、新たなチームメンバーに共有するという作業が必要になります。

今回の作戦では各個人のGoogle ID Tokenを使用することができます。 権限を取得できるチームメンバーを変更したい場合は、IAM Roleの信頼関係を編集すれば良いだけとなります。 そのため、MFAトークンよりも有効期限が長い代わりに保守性が向上するというメリットがあります。

どうやってGoogle ID Tokenを手に入れよう?

今回はGoogle Colabなどのお手軽にGoogle ID Tokenを取得できる環境がありません。
(毎回 Colabを立ち上げてコードを書いてもらうわけにもいきません。)

何かお手軽に認証とかできる環境建てられないかなぁ〜〜〜 と考えていたところに
『App Runner』というサービスです! なんと、コンテナイメージをECRにPushするだけでWeb Applicationを動かせます!

aws.amazon.com

App Runnerを動かすインスタンスロールにSystems Manager パラメーターストア へのアクセスを与えておけば、 OAuth Client ID/Secret をコンテナに含めなくて良さそうです。
また、パラメーターストアに関しては弊社の長田が便利なOSS ssmwrapを作っておりますので取り扱いやすいです。
ということで、サクッとGo言語でWeb Application『google-jwt-viewer』を書いてみました。

github.com github.com

このgoogle-jwt-viewerの使い方は簡単です。

  1. パラメータストアに /apprunner/CLIENT_ID, /apprunner/CLIENT_SECRET を設定します
    • CLIENT_ID : Google OAuth2 のクライアントID
    • CLIENT_SECRET : Google OAuth2 のクライアントシークレット
  2. App Runner の環境変数に、SSMWRAP_PATHS=/apprunner/ を指定して google-jwt-viewerを起動します。
    • リポジトリのルートで以下のようにしてECRにPushするようのが楽だと思います。
$ docker build -t $(ECR_REPOSITORY):latest -f docker/Dockerfile .
$ docker push $(ECR_REPOSITORY):latest

f:id:ikeda-masashi:20210611180031p:plain

こんな感じの寂しい画面で、ID TokenであるJSON Web Token(JWT) を取得できるようになりました。
次は、このWeb Tokenを信頼できるIAM Roleを定義してみましょう。

IAM Roleの作成

IAM Role自体の作成で大事なポイントは信頼関係の定義のところです。 

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Federated": "accounts.google.com"
      },
      "Action": "sts:AssumeRoleWithWebIdentity",
      "Condition": {
        "StringEquals": {
          "accounts.google.com:aud": "<google-jwt-viewerで表示されているAudience>"
        },
        "ForAnyValue:StringEqualsIgnoreCase": {
          "accounts.google.com:email": [
            "***********@kayac.com",
            "<assume role できる人のemail address>"
          ]
        }
      }
    }
  ]
}

こんな感じです。
詳しくは以下のドキュメントを参照してください。 Google ID Token以外にもAmazon Cognito ID poolを用いた方法等もあります。 docs.amazonaws.cn

この信頼関係を持つIAM RoleでGithub Actionsに行わせたいことができる「強い」権限をつけます。 次は肝心のGithub ActionsのWorkflow定義です。

Github ActionsでID Tokenは取り扱い注意!

大まかには「作戦1 - AssumeRole with MFA」と同じようなWorkflowになるのですが、問題はID Tokenの取り扱いです。
MFAトークンのように有効期限がものすごく短いわけではないですし、一度きりしか使えないみたいな制約もありません。

LogからID Tokenが流出しないようにする。

有効期限が1時間とはいえ、有効期限が短いものの中では長い方です。 Github Actionsの実行Logで入力したID Tokenが見えてしまったら、有効期限が切れる1時間の間は使いたい放題です。 そこで、実行Log上では見えないようにするために、add-maskを利用します。

docs.github.com

echo "::add-mask::{value}" をワークフロー内で実行することで、それ以降に出力される値をマスクできます。 入力されたJWTに対してマスクを追加した例は以下のようになっています。

f:id:ikeda-masashi:20210614100827p:plain

AssumeRole自体はAWS CLIを使えば以下のようになります。

aws sts assume-role-with-web-identity \
    --role-arn ${{ secrets.IAM_ROLE_ARN }}  \
    --role-session-name github-actions-jwt \
    --duration-seconds 900 \
    --web-identity-token ${{ github.event.inputs.jwt }}

GithubのRepositoryのSecretsにIAM RoleのARNを設定しておけば自動的にマスクされるので、AssumeRole先もログ上では隠せます。

同じID Tokenによる再実行を防ぎたい。

作戦1のMFAトークンを使った場合は、副次的な効果でうっかり「Re-run jobs」での再実行を防ぐことが可能というメリットが有りました。
今回のID Tokenでは有効期限が長いので、30分後に別の誰かがそのID Tokenを使って再実行するということもできてしまいます。 再実行できないという特性を、作戦3でも行う必要があります。

再実行防止に関しては、Git Tagをつかうことで解決できました。 ID Tokenに由来したGit Tagを生成し、Github ActionsのWorkflow内でtagを設定します。 Workflowの最後でそのタグをRemoteにPushします。

一度終わった実行に関して「Re-run jobs」と雑に押しても、すでにTagが存在していますので、Workflowが失敗します。 DeployやTerraform Apply等を行っているのであれば、自動的にTagが設定されるので、いつ、どのCommit時点で反映したのかもわかりやすいです。

今回は、ID Token のペイロードにnonceがあるので、それを使うことにしました。
ID Tokenごとに同じであれば良いので、単純にSHAハッシュを取るのでも良いとは思います。

Workflow 全貌

最終的なWorkflowは以下のようになります。この例はTerraform Applyすることを想定しています。

name: terraform-apply-with-jwt
on:
  workflow_dispatch:
    inputs:
      id_token:
        description: 'JWT'
        required: true

jobs:
  manual:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: set Git Terrform Apply Tag
        run: |
          set -e
          echo "::add-mask::${{ github.event.inputs.id_token }}"
          ID_TOKEN=${{ github.event.inputs.id_token }}
          PART=(${ID_TOKEN//./ })
          PAYLOAD=${PART[1]}
          PAYLOAD_LEN=$((${#PAYLOAD} % 4))
          if [ $PAYLOAD_LEN -eq 2 ]; then
            PAYLOAD="$PAYLOAD"'=='
          elif [ $PAYLOAD_LEN -eq 3 ]; then
            PAYLOAD="$PAYLOAD"'='
          fi
          NONCE=$( echo $PAYLOAD | base64 -d | jq -r '.nonce')
          EMAIL=$( echo $PAYLOAD | base64 -d | jq -r '.email')
          git config --local user.email $EMAIL
          TFAPPLY_GIT_TAG=terraform-apply_$NONCE
          git pull origin --tags
          git tag $TFAPPLY_GIT_TAG
          echo TFAPPLY_GIT_TAG=$TFAPPLY_GIT_TAG >> $GITHUB_ENV
          echo "terraform apply by $EMAIL"
        shell: bash
      - name: Assume Role With Web Identity
        run: |
          set -eu
          OUTPUT=$(aws sts assume-role-with-web-identity \
            --role-arn ${{ secrets.IAM_ROLE_ARN }}  \
            --role-session-name github-actions-jwt \
            --duration-seconds 900 \
            --web-identity-token ${{ github.event.inputs.jwt }} \
          )
          AWS_ACCESS_KEY_ID=$(echo $OUTPUT | jq -r '.Credentials.AccessKeyId')
          echo "::add-mask::$AWS_ACCESS_KEY_ID"
          echo AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID >> $GITHUB_ENV
          AWS_SECRET_ACCESS_KEY=$(echo $OUTPUT | jq -r '.Credentials.SecretAccessKey')
          echo "::add-mask::$AWS_SECRET_ACCESS_KEY"
          echo AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY >> $GITHUB_ENV
          AWS_SESSION_TOKEN=$(echo $OUTPUT | jq -r '.Credentials.SessionToken')
          echo "::add-mask::$AWS_SESSION_TOKEN"
          echo AWS_SESSION_TOKEN=$AWS_SESSION_TOKEN >> $GITHUB_ENV
        shell: bash
        env:
          AWS_DEFAULT_REGION: ap-northeast-1
          AWS_EC2_METADATA_DISABLED: true
      - name: terraform apply
        run: |
          make terraform/apply
        env:
          AWS_DEFAULT_REGION: ap-northeast-1
          AWS_EC2_METADATA_DISABLED: true
      - name: Git Terrform Apply Tag Push
        run: |
          git push origin $TFAPPLY_GIT_TAG
        shell: bash

これまでの、作戦と比較して「いいところ」と「わるいところ」は以下のようになります。

  • いいところ:
    • 作戦1 - AssumeRole with MFAと違ってタイムアタック要素がない。
    • MFAの設定等の複数人で共有するものはない。
    • 作戦2 - CludShell exports credentialsと違い関係ない外部のサービスにアクセスキーなどが通過しない。
    • そもそも、Repositoryにアクセスキーを設定する必要がない。
  • わるいところ:
    • Google ID Tokenの有効期限が1時間と少し長い(ユースケースに対して)

Google ID Tokenの有効期限が長めであることに対しては、もっと短い5分の有効期限のID Tokenを発行できるAWS Cognitoを使うという手もあります。 しかし、AWS CognitoのID Pool周りの設定が少しややこしいので、お手軽さではGoogle ID Tokenに軍配が上がります。 AWS Cognitoを使ってAssumeRoleWithWebIdentityをする場合は、クラスメソッド株式会社様の記事を参考に実装していただければと思います。 dev.classmethod.jp

おわりに

この記事では、Github Actions上に「強い」AWSの権限を渡すために以下のことを行いました。

  • App Runnerでお手軽にGoogle ID Token 取得するためのWeb Applicationを動かす。
  • Web Applicationから取得できるGoogle ID Tokenを信頼するIAM RoleにAssumeRoleする。
  • AssumeRoleによって得られた一時的な強い権限で、強い権限を要求する作業(Deploy, Terraform Apply)をGithub Actionsで行う

CI/CD周りの悩み解決の一助になれば幸いです。

カヤックではCI/CDで強い権限を取り扱えるエンジニアも募集しています

中途採用も募集しています

2021/06/15 追記

記事公開の後日、弊社の藤原宛にお返事がありました。

なんと!もっとスッキリできるとのことです。 また、その他にも多重起動対策を加えられたため、改良版を追加で記載します。 改良点としては以下の3点です。

  • AssumeRoleWithWebIdentityをAWS SDKにおまかせすることで、記述がスッキリ
  • Git Tagの生成を単純にJWTのhashにすることで、記述がスッキリ
  • 慌てて急いでボタンを連打してしまい、多重起動になってしまっても失敗になるように!
name: terraform-apply-with-jwt
on:
  workflow_dispatch:
    inputs:
      id_token:
        description: 'JWT'
        required: true

jobs:
  manual:
    runs-on: ubuntu-latest
    env:
      AWS_DEFAULT_REGION: ap-northeast-1
      AWS_EC2_METADATA_DISABLED: true
    steps:
      - uses: actions/checkout@v2
      - name: set Git Terrform Apply Tag
        run: |
          echo "::add-mask::${{ github.event.inputs.jwt }}"
          TFAPPLY_GIT_TAG=deploy-$(echo -n {{ github.event.inputs.jwt }} | shasum | cut -d " " -f1)
          git config --local user.email "action@github.com"
          git config --local user.name "GitHub Action"
          git tag $TFAPPLY_GIT_TAG
          git push origin $TFAPPLY_GIT_TAG
        shell: bash
      - name: Assume Role With Web Identity
        run: |
          set -eu
          TEMP_DIR=$(mktemp -d)
          echo ${{ github.event.inputs.jwt }} > $TEMP_DIR/jwt
          echo AWS_ROLE_ARN=${{ secrets.IAM_ROLE_ARN }}  >> $GITHUB_ENV
          echo AWS_WEB_IDENTITY_TOKEN_FILE=$TEMP_DIR/jwt >> $GITHUB_ENV
          echo AWS_ROLE_SESSION_NAME=github-actions-jwt >> $GITHUB_ENV
        shell: bash
      - name: terraform apply
        run: |
          make terraform/apply
        env:
          AWS_DEFAULT_REGION: ap-northeast-1
          AWS_EC2_METADATA_DISABLED: true

とてもコンパクトに仕上がりました。 アドバイスいただきありがとうございました。