MWAAで任意のツールをBashOperatorで実行したい

こんにちは。技術部の自称データエンジニアの池田です。

最近、Amazon Managed Workflows for Apache Airflow (MWAA) を使い倒すことに注力しています。 この記事では、MWAAの環境に任意のツール(バイナリファイル)を送り、BashOperatorで実行する方法について書きます。

背景

弊社は、2014年に次のような宣言をしており、Go言語を社内の主要言語のひとつとして使用しています。

www.kayac.com

多くのGo言語で書かれたツールは、バイナリファイル一つを配置すればよいという長所があります。
そのため、社内のプロダクトのいたる所で、OSSや内製のツールが活躍しております。 当然のごとく、MWAAでもそれらのツールを利用したくなります。 ところで、MWAAにおいて任意のツールを使ったワークフローを構築するにはどうすればよいでしょう?

  1. ツール入りのイメージを指定したECS Taskを定義して、MWAAからECSOperatorを使用してRunTaskする。
  2. ツール入のAWS Lambda関数を作成し、MWAAからPythonOperatorを使用してboto3経由でInvokeFunctionする。
  3. どうにかして、AirflowのWorkerにツールを送り、BashOperatorで使用する。

比較的様々な事を行い、動作の重たいツールやインストールが複雑なランタイムが必要な場合は1や2が良いでしょう。 特に、PerlやPHP、RubyなどのLLで書かれたプロダクト固有のツールは、ECS Task化しContainerOverrideを利用して、ECSOperatorでの呼び出し時に挙動を変更するのが良いでしょう。 しかし、Go言語で書かれた軽量なツールの場合は、一つのバイナリファイルだけの事が多いです。一つのバイナリファイルだけ入ったECS TaskやLambda関数を用意するのは少し手間です。 どうにかしてMWAAのWorkerにツールを送る事ができれば、BashOperetorから利用できそうです。 今回は、このどうにかして送る手段を調査しました。

どうやってツールをMWAAに送るのか?

実は、その方法は以下のユーザーガイドに書いてありました。 docs.aws.amazon.com

ユーザーズガイドの例では、Oracle InstantClientを送っています。 どのようなものでも、実行形式ファイルを plugins.zipに固めて送る ことで実現できるようです。 なるほど!、という感じですね。では、ユーザーガイドに習いGo言語製のツールをMWAAに送ってみようと思います。

以下のようなディレクトリ構成にします。

.
├── plugins
│   ├── __init__.py
│   ├── env_var_plugins.py
│   └── bin
│       └── ssmwrap 

今回はMWAAの環境に送るツールとして、弊社の長田 が作成したOSSであるssmwrapを想定しています。 AWS Systems Managerのパラメータストアにアクセスして、値を環境変数に展開してくれる便利なツールです。

github.com

env_var_plugins.py はユーザーズガイドをお手本に以下のようにします。

from airflow.plugins_manager import AirflowPlugin
import os

os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/bin/"

class EnvVarPlugin(AirflowPlugin):
     name = "env_var_plugin"

このあたりは、通常のAirflowのカスタムプラグインの作成と同じような感じですね。
Airflowの2.0.2に移行がお済みの方は、忘れてはいけないことがあります。
MWAA環境のAirflow設定オプションに core.lazy_load_plugins=False を入れることです。
この設定を忘れると環境変数が設定されてない。という状態に陥ります。

あとは、このpluginsディレクトリをzipで固めてS3アップロードし、MWAAのプラグインとして設定します。 この作業を手動で行うと、AWSコンソールとローカルのShellを往復することになるので、とても面倒です。
以下のようなMakefileを作っておくことをおすすめします。

ARIFLOW_S3_BUCKET_NAME:=<MWAA環境に設定したS3 Bucket名>
ARIFLOW_S3_PATH_PREFIX:=<plugins.zip 等を置く場所へのprefix>
AIRFLOW_ENV_NAME:=<MWAAの環境名>

.PHONY:deploy-dags
deploy-dags:
  aws s3 sync dags/ s3://$(ARIFLOW_S3_BUCKET_NAME)/$(ARIFLOW_S3_PATH_PREFIX)/dags/ \
   --exclude "*.pyc" \
   --exclude ".vscode*" \
   --exclude "tests*" \
   --delete --follow-symlinks

