Ray vs MLflow vs Airflow:MLツール選びの決定版ガイド|機能比較と実践コード例で開発ツールの使い分けを完全マスター

2025年12月30日 2025年12月30日 AI開発・効率化ツール

Ray vs MLflow vs Airflow:MLツール選びの決定版ガイド|機能比較と実践コード例で開発ツールの使い分けを完全マスター

記事のインフォグラフィックサマリ
📊 記事内容のビジュアルサマリ

機械学習エンジニアなら知っておきたい3大ツールを徹底比較。「どのツールをいつ使うべきか」がこの記事で明確になります。

📌 本記事のコード対応バージョン

  • Ray: 2.52.1(2025年11月リリース)
  • MLflow: 3.7.0(2025年12月リリース)
  • Apache Airflow: 3.1.5(2025年12月リリース )

本記事のコード例は2025年12月時点の公式ドキュメントを参考にしていますが、環境やマイナーアップデートによって挙動が異なる場合もあります。実際の利用時には公式ドキュメントもあわせて確認してください。


はじめに

はじめに

ML/AIエコシステムには数多くのツールが存在し、それぞれが「あなたの課題を解決します」と謳っています。その中でも、本番環境のML開発で必ず名前が挙がる3つのツールがあります。Ray、MLflow、Airflowです。

この記事で得られること:
✅ 3つのツールの違いが明確に理解できる
✅ 豊富な実践コード例で実装イメージが掴める
✅ 自分のプロジェクトに最適なツールが選べる判断フレームワーク
✅ 実務で使える具体的なユースケース

対象読者:

  • 機械学習モデルを本番環境に展開したいデータサイエンティスト
  • MLOpsの導入を検討している機械学習エンジニア
  • スケーラブルなML基盤を構築したいチームリーダー

それでは、各ツールの特徴を深く掘り下げていきましょう!


ML/AIライフサイクルの課題

ML/AIライフサイクルの課題

現代のMLプロジェクトは、複数の複雑なステージで構成されており、それぞれに独自の課題があります。

典型的なMLパイプラインの各ステージ

  1. データ収集と処理 – データの収集、クリーニング、変換
  2. 特徴量エンジニアリング – 関連性の高い特徴量の作成と選択
  3. モデル開発 – トレーニング、チューニング、さまざまなアプローチの実験
  4. モデル評価 – パフォーマンステストと異なるバージョンの比較
  5. モデルデプロイ – 本番環境でのモデル提供
  6. 監視とメンテナンス – パフォーマンス追跡とモデル更新

よくある課題

🐌 スケーラビリティの問題 – 単一マシンでのトレーニングに時間がかかりすぎる
📊 実験管理の混乱 – どのパラメータがどの結果を生み出したのか追跡できない
🔄 パイプラインの複雑性 – 複数タスク間の依存関係の管理
🚀 デプロイの難しさ – ノートブックから本番環境への移行
👥 チーム協業 – モデルの共有と結果の再現

各ツールの役割

  • Ray → スケーリングと分散コンピューティングの課題を解決
  • MLflow → 実験追跡とモデル管理に対応
  • Airflow → ワークフローのオーケストレーションとパイプライン管理を担当


Ray:AIのための分散コンピューティング

Ray:AIのための分散コンピューティング

Rayは、カリフォルニア大学バークレー校のRISELabで開発されたオープンソースの分散コンピューティングフレームワークです。最小限のコード変更で、Pythonアプリケーションを単一マシンから大規模クラスタまでスケールさせることができます。

コアとなる目的

Rayは、AI/MLワークロードのスケーリングという根本的な課題に取り組みます。大規模モデルのトレーニング、ハイパーパラメータチューニング、膨大なデータセットの処理、大規模なモデルサービング、いずれの場合でも、Rayは分散コンピューティングのための統一されたインターフェースを提供します。

主要コンポーネント

コンポーネント説明
Ray Core分散コンピューティングの基本機能(@ray.remote)
Ray Tuneハイパーパラメータチューニングを大規模に実行
Ray Serveモデルのサービングとデプロイ
Ray RLlib強化学習アルゴリズム
Ray Data分散データ処理
Ray Train分散モデルトレーニング


Rayの詳細機能解説

Rayの詳細機能解説

機能1:Ray Core – 分散コンピューティング

Ray Coreは、通常のPython関数をシンプルなデコレーターで分散タスクに変換します。

主なメリット:

  • ✅ コードの変更を最小限に抑えて並列処理を実現
  • ✅ 劇的な処理速度の向上
  • ✅ リソースの効率的な活用
import ray
import time

# Rayの初期化
ray.init()

# 通常の関数を分散タスクに変換
@ray.remote
def expensive_computation(n):
    """重い計算処理をシミュレート"""
    time.sleep(1)  # 作業をシミュレート
    return sum(i * i for i in range(n))

# 並列処理の実装例
def parallel_processing():
    start_time = time.time()
    
    # すべてのタスクを並列で送信
    futures = [expensive_computation.remote(i) for i in range(1000, 5000, 1000)]
    
    # 結果が準備できたら取得
    results = ray.get(futures)
    
    end_time = time.time()
    print(f"並列処理時間: {end_time - start_time:.2f}秒")
    return results

💡 ポイント: 上記のコードでは、単一マシンで順次実行する場合と比較して、処理時間を大幅に短縮できます。

Actor(アクター)パターンでの状態管理

@ray.remote
class Counter:
    def __init__(self):
        self.count = 0
    
    def increment(self):
        self.count += 1
        return self.count
    
    def get_count(self):
        return self.count

# アクターの使用例
counters = [Counter.remote() for _ in range(3)]

# 各アクターは独自の状態を維持
futures = []
for counter in counters:
    for _ in range(5):
        futures.append(counter.increment.remote())

results = ray.get(futures)
print(f"実行結果: {results}")

こんな方におすすめ:

  • 大規模なデータ処理を高速化したい方
  • 複数のモデルを並列でトレーニングしたい方
  • 計算リソースを最大限に活用したい方

機能2:Ray Tune – ハイパーパラメータチューニング

Ray Tuneは、さまざまな検索アルゴリズムと早期停止機能をサポートした、分散ハイパーパラメータ最適化を提供します。

主なメリット:

  • ✅ 何百もの試行を並列で実行可能
  • ✅ ベイズ最適化などの高度な検索アルゴリズムをサポート
  • ✅ 早期停止で無駄な計算を削減
from ray import tune
from ray.tune.search.bayesopt import BayesOptSearch
from ray.tune.schedulers import AsyncHyperBandScheduler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score

