Aller au contenu principal
Version: 1.3.1.0

Apprentissage automatique avec Apache Spark MLlib

Spark MLlib dans ODP

Apache Spark 3.5.6 est inclus dans ODP et déployé sur YARN. Spark MLlib est la bibliothèque d'apprentissage automatique intégrée de Spark — elle exécute des algorithmes ML distribués directement sur votre cluster, lisant les données depuis HDFS, Iceberg et les tables Hive, sans nécessiter d'installation de logiciel supplémentaire.

MLlib est bien adapté à l'apprentissage automatique classique à grande échelle : travail avec des données tabulaires structurées qui seraient impossibles à traiter sur une seule machine. Pour les charges de travail d'apprentissage profond (réseaux de neurones, LLMs), les équipes utilisent typiquement des frameworks comme PyTorch ou TensorFlow sur une infrastructure GPU séparée, lisant les données préparées par Spark depuis le stockage ODP.

Ce guide couvre MLlib tel qu'il est disponible aujourd'hui dans ODP avec Spark 3.5.6.

Algorithmes disponibles

MLlib fournit un large ensemble d'algorithmes via l'API Pipeline spark.ml (la nouvelle API basée sur DataFrame) :

Classification

AlgorithmeClasse
Régression logistiqueLogisticRegression
Arbre de décisionDecisionTreeClassifier
Forêt aléatoireRandomForestClassifier
Arbres à gradient boostéGBTClassifier
SVM linéaireLinearSVC
Naïve BayesNaiveBayes
Perceptron multicoucheMultilayerPerceptronClassifier
Un-contre-tousOneVsRest

Régression

AlgorithmeClasse
Régression linéaireLinearRegression
Régression linéaire généraliséeGeneralizedLinearRegression
Régression par arbre de décisionDecisionTreeRegressor
Régression par forêt aléatoireRandomForestRegressor
Régression par arbres à gradient boostéGBTRegressor
Régression isotoniqueIsotonicRegression
Régression de survie (AFT)AFTSurvivalRegression

Clustering

AlgorithmeClasse
K-MeansKMeans
K-Means bisectantBisectingKMeans
Modèle de mélange gaussienGaussianMixture
Allocation de Dirichlet latenteLDA

Filtrage collaboratif

AlgorithmeClasse
Moindres carrés alternésALS

Réduction de dimensionnalité

