Attività di classificazione con SynapseML

Questo articolo illustra come eseguire un'attività di classificazione del testo con due metodi. Un metodo usa plain pysparke l'altro usa la synapseml libreria . Entrambi i metodi producono le stesse prestazioni, ma evidenziano il modo in cui SynapseML riduce la complessità del codice rispetto a pyspark.

L'attività stima se una recensione cliente di un libro venduto su Amazon è buona (valutazione > 3) o negativa, in base al testo della recensione. Si addestrano modelli LogisticRegression con iperparametri diversi, quindi si sceglie il modello migliore.

Prerequisiti

  • Abbonati a Microsoft Fabric. Oppure, registrati per una versione di prova gratuita di Microsoft Fabric.

  • Accedi a Microsoft Fabric.

  • Passare a Fabric usando il selettore di esperienza nell'angolo in basso a sinistra della home page.

    Screenshot che mostra la selezione di

  • Crea un notebook.
  • Collegare il notebook a un lakehouse. Nel notebook selezionare Aggiungi nel riquadro sinistro per collegare una lakehouse esistente o crearne una nuova.

Note

Tutte le librerie usate in questo articolo (pyspark, synapseml, numpy) sono preinstallate nel runtime di Spark Fabric. Non è necessario installare alcun pacchetto.

Caricare ed esplorare i dati

Nei notebook di Fabric, una sessione Spark è già disponibile come variabile spark. Caricare il set di dati delle recensioni dei libri Amazon da una posizione Archiviazione BLOB di Azure pubblica:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Verificare che il set di dati sia stato caricato correttamente:

print(f"Row count: {rawData.count()}")
print(f"Columns: {rawData.columns}")
assert rawData.count() == 10000, "Expected 10,000 rows"
assert set(rawData.columns) == {"text", "rating"}, "Expected columns: text, rating"
print("Data loaded successfully")

Estrarre funzionalità ed elaborare i dati

I dati reali hanno spesso caratteristiche di più tipi, ad esempio testo, numerico e categorico. Per illustrare l'uso dei tipi di funzionalità misti, aggiungere due funzionalità numeriche al set di dati: il conteggio delle parole della revisione e la lunghezza media della parola.

Definire funzioni definite dall'utente

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
import numpy as np


def calc_word_count(s):
    return len(s.split())


def calc_word_length(s):
    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(calc_word_length, DoubleType())
wordCountUDF = udf(calc_word_count, IntegerType())

Applicare funzioni definite dall'utente con SynapseML UDFTransformer

Usa il UDFTransformer di SynapseML per racchiudere le UDF in trasformatori compatibili con la pipeline:

from synapse.ml.stages import UDFTransformer

wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol="wordLength", udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol="wordCount", udf=wordCountUDF
)

Esegui la pipeline delle funzionalità

Applica entrambi i trasformatori e crea una colonna di etichette binarie dalla valutazione:

from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)

Verificare l'estrazione delle funzionalità:

data.show(5)
print(f"Columns: {data.columns}")
assert "wordLength" in data.columns, "wordLength column missing"
assert "wordCount" in data.columns, "wordCount column missing"
assert "label" in data.columns, "label column missing"
assert "rating" not in data.columns, "rating column should be dropped"
print("Feature extraction successful")

Classificare usando pyspark

Per scegliere il classificatore LogisticRegression migliore usando la pyspark libreria, è necessario eseguire in modo esplicito questi passaggi:

  1. Elaborare le funzionalità:
    • Tokenizzare la colonna di testo.
    • Trasformare la colonna tokenizzata in un vettore mediante hashing.
    • Unire le caratteristiche numeriche con il vettore.
  2. Converti la colonna label dal tipo booleano al tipo intero.
  3. Addestrare più algoritmi LogisticRegression sul set di dati train con iperparametri diversi.
  4. Calcolare l'area sotto la curva ROC (AUC) per ogni modello sottoposto a training e selezionare il modello con la metrica più alta nel test set di dati.
  5. Valutare il modello migliore sull'insieme validation.

Definire le caratteristiche e preparare i dati

from pyspark.ml.feature import Tokenizer, HashingTF, VectorAssembler
from pyspark.sql.types import IntegerType

# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features into one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only the label and features columns, cast label to integer
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)

Verifica i dati caratterizzati:

print(f"Feature vector size: {processedData.first()['features'].size}")
print(f"Label values: {sorted(processedData.select('label').distinct().rdd.flatMap(lambda x: x).collect())}")
assert processedData.first()["features"].size == 10002, "Expected 10000 text + 2 numeric features"
print("Featurization successful")