def objective_function(config):
    """最適化する関数 - モデルをトレーニングして検証スコアを返す"""
    
    # データの生成(実際にはデータを読み込み)
    X, y = make_classification(n_samples=1000, n_features=20)
    
    # configのハイパーパラメータでモデルを作成
    model = RandomForestClassifier(
        n_estimators=int(config["n_estimators"]),
        max_depth=int(config["max_depth"]) if config["max_depth"] > 0 else None,
        random_state=42
    )
    
    # クロスバリデーションスコア
    scores = cross_val_score(model, X, y, cv=3, scoring='accuracy')
    accuracy = scores.mean()
    
    # Ray Tuneに結果を報告
    tune.report(accuracy=accuracy)

# ハイパーパラメータチューニングの実行
def run_hyperparameter_tuning():
    ray.init()
    
    # 探索空間の定義
    search_space = {
        "n_estimators": tune.randint(10, 200),
        "max_depth": tune.randint(-1, 20),
        "min_samples_split": tune.randint(2, 20),
        "min_samples_leaf": tune.randint(1, 10)
    }
    
    # ベイズ最適化の設定
    bayesopt = BayesOptSearch(metric="accuracy", mode="max")
    
    # 早期停止のスケジューラー設定
    scheduler = AsyncHyperBandScheduler(
        metric="accuracy",
        mode="max"
    )
    
    # チューニング実行(Ray 2.x以降の推奨方法:Tunerクラスを使用)
    tuner = tune.Tuner(
        objective_function,
        param_space=search_space,
        tune_config=tune.TuneConfig(
            metric="accuracy",
            mode="max",
            search_alg=bayesopt,
            scheduler=scheduler,
            num_samples=50  # 試行回数
        ),
        run_config=tune.RunConfig(
            name="rf_hyperparameter_tuning"
        )
    )
    
    results = tuner.fit()
    best_result = results.get_best_result()
    
    print(f"最適な設定: {best_result.config}")
    print(f"最高精度: {best_result.metrics['accuracy']:.4f}")

🎯 使い分けのヒント:

  • シンプルなグリッドサーチには従来のGridSearchCVを使用
  • 大規模で複雑な探索空間にはRay Tuneを使用

機能3:Ray Serve – モデルサービング

Ray Serveは、組み込みのロードバランシングと自動スケーリング機能を備えた、スケーラブルなモデルサービングを可能にします。

主なメリット:

  • ✅ 複数のモデルレプリカで負荷分散
  • ✅ リクエスト量に応じた自動スケーリング
  • ✅ バッチ処理による高スループット
from ray import serve
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

ray.init()

@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 1})
class MLModel:
    def __init__(self):
        # モデルのトレーニング(実際には保存済みモデルを読み込み)
        X, y = make_classification(n_samples=1000, n_features=20)
        self.model = RandomForestClassifier(n_estimators=100)
        self.model.fit(X, y)
        print("モデルの準備完了!")
    
    async def __call__(self, request):
        # リクエストの処理
        data = await request.json()
        features = np.array(data["features"]).reshape(1, -1)
        
        # 予測の実行
        prediction = self.model.predict(features)[0]
        probability = self.model.predict_proba(features)[0].tolist()
        
        return {
            "prediction": int(prediction),
            "probability": probability
        }

# アプリケーションの定義とデプロイ(Ray 2.x以降の標準的な方法)
app = MLModel.bind()
handle = serve.run(app)

バッチ処理による最適化

スループットを向上させるために、複数のリクエストをまとめて処理できます。

@serve.deployment(num_replicas=3)
class BatchedMLModel:
    def __init__(self):
        # モデルの初期化
        self.model = RandomForestClassifier(n_estimators=100)
        # トレーニング処理...
    
    @serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
    async def batch_predict(self, requests):
        """バッチリクエストを処理してスループット向上"""
        batch_features = []
        
        for request in requests:
            data = await request.json()
            features = np.array(data["features"])
            batch_features.append(features)
        
        # バッチ予測の実行
        batch_array = np.array(batch_features)
        predictions = self.model.predict(batch_array)
        probabilities = self.model.predict_proba(batch_array)
        
        # 個別のレスポンスを返す
        responses = []
        for pred, prob in zip(predictions, probabilities):
            responses.append({
                "prediction": int(pred),
                "probability": prob.tolist()
            })
        
        return responses

# アプリケーションの定義とデプロイ
app = BatchedMLModel.bind()
handle = serve.run(app)

⚠️ 注意点: バッチ処理を使用する場合は、batch_wait_timeout_sを適切に設定しないと、レイテンシが増加する可能性があります。

こんな方におすすめ:

  • 高負荷なリクエストに対応したい方
  • 複数のモデルバージョンをA/Bテストしたい方
  • スケーラブルなMLサービスを構築したい方

機能4:Ray RLlib – 強化学習

Ray RLibは、複数のアルゴリズムと分散トレーニングをサポートした、スケーラブルな強化学習を提供します。

主なメリット:

  • ✅ PPO、DQN、SACなどの主要なRLアルゴリズムを搭載
  • ✅ 分散環境でのトレーニング
  • ✅ マルチエージェントシステムのサポート
from ray.rllib.algorithms.ppo import PPO
import gymnasium as gym

ray.init()

# PPOエージェントのトレーニング
def train_ppo_agent():
    """CartPole環境でPPOエージェントをトレーニング"""
    
    # PPOアルゴリズムの設定
    config = {
        "env": "CartPole-v1",
        "framework": "torch",
        "num_workers": 2,  # データ収集用の並列ワーカー
        "train_batch_size": 4000,
        "lr": 3e-4,
        "gamma": 0.99
    }
    
    # PPOトレーナーの作成
    trainer = PPO(config=config)
    
    # トレーニングループ
    for i in range(50):
        result = trainer.train()
        
        if i % 10 == 0:
            reward = result["episode_reward_mean"]
            print(f"反復 {i}: 平均報酬 = {reward:.2f}")
            
            # 定期的にチェックポイントを保存
            checkpoint_path = trainer.save()
            print(f"チェックポイント保存: {checkpoint_path}")
    
    final_result = trainer.train()
    print(f"最終平均報酬: {final_result['episode_reward_mean']:.2f}")

カスタム環境の例:トレーディング環境

class CustomTradingEnv(gym.Env):
    """デモ用のシンプルなトレーディング環境"""
    
    def __init__(self):
        super().__init__()
        
        # アクション空間: 0=保持, 1=買い, 2=売り
        self.action_space = gym.spaces.Discrete(3)
        
        # 観測空間: [価格, ポジション, 現金, ポートフォリオ価値]
        self.observation_space = gym.spaces.Box(
            low=np.array([0, -1, 0, 0]),
            high=np.array([1000, 1, 10000, 20000]),
            dtype=np.float32
        )

💡 活用シーン:

  • ゲームAI開発
  • ロボット制御
  • 自動トレーディングシステム
  • リソース最適化

機能5:Ray Data – データ処理

Ray Dataは、大規模データセットのための分散データ処理機能を提供します。

