機械学習エンジニアなら知っておきたい3大ツールを徹底比較。「どのツールをいつ使うべきか」がこの記事で明確になります。
📌 本記事のコード対応バージョン
- Ray: 2.52.1(2025年11月リリース)
- MLflow: 3.7.0(2025年12月リリース)
- Apache Airflow: 3.1.5(2025年12月リリース )
本記事のコード例は2025年12月時点の公式ドキュメントを参考にしていますが、環境やマイナーアップデートによって挙動が異なる場合もあります。実際の利用時には公式ドキュメントもあわせて確認してください。
目次

ML/AIエコシステムには数多くのツールが存在し、それぞれが「あなたの課題を解決します」と謳っています。その中でも、本番環境のML開発で必ず名前が挙がる3つのツールがあります。Ray、MLflow、Airflowです。
この記事で得られること:
✅ 3つのツールの違いが明確に理解できる
✅ 豊富な実践コード例で実装イメージが掴める
✅ 自分のプロジェクトに最適なツールが選べる判断フレームワーク
✅ 実務で使える具体的なユースケース
対象読者:
それでは、各ツールの特徴を深く掘り下げていきましょう!

現代のMLプロジェクトは、複数の複雑なステージで構成されており、それぞれに独自の課題があります。
典型的なMLパイプラインの各ステージ
よくある課題
🐌 スケーラビリティの問題 – 単一マシンでのトレーニングに時間がかかりすぎる
📊 実験管理の混乱 – どのパラメータがどの結果を生み出したのか追跡できない
🔄 パイプラインの複雑性 – 複数タスク間の依存関係の管理
🚀 デプロイの難しさ – ノートブックから本番環境への移行
👥 チーム協業 – モデルの共有と結果の再現
各ツールの役割

Rayは、カリフォルニア大学バークレー校のRISELabで開発されたオープンソースの分散コンピューティングフレームワークです。最小限のコード変更で、Pythonアプリケーションを単一マシンから大規模クラスタまでスケールさせることができます。
コアとなる目的
Rayは、AI/MLワークロードのスケーリングという根本的な課題に取り組みます。大規模モデルのトレーニング、ハイパーパラメータチューニング、膨大なデータセットの処理、大規模なモデルサービング、いずれの場合でも、Rayは分散コンピューティングのための統一されたインターフェースを提供します。
主要コンポーネント
| コンポーネント | 説明 |
|---|---|
| Ray Core | 分散コンピューティングの基本機能(@ray.remote) |
| Ray Tune | ハイパーパラメータチューニングを大規模に実行 |
| Ray Serve | モデルのサービングとデプロイ |
| Ray RLlib | 強化学習アルゴリズム |
| Ray Data | 分散データ処理 |
| Ray Train | 分散モデルトレーニング |

