Skip to main content
Version: 1.3.1.0

Machine Learning with Apache Spark MLlib

Spark MLlib in ODP

Apache Spark 3.5.6 is included in ODP and deployed on YARN. Spark MLlib is Spark's built-in machine learning library — it runs distributed ML algorithms directly on your cluster, reading data from HDFS, Iceberg, and Hive tables, without requiring any additional software installation.

MLlib is well-suited for classical machine learning at scale: working with structured tabular data that would be impractical to process on a single machine. For deep learning workloads (neural networks, LLMs), teams typically use frameworks like PyTorch or TensorFlow on separate GPU infrastructure, reading data prepared by Spark from ODP storage.

This guide covers MLlib as it is available today in ODP with Spark 3.5.6.

Available Algorithms

MLlib provides a broad set of algorithms through the spark.ml Pipeline API (the newer, DataFrame-based API):

Classification

AlgorithmClass
Logistic RegressionLogisticRegression
Decision TreeDecisionTreeClassifier
Random ForestRandomForestClassifier
Gradient-Boosted TreesGBTClassifier
Linear SVMLinearSVC
Naive BayesNaiveBayes
Multilayer PerceptronMultilayerPerceptronClassifier
One-vs-RestOneVsRest

Regression

AlgorithmClass
Linear RegressionLinearRegression
Generalized Linear RegressionGeneralizedLinearRegression
Decision Tree RegressionDecisionTreeRegressor
Random Forest RegressionRandomForestRegressor
Gradient-Boosted Tree RegressionGBTRegressor
Isotonic RegressionIsotonicRegression
Survival Regression (AFT)AFTSurvivalRegression

Clustering

AlgorithmClass
K-MeansKMeans
Bisecting K-MeansBisectingKMeans
Gaussian Mixture ModelGaussianMixture
Latent Dirichlet AllocationLDA

Collaborative Filtering

AlgorithmClass
Alternating Least SquaresALS

Dimensionality Reduction

AlgorithmAPI
Principal Component AnalysisPCA
Singular Value DecompositionSVD (via RDD API)

Feature Engineering Transformers

MLlib includes a rich set of transformers for feature engineering, usable in Pipelines:

  • StringIndexer, IndexToString: encode categorical variables
  • OneHotEncoder: one-hot encoding
  • VectorAssembler: combine feature columns into a single vector column
  • StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler: feature scaling
  • Tokenizer, HashingTF, IDF, Word2Vec: text feature extraction
  • ChiSqSelector, UnivariateFeatureSelector: feature selection
  • Imputer: fill missing values
  • Bucketizer, QuantileDiscretizer: binning continuous features
  • SQLTransformer: apply SQL expressions as a pipeline step

The ML Pipelines API

MLlib's Pipeline API chains transformers and estimators into a reproducible, serializable workflow. This is the recommended approach for all MLlib work.

# pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = SparkSession.builder.appName("mlpipeline-example").getOrCreate()

# --- 1. Load data from Iceberg table ---
df = spark.table("hive_catalog.ml_datasets.customer_churn")

# --- 2. Feature engineering ---
indexer = StringIndexer(inputCol="contract_type", outputCol="contract_type_idx")

assembler = VectorAssembler(
inputCols=["tenure", "monthly_charges", "total_charges", "contract_type_idx"],
outputCol="features_raw"
)

scaler = StandardScaler(inputCol="features_raw", outputCol="features")

# --- 3. Model ---
rf = RandomForestClassifier(
labelCol="churn",
featuresCol="features",
numTrees=100,
maxDepth=10,
seed=42
)

# --- 4. Pipeline ---
pipeline = Pipeline(stages=[indexer, assembler, scaler, rf])

# --- 5. Train / test split ---
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# --- 6. Fit ---
model = pipeline.fit(train_df)

# --- 7. Evaluate ---
predictions = model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(
labelCol="churn", predictionCol="prediction", metricName="accuracy"
)
print(f"Accuracy: {evaluator.evaluate(predictions):.4f}")

# --- 8. Save the model ---
model.save("hdfs:///models/customer-churn-rf-v1")

Reading Training Data from Iceberg Tables

Iceberg is the recommended table format for ML training datasets in ODP. Benefits:

  • Schema enforcement: prevents bad data from landing in training sets
  • Time travel: retrain on an exact historical snapshot for reproducibility
  • ACID transactions: safe concurrent writes from ingestion pipelines alongside reads from training jobs

Standard Iceberg Read (Latest Snapshot)

df = spark.table("hive_catalog.ml_datasets.sensor_readings")

Time Travel Read (Reproducible Training)

To train on data as it existed on a specific date (crucial for experiment reproducibility):

# By timestamp
df = spark.read \
.option("as-of-timestamp", "2025-09-01T00:00:00.000Z") \
.format("iceberg") \
.load("hive_catalog.ml_datasets.sensor_readings")

# By snapshot ID (from Iceberg metadata)
df = spark.read \
.option("snapshot-id", "5931985158436469021") \
.format("iceberg") \
.load("hive_catalog.ml_datasets.sensor_readings")

Store the snapshot ID alongside your model metadata so you can reproduce the exact training dataset at any point.

Writing Feature Tables Back to Iceberg

Feature tables (derived features, embeddings, predictions) can be written back to Iceberg:

predictions \
.select("customer_id", "prediction", "probability") \
.writeTo("hive_catalog.ml_predictions.churn_scores") \
.createOrReplace()

Feature Engineering with Spark SQL on Hive/Iceberg

For complex feature engineering, Spark SQL is often more readable than the DataFrame API. Spark SQL queries can reference both Hive tables and Iceberg tables:

spark.sql("""
CREATE OR REPLACE TABLE hive_catalog.ml_datasets.customer_features
USING iceberg AS
SELECT
c.customer_id,
c.tenure_months,
c.contract_type,
COUNT(t.transaction_id) AS total_transactions_6m,
SUM(t.amount) AS total_spend_6m,
AVG(t.amount) AS avg_transaction_amount,
MAX(t.amount) AS max_transaction_amount,
COUNT(CASE WHEN t.status = 'failed' THEN 1 END) AS failed_transactions,
c.churn_label
FROM hive_catalog.raw.customers c
LEFT JOIN hive_catalog.raw.transactions t
ON c.customer_id = t.customer_id
AND t.transaction_date >= date_sub(current_date(), 180)
GROUP BY
c.customer_id, c.tenure_months, c.contract_type, c.churn_label
""")

This creates a versioned, Ranger-governed feature table in Iceberg that downstream training jobs can read.

Saving and Loading MLlib Models

MLlib models are serializable and can be saved to and loaded from HDFS.

Saving a Model

# Save a fitted Pipeline model
model.save("hdfs:///models/churn-rf-pipeline-v2")

# Or to a specific HDFS path with versioning
import datetime
version = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
model.save(f"hdfs:///models/churn-rf-pipeline-{version}")

Loading a Model

from pyspark.ml import PipelineModel

# Load the saved model
loaded_model = PipelineModel.load("hdfs:///models/churn-rf-pipeline-v2")

# Apply to new data
new_predictions = loaded_model.transform(new_data_df)

Model Metadata in Atlas

While MLlib does not natively publish model metadata to Atlas, teams can create Atlas entities for model artifacts programmatically using the Atlas REST API. This links the model to its training dataset (the Iceberg snapshot ID) and creates a lineage chain: raw data → feature table → model.

Running ML Jobs via YARN

MLlib jobs run on YARN like any other Spark application. The key configuration parameters for ML workloads:

spark-submit \
--master yarn \
--deploy-mode cluster \
--name "customer-churn-training" \
--num-executors 10 \
--executor-cores 4 \
--executor-memory 16g \
--driver-memory 8g \
--conf spark.yarn.queue=ml-training \
--conf spark.sql.catalog.hive_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hive_catalog.type=hive \
--conf spark.sql.catalog.hive_catalog.uri=thrift://master02.example.com:9083 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=20 \
train_churn_model.py

YARN queue configuration: for production ML workloads, dedicate a YARN capacity queue for training jobs to prevent them from consuming all cluster resources. Configure queue capacity in the YARN Capacity Scheduler via Ambari.

Dynamic allocation: enable dynamic allocation for training jobs that have variable data sizes. Spark will scale executors up during computation-intensive phases and release them afterward.

Kerberos: on Kerberos-secured clusters, ensure that the spark-submit process has a valid Kerberos ticket or that the application uses a keytab:

spark-submit \
--principal ml-service@REALM.EXAMPLE.COM \
--keytab /etc/security/keytabs/ml-service.keytab \
... other options ...

Zeppelin Notebooks for ML Experimentation

Apache Zeppelin 0.10.2 is included in ODP and is the recommended tool for interactive ML experimentation before productionizing jobs.

Spark Interpreter for MLlib

Zeppelin's Spark interpreter gives you an interactive PySpark or Scala Spark session with full access to MLlib:

%pyspark
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

df = spark.table("hive_catalog.ml_datasets.customer_features")
df.printSchema()
df.show(5)

Useful Zeppelin Patterns for ML

Inline visualization: Zeppelin's %sql paragraph can plot query results directly:

%sql
SELECT contract_type, AVG(churn_label) AS churn_rate
FROM hive_catalog.ml_datasets.customer_features
GROUP BY contract_type
ORDER BY churn_rate DESC

Parameter forms: Zeppelin supports dynamic form inputs, useful for iterating over hyperparameters:

%pyspark
# ${num_trees=100,100|200|300}
num_trees = int(z.input("num_trees", "100"))

rf = RandomForestClassifier(numTrees=num_trees, ...)

Notebook as training log: document your experiments in Zeppelin notebooks, saving each notebook version with the corresponding model HDFS path and Iceberg snapshot ID. This creates a lightweight experiment tracking record.

For more structured experiment tracking, teams may integrate external tools (MLflow, DVC) that store their metadata externally while reading data from ODP storage.

Hyperparameter Tuning

MLlib includes CrossValidator and TrainValidationSplit for hyperparameter tuning:

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.build()

evaluator = BinaryClassificationEvaluator(labelCol="churn", metricName="areaUnderROC")

cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5,
parallelism=4 # number of models to train in parallel
)

cv_model = cv.fit(train_df)
print(f"Best AUC: {evaluator.evaluate(cv_model.transform(test_df)):.4f}")

# Save the best model
cv_model.bestModel.save("hdfs:///models/churn-rf-best")

parallelism=4 tells Spark to evaluate 4 hyperparameter combinations simultaneously. Set this to a value that fits within your YARN queue capacity.