主なメリット:

  • ✅ Pandas/NumPyライクなAPIで使いやすい
  • ✅ 大規模データの効率的な処理
  • ✅ MLフレームワークとのシームレスな統合
import ray

ray.init()

# 基本的なデータ処理
def basic_data_processing():
"""Ray Dataの基本操作を実演"""

# データセットの作成
ds = ray.data.range(1000000)

# データの変換
squared = ds.map(lambda x: x ** 2)

# フィルタリング
filtered = squared.filter(lambda x: x > 1000)

# サンプルの取得
sample = filtered.take(10)
print("フィルタリング後のサンプル:", sample[:5])

# 集計
total = filtered.sum()
print(f"合計: {total}")

構造化データの処理

def process_structured_data():
"""Ray Dataで構造化データを処理"""

# サンプルデータセットの作成
data = []
for i in range(100000):
data.append({
"user_id": i,
"age": np.random.randint(18, 80),
"income": np.random.normal(50000, 15000),
"category": np.random.choice(["A", "B", "C"])
})

# Rayデータセットの作成
ds = ray.data.from_items(data)

# 複雑な変換処理
def enrich_user_data(batch):
"""ユーザーデータに派生特徴量を追加"""

# 収入区分の計算
income_bracket = np.where(
batch["income"] < 30000, "low",
np.where(batch["income"] < 70000, "medium", "high")
)

batch["income_bracket"] = income_bracket
return batch

# 変換の適用
enriched_ds = ds.map_batches(enrich_user_data, batch_format="numpy")

def pytorch_data_pipeline():
    """Ray DataからPyTorch DataLoaderを作成"""
    
    import torch
    
    # 画像風データの生成
    def generate_image_data(i):
        return {
            "image": np.random.rand(32, 32, 3).astype(np.float32),
            "label": np.random.randint(0, 10)
        }
    
    ds = ray.data.range(10000).map(generate_image_data)
    
    # PyTorchテンソルへの変換
    def to_torch_format(batch):
        return {
            "image": torch.tensor(batch["image"]).permute(0, 3, 1, 2),
            "label": torch.tensor(batch["label"], dtype=torch.long)
        }
    
    torch_ds = ds.map_batches(to_torch_format, batch_format="numpy")
    
    # DataLoaderの作成
    dataloader = torch_ds.iter_torch_batches(batch_size=32, device="cpu")

PyTorchとの統合

こんな方におすすめ:

  • TB規模のデータを処理したい方
  • MLパイプラインでのデータ前処理を高速化したい方
  • 分散環境でのデータ処理を簡単に実装したい方


MLflow:MLライフサイクル管理

MLflow:MLライフサイクル管理

MLflowは、Databricksによって開発された、機械学習ライフサイクル全体を管理するためのオープンソースプラットフォームです。実験追跡、モデルのパッケージング、デプロイ、コラボレーションのためのツールを提供します。

📝 MLflow 3.x の主な変更点(2025年)
MLflow 3.0は2025年に大きなメジャーアップデートを行いました:

  • LoggedModel: モデルを実験・評価・デプロイ全体で追跡する新しい概念
  • GenAI対応強化: LLMアプリケーション、AIエージェント向けの包括的なトレーシングと評価機能
  • OpenTelemetry統合: エンタープライズ観測性スタックとの統合
  • Prompt Registry: プロンプトのバージョン管理とA/Bテスト機能

本記事のコード例は従来のML/DLモデル向けで、MLflow 3.xでも完全に動作します。GenAI機能については公式ドキュメントをご参照ください。

コアとなる目的

MLflowは、ML実験管理とモデルライフサイクル追跡の課題に取り組みます。データサイエンティストやMLエンジニアが実験を整理し、結果を再現し、さまざまな環境でモデルを一貫してデプロイできるよう支援します。

4つの主要コンポーネント

コンポーネント説明
MLflow TrackingMLランからパラメータ、メトリクス、成果物をログ記録
MLflow Projectsデータサイエンスコードを再利用可能で再現性のある形式でパッケージ化
MLflow ModelsさまざまなMLライブラリのモデルを多様なプラットフォームにデプロイ
MLflow Model Registryモデルのバージョンとステージを管理する中央モデルストア


MLflowの詳細機能解説

MLflowの詳細機能解説

機能1:MLflow Tracking

MLflow Trackingは、包括的な実験ログ記録と比較機能を提供します。

主なメリット:

  • ✅ パラメータ、メトリクス、成果物の自動記録
  • ✅ 複数の実験を簡単に比較
  • ✅ 結果の再現性を確保
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

# MLflowトラッキングURIの設定
mlflow.set_tracking_uri("sqlite:///mlflow.db")

# 実験の作成
experiment_name = "classification_comparison"
mlflow.set_experiment(experiment_name)

def log_sklearn_experiment(model, model_name, X_train, X_test, y_train, y_test, params):
    """sklearn実験をMLflowでログ記録"""
    
    with mlflow.start_run(run_name=f"{model_name}_experiment"):
        # パラメータのログ記録
        mlflow.log_params(params)
        
        # モデルのトレーニング
        model.fit(X_train, y_train)
        
        # 予測の実行
        y_pred = model.predict(X_test)
        
        # メトリクスの計算
        accuracy = accuracy_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred, average='weighted')
        
        # メトリクスのログ記録
        mlflow.log_metric("accuracy", accuracy)
        mlflow.log_metric("f1_score", f1)
        
        # モデルのログ記録
        mlflow.sklearn.log_model(
            model, 
            "model",
            registered_model_name=f"{model_name}_classifier"
        )
        
        print(f"{model_name} - 精度: {accuracy:.4f}, F1: {f1:.4f}")

PyTorchとの統合

def pytorch_experiment():
    """PyTorch実験をMLflowでログ記録"""
    
    import torch
    import torch.nn as nn
    
    class SimpleNN(nn.Module):
        def __init__(self, input_size, hidden_size, num_classes):
            super(SimpleNN, self).__init__()
            self.fc1 = nn.Linear(input_size, hidden_size)
            self.relu = nn.ReLU()
            self.fc2 = nn.Linear(hidden_size, num_classes)
        
        def forward(self, x):
            x = self.fc1(x)
            x = self.relu(x)
            x = self.fc2(x)
            return x
    
    with mlflow.start_run(run_name="pytorch_neural_network"):
        # ハイパーパラメータ
        hidden_size = 64
        learning_rate = 0.001
        epochs = 100
        
        # パラメータのログ記録
        mlflow.log_param("hidden_size", hidden_size)
        mlflow.log_param("learning_rate", learning_rate)
        mlflow.log_param("epochs", epochs)
        
        # モデルの作成とトレーニング
        model = SimpleNN(20, hidden_size, 2)
        # ... トレーニングループ ...
        
        # モデルのログ記録
        mlflow.pytorch.log_model(model, "model")

ハイパーパラメータチューニングとの統合

