Klassifizierungsaufgaben mit SynapseML

In diesem Artikel wird gezeigt, wie Sie eine Textklassifizierungsaufgabe mit zwei Methoden ausführen. Eine Methode verwendet einfaches pyspark, und die andere verwendet die Bibliothek synapseml. Beide Methoden liefern dieselbe Leistung, zeigen jedoch, wie SynapseML die Codekomplexität im Vergleich zu pyspark reduziert.

Die Aufgabe prognostiziert, ob eine Kundenrezensierung eines auf Amazon verkauften Buchs gut (Bewertung > 3) oder schlecht ist, basierend auf dem Rezensionstext. Sie trainieren LogistikRegression Lernende mit verschiedenen Hyperparametern und wählen dann das beste Modell aus.

Voraussetzungen

  • Erstellen Sie ein Notizbuch.
  • Fügen Sie Ihr Notizbuch an ein Seehaus an. Wählen Sie im Notizbuch " Hinzufügen" im linken Bereich aus, um ein vorhandenes Seehaus anzufügen oder eine neue zu erstellen.

Note

Alle in diesem Artikel verwendeten Bibliotheken (pyspark, synapseml, numpy) werden in der Fabric Spark-Laufzeit vorinstalliert. Sie müssen keine Pakete installieren.

Laden und Untersuchen der Daten

In Fabric Notizbüchern ist eine Spark-Sitzung bereits als variable spark verfügbar. Laden Sie das Dataset für Amazon-Buchrezensionen von einem öffentlichen Azure Blob Storage Standort:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Überprüfen Sie, ob das Dataset ordnungsgemäß geladen wurde:

print(f"Row count: {rawData.count()}")
print(f"Columns: {rawData.columns}")
assert rawData.count() == 10000, "Expected 10,000 rows"
assert set(rawData.columns) == {"text", "rating"}, "Expected columns: text, rating"
print("Data loaded successfully")

Extrahieren von Features und Prozessdaten

Echte Daten verfügen häufig über Features mehrerer Typen, z. B. Text, Numerische und Kategorisierung. Um die Arbeit mit gemischten Featuretypen zu veranschaulichen, fügen Sie dem Dataset zwei numerische Features hinzu: die Wortanzahl der Überprüfung und die mittlere Wortlänge.

Definieren von benutzerdefinierten Funktionen (UDFs)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
import numpy as np


def calc_word_count(s):
    return len(s.split())


def calc_word_length(s):
    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(calc_word_length, DoubleType())
wordCountUDF = udf(calc_word_count, IntegerType())

UDFs mit SynapseML UDFTransformer anwenden

Verwenden Sie das UDFTransformer von SynapseML, um die UDFs in pipeline-kompatible Transformer zu verpacken:

from synapse.ml.stages import UDFTransformer

wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol="wordLength", udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol="wordCount", udf=wordCountUDF
)

Ausführen der Featurepipeline

Wenden Sie beide Transformer an und erstellen Sie basierend auf der Bewertung eine binäre Labelspalte:

from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)

Überprüfen Sie die Merkmalsextraktion:

data.show(5)
print(f"Columns: {data.columns}")
assert "wordLength" in data.columns, "wordLength column missing"
assert "wordCount" in data.columns, "wordCount column missing"
assert "label" in data.columns, "label column missing"
assert "rating" not in data.columns, "rating column should be dropped"
print("Feature extraction successful")

Klassifizieren mithilfe von Pyspark

Um den besten LogisticsRegression-Klassifizierer mithilfe der pyspark Bibliothek auszuwählen, müssen Sie diese Schritte explizit ausführen:

  1. Verarbeiten Sie die Funktionen:
    • Tokenisieren Sie die Textspalte.
    • Hashen Sie die tokenisierte Spalte mithilfe von Hashing in einen Vektor.
    • Verbinden Sie die numerischen Features mit dem Vektor.
  2. Wandeln Sie die Beschriftungsspalte vom booleschen Typ in einen ganzzahligen Typ um.
  3. Trainieren Sie mehrere LogisticsRegression-Algorithmen für das train Dataset mit verschiedenen Hyperparametern.
  4. Berechnen Sie den Bereich unter der ROC-Kurve (AUC) für jedes trainierte Modell, und wählen Sie das Modell mit der höchsten Metrik im test Dataset aus.
  5. Bewerten Sie das beste Modell für die validation Gruppe.

Merkmale extrahieren und die Daten vorbereiten

from pyspark.ml.feature import Tokenizer, HashingTF, VectorAssembler
from pyspark.sql.types import IntegerType

# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features into one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only the label and features columns, cast label to integer
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)

Überprüfen Sie die Merkmalsdaten:

print(f"Feature vector size: {processedData.first()['features'].size}")
print(f"Label values: {sorted(processedData.select('label').distinct().rdd.flatMap(lambda x: x).collect())}")
assert processedData.first()["features"].size == 10002, "Expected 10000 text + 2 numeric features"
print("Featurization successful")

Trainieren und Auswerten von Modellen

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Split the data into train, test, and validation sets
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Train each model and evaluate on the test set
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))

bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Evaluate the best model on the validation dataset
scoredVal = bestModel.transform(validation)
validationAUC = evaluator.evaluate(scoredVal)
print(f"Best model's AUC on validation set = {validationAUC:.4f}")

Überprüfen Sie die Ergebnisse:

print(f"Number of models trained: {len(models)}")
print(f"Best regularization parameter: {lrHyperParams[metrics.index(bestMetric)]}")
print(f"Test AUC scores: {[f'{m:.4f}' for m in metrics]}")
assert 0.5 < validationAUC <= 1.0, f"AUC {validationAUC} is outside expected range (0.5, 1.0]"
print(f"pyspark classification complete - AUC: {validationAUC:.4f}")

Note

Die genauen AUC-Werte hängen von der zufälligen Aufteilung ab. Erwarten Sie Werte zwischen 0,65 und 0,85.

Klassifizieren mithilfe von SynapseML

Der synapseml Ansatz erzielt dasselbe Ergebnis mit weniger Schritten. SynapseML behandelt die Featurisierung intern, wodurch der Code reduziert wird, den Sie schreiben müssen:

  1. Der TrainClassifier-Schätzer extrahiert intern Merkmale aus den Daten, sofern die Spalten in den Datensätzen train, test und validation die Merkmale darstellen.
  2. Der FindBestModel Estimator ermittelt anhand des test-Datensatzes mit der angegebenen Metrik das beste Modell aus einem Pool trainierter Modelle.
  3. Der ComputeModelStatistics Transformator berechnet mehrere Metriken für ein bewertetes Dataset (in diesem Fall das validation Dataset) gleichzeitig.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
from pyspark.ml.classification import LogisticRegression

# Split the raw feature data (SynapseML handles featurization internally)
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model based on AUC
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)

# Compute metrics on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)

Überprüfen Sie die SynapseML-Ergebnisse:

auc_value = metrics.first()["AUC"]
print(f"Available metrics: {metrics.columns}")
assert 0.5 < auc_value <= 1.0, f"AUC {auc_value} is outside expected range (0.5, 1.0]"
print(f"SynapseML classification complete - AUC: {auc_value:.4f}")

Note

Die Pyspark- und SynapseML-Ansätze sollten ähnliche AUC-Werte erzeugen, da sie den gleichen Modelltyp mit denselben Hyperparametern auf denselben Daten trainieren.

Vergleichen der beiden Ansätze

Aspect Pyspark SynapseML
Verarbeitung von Features Manuell (Tokenizer zu HashingTF zu VectorAssembler) Automatisch (verarbeitet von TrainClassifier)
Modellauswahl Manuelle Schleife mit Auswerter Integriert FindBestModel
Metrikberechnung Einzelne Metrik pro Auswertungsaufruf Mehrere Metriken mit ComputeModelStatistics
Codezeilen Ca. 30 Zeilen Ca. 15 Zeilen
Ergebnis Dieselbe AUC Dieselbe AUC

Problembehandlung

Thema Ursache Resolution
AnalysisException: Path does not exist Die URL für den öffentlichen Blobspeicher ist vorübergehend nicht verfügbar. Warten Sie einige Minuten, und versuchen Sie es erneut. Überprüfen Sie die Verbindung durch das Ausführen von spark.read.parquet("wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet").count()
IllegalArgumentException: Field "features" does not exist Die Namen der Feature-Spalten stimmen zwischen den Transformern nicht überein Spaltennamen überprüfen, indem Sie data.columns vor dem Schritt „VectorAssembler“ ausführen
NameError: name 'LogisticRegression' is not defined Fehlende Importanweisung Fügen Sie from pyspark.ml.classification import LogisticRegression oben in der Zelle hinzu
ModuleNotFoundError: No module named 'synapse.ml' Notebook verwendet nicht die Fabric Spark-Laufzeit Überprüfen Sie, ob das Notizbuch Fabric Runtime 1.2 oder höher verwendet. Wählen Sie " Umgebung " im Menüband aus, um dies zu überprüfen.
Niedrige AUC (unter 0,6) Datenteilungsproblem oder Konvergenzprobleme Überprüfen Sie die Labelverteilung mit data.groupBy("label").count().show(). Erwarten Sie ein ungefähr ausgewogenes Dataset.
Py4JJavaError: An error occurred while calling interner Java-/Spark-Fehler Überprüfen Sie die Spark-Benutzeroberfläche auf detaillierte Fehlerprotokolle. Starten Sie die Spark-Sitzung neu, indem Siedie Sitzung "> beenden" auswählen, und führen Sie dann alle Zellen erneut aus.

Bereinigen von Ressourcen

Wenn Sie für diesen Artikel ein neues Seehaus erstellt haben und es nicht mehr benötigen:

  1. Klicken Sie in Ihrem Arbeitsbereich mit der rechten Maustaste auf den Namen des Seehauses.
  2. Wählen Sie "Löschen" aus.
  3. Bestätigen Sie den Löschvorgang.

Das Notizbuch verbleibt in Ihrem Arbeitsbereich, es sei denn, Sie löschen es separat.