Ray Coreは、通常のPython関数をシンプルなデコレーターで分散タスクに変換します。
主なメリット:
import ray
import time
# Rayの初期化
ray.init()
# 通常の関数を分散タスクに変換
@ray.remote
def expensive_computation(n):
"""重い計算処理をシミュレート"""
time.sleep(1) # 作業をシミュレート
return sum(i * i for i in range(n))
# 並列処理の実装例
def parallel_processing():
start_time = time.time()
# すべてのタスクを並列で送信
futures = [expensive_computation.remote(i) for i in range(1000, 5000, 1000)]
# 結果が準備できたら取得
results = ray.get(futures)
end_time = time.time()
print(f"並列処理時間: {end_time - start_time:.2f}秒")
return results
💡 ポイント: 上記のコードでは、単一マシンで順次実行する場合と比較して、処理時間を大幅に短縮できます。
Actor(アクター)パターンでの状態管理
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
def get_count(self):
return self.count
# アクターの使用例
counters = [Counter.remote() for _ in range(3)]
# 各アクターは独自の状態を維持
futures = []
for counter in counters:
for _ in range(5):
futures.append(counter.increment.remote())
results = ray.get(futures)
print(f"実行結果: {results}")
こんな方におすすめ:
Ray Tuneは、さまざまな検索アルゴリズムと早期停止機能をサポートした、分散ハイパーパラメータ最適化を提供します。
主なメリット:
from ray import tune
from ray.tune.search.bayesopt import BayesOptSearch
from ray.tune.schedulers import AsyncHyperBandScheduler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
def objective_function(config):
"""最適化する関数 - モデルをトレーニングして検証スコアを返す"""
# データの生成(実際にはデータを読み込み)
X, y = make_classification(n_samples=1000, n_features=20)
# configのハイパーパラメータでモデルを作成
model = RandomForestClassifier(
n_estimators=int(config["n_estimators"]),
max_depth=int(config["max_depth"]) if config["max_depth"] > 0 else None,
random_state=42
)
# クロスバリデーションスコア
scores = cross_val_score(model, X, y, cv=3, scoring='accuracy')
accuracy = scores.mean()
# Ray Tuneに結果を報告
tune.report(accuracy=accuracy)
# ハイパーパラメータチューニングの実行
def run_hyperparameter_tuning():
ray.init()
# 探索空間の定義
search_space = {
"n_estimators": tune.randint(10, 200),
"max_depth": tune.randint(-1, 20),
"min_samples_split": tune.randint(2, 20),
"min_samples_leaf": tune.randint(1, 10)
}
# ベイズ最適化の設定
bayesopt = BayesOptSearch(metric="accuracy", mode="max")
# 早期停止のスケジューラー設定
scheduler = AsyncHyperBandScheduler(
metric="accuracy",
mode="max"
)
# チューニング実行(Ray 2.x以降の推奨方法:Tunerクラスを使用)
tuner = tune.Tuner(
objective_function,
param_space=search_space,
tune_config=tune.TuneConfig(
metric="accuracy",
mode="max",
search_alg=bayesopt,
scheduler=scheduler,
num_samples=50 # 試行回数
),
run_config=tune.RunConfig(
name="rf_hyperparameter_tuning"
)
)
results = tuner.fit()
best_result = results.get_best_result()
print(f"最適な設定: {best_result.config}")
print(f"最高精度: {best_result.metrics['accuracy']:.4f}")
🎯 使い分けのヒント:
GridSearchCVを使用Ray Serveは、組み込みのロードバランシングと自動スケーリング機能を備えた、スケーラブルなモデルサービングを可能にします。
主なメリット:
from ray import serve
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
ray.init()
@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 1})
class MLModel:
def __init__(self):
# モデルのトレーニング(実際には保存済みモデルを読み込み)
X, y = make_classification(n_samples=1000, n_features=20)
self.model = RandomForestClassifier(n_estimators=100)
self.model.fit(X, y)
print("モデルの準備完了!")
async def __call__(self, request):
# リクエストの処理
data = await request.json()
features = np.array(data["features"]).reshape(1, -1)
# 予測の実行
prediction = self.model.predict(features)[0]
probability = self.model.predict_proba(features)[0].tolist()
return {
"prediction": int(prediction),
"probability": probability
}
# アプリケーションの定義とデプロイ(Ray 2.x以降の標準的な方法)
app = MLModel.bind()
handle = serve.run(app)
バッチ処理による最適化
スループットを向上させるために、複数のリクエストをまとめて処理できます。
@serve.deployment(num_replicas=3)
class BatchedMLModel:
def __init__(self):
# モデルの初期化
self.model = RandomForestClassifier(n_estimators=100)
# トレーニング処理...
@serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
async def batch_predict(self, requests):
"""バッチリクエストを処理してスループット向上"""
batch_features = []
for request in requests:
data = await request.json()
features = np.array(data["features"])
batch_features.append(features)
# バッチ予測の実行
batch_array = np.array(batch_features)
predictions = self.model.predict(batch_array)
probabilities = self.model.predict_proba(batch_array)
# 個別のレスポンスを返す
responses = []
for pred, prob in zip(predictions, probabilities):
responses.append({
"prediction": int(pred),
"probability": prob.tolist()
})
return responses
# アプリケーションの定義とデプロイ
app = BatchedMLModel.bind()
handle = serve.run(app)
⚠️ 注意点: バッチ処理を使用する場合は、batch_wait_timeout_sを適切に設定しないと、レイテンシが増加する可能性があります。
こんな方におすすめ:
Ray RLibは、複数のアルゴリズムと分散トレーニングをサポートした、スケーラブルな強化学習を提供します。
主なメリット:
from ray.rllib.algorithms.ppo import PPO
import gymnasium as gym
ray.init()
# PPOエージェントのトレーニング
def train_ppo_agent():
"""CartPole環境でPPOエージェントをトレーニング"""
# PPOアルゴリズムの設定
config = {
"env": "CartPole-v1",
"framework": "torch",
"num_workers": 2, # データ収集用の並列ワーカー
"train_batch_size": 4000,
"lr": 3e-4,
"gamma": 0.99
}
# PPOトレーナーの作成
trainer = PPO(config=config)
# トレーニングループ
for i in range(50):
result = trainer.train()
if i % 10 == 0:
reward = result["episode_reward_mean"]
print(f"反復 {i}: 平均報酬 = {reward:.2f}")
# 定期的にチェックポイントを保存
checkpoint_path = trainer.save()
print(f"チェックポイント保存: {checkpoint_path}")
final_result = trainer.train()
print(f"最終平均報酬: {final_result['episode_reward_mean']:.2f}")
カスタム環境の例:トレーディング環境
class CustomTradingEnv(gym.Env):
"""デモ用のシンプルなトレーディング環境"""
def __init__(self):
super().__init__()
# アクション空間: 0=保持, 1=買い, 2=売り
self.action_space = gym.spaces.Discrete(3)
# 観測空間: [価格, ポジション, 現金, ポートフォリオ価値]
self.observation_space = gym.spaces.Box(
low=np.array([0, -1, 0, 0]),
high=np.array([1000, 1, 10000, 20000]),
dtype=np.float32
)
💡 活用シーン:
Ray Dataは、大規模データセットのための分散データ処理機能を提供します。
主なメリット:
import ray
ray.init()
# 基本的なデータ処理
def basic_data_processing():
"""Ray Dataの基本操作を実演"""
# データセットの作成
ds = ray.data.range(1000000)
# データの変換
squared = ds.map(lambda x: x ** 2)
# フィルタリング
filtered = squared.filter(lambda x: x > 1000)
# サンプルの取得
sample = filtered.take(10)
print("フィルタリング後のサンプル:", sample[:5])
# 集計
total = filtered.sum()
print(f"合計: {total}")
構造化データの処理
def process_structured_data():
"""Ray Dataで構造化データを処理"""
# サンプルデータセットの作成
data = []
for i in range(100000):
data.append({
"user_id": i,
"age": np.random.randint(18, 80),
"income": np.random.normal(50000, 15000),
"category": np.random.choice(["A", "B", "C"])
})
# Rayデータセットの作成
ds = ray.data.from_items(data)
# 複雑な変換処理
def enrich_user_data(batch):
"""ユーザーデータに派生特徴量を追加"""
# 収入区分の計算
income_bracket = np.where(
batch["income"] < 30000, "low",
np.where(batch["income"] < 70000, "medium", "high")
)
batch["income_bracket"] = income_bracket
return batch
# 変換の適用
enriched_ds = ds.map_batches(enrich_user_data, batch_format="numpy")
def pytorch_data_pipeline():
"""Ray DataからPyTorch DataLoaderを作成"""
import torch
# 画像風データの生成
def generate_image_data(i):
return {
"image": np.random.rand(32, 32, 3).astype(np.float32),
"label": np.random.randint(0, 10)
}
ds = ray.data.range(10000).map(generate_image_data)
# PyTorchテンソルへの変換
def to_torch_format(batch):
return {
"image": torch.tensor(batch["image"]).permute(0, 3, 1, 2),
"label": torch.tensor(batch["label"], dtype=torch.long)
}
torch_ds = ds.map_batches(to_torch_format, batch_format="numpy")
# DataLoaderの作成
dataloader = torch_ds.iter_torch_batches(batch_size=32, device="cpu")
PyTorchとの統合
こんな方におすすめ:

MLflowは、Databricksによって開発された、機械学習ライフサイクル全体を管理するためのオープンソースプラットフォームです。実験追跡、モデルのパッケージング、デプロイ、コラボレーションのためのツールを提供します。
📝 MLflow 3.x の主な変更点(2025年)
MLflow 3.0は2025年に大きなメジャーアップデートを行いました:
- LoggedModel: モデルを実験・評価・デプロイ全体で追跡する新しい概念
- GenAI対応強化: LLMアプリケーション、AIエージェント向けの包括的なトレーシングと評価機能
- OpenTelemetry統合: エンタープライズ観測性スタックとの統合
- Prompt Registry: プロンプトのバージョン管理とA/Bテスト機能
本記事のコード例は従来のML/DLモデル向けで、MLflow 3.xでも完全に動作します。GenAI機能については公式ドキュメントをご参照ください。
コアとなる目的
MLflowは、ML実験管理とモデルライフサイクル追跡の課題に取り組みます。データサイエンティストやMLエンジニアが実験を整理し、結果を再現し、さまざまな環境でモデルを一貫してデプロイできるよう支援します。
4つの主要コンポーネント
| コンポーネント | 説明 |
|---|---|
| MLflow Tracking | MLランからパラメータ、メトリクス、成果物をログ記録 |
| MLflow Projects | データサイエンスコードを再利用可能で再現性のある形式でパッケージ化 |
| MLflow Models | さまざまなMLライブラリのモデルを多様なプラットフォームにデプロイ |
| MLflow Model Registry | モデルのバージョンとステージを管理する中央モデルストア |