def hyperparameter_tuning_with_mlflow():
    """MLflow追跡を使用したハイパーパラメータチューニング"""
    
    # パラメータグリッドの定義
    param_grid = {
        'n_estimators': [50, 100, 200],
        'max_depth': [5, 10, 15, None],
        'min_samples_split': [2, 5, 10]
    }
    
    best_score = 0
    
    # 親ランの作成
    with mlflow.start_run(run_name="hyperparameter_tuning"):
        parent_run_id = mlflow.active_run().info.run_id
        
        trial_count = 0
        
        # グリッドサーチ
        for n_est in param_grid['n_estimators']:
            for max_d in param_grid['max_depth']:
                for min_split in param_grid['min_samples_split']:
                    trial_count += 1
                    
                    # 各試行用の子ラン
                    with mlflow.start_run(nested=True, run_name=f"trial_{trial_count}"):
                        params = {
                            'n_estimators': n_est,
                            'max_depth': max_d,
                            'min_samples_split': min_split
                        }
                        
                        model = RandomForestClassifier(**params)
                        model.fit(X_train, y_train)
                        
                        accuracy = accuracy_score(y_test, model.predict(X_test))
                        
                        # すべてをログ記録
                        mlflow.log_params(params)
                        mlflow.log_metric("accuracy", accuracy)
                        
                        if accuracy > best_score:
                            best_score = accuracy
                            mlflow.sklearn.log_model(model, "model")
        
        print(f"最高スコア: {best_score:.4f}")

💡 ベストプラクティス:

  • 実験には意味のある名前を付ける
  • メトリクスだけでなくパラメータも必ず記録する
  • 重要な成果物(図表、レポートなど)はlog_artifactで保存

機能2:MLflow Projects

MLflow Projectsは、データサイエンスコードを再利用可能で再現性のある形式でパッケージ化するための標準フォーマットを提供します。

プロジェクト構造の例

# MLprojectファイルとして保存
name: classification_pipeline
conda_env: conda.yaml

entry_points:
  main:
    parameters:
      n_estimators: {type: int, default: 100}
      max_depth: {type: int, default: 10}
      test_size: {type: float, default: 0.2}
    command: "python train.py --n-estimators {n_estimators}"
  
  train:
    parameters:
      model_type: {type: string, default: "random_forest"}
      data_path: {type: string}
    command: "python train.py --model-type {model_type}"

環境設定ファイル

# conda.yamlとして保存
name: ml_project
channels:
  - defaults
  - conda-forge
dependencies:
  - python=3.9
  - pip
  - pip:
      - mlflow
      - scikit-learn
      - pandas
      - numpy

プロジェクトの実行方法

# ローカルで実行
mlflow run . -P n_estimators=200 -P max_depth=15

# GitHubから実行
mlflow run https://github.com/username/ml-project -P n_estimators=150

# 特定のエントリーポイントを実行
mlflow run . -e train -P model_type=logistic_regression

こんな方におすすめ:

  • チーム間でコードを共有したい方
  • 実験の再現性を確保したい方
  • CI/CDパイプラインに統合したい方

機能3:MLflow Models

MLflow Modelsは、さまざまなプラットフォームへのデプロイのために、MLモデルをパッケージ化する標準フォーマットを提供します。

主なメリット:

  • ✅ 複数のMLフレームワークをサポート
  • ✅ 一貫したデプロイインターフェース
  • ✅ カスタム前処理ロジックの統合
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

# Sklearnモデルの保存と読み込み
def save_sklearn_model():
    """sklearnモデルをMLflowで保存"""
    
    # パイプラインの作成
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', RandomForestClassifier(n_estimators=100))
    ])
    
    pipeline.fit(X, y)
    
    with mlflow.start_run():
        mlflow.sklearn.log_model(
            pipeline,
            "model",
            registered_model_name="sklearn_pipeline_model"
        )
        
        run_id = mlflow.active_run().info.run_id
        print(f"モデル保存完了 run_id: {run_id}")

カスタムモデルラッパー

より複雑な前処理や後処理が必要な場合、カスタムモデルラッパーを作成できます。

import mlflow.pyfunc

class CustomModelWrapper(mlflow.pyfunc.PythonModel):
    """カスタム前処理を含むモデルラッパー"""
    
    def __init__(self, model, scaler, feature_names):
        self.model = model
        self.scaler = scaler
        self.feature_names = feature_names
    
    def preprocess(self, data):
        """カスタム前処理"""
        processed = data.copy()
        
        # 特徴量エンジニアリング
        processed['feature_sum'] = processed.sum(axis=1)
        processed['feature_mean'] = processed.mean(axis=1)
        
        # 特徴量のスケーリング
        feature_cols = [col for col in processed.columns if col.startswith('feature_')]
        processed[feature_cols] = self.scaler.transform(processed[feature_cols])
        
        return processed
    
    def predict(self, context, model_input):
        """カスタム予測ロジック"""
        # 前処理
        processed_input = self.preprocess(model_input)
        
        # 予測の実行
        predictions = self.model.predict(processed_input)
        probabilities = self.model.predict_proba(processed_input)
        
        # 構造化された出力を返す
        return pd.DataFrame({
            'prediction': predictions,
            'probability_class_0': probabilities[:, 0],
            'probability_class_1': probabilities[:, 1],
            'confidence': probabilities.max(axis=1)
        })

モデルシグネチャの定義

モデルの入出力スキーマを明示的に定義することで、型エラーを防ぎます。

from mlflow.models.signature import infer_signature

def save_model_with_signature():
    """明示的な入出力シグネチャを持つモデルを保存"""
    
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X, y)
    
    # サンプル予測からシグネチャを推論
    sample_input = X[:5]
    sample_output = model.predict(sample_input)
    signature = infer_signature(sample_input, sample_output)
    
    with mlflow.start_run():
        mlflow.sklearn.log_model(
            model,
            "model_with_signature",
            signature=signature,
            input_example=sample_input
        )
        
        print("モデルシグネチャ:")
        print(signature)

⚠️ 注意点: モデルをデプロイする前に、必ず入出力の型とシェイプを確認しましょう。シグネチャを定義することで、本番環境での予期しないエラーを防げます。

機能4:MLflow Model Registry

Model Registryは、協調的なMLワークフローのための中央モデルストレージ、バージョン管理、ステージ管理を提供します。

主なメリット:

  • ✅ モデルのバージョン管理
  • ✅ ステージ管理(None、Staging、Production、Archived)
  • ✅ チーム間のモデル共有
from mlflow.tracking import MlflowClient

client = MlflowClient()

