MWAAで任意のツールをBashOperatorで実行したい

こんにちは。技術部の自称データエンジニアの池田です。

最近、Amazon Managed Workflows for Apache Airflow (MWAA) を使い倒すことに注力しています。 この記事では、MWAAの環境に任意のツール(バイナリファイル)を送り、BashOperatorで実行する方法について書きます。

背景

弊社は、2014年に次のような宣言をしており、Go言語を社内の主要言語のひとつとして使用しています。

www.kayac.com

多くのGo言語で書かれたツールは、バイナリファイル一つを配置すればよいという長所があります。
そのため、社内のプロダクトのいたる所で、OSSや内製のツールが活躍しております。 当然のごとく、MWAAでもそれらのツールを利用したくなります。 ところで、MWAAにおいて任意のツールを使ったワークフローを構築するにはどうすればよいでしょう?

  1. ツール入りのイメージを指定したECS Taskを定義して、MWAAからECSOperatorを使用してRunTaskする。
  2. ツール入のAWS Lambda関数を作成し、MWAAからPythonOperatorを使用してboto3経由でInvokeFunctionする。
  3. どうにかして、AirflowのWorkerにツールを送り、BashOperatorで使用する。

比較的様々な事を行い、動作の重たいツールやインストールが複雑なランタイムが必要な場合は1や2が良いでしょう。 特に、PerlやPHP、RubyなどのLLで書かれたプロダクト固有のツールは、ECS Task化しContainerOverrideを利用して、ECSOperatorでの呼び出し時に挙動を変更するのが良いでしょう。 しかし、Go言語で書かれた軽量なツールの場合は、一つのバイナリファイルだけの事が多いです。一つのバイナリファイルだけ入ったECS TaskやLambda関数を用意するのは少し手間です。 どうにかしてMWAAのWorkerにツールを送る事ができれば、BashOperetorから利用できそうです。 今回は、このどうにかして送る手段を調査しました。

どうやってツールをMWAAに送るのか?

実は、その方法は以下のユーザーガイドに書いてありました。 docs.aws.amazon.com

ユーザーズガイドの例では、Oracle InstantClientを送っています。 どのようなものでも、実行形式ファイルを plugins.zipに固めて送る ことで実現できるようです。 なるほど!、という感じですね。では、ユーザーガイドに習いGo言語製のツールをMWAAに送ってみようと思います。

以下のようなディレクトリ構成にします。

.
├── plugins
│   ├── __init__.py
│   ├── env_var_plugins.py
│   └── bin
│       └── ssmwrap 

今回はMWAAの環境に送るツールとして、弊社の長田 が作成したOSSであるssmwrapを想定しています。 AWS Systems Managerのパラメータストアにアクセスして、値を環境変数に展開してくれる便利なツールです。

github.com

env_var_plugins.py はユーザーズガイドをお手本に以下のようにします。

from airflow.plugins_manager import AirflowPlugin
import os

os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/bin/"

class EnvVarPlugin(AirflowPlugin):
     name = "env_var_plugin"

このあたりは、通常のAirflowのカスタムプラグインの作成と同じような感じですね。
Airflowの2.0.2に移行がお済みの方は、忘れてはいけないことがあります。
MWAA環境のAirflow設定オプションに core.lazy_load_plugins=False を入れることです。
この設定を忘れると環境変数が設定されてない。という状態に陥ります。

あとは、このpluginsディレクトリをzipで固めてS3アップロードし、MWAAのプラグインとして設定します。 この作業を手動で行うと、AWSコンソールとローカルのShellを往復することになるので、とても面倒です。
以下のようなMakefileを作っておくことをおすすめします。

ARIFLOW_S3_BUCKET_NAME:=<MWAA環境に設定したS3 Bucket名>
ARIFLOW_S3_PATH_PREFIX:=<plugins.zip 等を置く場所へのprefix>
AIRFLOW_ENV_NAME:=<MWAAの環境名>

.PHONY:deploy-dags
deploy-dags:
  aws s3 sync dags/ s3://$(ARIFLOW_S3_BUCKET_NAME)/$(ARIFLOW_S3_PATH_PREFIX)/dags/ \
   --exclude "*.pyc" \
   --exclude ".vscode*" \
   --exclude "tests*" \
   --delete --follow-symlinks

.PHONY: update-env
update-env: plugins-version.txt requirements-version.txt
  AWS_PAGER='' aws mwaa update-environment --name $(AIRFLOW_ENV_NAME) \
      --plugins-s3-path $(ARIFLOW_S3_PATH_PREFIX)/plugins.zip \
      --plugins-s3-object-version $(shell cat plugins-version.txt)\
      --requirements-s3-path $(ARIFLOW_S3_PATH_PREFIX)/requirements.txt \
      --requirements-s3-object-version $(shell cat requirements-version.txt)
  $(MAKE) clean

plugins.zip: $(shell find plugins)
  cd plugins && \
  chmod -R 755 . && \
  zip -r ../plugins.zip .

requirements-version.txt: requirements.txt
  aws s3api put-object \
      --body $< \
      --bucket $(ARIFLOW_S3_BUCKET_NAME) \
      --key $(ARIFLOW_S3_PATH_PREFIX)/$< \
      --query VersionId --output text > $@

plugins-version.txt: plugins.zip
  aws s3api put-object \
      --body $< \
      --bucket $(ARIFLOW_S3_BUCKET_NAME) \
      --key $(ARIFLOW_S3_PATH_PREFIX)/$< \
      --query VersionId --output text > $@

.PHONY: clean
clean:
  $(RM) -rf plugins.zip
  $(RM) -rf requirements-version.txt
  $(RM) -rf plugins-version.txt

makeコマンド一つで環境を更新できるようになります。 環境の更新が終われば、BashOperatorでツールの利用ができるようになります。

実際に使ってみる。

ツールが使えるかどうかや、MWAAの環境に関する調査に便利なDAGを紹介します。

"""
確認用
"""
import os
from airflow.decorators import dag
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator

dag_name = os.path.basename(__file__).replace(".py", "")
default_args = {
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
}

@dag(
    dag_id=dag_name,
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    max_active_runs=1,
    doc_md=__doc__)
def create_dag():

    task = BashOperator(
        task_id="task",
        bash_command="{{ dag_run.conf['cmd'] }}"
    )

dag = create_dag()

このDAGを使ってツールが使えることを確認します。

{
    "cmd": "ssmwrap --version"
}

このパラメータでDAG Runすると以下のようになります。

f:id:ikeda-masashi:20210720124216p:plain

無事、ツールをBashOperatorで実行できることを確認しました。

おわりに

この記事では、バイナリが一つだけの軽量ツールをpluginsの一部としてMWAAに送り、BashOperatorで実行してみました。
例えば、Go言語で書いたようなツールはこのような一つのバイナリを置くだけで実行できるので、今回のケースにマッチします。 BashOperatorを組み合わせるようなワークフローの場合は非常に便利になります。

PerlやPHP、Rubyなどのランタイム環境が必要なPython以外のLLでは、あまりおすすめできません。それらの言語で書かれたツールの場合は適宜ECS TaskやLambda関数を定義して、MWAAのOperator経由で起動することをおすすめします。

適切な方法を選択して、様々なワークフローを実装しましょう。

カヤックではデータパイプラインの整備に興味のあるエンジニアも募集しています

中途採用も募集しています