MLflow Trackingは、包括的な実験ログ記録と比較機能を提供します。
主なメリット:
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
# MLflowトラッキングURIの設定
mlflow.set_tracking_uri("sqlite:///mlflow.db")
# 実験の作成
experiment_name = "classification_comparison"
mlflow.set_experiment(experiment_name)
def log_sklearn_experiment(model, model_name, X_train, X_test, y_train, y_test, params):
"""sklearn実験をMLflowでログ記録"""
with mlflow.start_run(run_name=f"{model_name}_experiment"):
# パラメータのログ記録
mlflow.log_params(params)
# モデルのトレーニング
model.fit(X_train, y_train)
# 予測の実行
y_pred = model.predict(X_test)
# メトリクスの計算
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
# メトリクスのログ記録
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# モデルのログ記録
mlflow.sklearn.log_model(
model,
"model",
registered_model_name=f"{model_name}_classifier"
)
print(f"{model_name} - 精度: {accuracy:.4f}, F1: {f1:.4f}")
PyTorchとの統合
def pytorch_experiment():
"""PyTorch実験をMLflowでログ記録"""
import torch
import torch.nn as nn
class SimpleNN(nn.Module):
def __init__(self, input_size, hidden_size, num_classes):
super(SimpleNN, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, num_classes)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
with mlflow.start_run(run_name="pytorch_neural_network"):
# ハイパーパラメータ
hidden_size = 64
learning_rate = 0.001
epochs = 100
# パラメータのログ記録
mlflow.log_param("hidden_size", hidden_size)
mlflow.log_param("learning_rate", learning_rate)
mlflow.log_param("epochs", epochs)
# モデルの作成とトレーニング
model = SimpleNN(20, hidden_size, 2)
# ... トレーニングループ ...
# モデルのログ記録
mlflow.pytorch.log_model(model, "model")
ハイパーパラメータチューニングとの統合
def hyperparameter_tuning_with_mlflow():
"""MLflow追跡を使用したハイパーパラメータチューニング"""
# パラメータグリッドの定義
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15, None],
'min_samples_split': [2, 5, 10]
}
best_score = 0
# 親ランの作成
with mlflow.start_run(run_name="hyperparameter_tuning"):
parent_run_id = mlflow.active_run().info.run_id
trial_count = 0
# グリッドサーチ
for n_est in param_grid['n_estimators']:
for max_d in param_grid['max_depth']:
for min_split in param_grid['min_samples_split']:
trial_count += 1
# 各試行用の子ラン
with mlflow.start_run(nested=True, run_name=f"trial_{trial_count}"):
params = {
'n_estimators': n_est,
'max_depth': max_d,
'min_samples_split': min_split
}
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
accuracy = accuracy_score(y_test, model.predict(X_test))
# すべてをログ記録
mlflow.log_params(params)
mlflow.log_metric("accuracy", accuracy)
if accuracy > best_score:
best_score = accuracy
mlflow.sklearn.log_model(model, "model")
print(f"最高スコア: {best_score:.4f}")
💡 ベストプラクティス:
log_artifactで保存MLflow Projectsは、データサイエンスコードを再利用可能で再現性のある形式でパッケージ化するための標準フォーマットを提供します。
プロジェクト構造の例
# MLprojectファイルとして保存
name: classification_pipeline
conda_env: conda.yaml
entry_points:
main:
parameters:
n_estimators: {type: int, default: 100}
max_depth: {type: int, default: 10}
test_size: {type: float, default: 0.2}
command: "python train.py --n-estimators {n_estimators}"
train:
parameters:
model_type: {type: string, default: "random_forest"}
data_path: {type: string}
command: "python train.py --model-type {model_type}"
環境設定ファイル
# conda.yamlとして保存
name: ml_project
channels:
- defaults
- conda-forge
dependencies:
- python=3.9
- pip
- pip:
- mlflow
- scikit-learn
- pandas
- numpy
プロジェクトの実行方法
# ローカルで実行 mlflow run . -P n_estimators=200 -P max_depth=15 # GitHubから実行 mlflow run https://github.com/username/ml-project -P n_estimators=150 # 特定のエントリーポイントを実行 mlflow run . -e train -P model_type=logistic_regression
こんな方におすすめ:
MLflow Modelsは、さまざまなプラットフォームへのデプロイのために、MLモデルをパッケージ化する標準フォーマットを提供します。
主なメリット:
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# Sklearnモデルの保存と読み込み
def save_sklearn_model():
"""sklearnモデルをMLflowで保存"""
# パイプラインの作成
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100))
])
pipeline.fit(X, y)
with mlflow.start_run():
mlflow.sklearn.log_model(
pipeline,
"model",
registered_model_name="sklearn_pipeline_model"
)
run_id = mlflow.active_run().info.run_id
print(f"モデル保存完了 run_id: {run_id}")
カスタムモデルラッパー
より複雑な前処理や後処理が必要な場合、カスタムモデルラッパーを作成できます。
import mlflow.pyfunc
class CustomModelWrapper(mlflow.pyfunc.PythonModel):
"""カスタム前処理を含むモデルラッパー"""
def __init__(self, model, scaler, feature_names):
self.model = model
self.scaler = scaler
self.feature_names = feature_names
def preprocess(self, data):
"""カスタム前処理"""
processed = data.copy()
# 特徴量エンジニアリング
processed['feature_sum'] = processed.sum(axis=1)
processed['feature_mean'] = processed.mean(axis=1)
# 特徴量のスケーリング
feature_cols = [col for col in processed.columns if col.startswith('feature_')]
processed[feature_cols] = self.scaler.transform(processed[feature_cols])
return processed
def predict(self, context, model_input):
"""カスタム予測ロジック"""
# 前処理
processed_input = self.preprocess(model_input)
# 予測の実行
predictions = self.model.predict(processed_input)
probabilities = self.model.predict_proba(processed_input)
# 構造化された出力を返す
return pd.DataFrame({
'prediction': predictions,
'probability_class_0': probabilities[:, 0],
'probability_class_1': probabilities[:, 1],
'confidence': probabilities.max(axis=1)
})
モデルシグネチャの定義
モデルの入出力スキーマを明示的に定義することで、型エラーを防ぎます。
from mlflow.models.signature import infer_signature
def save_model_with_signature():
"""明示的な入出力シグネチャを持つモデルを保存"""
model = RandomForestClassifier(n_estimators=100)
model.fit(X, y)
# サンプル予測からシグネチャを推論
sample_input = X[:5]
sample_output = model.predict(sample_input)
signature = infer_signature(sample_input, sample_output)
with mlflow.start_run():
mlflow.sklearn.log_model(
model,
"model_with_signature",
signature=signature,
input_example=sample_input
)
print("モデルシグネチャ:")
print(signature)
⚠️ 注意点: モデルをデプロイする前に、必ず入出力の型とシェイプを確認しましょう。シグネチャを定義することで、本番環境での予期しないエラーを防げます。
Model Registryは、協調的なMLワークフローのための中央モデルストレージ、バージョン管理、ステージ管理を提供します。
主なメリット:
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 複数のモデルバージョンを登録
def register_model_versions():
"""複数のモデルバージョンを作成して登録"""
model_name = "production_classifier"
# バージョン1をトレーニングして登録
with mlflow.start_run(run_name="model_v1"):
model_v1 = RandomForestClassifier(n_estimators=50)
model_v1.fit(X_train, y_train)
accuracy_v1 = accuracy_score(y_test, model_v1.predict(X_test))
mlflow.log_metric("accuracy", accuracy_v1)
mlflow.sklearn.log_model(
model_v1,
"model",
registered_model_name=model_name
)
print(f"モデルv1の精度: {accuracy_v1:.4f}")
# バージョン2(改善版)
with mlflow.start_run(run_name="model_v2"):
model_v2 = RandomForestClassifier(n_estimators=100, max_depth=10)
model_v2.fit(X_train, y_train)
accuracy_v2 = accuracy_score(y_test, model_v2.predict(X_test))
mlflow.log_metric("accuracy", accuracy_v2)
mlflow.sklearn.log_model(
model_v2,
"model",
registered_model_name=model_name
)
print(f"モデルv2の精度: {accuracy_v2:.4f}")
モデルステージの管理
def manage_model_stages(model_name):
"""モデルステージの管理:None、Staging、Production、Archived"""
# バージョン1をStagingに移行
client.transition_model_version_stage(
name=model_name,
version=1,
stage="Staging"
)
print("バージョン1をStagingに移行しました")
# バージョン2をProductionに移行
client.transition_model_version_stage(
name=model_name,
version=2,
stage="Production"
)
print("バージョン2をProductionに移行しました")
# 説明とタグの追加
client.update_model_version(
name=model_name,
version=1,
description="初期ベースラインモデル - 50本の木を持つシンプルなRF"
)
client.set_model_version_tag(model_name, "1", "model_type", "baseline")
client.set_model_version_tag(model_name, "2", "model_type", "production")
レジストリからモデルを読み込み
def load_model_from_registry(model_name, stage="Production"):
"""レジストリからステージ別にモデルを読み込み"""
# ステージ別にモデルを読み込み
model_uri = f"models:/{model_name}/{stage}"
model = mlflow.sklearn.load_model(model_uri)
# 予測の実行
predictions = model.predict(X_test[:5])
print(f"{stage}モデルの予測: {predictions}")
return model
# 本番モデルの読み込み
prod_model = load_model_from_registry("production_classifier", "Production")
最良モデルの自動プロモーション
def promote_best_model(model_name):
"""最高性能のモデルを自動的に本番環境に昇格"""
versions = client.search_model_versions(f"name='{model_name}'")
# 最高精度のバージョンを検索
best_version = None
best_accuracy = 0
for version in versions:
run = client.get_run(version.run_id)
accuracy = run.data.metrics.get('accuracy', 0)
if accuracy > best_accuracy:
best_accuracy = accuracy
best_version = version.version
if best_version:
# 既存の本番モデルをアーカイブ
client.transition_model_version_stage(
name=model_name,
version=best_version,
stage="Production",
archive_existing_versions=True
)
print(f"バージョン{best_version}を本番環境に昇格(精度: {best_accuracy:.4f})")
こんな方におすすめ:
MLflowは、人気のMLフレームワークとシームレスに統合され、自動ロギング機能により定型コードを最小限に抑えます。
主なメリット:
TensorFlow自動ロギング
import tensorflow as tf
from tensorflow import keras
# 自動ロギングを有効化
mlflow.tensorflow.autolog()
with mlflow.start_run(run_name="tensorflow_autolog"):
# モデルの定義
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(20,)),
keras.layers.Dropout(0.3),
keras.layers.Dense(1, activation='sigmoid')
])
# コンパイル
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# トレーニング(自動ロギングがすべてキャプチャ)
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2)
print("自動ロギング完了:モデル、メトリクス、パラメータ、トレーニング履歴")
XGBoost自動ロギング
import xgboost as xgb
mlflow.xgboost.autolog()
with mlflow.start_run(run_name="xgboost_autolog"):
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
params = {
'objective': 'binary:logistic',
'max_depth': 6,
'learning_rate': 0.1
}
# トレーニング(自動ロギングがすべてキャプチャ)
model = xgb.train(params, dtrain, num_boost_round=100, evals=[(dtest, 'test')])
print("自動ロギング完了:モデル、パラメータ、メトリクス、特徴量重要度")
カスタムロギングとの併用
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
mlflow.sklearn.autolog()
with mlflow.start_run(run_name="combined_logging"):
# モデルのトレーニング(自動ロギング)
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
# カスタムロギング:分類レポート
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred, output_dict=True)
mlflow.log_dict(report, "classification_report.json")
# カスタムロギング:混同行列の可視化
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('混同行列')
plt.savefig('confusion_matrix.png')
mlflow.log_artifact('confusion_matrix.png')
print("自動ロギングとカスタムロギングの併用完了")
💡 使い分けのポイント:

Apache Airflowは、もともとAirbnbで開発されたオープンソースのワークフローオーケストレーションプラットフォームです。ワークフローを有向非巡回グラフ(DAG)としてプログラム的に作成、スケジュール、監視できます。
📝 Apache Airflow 3.x の主な変更点(2025年)
Airflow 3.0は5年ぶりのメジャーアップデートとして2025年4月にリリースされました:Airflow 3.0.0(2025年4月リリース)の主な機能:
- 新しいReact UI: 完全に再設計されたユーザーインターフェース
- イベント駆動スケジューリング: 外部イベントに基づくワークフロー起動(AIP-82)
- Task SDK: DAGをよりシンプルに記述できる新しいSDK
- Task Execution API: セキュリティ強化のためメタDBへの直接アクセスを制限(AIP-72)
- DAGバージョニング: DAGの変更履歴を追跡
- Python 3.9-3.12対応: Python 3.9以上が必須
Airflow 3.1.0(2025年9月リリース)の追加機能:
- Python 3.13対応: 最新のPythonバージョンをサポート
- Human-in-the-Loop (HITL): ワークフローに人間の判断を組み込む機能
- 17言語対応: 国際化サポートの大幅強化
本記事のコード例は、主にAirflow 2.x系のAPIを使用していますが、Airflow 3.xでも動作します。ただし、一部のインポートパスや設定が変更されている場合があるため、詳細は後述の注釈と公式ドキュメントをご確認ください。
コアとなる目的
Airflowは、ワークフローオーケストレーションとパイプライン管理の課題に取り組みます。依存関係のある複雑なワークフローの調整、定期的なタスクのスケジューリング、実行の監視、失敗したタスクの再試行ロジックに優れています。
主要概念
| 概念 | 説明 |
|---|---|
| DAG(有向非巡回グラフ) | ワークフローの構造とタスクの依存関係を定義 |
| Operator(オペレーター) | 個々のタスクを定義(PythonOperator、BashOperatorなど) |
| Sensor(センサー) | 条件や外部イベントを待機 |
| Executor(エグゼキューター) | タスクの実行方法を決定(Sequential、Local、Celery、Kubernetes) |
| XCom(クロスコミュニケーション) | タスク間でデータを受け渡すメカニズム |

