Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo illustra come eseguire un'attività di classificazione del testo con due metodi. Un metodo usa plain pysparke l'altro usa la synapseml libreria . Entrambi i metodi producono le stesse prestazioni, ma evidenziano il modo in cui SynapseML riduce la complessità del codice rispetto a pyspark.
L'attività stima se una recensione cliente di un libro venduto su Amazon è buona (valutazione > 3) o negativa, in base al testo della recensione. Si addestrano modelli LogisticRegression con iperparametri diversi, quindi si sceglie il modello migliore.
Prerequisiti
Abbonati a Microsoft Fabric. Oppure, registrati per una versione di prova gratuita di Microsoft Fabric.
Accedi a Microsoft Fabric.
Passare a Fabric usando il selettore di esperienza nell'angolo in basso a sinistra della home page.
- Crea un notebook.
- Collegare il notebook a un lakehouse. Nel notebook selezionare Aggiungi nel riquadro sinistro per collegare una lakehouse esistente o crearne una nuova.
Note
Tutte le librerie usate in questo articolo (pyspark, synapseml, numpy) sono preinstallate nel runtime di Spark Fabric. Non è necessario installare alcun pacchetto.
Caricare ed esplorare i dati
Nei notebook di Fabric, una sessione Spark è già disponibile come variabile spark. Caricare il set di dati delle recensioni dei libri Amazon da una posizione Archiviazione BLOB di Azure pubblica:
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Verificare che il set di dati sia stato caricato correttamente:
print(f"Row count: {rawData.count()}")
print(f"Columns: {rawData.columns}")
assert rawData.count() == 10000, "Expected 10,000 rows"
assert set(rawData.columns) == {"text", "rating"}, "Expected columns: text, rating"
print("Data loaded successfully")
Estrarre funzionalità ed elaborare i dati
I dati reali hanno spesso caratteristiche di più tipi, ad esempio testo, numerico e categorico. Per illustrare l'uso dei tipi di funzionalità misti, aggiungere due funzionalità numeriche al set di dati: il conteggio delle parole della revisione e la lunghezza media della parola.
Definire funzioni definite dall'utente
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
import numpy as np
def calc_word_count(s):
return len(s.split())
def calc_word_length(s):
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(calc_word_length, DoubleType())
wordCountUDF = udf(calc_word_count, IntegerType())
Applicare funzioni definite dall'utente con SynapseML UDFTransformer
Usa il UDFTransformer di SynapseML per racchiudere le UDF in trasformatori compatibili con la pipeline:
from synapse.ml.stages import UDFTransformer
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol="wordLength", udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol="wordCount", udf=wordCountUDF
)
Esegui la pipeline delle funzionalità
Applica entrambi i trasformatori e crea una colonna di etichette binarie dalla valutazione:
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
Verificare l'estrazione delle funzionalità:
data.show(5)
print(f"Columns: {data.columns}")
assert "wordLength" in data.columns, "wordLength column missing"
assert "wordCount" in data.columns, "wordCount column missing"
assert "label" in data.columns, "label column missing"
assert "rating" not in data.columns, "rating column should be dropped"
print("Feature extraction successful")
Classificare usando pyspark
Per scegliere il classificatore LogisticRegression migliore usando la pyspark libreria, è necessario eseguire in modo esplicito questi passaggi:
- Elaborare le funzionalità:
- Tokenizzare la colonna di testo.
- Trasformare la colonna tokenizzata in un vettore mediante hashing.
- Unire le caratteristiche numeriche con il vettore.
- Converti la colonna label dal tipo booleano al tipo intero.
- Addestrare più algoritmi LogisticRegression sul set di dati
traincon iperparametri diversi. - Calcolare l'area sotto la curva ROC (AUC) per ogni modello sottoposto a training e selezionare il modello con la metrica più alta nel
testset di dati. - Valutare il modello migliore sull'insieme
validation.
Definire le caratteristiche e preparare i dati
from pyspark.ml.feature import Tokenizer, HashingTF, VectorAssembler
from pyspark.sql.types import IntegerType
# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features into one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only the label and features columns, cast label to integer
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
Verifica i dati caratterizzati:
print(f"Feature vector size: {processedData.first()['features'].size}")
print(f"Label values: {sorted(processedData.select('label').distinct().rdd.flatMap(lambda x: x).collect())}")
assert processedData.first()["features"].size == 10002, "Expected 10000 text + 2 numeric features"
print("Featurization successful")
Addestrare e valutare i modelli
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Split the data into train, test, and validation sets
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Train each model and evaluate on the test set
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Evaluate the best model on the validation dataset
scoredVal = bestModel.transform(validation)
validationAUC = evaluator.evaluate(scoredVal)
print(f"Best model's AUC on validation set = {validationAUC:.4f}")
Verificare i risultati:
print(f"Number of models trained: {len(models)}")
print(f"Best regularization parameter: {lrHyperParams[metrics.index(bestMetric)]}")
print(f"Test AUC scores: {[f'{m:.4f}' for m in metrics]}")
assert 0.5 < validationAUC <= 1.0, f"AUC {validationAUC} is outside expected range (0.5, 1.0]"
print(f"pyspark classification complete - AUC: {validationAUC:.4f}")
Note
I valori AUC esatti dipendono dalla divisione casuale. Si prevedono valori compresi tra 0,65 e 0,85.
Classificare usando SynapseML
L'approccio synapseml ottiene lo stesso risultato con un minor numero di passaggi. SynapseML gestisce internamente la definizione delle caratteristiche, riducendo il codice necessario per scrivere:
- Lo
TrainClassifierstimatore estrae internamente le caratteristiche dai dati, purché le colonne nei set di datitrain,testevalidationrappresentino le caratteristiche. - Lo
FindBestModelstrumento di stima trova il modello migliore da un pool di modelli sottoposti a training valutando le prestazioni neltestset di dati con la metrica specificata. - Il
ComputeModelStatisticstrasformatore calcola più metriche in un set di dati con punteggio (in questo caso, ilvalidationset di dati) contemporaneamente.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
from pyspark.ml.classification import LogisticRegression
# Split the raw feature data (SynapseML handles featurization internally)
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train models with different regularization parameters
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model based on AUC
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Compute metrics on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)
Verificare i risultati di SynapseML:
auc_value = metrics.first()["AUC"]
print(f"Available metrics: {metrics.columns}")
assert 0.5 < auc_value <= 1.0, f"AUC {auc_value} is outside expected range (0.5, 1.0]"
print(f"SynapseML classification complete - AUC: {auc_value:.4f}")
Note
Gli approcci pyspark e SynapseML devono produrre valori di AUC simili, poiché eseguono il training dello stesso tipo di modello con gli stessi iperparametri sugli stessi dati.
Confrontare i due approcci
| Aspect | pyspark | SynapseML |
|---|---|---|
| Elaborazione delle caratteristiche | Manuale (da Tokenizer a HashingTF a VectorAssembler) | Automatico (gestito da TrainClassifier) |
| Selezione del modello | Ciclo manuale con analizzatore | Integrato FindBestModel |
| Calcolo delle metriche | Singola metrica per chiamata di valutazione | Più metriche con ComputeModelStatistics |
| Righe di codice | Circa 30 linee | Circa 15 righe |
| Result | Stessa AUC | Stessa AUC |
Risoluzione dei problemi
| Problema | Motivo | Risoluzione |
|---|---|---|
AnalysisException: Path does not exist |
L'URL di archiviazione BLOB pubblico non è temporaneamente disponibile | Attendere alcuni minuti e riprovare. Verificare la connettività eseguendo spark.read.parquet("wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet").count() |
IllegalArgumentException: Field "features" does not exist |
I nomi delle colonne di funzionalità non corrispondono tra trasformatori | Verificare i nomi delle colonne eseguendo data.columns prima del passaggio VectorAssembler |
NameError: name 'LogisticRegression' is not defined |
Istruzione di importazione mancante | Aggiungere from pyspark.ml.classification import LogisticRegression nella parte superiore della cella |
ModuleNotFoundError: No module named 'synapse.ml' |
Il notebook non usa Fabric runtime di Spark | Verificare che il notebook usi Fabric Runtime 1.2 o versione successiva. Selezionare Ambiente nella barra multifunzione per verificare. |
| AUC basso (inferiore a 0,6) | Problemi di suddivisione dei dati o convergenza | Verificare la distribuzione delle etichette con data.groupBy("label").count().show(). Si prevede un set di dati approssimativamente bilanciato. |
Py4JJavaError: An error occurred while calling |
errore interno di Java/Spark | Controllare l'interfaccia utente di Spark per i log degli errori dettagliati. Riavvia la sessione Spark selezionando Sessione>Arresta sessione, quindi riesegui tutte le celle. |
Pulire le risorse
Se è stato creato un nuovo lakehouse per questo articolo e non è più necessario:
- Nell'area di lavoro, fare clic con il pulsante destro del mouse sul nome del lakehouse.
- Seleziona Elimina.
- Conferma l'eliminazione.
Il notebook rimane nell'area di lavoro, a meno che non venga eliminato separatamente.
Contenuto correlato
- Come usare il modello K-NN (K-Nearest-Neighbors) con SynapseML
- Come usare ONNX con SynapseML - Deep Learning
- Come usare Kernel SHAP per spiegare un modello di classificazione tabulare