Klassificeringsopgaver ved hjælp af SynapseML

Denne artikel viser, hvordan man udfører en tekstklassifikationsopgave med to metoder. Den ene metode bruger almindelig pyspark, og den anden bruger biblioteket synapseml . Begge metoder giver samme ydeevne, men fremhæver, hvordan SynapseML reducerer kodekompleksiteten sammenlignet med pyspark.

Opgaven forudsiger, om en kundeanmeldelse af en bog solgt på Amazon er god (vurdering > 3) eller dårlig, baseret på anmeldelsesteksten. Du træner LogisticRegression-lærende med forskellige hyperparametre og vælger derefter den bedste model.

Forudsætninger

  • Få et Microsoft Fabric-abonnement. Du kan også tilmelde dig en gratis Prøveversion af Microsoft Fabric.

  • Log på Microsoft Fabric.

  • Skift til Fabric ved at bruge experience-switcheren nederst til venstre på din startside.

    Skærmbillede, der viser valget af Fabric i oplevelsesskifter-menuen.

  • Opret en notesbog.
  • Vedhæft din notesbog til et lakehouse. I notesbogen vælger du Tilføj i venstre panel for at tilføje et eksisterende søhus eller oprette et nyt.

Bemærkning

Alle biblioteker, der bruges i denne artikel (pyspark, synapseml, numpy), er forudinstalleret i Fabric Spark-runtime. Du behøver ikke installere nogen pakker.

Indlæs og udforsk dataene

I Fabric notesbøger er en Spark-session allerede tilgængelig som variablen spark. Indlæs Amazon-boganmeldelsesdatasættet fra en offentlig Azure Blob Storage-lokation:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Kontroller at datasættet indlæste korrekt:

print(f"Row count: {rawData.count()}")
print(f"Columns: {rawData.columns}")
assert rawData.count() == 10000, "Expected 10,000 rows"
assert set(rawData.columns) == {"text", "rating"}, "Expected columns: text, rating"
print("Data loaded successfully")

Udtræk funktioner og behandl data

Rigtige data har ofte egenskaber af flere typer, for eksempel tekst, numerisk og kategorisk. For at demonstrere arbejde med blandede funktionstyper, tilføj to numeriske funktioner til datasættet: ordantallet i anmeldelsen og gennemsnitslængden på ordet.

Definér brugerdefinerede funktioner (UDF'er)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
import numpy as np


def calc_word_count(s):
    return len(s.split())


def calc_word_length(s):
    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(calc_word_length, DoubleType())
wordCountUDF = udf(calc_word_count, IntegerType())

Anvend UDF'er med SynapseML UDFTransformer

Brug fra UDFTransformer SynapseML til at indpakke UDF'erne i pipeline-kompatible transformere:

from synapse.ml.stages import UDFTransformer

wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol="wordLength", udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol="wordCount", udf=wordCountUDF
)

Kør feature-pipelinen

Påfør begge transformere og opret en binær labelkolonne ud fra vurderingen:

from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)

Verificér funktionsudtrækningen:

data.show(5)
print(f"Columns: {data.columns}")
assert "wordLength" in data.columns, "wordLength column missing"
assert "wordCount" in data.columns, "wordCount column missing"
assert "label" in data.columns, "label column missing"
assert "rating" not in data.columns, "rating column should be dropped"
print("Feature extraction successful")

Klassificer ved hjælp af pyspark

For at vælge den bedste LogisticRegression-klassifikator ved hjælp af biblioteket pyspark , skal du eksplicit udføre disse trin:

  1. Behandl funktionerne:
    • Tokeniser tekstkolonnen.
    • Hash den tokeniserede kolonne til en vektor ved hjælp af hashing.
    • Sammenføj de numeriske træk med vektoren.
  2. Støb labelkolonnen fra boolesk til heltalstype.
  3. Træn flere LogisticRegression-algoritmer på datasættet train med forskellige hyperparametre.
  4. Beregn arealet under ROC-kurven (AUC) for hver trænet model og vælg modellen med den højeste metrik på datasættet test .
  5. Evaluer den bedste model på settet validation .

Marker og forbered dataene

from pyspark.ml.feature import Tokenizer, HashingTF, VectorAssembler
from pyspark.sql.types import IntegerType

# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features into one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only the label and features columns, cast label to integer
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)

Verificér de fremhævede data:

print(f"Feature vector size: {processedData.first()['features'].size}")
print(f"Label values: {sorted(processedData.select('label').distinct().rdd.flatMap(lambda x: x).collect())}")
assert processedData.first()["features"].size == 10002, "Expected 10000 text + 2 numeric features"
print("Featurization successful")

