L'inferenza ONNX su Spark

ONNX (Open Neural Network Exchange) offre un runtime portatile ottimizzato per l'hardware per i modelli di Machine Learning. Convertendo un modello in formato ONNX, è possibile eseguire l'inferenza batch in Spark con una latenza inferiore e senza dipendere dal framework di training originale in fase di stima.

In questo articolo si esegue il training di un modello LightGBM con SynapseML, lo si converte in formato ONNX e quindi si usa il modello ONNX per eseguire l'inferenza in Spark in Microsoft Fabric.

Prerequisiti

  • Abbonati a Microsoft Fabric. Oppure, registrati per una versione di prova gratuita di Microsoft Fabric.

  • Accedi a Microsoft Fabric.

  • Passare a Fabric usando il selettore di esperienza nell'angolo in basso a sinistra della home page.

    Screenshot che mostra la selezione di

  • Collega il tuo notebook a un lakehouse. Sul lato sinistro del notebook selezionare Aggiungi per aggiungere una lakehouse esistente o crearne una.
  • Fabric Runtime 1.2 o versione successiva.

Installare i pacchetti necessari

Eseguire la cella seguente nel notebook per installare i pacchetti necessari. Il pacchetto onnxmltools non è preinstallato nel runtime di Fabric.

%pip install onnxmltools --quiet

Al termine dell'installazione, verificare che i pacchetti siano disponibili:

import onnxmltools
import lightgbm
print(f"onnxmltools version: {onnxmltools.__version__}")
print(f"lightgbm version: {lightgbm.__version__}")

Note

Il pacchetto lightgbm è preinstallato in Fabric Runtime 1.2 e versioni successive. È sufficiente installare onnxmltools.

Caricare i dati di esempio

Caricare il set di dati di stima del fallimento dal Archiviazione BLOB di Azure pubblico:

df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

print(f"Rows: {df.count()}, Columns: {len(df.columns)}")
display(df.limit(5))

La tabella visualizzata include colonne come:

Fallimento? Indicatore del reddito netto Patrimonio netto verso passività
0 1.0 0,0165
0 1.0 0,0208

Addestrare un modello LightGBM

Usare VectorAssembler per combinare le colonne delle caratteristiche, quindi addestrare un LightGBMClassifier:

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?")
    .setDataTransferMode("bulk")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

Verificare che il modello sia stato sottoposto a training correttamente:

print(f"Model type: {type(model).__name__}")
print(f"Number of features: {len(feature_cols)}")

Convertire il modello in formato ONNX

Esportare il modello sottoposto a training in un booster LightGBM, quindi convertirlo in ONNX:

import lightgbm as lgb
from typing import Union
from lightgbm import Booster, LGBMClassifier
from onnxmltools.convert import convert_lightgbm
from onnxmltools.convert.common.data_types import FloatTensorType


def convert_to_onnx(lgbm_model: Union[LGBMClassifier, Booster], input_size: int) -> bytes:
    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=13
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convert_to_onnx(booster, len(feature_cols))

Verificare che la conversione ONNX sia riuscita:

print(f"ONNX model payload size: {len(model_payload_ml)} bytes")
assert len(model_payload_ml) > 0, "ONNX conversion failed: empty payload"

L'output mostra le dimensioni del payload del modello ONNX in byte (in genere circa 800.000 byte).

Importante

Usare from onnxmltools.convert.common.data_types import FloatTensorType per la definizione del tipo. Il percorso from onnxconverter_common.data_types import FloatTensorType di importazione precedente non è compatibile con le versioni correnti di onnxmltools.

Caricare e configurare il modello ONNX

Caricare il payload ONNX in un synapseML ONNXModel ed esaminare gli input e gli output del modello:

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

L'output elenca i nodi di input e output del modello.

Configurare il modello eseguendo il mapping delle colonne di input e output. Il FeedDict associa i nomi di input del modello ONNX ai nomi delle colonne del DataFrame. Il FetchDict associa i nomi delle colonne di output desiderate ai nomi di output del modello ONNX:

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

Eseguire l'inferenza

Creare dati di test e trasformarli tramite il modello ONNX:

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 10000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.repartition(4)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

Note

Poiché i dati di test vengono generati in modo casuale, i valori di stima non rappresentano risultati reali. Questa sezione illustra che il modello ONNX viene eseguito correttamente in Spark.

L'output deve contenere colonne per features, predictione probability:

Funzionalità Previsione probabilità
{"type":1,"values":[0.105... 0 {"0":0.835...
{"type":1,"values":[0.814... 0 {"0":0.658...

Verificare i risultati generati dall'inferenza:

results = onnx_ml.transform(testDf)
print(f"Result count: {results.count()}")
print(f"Output columns: {results.columns}")
assert "prediction" in results.columns, "Missing prediction column"
assert "probability" in results.columns, "Missing probability column"

L'output conferma che tutte le righe di test hanno ricevuto un punteggio e che il DataFrame dei risultati contiene le colonne features, prediction e probability.

Risoluzione dei problemi

Problema Motivo Risoluzione
ModuleNotFoundError: No module named 'onnxmltools' Il pacchetto non è preinstallato in Fabric runtime. Eseguire %pip install onnxmltools --quiet e riavviare il kernel Python.
RuntimeError: Operator LgbmClassifier got an input with a wrong type Percorso di importazione errato per FloatTensorType. Usare from onnxmltools.convert.common.data_types import FloatTensorType anziché importare da onnxconverter_common.data_types.
ModuleNotFoundError: No module named 'onnx.mapping' Versione onnxmltools 1.7.0 o precedenti non compatibile con il pacchetto onnx corrente. Eseguire %pip install onnxmltools --upgrade --quiet per installare una versione compatibile.
ONNX conversion returns empty payload Estrazione di stringhe del modello booster non riuscita. Verificare che model.getLightGBMBooster().modelStr().get() restituisca una stringa non vuota prima della conversione.
Feature (Column_) appears more than one time durante model.fit() Le colonne del dataset con caratteri speciali producono nomi duplicati dopo la sanificazione di LightGBM. Aggiungere .setDataTransferMode("bulk") alla LightGBMClassifier configurazione. La modalità bulk usa Apache Arrow ed evita il problema di purificazione del nome della colonna.
AssertionError su SparkContext in ONNXModel() La sessione Spark non viene inizializzata. Esegui questo codice in un notebook Fabric con un lakehouse collegato. La spark variabile viene pre-inizializzata dal runtime.

Pulire le risorse

Se il dataframe di test memorizzato nella cache non è più necessario, annullarlo per liberare memoria del cluster:

testDf.unpersist()