Amazon Managed Workflows for Apache Airflow (MWAA) でのAiflow v2系への移行記録

こんにちは。技術部の池田です。

この記事では、Amazon Managed Workflows for Apache Airflow (MWAA) のAirflow v2系への移行を行いましたので、その時の話をしたいとお思います。 内容としては、主に以下となります。

  • MWAA では v1.10.12の環境を直接 v2.0.2に移行できないぞ!
  • ローカルでDAGのテストをしていると互換性チェックは楽だぞ!
  • 新しいv2.0.2の環境を作るときには、最初はrequirements.txtやDAGを空っぽにしておくと良いぞ!
  • ConnectionsとVariablesの新環境への移植は頑張るんだぞ!!!

背景

MWAAでもAirflow v2系のサポートが始まりました。
先日、Airflowの勉強会に参加して初めて知ったのですが、Airflow v1系は2021/06月末でEOLです。
早く移行をしなくては!と思ってる皆様もいると思いますので、今回MWAA環境におけるAirflow v2系への移行の記録を記事にしたいと思います。 GCPのCloud Composerの方の移行に関しては、勉強会で手厚く紹介されていましたので、このリンクも合わせて載せておきます。

aws.amazon.com finatext.connpass.com

Airflow v2系移行 on MWAA

早速ですが、こちらを御覧ください。

f:id:ikeda-masashi:20210625153231p:plain

こちらは、既存のAirflow v1.10.12のMWAA環境をプルダウンメニューからv2.0.2に変更して保存すると出てくるエラーメッセージです。
MWAAの場合、既存環境を直接v2.0.2に移行することができません。ですので、もう一つv2.0.2の環境を作成して移植するという作業が必要になります。

DAGの互換性

さて、実際の移植にあたり、はじめにDAGの互換性を確認します。
今回の場合、予めローカルでDAGのテストをしていたので、ローカルテストの環境をv2.0.2にバージョンアップし、テストが通るまでDAGを修正していきました。
(先日の勉強会で知ったのですが、v1.10.15のバージョンにはAirflow v2系への移行チェックスクリプトがあるので、ローカル環境がある場合は一旦v1.10.15を経由するのも良いかもしれません)

ローカルテスト自体は、英語の記事ですが、Testing in Airflow Part 1 — DAG Validation Tests, DAG Definition Tests and Unit Tests という記事がとても参考になります。

DAGの互換性を確認するだけのテストでしたら、雑ですが以下のようになると思います。

docker-compose.yml

version: "3.6"

volumes:
  pypkg:

services:
  airflow:
    image: apache/airflow:2.0.2-python3.7
    volumes:
      -  ./:/opt/airflow/
      - pypkg:/home/airflow/.local/

tests/test_dag_integrity.py

import unittest
from unittest import mock
from airflow.models import DagBag


class TestDagIntegrity(unittest.TestCase):

    LOAD_SECOND_THRESHOLD = 2

    def setUp(self):
        self.patcher = mock.patch('airflow.models.variable.Variable.get')
        self.mock_variable = self.patcher.start()

        def variable_side_effect_func(*args):
            key = args[0]
            if key == "slack_token":
                return "dummy"
            # 使っているVariableのダミーをMockで返す
            if len(args) == 2:
                return args[1]
            return None

        self.mock_variable.side_effect = variable_side_effect_func
        self.dagbag = DagBag()

    def tearDown(self):
        self.patcher.stop()

    def test_import_dags(self):
        self.assertFalse(
            len(self.dagbag.import_errors),
            'DAG import failures. Errors: {}'.format(
                self.dagbag.import_errors
            )
        )


if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestDagIntegrity)
    unittest.TextTestRunner(verbosity=2).run(suite)
$ tree -L 1
.
├── airflow.cfg
├── dags
├── docker-compose.yml
├── requirements.txt
├── script
├── tests
└── unittests.cfg              

$ docker-compose run --rm airflow bash -c "airflow db init && pip install --user -r ./requirements.txt"
$ docker-compose run --rm airflow bash -c 'python -m unittest discover -v tests "test_*.py"'

dagsの中にはMWAAが参照しているS3へ転送するDAG定義の.pyファイルがたくさん入っています。
このように、テストをするとv1.10.12環境で動いているDAGの場合、たくさんの警告やエラーが発生すると思います。

f:id:ikeda-masashi:20210625153235p:plain

この警告やエラーが無くなるまでDAGを修正していくのが第1段階です。

新しいv2.0.2の環境を作成し、DAGを移植していく

ローカルのテストでDAGの互換性を修正した後は、既存のv1.10.12環境とは別のv2.0.2環境を実際に作成します。 その際、DAGをアップロードするS3バケットとキープレフィックスの組み合わせは、既存のものとは別にしておくことをおすすめします。 (同時に並行して動かせるので)