Træn og evaluer modeller

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Split the data into train, test, and validation sets
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Train each model and evaluate on the test set
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))

bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Evaluate the best model on the validation dataset
scoredVal = bestModel.transform(validation)
validationAUC = evaluator.evaluate(scoredVal)
print(f"Best model's AUC on validation set = {validationAUC:.4f}")

Verificér resultaterne:

print(f"Number of models trained: {len(models)}")
print(f"Best regularization parameter: {lrHyperParams[metrics.index(bestMetric)]}")
print(f"Test AUC scores: {[f'{m:.4f}' for m in metrics]}")
assert 0.5 < validationAUC <= 1.0, f"AUC {validationAUC} is outside expected range (0.5, 1.0]"
print(f"pyspark classification complete - AUC: {validationAUC:.4f}")

Bemærkning

De præcise AUC-værdier afhænger af den tilfældige fordeling. Forvent værdier mellem 0,65 og 0,85.

Klassificer ved hjælp af SynapseML

Tilgangen synapseml opnår samme resultat med færre trin. SynapseML håndterer funktionalitet internt, hvilket reducerer den kode, du skal skrive:

  1. Estimatoren TrainClassifier indeholder internt dataene, så længe kolonnerne i train, test, og validation datasættene repræsenterer funktionerne.
  2. Estimatoren FindBestModel finder den bedste model fra en pulje af trænede modeller ved at evaluere præstationen på datasættet test med den specificerede metrik.
  3. Transformeren ComputeModelStatistics beregner flere metrikker på et scoret datasæt (i dette tilfælde datasættet validation ) på samme tid.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
from pyspark.ml.classification import LogisticRegression

# Split the raw feature data (SynapseML handles featurization internally)
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model based on AUC
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)

# Compute metrics on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)

Verificér SynapseML-resultaterne:

auc_value = metrics.first()["AUC"]
print(f"Available metrics: {metrics.columns}")
assert 0.5 < auc_value <= 1.0, f"AUC {auc_value} is outside expected range (0.5, 1.0]"
print(f"SynapseML classification complete - AUC: {auc_value:.4f}")

Bemærkning

Pyspark- og SynapseML-tilgangene bør give lignende AUC-værdier, da de træner den samme modeltype med de samme hyperparametre på de samme data.

Sammenlign de to tilgange

Aspekt pyspark SynapseML
Featurebehandling Manual (Tokenizer til HashingTF til VectorAssembler) Automatisk (håndteret af TrainClassifier)
Valg af model Manuel løkke med evaluator Indbygget FindBestModel
Metrikberegning Enkelt metrik pr. evalueringskald Flere metrikker med ComputeModelStatistics
Kodelinjer Omkring 30 linjer Omkring 15 linjer
Resultat Samme AUC Samme AUC

Fejlfinding

Spørgsmål Årsag Løsning
AnalysisException: Path does not exist Den offentlige blob-lagrings-URL er midlertidigt utilgængelig Vent et par minutter, og prøv igen. Verificér forbindelsen ved at køre spark.read.parquet("wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet").count()
IllegalArgumentException: Field "features" does not exist Navnene på funktionskolonnerne passer ikke mellem transformere Verificér kolonnenavne ved at køre data.columns før VectorAssembler-trinnet
NameError: name 'LogisticRegression' is not defined Manglende import-statement Tilføj from pyspark.ml.classification import LogisticRegression øverst i cellen
ModuleNotFoundError: No module named 'synapse.ml' Notebook bruger ikke Fabric Spark runtime Tjek at notebooken bruger Fabric Runtime 1.2 eller nyere. Vælg Miljø i båndet for at tjekke.
Lav AUC (under 0,6) Datasplittelsesproblem eller konvergensproblemer Verificér labelfordelingen med data.groupBy("label").count().show(). Forvent et nogenlunde balanceret datasæt.
Py4JJavaError: An error occurred while calling Java/Spark intern fejl Tjek Spark-brugerfladen for detaljerede fejllogfiler. Genstart Spark-sessionen ved at vælge Session>Stop session, og kør derefter alle celler igen.

Ryd op i ressourcer

Hvis du har oprettet et nyt søhus til denne artikel og ikke længere har brug for det:

  1. I dit arbejdsområde højreklikker du på navnet på søhuset.
  2. Vælg Slet.
  3. Bekræft sletningen.

Notesbogen forbliver i dit arbejdsområde, medmindre du sletter den separat.