DAGはワークフローの構造、タスクの依存関係、スケジューリングを定義します。MLパイプラインDAGの包括的な例を見てみましょう。
主なメリット:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# 📝 注: Airflow 3.x以降では以下のインポートが推奨されます:
# from airflow.sdk import DAG
# from airflow.providers.standard.operators.python import PythonOperator
# from airflow.providers.standard.operators.bash import BashOperator
# 上記のAirflow 2.x形式のインポートも現時点では動作しますが、
# 将来的に非推奨となる可能性があります。新規プロジェクトでは
# airflow.sdk および標準プロバイダーからのインポートを推奨します。
# DAGのデフォルト引数
default_args = {
'owner': 'data-science-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email': ['alerts@company.com'],
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
# DAGの作成
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
description='完全なMLトレーニングとデプロイパイプライン',
schedule_interval='0 2 * * *', # 毎日午前2時に実行
catchup=False,
tags=['machine-learning', 'training', 'production']
)
タスク1:データの可用性チェック
def check_data_availability(**context):
"""必要なデータファイルが存在するかチェック"""
import os
data_path = '/data/raw/training_data.csv'
if not os.path.exists(data_path):
raise FileNotFoundError(f"データファイルが見つかりません: {data_path}")
# ファイルサイズとレコード数を取得
file_size = os.path.getsize(data_path)
df = pd.read_csv(data_path)
record_count = len(df)
# 下流タスク用にXComにプッシュ
context['task_instance'].xcom_push(key='data_path', value=data_path)
context['task_instance'].xcom_push(key='record_count', value=record_count)
print(f"データ利用可能: {record_count}レコード、{file_size/1024/1024:.2f} MB")
return data_path
check_data_task = PythonOperator(
task_id='check_data_availability',
python_callable=check_data_availability,
provide_context=True, # 📝 注: Airflow 2.0以降、**kwargsを使う関数では自動的にコンテキストが渡されるため、このパラメータは指定不要です(非推奨)
dag=dag
)
タスク2:データ検証
def validate_data(**context):
"""データ品質を検証"""
# 前タスクからデータパスを取得
ti = context['task_instance']
data_path = ti.xcom_pull(task_ids='check_data_availability', key='data_path')
# データの読み込み
df = pd.read_csv(data_path)
# 検証チェック
validations = {
'null_percentage': (df.isnull().sum() / len(df) * 100).to_dict(),
'duplicate_count': df.duplicated().sum(),
'shape': df.shape
}
# 重大な問題のチェック
if validations['duplicate_count'] > len(df) * 0.1:
raise ValueError(f"重複が多すぎます: {validations['duplicate_count']}")
ti.xcom_push(key='validation_results', value=validations)
print(f"検証完了: {validations}")
return "validation_passed"
validate_data_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
provide_context=True,
dag=dag
)
タスク3:データ前処理
def preprocess_data(**context):
"""前処理と特徴量エンジニアリング"""
ti = context['task_instance']
data_path = ti.xcom_pull(task_ids='check_data_availability', key='data_path')
df = pd.read_csv(data_path)
# 前処理ステップ
# 1. 重複の削除
df = df.drop_duplicates()
# 2. 欠損値の処理
df = df.fillna(df.mean())
# 3. 特徴量エンジニアリング
if 'feature_1' in df.columns and 'feature_2' in df.columns:
df['feature_interaction'] = df['feature_1'] * df['feature_2']
df['feature_ratio'] = df['feature_1'] / (df['feature_2'] + 1)
# 4. 処理済みデータの保存
processed_path = '/data/processed/training_data_processed.csv'
df.to_csv(processed_path, index=False)
ti.xcom_push(key='processed_data_path', value=processed_path)
print(f"前処理完了: {len(df)}レコード、{len(df.columns)}特徴量")
return processed_path
preprocess_data_task = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_data,
provide_context=True,
dag=dag
)
タスク4:モデルトレーニング
def train_model(**context):
"""MLモデルのトレーニング"""
ti = context['task_instance']
processed_path = ti.xcom_pull(task_ids='preprocess_data', key='processed_data_path')
df = pd.read_csv(processed_path)
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# モデルのトレーニング
model = RandomForestClassifier(n_estimators=100, max_depth=10, n_jobs=-1)
model.fit(X_train, y_train)
# 評価
test_accuracy = accuracy_score(y_test, model.predict(X_test))
# モデルの保存
model_path = f'/models/rf_model_{context["ds"]}.pkl'
with open(model_path, 'wb') as f:
pickle.dump(model, f)
ti.xcom_push(key='model_path', value=model_path)
ti.xcom_push(key='test_accuracy', value=test_accuracy)
print(f"モデルトレーニング完了: テスト精度 = {test_accuracy:.4f}")
return model_path
train_model_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
provide_context=True,
dag=dag
)
タスク5:モデル評価と検証
def evaluate_model(**context):
"""包括的なモデル評価"""
ti = context['task_instance']
test_accuracy = ti.xcom_pull(task_ids='train_model', key='test_accuracy')
# 精度しきい値の定義
ACCURACY_THRESHOLD = 0.75
if test_accuracy < ACCURACY_THRESHOLD:
raise ValueError(
f"モデル精度{test_accuracy:.4f}がしきい値{ACCURACY_THRESHOLD}を下回っています"
)
validation_passed = True
validation_message = "モデルがすべての検証チェックに合格しました"
ti.xcom_push(key='validation_passed', value=validation_passed)
print(f"モデル評価完了: {validation_message}")
return validation_passed
evaluate_model_task = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model,
provide_context=True,
dag=dag
)
タスク6:モデルデプロイ
def deploy_model(**context):
"""モデルを本番環境にデプロイ"""
ti = context['task_instance']
model_path = ti.xcom_pull(task_ids='train_model', key='model_path')
validation_passed = ti.xcom_pull(task_ids='evaluate_model', key='validation_passed')
if not validation_passed:
raise ValueError("デプロイ不可:モデル検証に失敗しました")
# モデルを本番環境の場所にコピー
import shutil
production_path = '/models/production/current_model.pkl'
shutil.copy(model_path, production_path)
print(f"モデルデプロイ成功: {production_path}")
return production_path
deploy_model_task = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
provide_context=True,
dag=dag
)
タスク依存関係の定義
# タスクの依存関係を定義 check_data_task >> validate_data_task >> preprocess_data_task preprocess_data_task >> train_model_task >> evaluate_model_task evaluate_model_task >> deploy_model_task
⚠️ 重要な注意点:
>> 演算子を使用してタスクの依存関係を定義retries設定による)Airflowは、さまざまなタスクタイプに対応する多様なオペレーターを提供します。
主要オペレーター一覧
| オペレーター | 用途 |
|---|---|
| PythonOperator | Python関数の実行 |
| BashOperator | Bashコマンドの実行 |
| BranchPythonOperator | 条件分岐 |
| DummyOperator | プレースホルダー/分岐の合流 |
| EmailOperator | メール通知 |
| SimpleHttpOperator | REST API呼び出し |
| PostgresOperator | データベース操作 |
条件分岐の例:BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def decide_model_type(**context):
"""データサイズに基づいてどのモデルをトレーニングするか決定"""
ti = context['task_instance']
data_path = ti.xcom_pull(task_ids='extract_data')
df = pd.read_csv(data_path)
# 決定ロジック
if len(df) > 500:
print("大規模データセット - Random Forestを使用")
return 'train_random_forest'
else:
print("小規模データセット - Logistic Regressionを使用")
return 'train_logistic_regression'
branch_task = BranchPythonOperator(
task_id='decide_model',
python_callable=decide_model_type,
provide_context=True,
dag=dag
)
# 複数のトレーニングパス
train_rf_task = PythonOperator(task_id='train_random_forest', ...)
train_lr_task = PythonOperator(task_id='train_logistic_regression', ...)
# 分岐の合流
join_task = DummyOperator(
task_id='join_branches',
trigger_rule='none_failed_min_one_success',
dag=dag
)
# 依存関係
branch_task >> [train_rf_task, train_lr_task] >> join_task
BashOperatorの例
from airflow.operators.bash import BashOperator
validate_bash = BashOperator(
task_id='validate_with_bash',
bash_command='''
echo "データファイルを検証中"
if [ -f /tmp/data_{{ ds }}.csv ]; then
echo "ファイルが存在します"
lines=$(wc -l < /tmp/data_{{ ds }}.csv)
echo "ファイルの行数: $lines"
else
echo "ファイルが見つかりません"
exit 1
fi
''',
dag=dag
)
💡 テンプレート変数:
{{ ds }} – 実行日(YYYY-MM-DD形式){{ ts }} – 実行タイムスタンプ{{ task_instance }} – タスクインスタンスオブジェクトSimpleHttpOperatorの例:API呼び出し
from airflow.providers.http.operators.http import SimpleHttpOperator
api_call_task = SimpleHttpOperator(
task_id='register_model_api',
http_conn_id='model_registry_api',
endpoint='/api/models/register',
method='POST',
data=json.dumps({
'model_name': 'production_classifier',
'version': '{{ ds }}',
'metrics': {'accuracy': 0.85}
}),
headers={'Content-Type': 'application/json'},
dag=dag
)
⚠️ 接続設定: http_conn_idは、Airflow UIの「Admin > Connections」で事前に設定する必要があります。
Airflowは、依存関係、センサー、データ受け渡しを伴う複雑なデータパイプラインのオーケストレーションに優れています。
センサーの使用
センサーは、特定の条件が満たされるまで待機します。
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.sql import SqlSensor
# ファイルの存在を待つ
file_sensor = FileSensor(
task_id='wait_for_file',
filepath='/data/input/new_data.csv',
poke_interval=60, # 60秒ごとにチェック
timeout=3600, # 1時間後にタイムアウト
dag=dag
)
# データベースの条件を待つ
sql_sensor = SqlSensor(
task_id='wait_for_data',
conn_id='postgres_default',
sql="SELECT COUNT(*) FROM orders WHERE date = '{{ ds }}'",
timeout=600,
dag=dag
)
複雑な依存関係の例
# 並列タスク extract_task_1 = PythonOperator(task_id='extract_source_1', ...) extract_task_2 = PythonOperator(task_id='extract_source_2', ...) extract_task_3 = PythonOperator(task_id='extract_source_3', ...) # 統合タスク merge_task = PythonOperator(task_id='merge_data', ...) # 変換タスク transform_task = PythonOperator(task_id='transform_data', ...) # 負荷タスク load_task = PythonOperator(task_id='load_to_warehouse', ...) # 依存関係:3つのソースを並列抽出 → 統合 → 変換 → 負荷 [extract_task_1, extract_task_2, extract_task_3] >> merge_task merge_task >> transform_task >> load_task
動的DAG生成
from airflow.models import Variable
# 設定から読み込み
data_sources = Variable.get("data_sources", deserialize_json=True)
for source in data_sources:
task = PythonOperator(
task_id=f'process_{source["name"]}',
python_callable=process_source,
op_kwargs={'source': source},
dag=dag
)
こんな方におすすめ:

