こんにちは。技術部の自称データエンジニアの池田です。
最近、Amazon Managed Workflows for Apache Airflow (MWAA) を使い倒すことに注力しています。
この記事では、MWAAの環境に任意のツール(バイナリファイル)を送り、BashOperatorで実行する方法について書きます。
背景
弊社は、2014年に次のような宣言をしており、Go言語を社内の主要言語のひとつとして使用しています。
www.kayac.com
多くのGo言語で書かれたツールは、バイナリファイル一つを配置すればよいという長所があります。
そのため、社内のプロダクトのいたる所で、OSSや内製のツールが活躍しております。
当然のごとく、MWAAでもそれらのツールを利用したくなります。
ところで、MWAAにおいて任意のツールを使ったワークフローを構築するにはどうすればよいでしょう?
- ツール入りのイメージを指定したECS Taskを定義して、MWAAからECSOperatorを使用してRunTaskする。
- ツール入のAWS Lambda関数を作成し、MWAAからPythonOperatorを使用してboto3経由でInvokeFunctionする。
- どうにかして、AirflowのWorkerにツールを送り、BashOperatorで使用する。
比較的様々な事を行い、動作の重たいツールやインストールが複雑なランタイムが必要な場合は1や2が良いでしょう。
特に、PerlやPHP、RubyなどのLLで書かれたプロダクト固有のツールは、ECS Task化しContainerOverrideを利用して、ECSOperatorでの呼び出し時に挙動を変更するのが良いでしょう。
しかし、Go言語で書かれた軽量なツールの場合は、一つのバイナリファイルだけの事が多いです。一つのバイナリファイルだけ入ったECS TaskやLambda関数を用意するのは少し手間です。
どうにかしてMWAAのWorkerにツールを送る事ができれば、BashOperetorから利用できそうです。
今回は、このどうにかして送る手段を調査しました。
どうやってツールをMWAAに送るのか?
実は、その方法は以下のユーザーガイドに書いてありました。
docs.aws.amazon.com
ユーザーズガイドの例では、Oracle InstantClientを送っています。
どのようなものでも、実行形式ファイルを plugins.zipに固めて送る ことで実現できるようです。
なるほど!、という感じですね。では、ユーザーガイドに習いGo言語製のツールをMWAAに送ってみようと思います。
以下のようなディレクトリ構成にします。
.
├── plugins
│ ├── __init__.py
│ ├── env_var_plugins.py
│ └── bin
│ └── ssmwrap
今回はMWAAの環境に送るツールとして、弊社の長田 が作成したOSSであるssmwrapを想定しています。
AWS Systems Managerのパラメータストアにアクセスして、値を環境変数に展開してくれる便利なツールです。
github.com
env_var_plugins.py はユーザーズガイドをお手本に以下のようにします。
from airflow.plugins_manager import AirflowPlugin
import os
os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/bin/"
class EnvVarPlugin(AirflowPlugin):
name = "env_var_plugin"
このあたりは、通常のAirflowのカスタムプラグインの作成と同じような感じですね。
Airflowの2.0.2に移行がお済みの方は、忘れてはいけないことがあります。
MWAA環境のAirflow設定オプションに core.lazy_load_plugins=False
を入れることです。
この設定を忘れると環境変数が設定されてない。という状態に陥ります。
あとは、このpluginsディレクトリをzipで固めてS3アップロードし、MWAAのプラグインとして設定します。
この作業を手動で行うと、AWSコンソールとローカルのShellを往復することになるので、とても面倒です。
以下のようなMakefileを作っておくことをおすすめします。
ARIFLOW_S3_BUCKET_NAME:=<MWAA環境に設定したS3 Bucket名>
ARIFLOW_S3_PATH_PREFIX:=<plugins.zip 等を置く場所へのprefix>
AIRFLOW_ENV_NAME:=<MWAAの環境名>
.PHONY:deploy-dags
deploy-dags:
aws s3 sync dags/ s3://$(ARIFLOW_S3_BUCKET_NAME)/$(ARIFLOW_S3_PATH_PREFIX)/dags/ \
--exclude "*.pyc" \
--exclude ".vscode*" \
--exclude "tests*" \
--delete --follow-symlinks
.PHONY: update-env
update-env: plugins-version.txt requirements-version.txt
AWS_PAGER='' aws mwaa update-environment --name $(AIRFLOW_ENV_NAME) \
--plugins-s3-path $(ARIFLOW_S3_PATH_PREFIX)/plugins.zip \
--plugins-s3-object-version $(shell cat plugins-version.txt)\
--requirements-s3-path $(ARIFLOW_S3_PATH_PREFIX)/requirements.txt \
--requirements-s3-object-version $(shell cat requirements-version.txt)
$(MAKE) clean
plugins.zip: $(shell find plugins)
cd plugins && \
chmod -R 755 . && \
zip -r ../plugins.zip .
requirements-version.txt: requirements.txt
aws s3api put-object \
--body $< \
--bucket $(ARIFLOW_S3_BUCKET_NAME) \
--key $(ARIFLOW_S3_PATH_PREFIX)/$< \
--query VersionId --output text > $@
plugins-version.txt: plugins.zip
aws s3api put-object \
--body $< \
--bucket $(ARIFLOW_S3_BUCKET_NAME) \
--key $(ARIFLOW_S3_PATH_PREFIX)/$< \
--query VersionId --output text > $@
.PHONY: clean
clean:
$(RM) -rf plugins.zip
$(RM) -rf requirements-version.txt
$(RM) -rf plugins-version.txt
makeコマンド一つで環境を更新できるようになります。
環境の更新が終われば、BashOperatorでツールの利用ができるようになります。
実際に使ってみる。
ツールが使えるかどうかや、MWAAの環境に関する調査に便利なDAGを紹介します。
"""
確認用
"""
import os
from airflow.decorators import dag
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
dag_name = os.path.basename(__file__).replace(".py", "")
default_args = {
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
}
@dag(
dag_id=dag_name,
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
max_active_runs=1,
doc_md=__doc__)
def create_dag():
task = BashOperator(
task_id="task",
bash_command="{{ dag_run.conf['cmd'] }}"
)
dag = create_dag()
このDAGを使ってツールが使えることを確認します。
{
"cmd": "ssmwrap --version"
}
このパラメータでDAG Runすると以下のようになります。
無事、ツールをBashOperatorで実行できることを確認しました。
おわりに
この記事では、バイナリが一つだけの軽量ツールをpluginsの一部としてMWAAに送り、BashOperatorで実行してみました。
例えば、Go言語で書いたようなツールはこのような一つのバイナリを置くだけで実行できるので、今回のケースにマッチします。
BashOperatorを組み合わせるようなワークフローの場合は非常に便利になります。
PerlやPHP、Rubyなどのランタイム環境が必要なPython以外のLLでは、あまりおすすめできません。それらの言語で書かれたツールの場合は適宜ECS TaskやLambda関数を定義して、MWAAのOperator経由で起動することをおすすめします。
適切な方法を選択して、様々なワークフローを実装しましょう。
カヤックではデータパイプラインの整備に興味のあるエンジニアも募集しています
中途採用も募集しています