.PHONY: update-env
update-env: plugins-version.txt requirements-version.txt
  AWS_PAGER='' aws mwaa update-environment --name $(AIRFLOW_ENV_NAME) \
      --plugins-s3-path $(ARIFLOW_S3_PATH_PREFIX)/plugins.zip \
      --plugins-s3-object-version $(shell cat plugins-version.txt)\
      --requirements-s3-path $(ARIFLOW_S3_PATH_PREFIX)/requirements.txt \
      --requirements-s3-object-version $(shell cat requirements-version.txt)
  $(MAKE) clean

plugins.zip: $(shell find plugins)
  cd plugins && \
  chmod -R 755 . && \
  zip -r ../plugins.zip .

requirements-version.txt: requirements.txt
  aws s3api put-object \
      --body $< \
      --bucket $(ARIFLOW_S3_BUCKET_NAME) \
      --key $(ARIFLOW_S3_PATH_PREFIX)/$< \
      --query VersionId --output text > $@

plugins-version.txt: plugins.zip
  aws s3api put-object \
      --body $< \
      --bucket $(ARIFLOW_S3_BUCKET_NAME) \
      --key $(ARIFLOW_S3_PATH_PREFIX)/$< \
      --query VersionId --output text > $@

.PHONY: clean
clean:
  $(RM) -rf plugins.zip
  $(RM) -rf requirements-version.txt
  $(RM) -rf plugins-version.txt

makeコマンド一つで環境を更新できるようになります。 環境の更新が終われば、BashOperatorでツールの利用ができるようになります。

実際に使ってみる。

ツールが使えるかどうかや、MWAAの環境に関する調査に便利なDAGを紹介します。

"""
確認用
"""
import os
from airflow.decorators import dag
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator

dag_name = os.path.basename(__file__).replace(".py", "")
default_args = {
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
}

@dag(
    dag_id=dag_name,
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    max_active_runs=1,
    doc_md=__doc__)
def create_dag():

    task = BashOperator(
        task_id="task",
        bash_command="{{ dag_run.conf['cmd'] }}"
    )

dag = create_dag()

このDAGを使ってツールが使えることを確認します。

{
    "cmd": "ssmwrap --version"
}

このパラメータでDAG Runすると以下のようになります。

f:id:ikeda-masashi:20210720124216p:plain

無事、ツールをBashOperatorで実行できることを確認しました。

おわりに

この記事では、バイナリが一つだけの軽量ツールをpluginsの一部としてMWAAに送り、BashOperatorで実行してみました。
例えば、Go言語で書いたようなツールはこのような一つのバイナリを置くだけで実行できるので、今回のケースにマッチします。 BashOperatorを組み合わせるようなワークフローの場合は非常に便利になります。

PerlやPHP、Rubyなどのランタイム環境が必要なPython以外のLLでは、あまりおすすめできません。それらの言語で書かれたツールの場合は適宜ECS TaskやLambda関数を定義して、MWAAのOperator経由で起動することをおすすめします。

適切な方法を選択して、様々なワークフローを実装しましょう。

カヤックではデータパイプラインの整備に興味のあるエンジニアも募集しています

中途採用も募集しています

Amazon Managed Workflows for Apache Airflow (MWAA) でのAiflow v2系への移行記録

こんにちは。技術部の池田です。

この記事では、Amazon Managed Workflows for Apache Airflow (MWAA) のAirflow v2系への移行を行いましたので、その時の話をしたいとお思います。 内容としては、主に以下となります。

  • MWAA では v1.10.12の環境を直接 v2.0.2に移行できないぞ!
  • ローカルでDAGのテストをしていると互換性チェックは楽だぞ!
  • 新しいv2.0.2の環境を作るときには、最初はrequirements.txtやDAGを空っぽにしておくと良いぞ!
  • ConnectionsとVariablesの新環境への移植は頑張るんだぞ!!!

背景

MWAAでもAirflow v2系のサポートが始まりました。
先日、Airflowの勉強会に参加して初めて知ったのですが、Airflow v1系は2021/06月末でEOLです。
早く移行をしなくては!と思ってる皆様もいると思いますので、今回MWAA環境におけるAirflow v2系への移行の記録を記事にしたいと思います。 GCPのCloud Composerの方の移行に関しては、勉強会で手厚く紹介されていましたので、このリンクも合わせて載せておきます。

