Apprentissage automatique avec Apache Spark MLlib
Spark MLlib dans ODP
Apache Spark 3.5.6 est inclus dans ODP et déployé sur YARN. Spark MLlib est la bibliothèque d'apprentissage automatique intégrée de Spark — elle exécute des algorithmes ML distribués directement sur votre cluster, lisant les données depuis HDFS, Iceberg et les tables Hive, sans nécessiter d'installation de logiciel supplémentaire.
MLlib est bien adapté à l'apprentissage automatique classique à grande échelle : travail avec des données tabulaires structurées qui seraient impossibles à traiter sur une seule machine. Pour les charges de travail d'apprentissage profond (réseaux de neurones, LLMs), les équipes utilisent typiquement des frameworks comme PyTorch ou TensorFlow sur une infrastructure GPU séparée, lisant les données préparées par Spark depuis le stockage ODP.
Ce guide couvre MLlib tel qu'il est disponible aujourd'hui dans ODP avec Spark 3.5.6.
Algorithmes disponibles
MLlib fournit un large ensemble d'algorithmes via l'API Pipeline spark.ml (la nouvelle API basée sur DataFrame) :
Classification
| Algorithme | Classe |
|---|---|
| Régression logistique | LogisticRegression |
| Arbre de décision | DecisionTreeClassifier |
| Forêt aléatoire | RandomForestClassifier |
| Arbres à gradient boosté | GBTClassifier |
| SVM linéaire | LinearSVC |
| Naïve Bayes | NaiveBayes |
| Perceptron multicouche | MultilayerPerceptronClassifier |
| Un-contre-tous | OneVsRest |
Régression
| Algorithme | Classe |
|---|---|
| Régression linéaire | LinearRegression |
| Régression linéaire généralisée | GeneralizedLinearRegression |
| Régression par arbre de décision | DecisionTreeRegressor |
| Régression par forêt aléatoire | RandomForestRegressor |
| Régression par arbres à gradient boosté | GBTRegressor |
| Régression isotonique | IsotonicRegression |
| Régression de survie (AFT) | AFTSurvivalRegression |
Clustering
| Algorithme | Classe |
|---|---|
| K-Means | KMeans |
| K-Means bisectant | BisectingKMeans |
| Modèle de mélange gaussien | GaussianMixture |
| Allocation de Dirichlet latente | LDA |
Filtrage collaboratif
| Algorithme | Classe |
|---|---|
| Moindres carrés alternés | ALS |
Réduction de dimensionnalité
| Algorithme | API |
|---|---|
| Analyse en composantes principales | PCA |
| Décomposition en valeurs singulières | SVD (via l'API RDD) |
Transformateurs d'ingénierie des caractéristiques
MLlib inclut un riche ensemble de transformateurs pour l'ingénierie des caractéristiques, utilisables dans les Pipelines :
StringIndexer,IndexToString: encoder les variables catégoriellesOneHotEncoder: encodage one-hotVectorAssembler: combiner des colonnes de caractéristiques en une seule colonne vecteurStandardScaler,MinMaxScaler,MaxAbsScaler,RobustScaler: mise à l'échelle des caractéristiquesTokenizer,HashingTF,IDF,Word2Vec: extraction de caractéristiques textuellesChiSqSelector,UnivariateFeatureSelector: sélection de caractéristiquesImputer: remplissage des valeurs manquantesBucketizer,QuantileDiscretizer: discrétisation de caractéristiques continuesSQLTransformer: appliquer des expressions SQL comme étape de pipeline
L'API ML Pipelines
L'API Pipeline de MLlib enchaîne des transformateurs et des estimateurs dans un workflow reproductible et sérialisable. Il s'agit de l'approche recommandée pour tout travail MLlib.
# pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("mlpipeline-example").getOrCreate()
# --- 1. Charger les données depuis une table Iceberg ---
df = spark.table("hive_catalog.ml_datasets.customer_churn")
# --- 2. Ingénierie des caractéristiques ---
indexer = StringIndexer(inputCol="contract_type", outputCol="contract_type_idx")
assembler = VectorAssembler(
inputCols=["tenure", "monthly_charges", "total_charges", "contract_type_idx"],
outputCol="features_raw"
)
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
# --- 3. Modèle ---
rf = RandomForestClassifier(
labelCol="churn",
featuresCol="features",
numTrees=100,
maxDepth=10,
seed=42
)
# --- 4. Pipeline ---
pipeline = Pipeline(stages=[indexer, assembler, scaler, rf])
# --- 5. Division entraînement / test ---
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# --- 6. Ajustement ---
model = pipeline.fit(train_df)
# --- 7. Évaluation ---
predictions = model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(
labelCol="churn", predictionCol="prediction", metricName="accuracy"
)
print(f"Précision : {evaluator.evaluate(predictions):.4f}")
# --- 8. Sauvegarder le modèle ---
model.save("hdfs:///models/customer-churn-rf-v1")
Lecture des données d'entraînement depuis des tables Iceberg
Iceberg est le format de table recommandé pour les jeux de données d'entraînement ML dans ODP. Avantages :
- Application du schéma : empêche les mauvaises données d'atterrir dans les jeux d'entraînement
- Voyage dans le temps : ré-entraîner sur un snapshot historique exact pour la reproductibilité
- Transactions ACID : écritures simultanées sécurisées depuis les pipelines d'ingestion aux côtés des lectures des jobs d'entraînement
Lecture Iceberg standard (dernier snapshot)
df = spark.table("hive_catalog.ml_datasets.sensor_readings")
Lecture par voyage dans le temps (entraînement reproductible)
Pour entraîner sur les données telles qu'elles existaient à une date spécifique (essentiel pour la reproductibilité des expériences) :
# Par timestamp
df = spark.read \
.option("as-of-timestamp", "2025-09-01T00:00:00.000Z") \
.format("iceberg") \
.load("hive_catalog.ml_datasets.sensor_readings")
# Par ID de snapshot (depuis les métadonnées Iceberg)
df = spark.read \
.option("snapshot-id", "5931985158436469021") \
.format("iceberg") \
.load("hive_catalog.ml_datasets.sensor_readings")
Stockez l'ID de snapshot aux côtés des métadonnées de votre modèle afin de pouvoir reproduire le jeu de données d'entraînement exact à tout moment.
Écriture des tables de caractéristiques vers Iceberg
Les tables de caractéristiques (caractéristiques dérivées, embeddings, prédictions) peuvent être réécrites dans Iceberg :
predictions \
.select("customer_id", "prediction", "probability") \
.writeTo("hive_catalog.ml_predictions.churn_scores") \
.createOrReplace()
Ingénierie des caractéristiques avec Spark SQL sur Hive/Iceberg
Pour l'ingénierie de caractéristiques complexes, Spark SQL est souvent plus lisible que l'API DataFrame. Les requêtes Spark SQL peuvent référencer à la fois des tables Hive et des tables Iceberg :
spark.sql("""
CREATE OR REPLACE TABLE hive_catalog.ml_datasets.customer_features
USING iceberg AS
SELECT
c.customer_id,
c.tenure_months,
c.contract_type,
COUNT(t.transaction_id) AS total_transactions_6m,
SUM(t.amount) AS total_spend_6m,
AVG(t.amount) AS avg_transaction_amount,
MAX(t.amount) AS max_transaction_amount,
COUNT(CASE WHEN t.status = 'failed' THEN 1 END) AS failed_transactions,
c.churn_label
FROM hive_catalog.raw.customers c
LEFT JOIN hive_catalog.raw.transactions t
ON c.customer_id = t.customer_id
AND t.transaction_date >= date_sub(current_date(), 180)
GROUP BY
c.customer_id, c.tenure_months, c.contract_type, c.churn_label
""")
Cela crée une table de caractéristiques versionnée et gouvernée par Ranger dans Iceberg que les jobs d'entraînement en aval peuvent lire.
Sauvegarde et chargement des modèles MLlib
Les modèles MLlib sont sérialisables et peuvent être sauvegardés vers et chargés depuis HDFS.
Sauvegarde d'un modèle
# Sauvegarder un modèle Pipeline ajusté
model.save("hdfs:///models/churn-rf-pipeline-v2")
# Ou vers un chemin HDFS spécifique avec versionnement
import datetime
version = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
model.save(f"hdfs:///models/churn-rf-pipeline-{version}")
Chargement d'un modèle
from pyspark.ml import PipelineModel
# Charger le modèle sauvegardé
loaded_model = PipelineModel.load("hdfs:///models/churn-rf-pipeline-v2")
# Appliquer aux nouvelles données
new_predictions = loaded_model.transform(new_data_df)
Métadonnées de modèles dans Atlas
Bien que MLlib ne publie pas nativement les métadonnées de modèles vers Atlas, les équipes peuvent créer des entités Atlas pour les artefacts de modèles de façon programmatique en utilisant l'API REST Atlas. Cela lie le modèle à son jeu de données d'entraînement (l'ID de snapshot Iceberg) et crée une chaîne de lignage : données brutes → table de caractéristiques → modèle.
Exécution des jobs ML via YARN
Les jobs MLlib s'exécutent sur YARN comme toute autre application Spark. Les paramètres de configuration clés pour les charges de travail ML :
spark-submit \
--master yarn \
--deploy-mode cluster \
--name "customer-churn-training" \
--num-executors 10 \
--executor-cores 4 \
--executor-memory 16g \
--driver-memory 8g \
--conf spark.yarn.queue=ml-training \
--conf spark.sql.catalog.hive_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hive_catalog.type=hive \
--conf spark.sql.catalog.hive_catalog.uri=thrift://master02.example.com:9083 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=20 \
train_churn_model.py
Configuration des files d'attente YARN : pour les charges de travail ML en production, dédier une file d'attente de capacité YARN aux jobs d'entraînement afin d'éviter qu'ils consomment toutes les ressources du cluster. Configurez la capacité des files d'attente dans le YARN Capacity Scheduler via Ambari.
Allocation dynamique : activez l'allocation dynamique pour les jobs d'entraînement avec des tailles de données variables. Spark mettra à l'échelle les exécuteurs pendant les phases intensives en calcul et les libérera ensuite.
Kerberos : sur les clusters sécurisés par Kerberos, assurez-vous que le processus spark-submit dispose d'un ticket Kerberos valide ou que l'application utilise un keytab :
spark-submit \
--principal ml-service@REALM.EXAMPLE.COM \
--keytab /etc/security/keytabs/ml-service.keytab \
... autres options ...
Notebooks Zeppelin pour l'expérimentation ML
Apache Zeppelin 0.10.2 est inclus dans ODP et est l'outil recommandé pour l'expérimentation ML interactive avant la mise en production des jobs.
Interpréteur Spark pour MLlib
L'interpréteur Spark de Zeppelin vous donne une session PySpark ou Scala Spark interactive avec un accès complet à MLlib :
%pyspark
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
df = spark.table("hive_catalog.ml_datasets.customer_features")
df.printSchema()
df.show(5)
Patterns Zeppelin utiles pour le ML
Visualisation en ligne : le paragraphe %sql de Zeppelin peut tracer les résultats de requêtes directement :
%sql
SELECT contract_type, AVG(churn_label) AS churn_rate
FROM hive_catalog.ml_datasets.customer_features
GROUP BY contract_type
ORDER BY churn_rate DESC
Formulaires paramétrés : Zeppelin prend en charge les entrées de formulaire dynamiques, utiles pour itérer sur les hyperparamètres :
%pyspark
# ${num_trees=100,100|200|300}
num_trees = int(z.input("num_trees", "100"))
rf = RandomForestClassifier(numTrees=num_trees, ...)
Notebook comme journal d'entraînement : documentez vos expériences dans des notebooks Zeppelin, en sauvegardant chaque version du notebook avec le chemin HDFS du modèle correspondant et l'ID de snapshot Iceberg. Cela crée un registre léger de suivi des expériences.
Pour un suivi d'expériences plus structuré, les équipes peuvent intégrer des outils externes (MLflow, DVC) qui stockent leurs métadonnées en externe tout en lisant les données depuis le stockage ODP.
Réglage des hyperparamètres
MLlib inclut CrossValidator et TrainValidationSplit pour le réglage des hyperparamètres :
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.build()
evaluator = BinaryClassificationEvaluator(labelCol="churn", metricName="areaUnderROC")
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5,
parallelism=4 # nombre de modèles à entraîner en parallèle
)
cv_model = cv.fit(train_df)
print(f"Meilleur AUC : {evaluator.evaluate(cv_model.transform(test_df)):.4f}")
# Sauvegarder le meilleur modèle
cv_model.bestModel.save("hdfs:///models/churn-rf-best")
parallelism=4 indique à Spark d'évaluer 4 combinaisons d'hyperparamètres simultanément. Définissez cette valeur de façon à s'adapter à la capacité de votre file d'attente YARN.