ここまで3つのツールの詳細機能を見てきました。次は、実際のプロジェクトでどのツールを選ぶべきかの判断基準を示します。
✅ こんな課題があるなら Ray
具体的なユースケース:
✅ こんな課題があるなら MLflow
具体的なユースケース:
✅ こんな課題があるなら Airflow
具体的なユースケース:
実際のプロジェクトでは、これらのツールを組み合わせることで最大の効果を発揮します。
組み合わせ例1:MLflow + Airflow
Airflow(オーケストレーション)
↓
データ前処理タスク
↓
モデルトレーニング(MLflow で追跡)
↓
モデル評価(MLflow に記録)
↓
モデルデプロイ(MLflow Registry から)
組み合わせ例2:Ray + MLflow
Ray Tune(ハイパーパラメータ探索)
↓
各試行を MLflow で追跡
↓
最良のモデルを MLflow Registry に登録
↓
Ray Serve でモデルをデプロイ
組み合わせ例3:すべてを統合
Airflow(日次スケジュール)
↓
Ray Data(大規模データ処理)
↓
Ray Tune + MLflow(分散チューニング + 追跡)
↓
MLflow Registry(モデル管理)
↓
Ray Serve(スケーラブルサービング)

以下の質問に答えて、最適なツールを見つけましょう。
質問1:主な課題は何ですか?
質問2:チームの規模は?
質問3:データのサイズは?
質問4:デプロイ要件は?