aws.amazon.com finatext.connpass.com

Airflow v2系移行 on MWAA

早速ですが、こちらを御覧ください。

f:id:ikeda-masashi:20210625153231p:plain

こちらは、既存のAirflow v1.10.12のMWAA環境をプルダウンメニューからv2.0.2に変更して保存すると出てくるエラーメッセージです。
MWAAの場合、既存環境を直接v2.0.2に移行することができません。ですので、もう一つv2.0.2の環境を作成して移植するという作業が必要になります。

DAGの互換性

さて、実際の移植にあたり、はじめにDAGの互換性を確認します。
今回の場合、予めローカルでDAGのテストをしていたので、ローカルテストの環境をv2.0.2にバージョンアップし、テストが通るまでDAGを修正していきました。
(先日の勉強会で知ったのですが、v1.10.15のバージョンにはAirflow v2系への移行チェックスクリプトがあるので、ローカル環境がある場合は一旦v1.10.15を経由するのも良いかもしれません)

ローカルテスト自体は、英語の記事ですが、Testing in Airflow Part 1 — DAG Validation Tests, DAG Definition Tests and Unit Tests という記事がとても参考になります。

DAGの互換性を確認するだけのテストでしたら、雑ですが以下のようになると思います。

docker-compose.yml

version: "3.6"

volumes:
  pypkg:

services:
  airflow:
    image: apache/airflow:2.0.2-python3.7
    volumes:
      -  ./:/opt/airflow/
      - pypkg:/home/airflow/.local/

tests/test_dag_integrity.py

import unittest
from unittest import mock
from airflow.models import DagBag


class TestDagIntegrity(unittest.TestCase):

    LOAD_SECOND_THRESHOLD = 2

    def setUp(self):
        self.patcher = mock.patch('airflow.models.variable.Variable.get')
        self.mock_variable = self.patcher.start()

        def variable_side_effect_func(*args):
            key = args[0]
            if key == "slack_token":
                return "dummy"
            # 使っているVariableのダミーをMockで返す
            if len(args) == 2:
                return args[1]
            return None

        self.mock_variable.side_effect = variable_side_effect_func
        self.dagbag = DagBag()

    def tearDown(self):
        self.patcher.stop()

    def test_import_dags(self):
        self.assertFalse(
            len(self.dagbag.import_errors),
            'DAG import failures. Errors: {}'.format(
                self.dagbag.import_errors
            )
        )


if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestDagIntegrity)
    unittest.TextTestRunner(verbosity=2).run(suite)
$ tree -L 1
.
├── airflow.cfg
├── dags
├── docker-compose.yml
├── requirements.txt
├── script
├── tests
└── unittests.cfg              

$ docker-compose run --rm airflow bash -c "airflow db init && pip install --user -r ./requirements.txt"
$ docker-compose run --rm airflow bash -c 'python -m unittest discover -v tests "test_*.py"'

dagsの中にはMWAAが参照しているS3へ転送するDAG定義の.pyファイルがたくさん入っています。
このように、テストをするとv1.10.12環境で動いているDAGの場合、たくさんの警告やエラーが発生すると思います。

f:id:ikeda-masashi:20210625153235p:plain

この警告やエラーが無くなるまでDAGを修正していくのが第1段階です。

新しいv2.0.2の環境を作成し、DAGを移植していく

ローカルのテストでDAGの互換性を修正した後は、既存のv1.10.12環境とは別のv2.0.2環境を実際に作成します。 その際、DAGをアップロードするS3バケットとキープレフィックスの組み合わせは、既存のものとは別にしておくことをおすすめします。 (同時に並行して動かせるので)

さて、ここでv1.10.12環境で使っているrequirements.txtを、そのままv2.0.2環境で使用しようと考えました。

requirements.txt

# https://docs.aws.amazon.com/mwaa/latest/userguide/troubleshooting.html#broken-cython
cython == 0.29.22
pyarrow == 2.0.0
awswrangler == 2.4.0

#以下略

ところが、作成されたv2.0.2環境ではこうなります。

f:id:ikeda-masashi:20210625153224p:plain