Addestrare e valutare i modelli

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Split the data into train, test, and validation sets
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Train each model and evaluate on the test set
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))

bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Evaluate the best model on the validation dataset
scoredVal = bestModel.transform(validation)
validationAUC = evaluator.evaluate(scoredVal)
print(f"Best model's AUC on validation set = {validationAUC:.4f}")

Verificare i risultati:

print(f"Number of models trained: {len(models)}")
print(f"Best regularization parameter: {lrHyperParams[metrics.index(bestMetric)]}")
print(f"Test AUC scores: {[f'{m:.4f}' for m in metrics]}")
assert 0.5 < validationAUC <= 1.0, f"AUC {validationAUC} is outside expected range (0.5, 1.0]"
print(f"pyspark classification complete - AUC: {validationAUC:.4f}")

Note

I valori AUC esatti dipendono dalla divisione casuale. Si prevedono valori compresi tra 0,65 e 0,85.

Classificare usando SynapseML

L'approccio synapseml ottiene lo stesso risultato con un minor numero di passaggi. SynapseML gestisce internamente la definizione delle caratteristiche, riducendo il codice necessario per scrivere:

  1. Lo TrainClassifier stimatore estrae internamente le caratteristiche dai dati, purché le colonne nei set di dati train, test e validation rappresentino le caratteristiche.
  2. Lo FindBestModel strumento di stima trova il modello migliore da un pool di modelli sottoposti a training valutando le prestazioni nel test set di dati con la metrica specificata.
  3. Il ComputeModelStatistics trasformatore calcola più metriche in un set di dati con punteggio (in questo caso, il validation set di dati) contemporaneamente.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
from pyspark.ml.classification import LogisticRegression

# Split the raw feature data (SynapseML handles featurization internally)
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model based on AUC
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)

# Compute metrics on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)

Verificare i risultati di SynapseML:

auc_value = metrics.first()["AUC"]
print(f"Available metrics: {metrics.columns}")
assert 0.5 < auc_value <= 1.0, f"AUC {auc_value} is outside expected range (0.5, 1.0]"
print(f"SynapseML classification complete - AUC: {auc_value:.4f}")

Note

Gli approcci pyspark e SynapseML devono produrre valori di AUC simili, poiché eseguono il training dello stesso tipo di modello con gli stessi iperparametri sugli stessi dati.

Confrontare i due approcci

Aspect pyspark SynapseML
Elaborazione delle caratteristiche Manuale (da Tokenizer a HashingTF a VectorAssembler) Automatico (gestito da TrainClassifier)
Selezione del modello Ciclo manuale con analizzatore Integrato FindBestModel
Calcolo delle metriche Singola metrica per chiamata di valutazione Più metriche con ComputeModelStatistics
Righe di codice Circa 30 linee Circa 15 righe
Result Stessa AUC Stessa AUC

Risoluzione dei problemi

Problema Motivo Risoluzione
AnalysisException: Path does not exist L'URL di archiviazione BLOB pubblico non è temporaneamente disponibile Attendere alcuni minuti e riprovare. Verificare la connettività eseguendo spark.read.parquet("wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet").count()
IllegalArgumentException: Field "features" does not exist I nomi delle colonne di funzionalità non corrispondono tra trasformatori Verificare i nomi delle colonne eseguendo data.columns prima del passaggio VectorAssembler
NameError: name 'LogisticRegression' is not defined Istruzione di importazione mancante Aggiungere from pyspark.ml.classification import LogisticRegression nella parte superiore della cella
ModuleNotFoundError: No module named 'synapse.ml' Il notebook non usa Fabric runtime di Spark Verificare che il notebook usi Fabric Runtime 1.2 o versione successiva. Selezionare Ambiente nella barra multifunzione per verificare.
AUC basso (inferiore a 0,6) Problemi di suddivisione dei dati o convergenza Verificare la distribuzione delle etichette con data.groupBy("label").count().show(). Si prevede un set di dati approssimativamente bilanciato.
Py4JJavaError: An error occurred while calling errore interno di Java/Spark Controllare l'interfaccia utente di Spark per i log degli errori dettagliati. Riavvia la sessione Spark selezionando Sessione>Arresta sessione, quindi riesegui tutte le celle.

Pulire le risorse

Se è stato creato un nuovo lakehouse per questo articolo e non è più necessario:

  1. Nell'area di lavoro, fare clic con il pulsante destro del mouse sul nome del lakehouse.
  2. Seleziona Elimina.
  3. Conferma l'eliminazione.

Il notebook rimane nell'area di lavoro, a meno che non venga eliminato separatamente.