# 複数のモデルバージョンを登録
def register_model_versions():
    """複数のモデルバージョンを作成して登録"""
    
    model_name = "production_classifier"
    
    # バージョン1をトレーニングして登録
    with mlflow.start_run(run_name="model_v1"):
        model_v1 = RandomForestClassifier(n_estimators=50)
        model_v1.fit(X_train, y_train)
        
        accuracy_v1 = accuracy_score(y_test, model_v1.predict(X_test))
        mlflow.log_metric("accuracy", accuracy_v1)
        
        mlflow.sklearn.log_model(
            model_v1,
            "model",
            registered_model_name=model_name
        )
        
        print(f"モデルv1の精度: {accuracy_v1:.4f}")
    
    # バージョン2(改善版)
    with mlflow.start_run(run_name="model_v2"):
        model_v2 = RandomForestClassifier(n_estimators=100, max_depth=10)
        model_v2.fit(X_train, y_train)
        
        accuracy_v2 = accuracy_score(y_test, model_v2.predict(X_test))
        mlflow.log_metric("accuracy", accuracy_v2)
        
        mlflow.sklearn.log_model(
            model_v2,
            "model",
            registered_model_name=model_name
        )
        
        print(f"モデルv2の精度: {accuracy_v2:.4f}")

モデルステージの管理

def manage_model_stages(model_name):
    """モデルステージの管理:None、Staging、Production、Archived"""
    
    # バージョン1をStagingに移行
    client.transition_model_version_stage(
        name=model_name,
        version=1,
        stage="Staging"
    )
    print("バージョン1をStagingに移行しました")
    
    # バージョン2をProductionに移行
    client.transition_model_version_stage(
        name=model_name,
        version=2,
        stage="Production"
    )
    print("バージョン2をProductionに移行しました")
    
    # 説明とタグの追加
    client.update_model_version(
        name=model_name,
        version=1,
        description="初期ベースラインモデル - 50本の木を持つシンプルなRF"
    )
    
    client.set_model_version_tag(model_name, "1", "model_type", "baseline")
    client.set_model_version_tag(model_name, "2", "model_type", "production")

レジストリからモデルを読み込み

def load_model_from_registry(model_name, stage="Production"):
    """レジストリからステージ別にモデルを読み込み"""
    
    # ステージ別にモデルを読み込み
    model_uri = f"models:/{model_name}/{stage}"
    model = mlflow.sklearn.load_model(model_uri)
    
    # 予測の実行
    predictions = model.predict(X_test[:5])
    print(f"{stage}モデルの予測: {predictions}")
    
    return model

# 本番モデルの読み込み
prod_model = load_model_from_registry("production_classifier", "Production")

最良モデルの自動プロモーション

def promote_best_model(model_name):
    """最高性能のモデルを自動的に本番環境に昇格"""
    
    versions = client.search_model_versions(f"name='{model_name}'")
    
    # 最高精度のバージョンを検索
    best_version = None
    best_accuracy = 0
    
    for version in versions:
        run = client.get_run(version.run_id)
        accuracy = run.data.metrics.get('accuracy', 0)
        
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_version = version.version
    
    if best_version:
        # 既存の本番モデルをアーカイブ
        client.transition_model_version_stage(
            name=model_name,
            version=best_version,
            stage="Production",
            archive_existing_versions=True
        )
        
        print(f"バージョン{best_version}を本番環境に昇格(精度: {best_accuracy:.4f})")

こんな方におすすめ:

  • 複数のモデルバージョンを管理したい方
  • チームでモデルを共有したい方
  • 本番環境へのデプロイプロセスを自動化したい方

機能5:MLflow統合と自動ロギング

MLflowは、人気のMLフレームワークとシームレスに統合され、自動ロギング機能により定型コードを最小限に抑えます。

主なメリット:

  • ✅ TensorFlow、PyTorch、XGBoostなど主要フレームワークをサポート
  • ✅ 自動ロギングで手動記録の手間を削減
  • ✅ カスタムロギングとの併用が可能

TensorFlow自動ロギング

import tensorflow as tf
from tensorflow import keras

# 自動ロギングを有効化
mlflow.tensorflow.autolog()

with mlflow.start_run(run_name="tensorflow_autolog"):
    # モデルの定義
    model = keras.Sequential([
        keras.layers.Dense(64, activation='relu', input_shape=(20,)),
        keras.layers.Dropout(0.3),
        keras.layers.Dense(1, activation='sigmoid')
    ])
    
    # コンパイル
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    
    # トレーニング(自動ロギングがすべてキャプチャ)
    history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2)
    
    print("自動ロギング完了:モデル、メトリクス、パラメータ、トレーニング履歴")

XGBoost自動ロギング

import xgboost as xgb

mlflow.xgboost.autolog()

with mlflow.start_run(run_name="xgboost_autolog"):
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test, label=y_test)
    
    params = {
        'objective': 'binary:logistic',
        'max_depth': 6,
        'learning_rate': 0.1
    }
    
    # トレーニング(自動ロギングがすべてキャプチャ)
    model = xgb.train(params, dtrain, num_boost_round=100, evals=[(dtest, 'test')])
    
    print("自動ロギング完了:モデル、パラメータ、メトリクス、特徴量重要度")

カスタムロギングとの併用

from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt

mlflow.sklearn.autolog()

with mlflow.start_run(run_name="combined_logging"):
    # モデルのトレーニング(自動ロギング)
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X_train, y_train)
    
    # カスタムロギング:分類レポート
    y_pred = model.predict(X_test)
    report = classification_report(y_test, y_pred, output_dict=True)
    mlflow.log_dict(report, "classification_report.json")
    
    # カスタムロギング:混同行列の可視化
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('混同行列')
    plt.savefig('confusion_matrix.png')
    mlflow.log_artifact('confusion_matrix.png')
    
    print("自動ロギングとカスタムロギングの併用完了")

💡 使い分けのポイント:

  • 標準的なメトリクスには自動ロギングを使用
  • カスタム可視化や独自メトリクスには手動ロギングを追加
  • 両方を組み合わせて包括的な追跡を実現


Airflow:ワークフローオーケストレーション

Airflow:ワークフローオーケストレーション

Apache Airflowは、もともとAirbnbで開発されたオープンソースのワークフローオーケストレーションプラットフォームです。ワークフローを有向非巡回グラフ(DAG)としてプログラム的に作成、スケジュール、監視できます。

📝 Apache Airflow 3.x の主な変更点(2025年)
Airflow 3.0は5年ぶりのメジャーアップデートとして2025年4月にリリースされました:

Airflow 3.0.0(2025年4月リリース)の主な機能:

  • 新しいReact UI: 完全に再設計されたユーザーインターフェース
  • イベント駆動スケジューリング: 外部イベントに基づくワークフロー起動(AIP-82)
  • Task SDK: DAGをよりシンプルに記述できる新しいSDK
  • Task Execution API: セキュリティ強化のためメタDBへの直接アクセスを制限(AIP-72)
  • DAGバージョニング: DAGの変更履歴を追跡
  • Python 3.9-3.12対応: Python 3.9以上が必須