既存環境で、aws関係やcython等のパッケージバージョンを諸事情で固定していると、v2.0.2の環境ではWebServerが起動できなくなったりします。
ですので、まず環境を立ち上げるためにrequirements.txtは指定せず、あるいは空の状態で立ち上げてみることをおすすめします。

f:id:ikeda-masashi:20210625153228p:plain

このように、新旧2環境立ち上がりました。稼働費も2倍かかってしまいますので、急いで移植していきます。 ここで、ローカルテストで互換性を確認した修正積みDAGとrequiements.txtをアップロードしていきます(Pluginもある場合はそちらも)。 そして、問題なくDAGが読み込めたら、VariablesとConnectionsを移植していきます。

この辺は面倒ですが、愚直にやっていくしかありません。 弊社の場合、RedshiftとGoogle Cloud PlatformのAPIしかAirflowから触れていませんでしたので、幸いにしても個数が少なく手で一つ一つ移植するのは不可能ではありませんでした。 (この背景には、サービスで使用しているAurora MySQLは、Redshift Federated Queryを使用していました。そのため、Airflowはほぼすべてのデータソースに対してRedshiftを経由してアクセスしています。)

MWAAのv2.0.2の環境ではProvider packageをrequiements.txt に記述してもConnections のUIに反映されないという現象があります。(AWSのサポートには報告済みです。) この辺の問題は、Provider packageのConnectionの実装をみてHTTPのConnectionとして、いい感じにextraを埋めながら登録しました。

Variablesの数やConnectionsの数が多くなってくると大変だと思います。 今後は、予めAmazon Secrets ManagerにVariablesとConnectionsのBackendを切り替えておくと、次のメジャーアップデートのときに楽になるかもしれません。
Secrets Manager backendを使用すると、MWAAが動いているAWSアカウントのSecrets ManagerにVariablesとConnectionsの実態があるので、環境間での共有が簡単になると思います。

docs.aws.amazon.com

後は1つずつDAGを実行していくだけ。

ここまでくれば、あとはDAGを1つずつ有効化して、実行していくだけです。

  1. 旧環境の特定のDAGをOFFにする。
  2. 新環境の同じDAGをONにする。
  3. エラーが出ないことを祈る。 (エラーが出た場合は、トラブルシューティングをする)

これの繰り返しです。 幸いにして、弊社の場合はDAGが一度失敗してもリトライしやすいように冪等に作っていたため、この作業で苦労することはありませんでした。

旧環境にお別れをして、移行作業はおしまいです。

f:id:ikeda-masashi:20210625153105p:plain
goodbye

おわりに

今回の移行をまとめると

  • MWAA では v1.10.12の環境を直接 v2.0.2に変更できないぞ!
  • ローカルでDAGのテストをしていると互換性チェックは楽だぞ!
  • 新しいv2.0.2の環境を作るときには、最初はrequirements.txtやDAGを空っぽにしておくと良いぞ!
  • ConnectionsとVariablesの新環境への移植は頑張るんだぞ!!!

となります。

個人的にはMWAAでは、どうやらテストや構築自動化が大事そうだと感じています。
弊社では、MWAAが出る前にECS Fargateを用いてアンマネージドなAirflow立てた経験から、pytestを用いたCI環境をある程度簡単にですが整備しておりました。 そのおかげで、すんなりと移行できたように感じています。

MWAAの環境は停止することができない(開始・停止ではなく作成・削除のみである)ことから、MWAAの思想としてすでにある環境を秘伝のAirflowにするのではなく、環境を簡単に作っては壊す感じの運用を想定しているように思います。 実行ログ自体もAirflowではなくCloudwatch Logsの管轄であることからも、なんとなくそのような感じがしています。 ですので、環境の作成をある程度自動化する準備をしておくのが良いと思います。

幸いにしてAirflowはCLIが整っていますし、環境の作成自体もTerraformやCloudFormationで簡単に作れます。ほとんどの作成作業はスクリプト化できると思いますので、今回のv2系への移行に合わせて整えるとよいと思います。

MWAAのWorkerのオートスケールが便利ですので、ぜひMWAAもお使いいただければと思います!

カヤックではデータパイプラインの整備に興味のあるエンジニアも募集しています

中途採用も募集しています