AlgorithmeAPI
Analyse en composantes principalesPCA
Décomposition en valeurs singulièresSVD (via l'API RDD)

Transformateurs d'ingénierie des caractéristiques

MLlib inclut un riche ensemble de transformateurs pour l'ingénierie des caractéristiques, utilisables dans les Pipelines :

  • StringIndexer, IndexToString : encoder les variables catégorielles
  • OneHotEncoder : encodage one-hot
  • VectorAssembler : combiner des colonnes de caractéristiques en une seule colonne vecteur
  • StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler : mise à l'échelle des caractéristiques
  • Tokenizer, HashingTF, IDF, Word2Vec : extraction de caractéristiques textuelles
  • ChiSqSelector, UnivariateFeatureSelector : sélection de caractéristiques
  • Imputer : remplissage des valeurs manquantes
  • Bucketizer, QuantileDiscretizer : discrétisation de caractéristiques continues
  • SQLTransformer : appliquer des expressions SQL comme étape de pipeline

L'API ML Pipelines

L'API Pipeline de MLlib enchaîne des transformateurs et des estimateurs dans un workflow reproductible et sérialisable. Il s'agit de l'approche recommandée pour tout travail MLlib.

# pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = SparkSession.builder.appName("mlpipeline-example").getOrCreate()

# --- 1. Charger les données depuis une table Iceberg ---
df = spark.table("hive_catalog.ml_datasets.customer_churn")

# --- 2. Ingénierie des caractéristiques ---
indexer = StringIndexer(inputCol="contract_type", outputCol="contract_type_idx")

assembler = VectorAssembler(
inputCols=["tenure", "monthly_charges", "total_charges", "contract_type_idx"],
outputCol="features_raw"
)

scaler = StandardScaler(inputCol="features_raw", outputCol="features")

# --- 3. Modèle ---
rf = RandomForestClassifier(
labelCol="churn",
featuresCol="features",
numTrees=100,
maxDepth=10,
seed=42
)

# --- 4. Pipeline ---
pipeline = Pipeline(stages=[indexer, assembler, scaler, rf])

# --- 5. Division entraînement / test ---
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# --- 6. Ajustement ---
model = pipeline.fit(train_df)

# --- 7. Évaluation ---
predictions = model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(
labelCol="churn", predictionCol="prediction", metricName="accuracy"
)
print(f"Précision : {evaluator.evaluate(predictions):.4f}")

# --- 8. Sauvegarder le modèle ---
model.save("hdfs:///models/customer-churn-rf-v1")

Lecture des données d'entraînement depuis des tables Iceberg

Iceberg est le format de table recommandé pour les jeux de données d'entraînement ML dans ODP. Avantages :

  • Application du schéma : empêche les mauvaises données d'atterrir dans les jeux d'entraînement
  • Voyage dans le temps : ré-entraîner sur un snapshot historique exact pour la reproductibilité
  • Transactions ACID : écritures simultanées sécurisées depuis les pipelines d'ingestion aux côtés des lectures des jobs d'entraînement

Lecture Iceberg standard (dernier snapshot)

df = spark.table("hive_catalog.ml_datasets.sensor_readings")

Lecture par voyage dans le temps (entraînement reproductible)

Pour entraîner sur les données telles qu'elles existaient à une date spécifique (essentiel pour la reproductibilité des expériences) :

# Par timestamp
df = spark.read \
.option("as-of-timestamp", "2025-09-01T00:00:00.000Z") \
.format("iceberg") \
.load("hive_catalog.ml_datasets.sensor_readings")

# Par ID de snapshot (depuis les métadonnées Iceberg)
df = spark.read \
.option("snapshot-id", "5931985158436469021") \
.format("iceberg") \
.load("hive_catalog.ml_datasets.sensor_readings")

Stockez l'ID de snapshot aux côtés des métadonnées de votre modèle afin de pouvoir reproduire le jeu de données d'entraînement exact à tout moment.

Écriture des tables de caractéristiques vers Iceberg

Les tables de caractéristiques (caractéristiques dérivées, embeddings, prédictions) peuvent être réécrites dans Iceberg :

predictions \
.select("customer_id", "prediction", "probability") \
.writeTo("hive_catalog.ml_predictions.churn_scores") \
.createOrReplace()

Ingénierie des caractéristiques avec Spark SQL sur Hive/Iceberg

Pour l'ingénierie de caractéristiques complexes, Spark SQL est souvent plus lisible que l'API DataFrame. Les requêtes Spark SQL peuvent référencer à la fois des tables Hive et des tables Iceberg :

spark.sql("""
CREATE OR REPLACE TABLE hive_catalog.ml_datasets.customer_features
USING iceberg AS
SELECT
c.customer_id,
c.tenure_months,
c.contract_type,
COUNT(t.transaction_id) AS total_transactions_6m,
SUM(t.amount) AS total_spend_6m,
AVG(t.amount) AS avg_transaction_amount,
MAX(t.amount) AS max_transaction_amount,
COUNT(CASE WHEN t.status = 'failed' THEN 1 END) AS failed_transactions,
c.churn_label
FROM hive_catalog.raw.customers c
LEFT JOIN hive_catalog.raw.transactions t
ON c.customer_id = t.customer_id
AND t.transaction_date >= date_sub(current_date(), 180)
GROUP BY
c.customer_id, c.tenure_months, c.contract_type, c.churn_label
""")

Cela crée une table de caractéristiques versionnée et gouvernée par Ranger dans Iceberg que les jobs d'entraînement en aval peuvent lire.

Sauvegarde et chargement des modèles MLlib

Les modèles MLlib sont sérialisables et peuvent être sauvegardés vers et chargés depuis HDFS.

Sauvegarde d'un modèle

# Sauvegarder un modèle Pipeline ajusté
model.save("hdfs:///models/churn-rf-pipeline-v2")

# Ou vers un chemin HDFS spécifique avec versionnement
import datetime
version = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
model.save(f"hdfs:///models/churn-rf-pipeline-{version}")

Chargement d'un modèle

from pyspark.ml import PipelineModel

# Charger le modèle sauvegardé
loaded_model = PipelineModel.load("hdfs:///models/churn-rf-pipeline-v2")

# Appliquer aux nouvelles données
new_predictions = loaded_model.transform(new_data_df)

Métadonnées de modèles dans Atlas

Bien que MLlib ne publie pas nativement les métadonnées de modèles vers Atlas, les équipes peuvent créer des entités Atlas pour les artefacts de modèles de façon programmatique en utilisant l'API REST Atlas. Cela lie le modèle à son jeu de données d'entraînement (l'ID de snapshot Iceberg) et crée une chaîne de lignage : données brutes → table de caractéristiques → modèle.

Exécution des jobs ML via YARN

Les jobs MLlib s'exécutent sur YARN comme toute autre application Spark. Les paramètres de configuration clés pour les charges de travail ML :

spark-submit \
--master yarn \
--deploy-mode cluster \
--name "customer-churn-training" \
--num-executors 10 \
--executor-cores 4 \
--executor-memory 16g \
--driver-memory 8g \
--conf spark.yarn.queue=ml-training \
--conf spark.sql.catalog.hive_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hive_catalog.type=hive \
--conf spark.sql.catalog.hive_catalog.uri=thrift://master02.example.com:9083 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=20 \
train_churn_model.py

Configuration des files d'attente YARN : pour les charges de travail ML en production, dédier une file d'attente de capacité YARN aux jobs d'entraînement afin d'éviter qu'ils consomment toutes les ressources du cluster. Configurez la capacité des files d'attente dans le YARN Capacity Scheduler via Ambari.

Allocation dynamique : activez l'allocation dynamique pour les jobs d'entraînement avec des tailles de données variables. Spark mettra à l'échelle les exécuteurs pendant les phases intensives en calcul et les libérera ensuite.

Kerberos : sur les clusters sécurisés par Kerberos, assurez-vous que le processus spark-submit dispose d'un ticket Kerberos valide ou que l'application utilise un keytab :

spark-submit \
--principal ml-service@REALM.EXAMPLE.COM \
--keytab /etc/security/keytabs/ml-service.keytab \
... autres options ...

Notebooks Zeppelin pour l'expérimentation ML

Apache Zeppelin 0.10.2 est inclus dans ODP et est l'outil recommandé pour l'expérimentation ML interactive avant la mise en production des jobs.

Interpréteur Spark pour MLlib

L'interpréteur Spark de Zeppelin vous donne une session PySpark ou Scala Spark interactive avec un accès complet à MLlib :

%pyspark
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

df = spark.table("hive_catalog.ml_datasets.customer_features")
df.printSchema()
df.show(5)

Patterns Zeppelin utiles pour le ML

Visualisation en ligne : le paragraphe %sql de Zeppelin peut tracer les résultats de requêtes directement :

%sql
SELECT contract_type, AVG(churn_label) AS churn_rate
FROM hive_catalog.ml_datasets.customer_features
GROUP BY contract_type
ORDER BY churn_rate DESC

Formulaires paramétrés : Zeppelin prend en charge les entrées de formulaire dynamiques, utiles pour itérer sur les hyperparamètres :

%pyspark
# ${num_trees=100,100|200|300}
num_trees = int(z.input("num_trees", "100"))

rf = RandomForestClassifier(numTrees=num_trees, ...)

Notebook comme journal d'entraînement : documentez vos expériences dans des notebooks Zeppelin, en sauvegardant chaque version du notebook avec le chemin HDFS du modèle correspondant et l'ID de snapshot Iceberg. Cela crée un registre léger de suivi des expériences.

Pour un suivi d'expériences plus structuré, les équipes peuvent intégrer des outils externes (MLflow, DVC) qui stockent leurs métadonnées en externe tout en lisant les données depuis le stockage ODP.

Réglage des hyperparamètres

MLlib inclut CrossValidator et TrainValidationSplit pour le réglage des hyperparamètres :

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.build()

evaluator = BinaryClassificationEvaluator(labelCol="churn", metricName="areaUnderROC")

cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5,
parallelism=4 # nombre de modèles à entraîner en parallèle
)

cv_model = cv.fit(train_df)
print(f"Meilleur AUC : {evaluator.evaluate(cv_model.transform(test_df)):.4f}")

# Sauvegarder le meilleur modèle
cv_model.bestModel.save("hdfs:///models/churn-rf-best")

parallelism=4 indique à Spark d'évaluer 4 combinaisons d'hyperparamètres simultanément. Définissez cette valeur de façon à s'adapter à la capacité de votre file d'attente YARN.