Airflow 3.1.0(2025年9月リリース)の追加機能:

  • Python 3.13対応: 最新のPythonバージョンをサポート
  • Human-in-the-Loop (HITL): ワークフローに人間の判断を組み込む機能
  • 17言語対応: 国際化サポートの大幅強化

本記事のコード例は、主にAirflow 2.x系のAPIを使用していますが、Airflow 3.xでも動作します。ただし、一部のインポートパスや設定が変更されている場合があるため、詳細は後述の注釈と公式ドキュメントをご確認ください。

コアとなる目的

Airflowは、ワークフローオーケストレーションとパイプライン管理の課題に取り組みます。依存関係のある複雑なワークフローの調整、定期的なタスクのスケジューリング、実行の監視、失敗したタスクの再試行ロジックに優れています。

主要概念

概念説明
DAG(有向非巡回グラフ)ワークフローの構造とタスクの依存関係を定義
Operator(オペレーター)個々のタスクを定義(PythonOperator、BashOperatorなど)
Sensor(センサー)条件や外部イベントを待機
Executor(エグゼキューター)タスクの実行方法を決定(Sequential、Local、Celery、Kubernetes)
XCom(クロスコミュニケーション)タスク間でデータを受け渡すメカニズム


Airflowの詳細機能解説

Airflowの詳細機能解説

機能1:DAG作成

DAGはワークフローの構造、タスクの依存関係、スケジューリングを定義します。MLパイプラインDAGの包括的な例を見てみましょう。

主なメリット:

  • ✅ 複雑なワークフローを視覚的に理解できる
  • ✅ タスクの依存関係を明確に定義
  • ✅ 柔軟なスケジューリング
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# 📝 注: Airflow 3.x以降では以下のインポートが推奨されます:
# from airflow.sdk import DAG
# from airflow.providers.standard.operators.python import PythonOperator
# from airflow.providers.standard.operators.bash import BashOperator
# 上記のAirflow 2.x形式のインポートも現時点では動作しますが、
# 将来的に非推奨となる可能性があります。新規プロジェクトでは
# airflow.sdk および標準プロバイダーからのインポートを推奨します。

# DAGのデフォルト引数
default_args = {
    'owner': 'data-science-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

# DAGの作成
dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='完全なMLトレーニングとデプロイパイプライン',
    schedule_interval='0 2 * * *',  # 毎日午前2時に実行
    catchup=False,
    tags=['machine-learning', 'training', 'production']
)

タスク1:データの可用性チェック

def check_data_availability(**context):
    """必要なデータファイルが存在するかチェック"""
    import os
    
    data_path = '/data/raw/training_data.csv'
    
    if not os.path.exists(data_path):
        raise FileNotFoundError(f"データファイルが見つかりません: {data_path}")
    
    # ファイルサイズとレコード数を取得
    file_size = os.path.getsize(data_path)
    df = pd.read_csv(data_path)
    record_count = len(df)
    
    # 下流タスク用にXComにプッシュ
    context['task_instance'].xcom_push(key='data_path', value=data_path)
    context['task_instance'].xcom_push(key='record_count', value=record_count)
    
    print(f"データ利用可能: {record_count}レコード、{file_size/1024/1024:.2f} MB")
    
    return data_path

check_data_task = PythonOperator(
    task_id='check_data_availability',
    python_callable=check_data_availability,
    provide_context=True,  # 📝 注: Airflow 2.0以降、**kwargsを使う関数では自動的にコンテキストが渡されるため、このパラメータは指定不要です(非推奨)
    dag=dag
)

タスク2:データ検証

def validate_data(**context):
    """データ品質を検証"""
    
    # 前タスクからデータパスを取得
    ti = context['task_instance']
    data_path = ti.xcom_pull(task_ids='check_data_availability', key='data_path')
    
    # データの読み込み
    df = pd.read_csv(data_path)
    
    # 検証チェック
    validations = {
        'null_percentage': (df.isnull().sum() / len(df) * 100).to_dict(),
        'duplicate_count': df.duplicated().sum(),
        'shape': df.shape
    }
    
    # 重大な問題のチェック
    if validations['duplicate_count'] > len(df) * 0.1:
        raise ValueError(f"重複が多すぎます: {validations['duplicate_count']}")
    
    ti.xcom_push(key='validation_results', value=validations)
    print(f"検証完了: {validations}")
    
    return "validation_passed"

validate_data_task = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    provide_context=True,
    dag=dag
)

タスク3:データ前処理

def preprocess_data(**context):
    """前処理と特徴量エンジニアリング"""
    
    ti = context['task_instance']
    data_path = ti.xcom_pull(task_ids='check_data_availability', key='data_path')
    
    df = pd.read_csv(data_path)
    
    # 前処理ステップ
    # 1. 重複の削除
    df = df.drop_duplicates()
    
    # 2. 欠損値の処理
    df = df.fillna(df.mean())
    
    # 3. 特徴量エンジニアリング
    if 'feature_1' in df.columns and 'feature_2' in df.columns:
        df['feature_interaction'] = df['feature_1'] * df['feature_2']
        df['feature_ratio'] = df['feature_1'] / (df['feature_2'] + 1)
    
    # 4. 処理済みデータの保存
    processed_path = '/data/processed/training_data_processed.csv'
    df.to_csv(processed_path, index=False)
    
    ti.xcom_push(key='processed_data_path', value=processed_path)
    
    print(f"前処理完了: {len(df)}レコード、{len(df.columns)}特徴量")
    
    return processed_path

preprocess_data_task = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    provide_context=True,
    dag=dag
)

タスク4:モデルトレーニング

def train_model(**context):
    """MLモデルのトレーニング"""
    
    ti = context['task_instance']
    processed_path = ti.xcom_pull(task_ids='preprocess_data', key='processed_data_path')
    
    df = pd.read_csv(processed_path)
    
    X = df.drop('target', axis=1)
    y = df['target']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    
    # モデルのトレーニング
    model = RandomForestClassifier(n_estimators=100, max_depth=10, n_jobs=-1)
    model.fit(X_train, y_train)
    
    # 評価
    test_accuracy = accuracy_score(y_test, model.predict(X_test))
    
    # モデルの保存
    model_path = f'/models/rf_model_{context["ds"]}.pkl'
    with open(model_path, 'wb') as f:
        pickle.dump(model, f)
    
    ti.xcom_push(key='model_path', value=model_path)
    ti.xcom_push(key='test_accuracy', value=test_accuracy)
    
    print(f"モデルトレーニング完了: テスト精度 = {test_accuracy:.4f}")
    
    return model_path

train_model_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    provide_context=True,
    dag=dag
)

タスク5:モデル評価と検証