この記事では、Ray、MLflow、Airflowという3つの重要なMLツールを詳しく比較してきました。
重要なポイントの再確認
✅ Ray – 分散コンピューティングとスケーリングの課題を解決
✅ MLflow – 実験追跡とモデルライフサイクル管理を提供
✅ Airflow – ワークフローオーケストレーションとスケジューリングを担当
今すぐ始められるアクションプラン
さらに学ぶためのリソース
📚 公式ドキュメント
🎓 次のステップ
最後に
適切なツールを選択することは、スケーラブルで保守性の高いMLシステムを構築するための第一歩です。この記事が、あなたのプロジェクトに最適なツールを選ぶ助けとなれば幸いです。
さあ、これらのツールを使って、あなたのMLプロジェクトを次のレベルに引き上げましょう!応援しています!
生成AI、LLM、NLPを専門とするAI/ML開発者兼MLOpsエンジニア。
生成AI、大規模言語モデル(LLM)、自然言語処理(NLP)を専門とするAI/ML開発者兼MLOpsエンジニアで、5年以上の実務経験を有している。クラウドプラットフォームや最先端のフレームワークを活用し、実運用レベルのAIアプリケーションの構築とデプロイに精通している。AI技術の発展と、グローバルなコミュニティへの知識共有に情熱を注いでいる。
この記事は著者の許可を得て公開しています。
Workstyle Evolution代表。18万人超YouTuber&著書『ChatGPT最強の仕事術』は4万部突破。
株式会社Workstyle Evolution代表取締役。YouTubeチャンネル「いけともch(チャンネル)」では、 AIエージェント時代の必須ノウハウ・スキルや、最新AIツールの活用法を独自のビジネス視点から解説し、 チャンネル登録数は18万人超(2025年7月時点)。