さて、ここでv1.10.12環境で使っているrequirements.txtを、そのままv2.0.2環境で使用しようと考えました。

requirements.txt

# https://docs.aws.amazon.com/mwaa/latest/userguide/troubleshooting.html#broken-cython
cython == 0.29.22
pyarrow == 2.0.0
awswrangler == 2.4.0

#以下略

ところが、作成されたv2.0.2環境ではこうなります。

f:id:ikeda-masashi:20210625153224p:plain

既存環境で、aws関係やcython等のパッケージバージョンを諸事情で固定していると、v2.0.2の環境ではWebServerが起動できなくなったりします。
ですので、まず環境を立ち上げるためにrequirements.txtは指定せず、あるいは空の状態で立ち上げてみることをおすすめします。

f:id:ikeda-masashi:20210625153228p:plain

このように、新旧2環境立ち上がりました。稼働費も2倍かかってしまいますので、急いで移植していきます。 ここで、ローカルテストで互換性を確認した修正積みDAGとrequiements.txtをアップロードしていきます(Pluginもある場合はそちらも)。 そして、問題なくDAGが読み込めたら、VariablesとConnectionsを移植していきます。

この辺は面倒ですが、愚直にやっていくしかありません。 弊社の場合、RedshiftとGoogle Cloud PlatformのAPIしかAirflowから触れていませんでしたので、幸いにしても個数が少なく手で一つ一つ移植するのは不可能ではありませんでした。 (この背景には、サービスで使用しているAurora MySQLは、Redshift Federated Queryを使用していました。そのため、Airflowはほぼすべてのデータソースに対してRedshiftを経由してアクセスしています。)

MWAAのv2.0.2の環境ではProvider packageをrequiements.txt に記述してもConnections のUIに反映されないという現象があります。(AWSのサポートには報告済みです。) この辺の問題は、Provider packageのConnectionの実装をみてHTTPのConnectionとして、いい感じにextraを埋めながら登録しました。

Variablesの数やConnectionsの数が多くなってくると大変だと思います。 今後は、予めAmazon Secrets ManagerにVariablesとConnectionsのBackendを切り替えておくと、次のメジャーアップデートのときに楽になるかもしれません。
Secrets Manager backendを使用すると、MWAAが動いているAWSアカウントのSecrets ManagerにVariablesとConnectionsの実態があるので、環境間での共有が簡単になると思います。

docs.aws.amazon.com

後は1つずつDAGを実行していくだけ。

ここまでくれば、あとはDAGを1つずつ有効化して、実行していくだけです。

  1. 旧環境の特定のDAGをOFFにする。
  2. 新環境の同じDAGをONにする。
  3. エラーが出ないことを祈る。 (エラーが出た場合は、トラブルシューティングをする)

これの繰り返しです。 幸いにして、弊社の場合はDAGが一度失敗してもリトライしやすいように冪等に作っていたため、この作業で苦労することはありませんでした。

旧環境にお別れをして、移行作業はおしまいです。

f:id:ikeda-masashi:20210625153105p:plain
goodbye

おわりに

今回の移行をまとめると

  • MWAA では v1.10.12の環境を直接 v2.0.2に変更できないぞ!
  • ローカルでDAGのテストをしていると互換性チェックは楽だぞ!
  • 新しいv2.0.2の環境を作るときには、最初はrequirements.txtやDAGを空っぽにしておくと良いぞ!
  • ConnectionsとVariablesの新環境への移植は頑張るんだぞ!!!

となります。

個人的にはMWAAでは、どうやらテストや構築自動化が大事そうだと感じています。
弊社では、MWAAが出る前にECS Fargateを用いてアンマネージドなAirflow立てた経験から、pytestを用いたCI環境をある程度簡単にですが整備しておりました。 そのおかげで、すんなりと移行できたように感じています。

MWAAの環境は停止することができない(開始・停止ではなく作成・削除のみである)ことから、MWAAの思想としてすでにある環境を秘伝のAirflowにするのではなく、環境を簡単に作っては壊す感じの運用を想定しているように思います。 実行ログ自体もAirflowではなくCloudwatch Logsの管轄であることからも、なんとなくそのような感じがしています。 ですので、環境の作成をある程度自動化する準備をしておくのが良いと思います。

幸いにしてAirflowはCLIが整っていますし、環境の作成自体もTerraformやCloudFormationで簡単に作れます。ほとんどの作成作業はスクリプト化できると思いますので、今回のv2系への移行に合わせて整えるとよいと思います。

MWAAのWorkerのオートスケールが便利ですので、ぜひMWAAもお使いいただければと思います!

カヤックではデータパイプラインの整備に興味のあるエンジニアも募集しています

中途採用も募集しています