def evaluate_model(**context):
    """包括的なモデル評価"""
    
    ti = context['task_instance']
    test_accuracy = ti.xcom_pull(task_ids='train_model', key='test_accuracy')
    
    # 精度しきい値の定義
    ACCURACY_THRESHOLD = 0.75
    
    if test_accuracy < ACCURACY_THRESHOLD:
        raise ValueError(
            f"モデル精度{test_accuracy:.4f}がしきい値{ACCURACY_THRESHOLD}を下回っています"
        )
    
    validation_passed = True
    validation_message = "モデルがすべての検証チェックに合格しました"
    
    ti.xcom_push(key='validation_passed', value=validation_passed)
    
    print(f"モデル評価完了: {validation_message}")
    
    return validation_passed

evaluate_model_task = PythonOperator(
    task_id='evaluate_model',
    python_callable=evaluate_model,
    provide_context=True,
    dag=dag
)

タスク6:モデルデプロイ

def deploy_model(**context):
    """モデルを本番環境にデプロイ"""
    
    ti = context['task_instance']
    model_path = ti.xcom_pull(task_ids='train_model', key='model_path')
    validation_passed = ti.xcom_pull(task_ids='evaluate_model', key='validation_passed')
    
    if not validation_passed:
        raise ValueError("デプロイ不可:モデル検証に失敗しました")
    
    # モデルを本番環境の場所にコピー
    import shutil
    production_path = '/models/production/current_model.pkl'
    shutil.copy(model_path, production_path)
    
    print(f"モデルデプロイ成功: {production_path}")
    
    return production_path

deploy_model_task = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_model,
    provide_context=True,
    dag=dag
)

タスク依存関係の定義

# タスクの依存関係を定義
check_data_task >> validate_data_task >> preprocess_data_task
preprocess_data_task >> train_model_task >> evaluate_model_task
evaluate_model_task >> deploy_model_task

⚠️ 重要な注意点:

  • >> 演算子を使用してタスクの依存関係を定義
  • 失敗したタスクは自動的に再試行される(retries設定による)
  • XComを使用してタスク間でデータを受け渡す

機能2:さまざまなオペレーター

Airflowは、さまざまなタスクタイプに対応する多様なオペレーターを提供します。

主要オペレーター一覧

オペレーター用途
PythonOperatorPython関数の実行
BashOperatorBashコマンドの実行
BranchPythonOperator条件分岐
DummyOperatorプレースホルダー/分岐の合流
EmailOperatorメール通知
SimpleHttpOperatorREST API呼び出し
PostgresOperatorデータベース操作

条件分岐の例:BranchPythonOperator

from airflow.operators.python import BranchPythonOperator

def decide_model_type(**context):
    """データサイズに基づいてどのモデルをトレーニングするか決定"""
    
    ti = context['task_instance']
    data_path = ti.xcom_pull(task_ids='extract_data')
    
    df = pd.read_csv(data_path)
    
    # 決定ロジック
    if len(df) > 500:
        print("大規模データセット - Random Forestを使用")
        return 'train_random_forest'
    else:
        print("小規模データセット - Logistic Regressionを使用")
        return 'train_logistic_regression'

branch_task = BranchPythonOperator(
    task_id='decide_model',
    python_callable=decide_model_type,
    provide_context=True,
    dag=dag
)

# 複数のトレーニングパス
train_rf_task = PythonOperator(task_id='train_random_forest', ...)
train_lr_task = PythonOperator(task_id='train_logistic_regression', ...)

# 分岐の合流
join_task = DummyOperator(
    task_id='join_branches',
    trigger_rule='none_failed_min_one_success',
    dag=dag
)

# 依存関係
branch_task >> [train_rf_task, train_lr_task] >> join_task

BashOperatorの例

from airflow.operators.bash import BashOperator

validate_bash = BashOperator(
    task_id='validate_with_bash',
    bash_command='''
        echo "データファイルを検証中"
        if [ -f /tmp/data_{{ ds }}.csv ]; then
            echo "ファイルが存在します"
            lines=$(wc -l < /tmp/data_{{ ds }}.csv)
            echo "ファイルの行数: $lines"
        else
            echo "ファイルが見つかりません"
            exit 1
        fi
    ''',
    dag=dag
)

💡 テンプレート変数:

  • {{ ds }} – 実行日(YYYY-MM-DD形式)
  • {{ ts }} – 実行タイムスタンプ
  • {{ task_instance }} – タスクインスタンスオブジェクト

SimpleHttpOperatorの例:API呼び出し

from airflow.providers.http.operators.http import SimpleHttpOperator

api_call_task = SimpleHttpOperator(
    task_id='register_model_api',
    http_conn_id='model_registry_api',
    endpoint='/api/models/register',
    method='POST',
    data=json.dumps({
        'model_name': 'production_classifier',
        'version': '{{ ds }}',
        'metrics': {'accuracy': 0.85}
    }),
    headers={'Content-Type': 'application/json'},
    dag=dag
)

⚠️ 接続設定: http_conn_idは、Airflow UIの「Admin > Connections」で事前に設定する必要があります。

機能3:データパイプラインのオーケストレーション

Airflowは、依存関係、センサー、データ受け渡しを伴う複雑なデータパイプラインのオーケストレーションに優れています。

センサーの使用

センサーは、特定の条件が満たされるまで待機します。

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.sql import SqlSensor

# ファイルの存在を待つ
file_sensor = FileSensor(
task_id='wait_for_file',
filepath='/data/input/new_data.csv',
poke_interval=60, # 60秒ごとにチェック
timeout=3600, # 1時間後にタイムアウト
dag=dag
)

# データベースの条件を待つ
sql_sensor = SqlSensor(
task_id='wait_for_data',
conn_id='postgres_default',
sql="SELECT COUNT(*) FROM orders WHERE date = '{{ ds }}'",
timeout=600,
dag=dag
)

複雑な依存関係の例

# 並列タスク
extract_task_1 = PythonOperator(task_id='extract_source_1', ...)
extract_task_2 = PythonOperator(task_id='extract_source_2', ...)
extract_task_3 = PythonOperator(task_id='extract_source_3', ...)

# 統合タスク
merge_task = PythonOperator(task_id='merge_data', ...)

# 変換タスク
transform_task = PythonOperator(task_id='transform_data', ...)

# 負荷タスク
load_task = PythonOperator(task_id='load_to_warehouse', ...)

# 依存関係:3つのソースを並列抽出 → 統合 → 変換 → 負荷
[extract_task_1, extract_task_2, extract_task_3] >> merge_task
merge_task >> transform_task >> load_task

動的DAG生成

from airflow.models import Variable

# 設定から読み込み
data_sources = Variable.get("data_sources", deserialize_json=True)

for source in data_sources:
    task = PythonOperator(
        task_id=f'process_{source["name"]}',
        python_callable=process_source,
        op_kwargs={'source': source},
        dag=dag
    )

こんな方におすすめ:

  • 複数のデータソースを統合したい方
  • 定期的なバッチ処理を自動化したい方
  • 複雑な依存関係を持つワークフローを管理したい方


各ツールの使い分けガイド

各ツールの使い分けガイド

ここまで3つのツールの詳細機能を見てきました。次は、実際のプロジェクトでどのツールを選ぶべきかの判断基準を示します。

Ray を選ぶべき場合

✅ こんな課題があるなら Ray

  • 単一マシンでは処理が遅すぎる
  • 複数のモデルを並列でトレーニングしたい
  • 大規模なハイパーパラメータ探索が必要
  • リアルタイムの推論サービングが必要
  • 強化学習プロジェクトに取り組んでいる

具体的なユースケース:

  • 数百のハイパーパラメータ組み合わせを試したい
  • TBスケールのデータを処理したい
  • 複数GPUでモデルをトレーニングしたい
  • 低レイテンシでモデルをサービングしたい

MLflow を選ぶべき場合

✅ こんな課題があるなら MLflow

  • 実験の記録が散らばっている
  • どのパラメータが最良の結果を生んだか忘れた
  • モデルのバージョン管理が必要
  • チーム間でモデルを共有したい
  • 再現性を確保したい

具体的なユースケース:

  • 何十もの実験を追跡して比較したい
  • モデルを複数の環境(開発、ステージング、本番)で管理したい
  • チームメンバーと実験結果を共有したい
  • 本番モデルの系譜を追跡したい

Airflow を選ぶべき場合

✅ こんな課題があるなら Airflow

  • 複数のステップを持つパイプラインを調整したい
  • タスク間に依存関係がある
  • 定期的にパイプラインを実行したい
  • 失敗時の再試行が必要
  • 複数のシステムを統合したい

具体的なユースケース:

  • 毎日深夜にモデルを再トレーニングしたい
  • データ抽出 → 変換 → モデルトレーニング → デプロイの流れを自動化したい
  • 複数のデータソースを調整したい
  • ETLパイプラインとMLパイプラインを統合したい

ツールの組み合わせ

実際のプロジェクトでは、これらのツールを組み合わせることで最大の効果を発揮します。

組み合わせ例1:MLflow + Airflow

Airflow(オーケストレーション)

データ前処理タスク

モデルトレーニング(MLflow で追跡)

モデル評価(MLflow に記録)

モデルデプロイ(MLflow Registry から)

組み合わせ例2:Ray + MLflow

Ray Tune(ハイパーパラメータ探索)

各試行を MLflow で追跡

最良のモデルを MLflow Registry に登録

Ray Serve でモデルをデプロイ

組み合わせ例3:すべてを統合

Airflow(日次スケジュール)

Ray Data(大規模データ処理)

Ray Tune + MLflow(分散チューニング + 追跡)

MLflow Registry(モデル管理)

Ray Serve(スケーラブルサービング)


実践的な決定フレームワーク

実践的な決定フレームワーク

以下の質問に答えて、最適なツールを見つけましょう。

質問1:主な課題は何ですか?

  • 計算が遅い → Ray
  • 実験が混乱している → MLflow
  • ワークフローが複雑 → Airflow

質問2:チームの規模は?

  • 個人/小規模チーム → MLflow から開始
  • 中規模チーム → MLflow + Airflow
  • 大規模チーム → 3つすべて

質問3:データのサイズは?

  • GB未満 → 標準ツール
  • GB〜TB → Ray Data
  • TB以上 → Ray + 分散ストレージ

質問4:デプロイ要件は?

  • バッチ予測 → Airflow
  • リアルタイムAPI → Ray Serve または MLflow Models
  • 高負荷サービング → Ray Serve


まとめと次のステップ

まとめと次のステップ

この記事では、Ray、MLflow、Airflowという3つの重要なMLツールを詳しく比較してきました。

重要なポイントの再確認

Ray – 分散コンピューティングとスケーリングの課題を解決
MLflow – 実験追跡とモデルライフサイクル管理を提供
Airflow – ワークフローオーケストレーションとスケジューリングを担当

今すぐ始められるアクションプラン

  1. まずは小さく始める
    • 既存のプロジェクトで最も痛みを感じている部分を特定
    • その課題に最適なツールを1つだけ導入
    • チームで使い方を習得
  2. 段階的に拡大
    • 最初のツールに慣れたら、次の課題に取り組む
    • 必要に応じて他のツールを追加
    • ツール間の統合を構築
  3. 継続的な改善
    • ベストプラクティスを学び続ける
    • チーム内で知識を共有
    • 定期的にワークフローを見直し

さらに学ぶためのリソース

📚 公式ドキュメント

🎓 次のステップ

  • 各ツールの公式チュートリアルを試す
  • 小規模なプロジェクトで実装してみる
  • コミュニティフォーラムに参加する

最後に

適切なツールを選択することは、スケーラブルで保守性の高いMLシステムを構築するための第一歩です。この記事が、あなたのプロジェクトに最適なツールを選ぶ助けとなれば幸いです。

さあ、これらのツールを使って、あなたのMLプロジェクトを次のレベルに引き上げましょう!応援しています!

この記事の著者

Md Amanatullahのプロフィール写真

Md Amanatullah

生成AI、LLM、NLPを専門とするAI/ML開発者兼MLOpsエンジニア。

生成AI、大規模言語モデル(LLM)、自然言語処理(NLP)を専門とするAI/ML開発者兼MLOpsエンジニアで、5年以上の実務経験を有している。クラウドプラットフォームや最先端のフレームワークを活用し、実運用レベルのAIアプリケーションの構築とデプロイに精通している。AI技術の発展と、グローバルなコミュニティへの知識共有に情熱を注いでいる。

この記事は著者の許可を得て公開しています。

元記事:Ray vs MLflow vs Airflow: The Ultimate Guide to Choosing the Right ML Tool — Features, Examples, and When to Use What

この記事の監修・コメント

池田朋弘のプロフィール写真

池田朋弘(監修)

Workstyle Evolution代表。18万人超YouTuber&著書『ChatGPT最強の仕事術』は4万部突破。

株式会社Workstyle Evolution代表取締役。YouTubeチャンネル「いけともch(チャンネル)」では、 AIエージェント時代の必須ノウハウ・スキルや、最新AIツールの活用法を独自のビジネス視点から解説し、 チャンネル登録数は18万人超(2025年7月時点)。

主な著書:ChatGPT最強の仕事術』、 『Perplexity 最強のAI検索術』、 『Mapify 最強のAI理解術』、 『Gemini 最強のAI仕事術

合わせて読みたい
関連記事

公式LINEで最新ニュースをゲット

LINE登録の無料特典
LINE登録の無料特典
icon

最新のAIニュース
毎週お届け

icon

生成AIの業務別の
ビジネス活用シーン

がわかるAIチャット

icon

過去のAIニュースから
事実を確認できる
何でもAI相談チャット

icon

ニュース動画
アーカイブ

ページトップへ