Ein Datenflussdiagramm ist eine komponierbare Verarbeitungspipeline, die Daten transformiert, während sie zwischen Quellen und Zielen fließt. Ein Standarddatenfluss folgt einer festen Anreicherung, Filter- und Kartensequenz. Mit einem Datenflussdiagramm können Sie Transformationen in beliebiger Reihenfolge verketten, in parallele Pfade verzweigen und Daten über Zeitfenster aggregieren.
In diesem Artikel wird schrittweise das Erstellen eines Datenflussdiagramms erläutert. Eine Übersicht über Datenflussdiagramme und die verfügbaren Transformationen finden Sie unter Übersicht über Datenflussdiagramme.
Von Bedeutung
Datenflussdiagramme unterstützen derzeit nur MQTT-, Kafka- und OpenTelemetry-Endpunkte. Andere Endpunkttypen wie Data Lake, Microsoft Fabric OneLake, Azure Data Explorer und lokaler Speicher werden nicht unterstützt. Weitere Informationen finden Sie unter Bekannte Probleme.
Voraussetzungen
- Eine Instanz von Azure IoT Einsatz, die in einem Kubernetes-Cluster bereitgestellt wird. Weitere Informationen finden Sie unter Deploy Azure IoT Einsatz.
Die Azure CLI-Version 2.62.0 oder neuer, die auf Ihrem Entwicklungscomputer installiert ist. Verwenden Sie az --version, um Ihre Version zu überprüfen. Bei Bedarf können Sie sie mithilfe von az upgrade aktualisieren. Weitere Informationen finden Sie unter Install the Azure CLI.
Die Azure IoT Einsatz Erweiterung für die Azure CLI. Verwenden Sie den folgenden Befehl, um die Erweiterung hinzuzufügen oder auf die neueste Version zu aktualisieren:
az extension add --upgrade --name azure-iot-ops
Azure IoT Einsatz Version 1.2 oder höher.
Ein Datenflussprofil. Sie können das Standardprofil verwenden.
Ein Datenflussendpunkt für Ihre Quelle und Ihr Ziel. Der standardmäßige MQTT-Broker-Endpunkt funktioniert für die ersten Schritte.
Erstellen eines Datenflussdiagramms
Ein Datenflussdiagramm enthält drei Arten von Elementen: Quellen , die Daten einbinden, transformationen , die sie verarbeiten, und Ziele , die sie senden. Verbinden Sie sie in der Reihenfolge, in der Daten fließen sollen.
Wechseln Sie im Operations-Erlebnis zu Ihrer Azure IoT Einsatz-Instanz.
Wählen Sie "Datenflussdiagramm>erstellen" aus.
Geben Sie einen Namen für das Datenflussdiagramm ein, und wählen Sie ein Datenflussprofil aus. Das Standardprofil ist standardmäßig ausgewählt.
Erstellen Sie Ihre Pipeline, indem Sie dem Zeichenbereich Elemente hinzufügen:
Fügen Sie eine Quelle hinzu: Wählen Sie den Quellendpunkt aus, und konfigurieren Sie die Themen, die für eingehende Nachrichten abonniert werden sollen.
-
Hinzufügen von Transformationen: Wählen Sie eine oder mehrere Transformationen aus, um die Daten zu verarbeiten. Verfügbare Transformationen umfassen Abbildung, Filter, Verzweigung, Verkettung und Fenster. Ausführliche Informationen zu den einzelnen Transformationstypen finden Sie in der Übersicht über Datenflussdiagramme.
Fügen Sie ein Ziel hinzu: Wählen Sie den Zielendpunkt aus, und konfigurieren Sie das Thema oder den Pfad, an das verarbeitete Daten gesendet werden sollen.
Verbinden Sie die Elemente in der Reihenfolge, in der Daten fließen sollen.
Wählen Sie "Speichern" aus, um das Datenflussdiagramm bereitzustellen.
Die Azure CLI wendet ein Datenflussdiagramm aus einer einzelnen JSON-Konfigurationsdatei an, die alle Knoten und Verbindungen enthält. Verwenden Sie az iot ops dataflowgraph apply, um den Graphen zu erstellen oder zu ersetzen. Im folgenden Beispiel werden Temperaturdaten gelesen, in Fahrenheit konvertiert und an ein Zielthema gesendet.
Erstellen Sie eine graph.json Datei mit den Datenflussdiagrammeigenschaften. In der graph.json Datei werden die Regeln jeder Transformation im value Feld als ESCAPE-JSON-Zeichenfolge gespeichert. Die lesbare Form der Regeln der einzelnen Transformationen finden Sie in der Vorgehensweise für diesen Transformationstyp.
{
"mode": "Enabled",
"nodes": [
{
"nodeType": "Source",
"name": "sensors",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"telemetry/temperature"
]
}
},
{
"nodeType": "Graph",
"name": "convert",
"graphSettings": {
"registryEndpointRef": "default",
"artifact": "azureiotoperations/graph-dataflow-map:1.0.0",
"configuration": [
{
"key": "rules",
"value": "{\"map\":[{\"inputs\":[\"*\"],\"output\":\"*\"},{\"inputs\":[\"temperature\"],\"output\":\"temperature_f\",\"expression\":\"cToF($1)\"}]}"
}
]
}
},
{
"nodeType": "Destination",
"name": "output",
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "telemetry/converted"
}
}
],
"nodeConnections": [
{
"from": {
"name": "sensors"
},
"to": {
"name": "convert"
}
},
{
"from": {
"name": "convert"
},
"to": {
"name": "output"
}
}
]
}
Wenden Sie die Konfigurationsdatei an. Die extendedLocation Datei wird automatisch aus der Instanz und Ressourcengruppe hinzugefügt. Fügen Sie sie daher nicht in die Datei ein.
az iot ops dataflowgraph apply \
--name temperature-processing \
--instance <INSTANCE_NAME> \
--resource-group <RESOURCE_GROUP> \
--config-file graph.json
Das Diagramm ist dem default Datenflussprofil zugeordnet. Um ein anderes Profil zu verwenden, fügen Sie --profile <PROFILE_NAME> hinzu.
Erstellen Sie eine Bicep-Datei .bicep mit der folgenden Struktur. In diesem Beispiel wird ein Datenflussdiagramm erstellt, das Temperaturdaten liest, in Fahrenheit konvertiert und an ein Zielthema sendet.
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2026-03-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2026-03-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflowGraph 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflowGraphs@2026-03-01' = {
parent: defaultDataflowProfile
name: 'temperature-processing'
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
profileRef: 'default'
mode: 'Enabled'
nodes: [
{
nodeType: 'Source'
name: 'sensors'
sourceSettings: {
endpointRef: 'default'
dataSources: [
'telemetry/temperature'
]
}
}
{
nodeType: 'Graph'
name: 'convert'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-map:1.0.0'
configuration: [
{
key: 'rules'
value: '{"map":[{"inputs":["*"],"output":"*"},{"inputs":["temperature"],"output":"temperature_f","expression":"cToF($1)"}]}'
}
]
}
}
{
nodeType: 'Destination'
name: 'output'
destinationSettings: {
endpointRef: 'default'
dataDestination: 'telemetry/converted'
}
}
]
nodeConnections: [
{
from: { name: 'sensors' }
to: { name: 'convert' }
}
{
from: { name: 'convert' }
to: { name: 'output' }
}
]
}
}
Bereitstellen der Bicep-Datei:
az deployment group create --resource-group <RESOURCE_GROUP> --template-file <FILE>.bicep
Von Bedeutung
Die Verwendung von Kubernetes-Bereitstellungsmanifesten wird in Produktionsumgebungen nicht unterstützt und sollte nur zum Debuggen und Testen verwendet werden.
Erstellen Sie eine Kubernetes-Manifestdatei .yaml mit der folgenden Struktur. In diesem Beispiel wird ein Datenflussdiagramm erstellt, das Temperaturdaten liest, in Fahrenheit konvertiert und an ein Zielthema sendet.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: DataflowGraph
metadata:
name: temperature-processing
namespace: azure-iot-operations
spec:
profileRef: default
nodes:
- nodeType: Source
name: sensors
sourceSettings:
endpointRef: default
dataSources:
- telemetry/temperature
- nodeType: Graph
name: convert
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-map:1.0.0
configuration:
- key: rules
value: |
{
"map": [
{
"inputs": ["*"],
"output": "*"
},
{
"inputs": ["temperature"],
"output": "temperature_f",
"expression": "cToF($1)"
}
]
}
- nodeType: Destination
name: output
destinationSettings:
endpointRef: default
dataDestination: telemetry/converted
nodeConnections:
- from:
name: sensors
to:
name: convert
- from:
name: convert
to:
name: output
Anwenden des Manifests:
kubectl apply -f <FILE>.yaml
Die Quelle definiert, wo Daten in die Pipeline eingegeben werden. Geben Sie einen Endpunktverweis und ein oder mehrere Themen an.
Wählen Sie im Datenflussdiagramm-Editor das Quellelement aus, und konfigurieren Sie Folgendes:
| Setting |
Beschreibung |
|
Endpunkt |
Der zu verwendende Datenflussendpunkt. Wählen Sie den Standardwert für den lokalen MQTT-Broker aus. |
|
Themen |
Ein oder mehrere Themen, die für eingehende Nachrichten abonniert werden sollen. |
Die CLI wendet den gesamten Graphen auf einmal an. Konfigurieren Sie die Quelle also als Knoten Source in Ihrer graph.json-Konfigurationsdatei und führen Sie dann az iot ops dataflowgraph apply aus:
{
"nodeType": "Source",
"name": "sensors",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"telemetry/temperature",
"telemetry/humidity"
]
}
}
{
nodeType: 'Source'
name: 'sensors'
sourceSettings: {
endpointRef: 'default'
dataSources: [
'telemetry/temperature'
'telemetry/humidity'
]
}
}
Von Bedeutung
Die Verwendung von Kubernetes-Bereitstellungsmanifesten wird in Produktionsumgebungen nicht unterstützt und sollte nur zum Debuggen und Testen verwendet werden.
- nodeType: Source
name: sensors
sourceSettings:
endpointRef: default
dataSources:
- telemetry/temperature
- telemetry/humidity
Transformiert Prozessdaten zwischen Der Quelle und dem Ziel. Jede Transformation verweist auf ein integriertes Artefakt und ist mit Regeln konfiguriert.
Die verfügbaren integrierten Transformationen sind:
| Umwandeln |
Artefakt |
Beschreibung |
|
Landkarte |
azureiotoperations/graph-dataflow-map:1.0.0 |
Umbenennen, Neustrukturieren, Berechnen und Kopieren von Feldern. |
|
Filter |
azureiotoperations/graph-dataflow-filter:1.0.0 |
Verwerfen Sie Nachrichten, die einer Bedingung entsprechen. |
|
Filiale |
azureiotoperations/graph-dataflow-branch:1.0.0 |
Leiten Sie jede Nachricht basierend auf einer Bedingung an einen true- oder false-Pfad weiter. |
|
Concatenate |
azureiotoperations/graph-dataflow-concatenate:1.0.0 |
Führen Sie zwei oder mehr Pfade wieder in einem Pfad zusammen. |
|
Fenster |
azureiotoperations/graph-dataflow-window:1.0.0 |
Sammeln Sie Nachrichten über ein Zeitintervall, und aggregieren Sie dann. |
Weitere Informationen zum Anreichern von Nachrichten mit externen Daten finden Sie unter "Anreichern mit externen Daten".
Wählen Sie im Datenflussdiagramm-Editor "Transformation hinzufügen " und dann den Transformationstyp aus. Konfigurieren Sie die Regeln im visuellen Editor.
Jede Transformation ist ein Knoten, wobei nodeType in Ihrer Graph-Konfigurationsdatei auf graph.json gesetzt ist. Die Regeln der Transformation sind ein JSON-Objekt, wie diese Karte, die Temperatur in Fahrenheit konvertiert:
{
"map": [
{
"inputs": ["temperature"],
"output": "temperature_f",
"expression": "cToF($1)"
}
]
}
Die Eigenschaft configuration akzeptiert diese Regeln als Zeichenfolge, daher wird das JSON der Regeln mit Escape-Zeichen versehen und im Feld value eingefügt. Wenden Sie den vollständigen Graphen mit az iot ops dataflowgraph apply an:
{
"nodeType": "Graph",
"name": "convert",
"graphSettings": {
"registryEndpointRef": "default",
"artifact": "azureiotoperations/graph-dataflow-map:1.0.0",
"configuration": [
{
"key": "rules",
"value": "{\"map\":[{\"inputs\":[\"temperature\"],\"output\":\"temperature_f\",\"expression\":\"cToF($1)\"}]}"
}
]
}
}
Tip
Um die Escape-Zeichenkette zu generieren, speichern Sie die Regeln in einer Datei wie rules.json, führen Sie dann jq -c . rules.json aus und fügen Sie die einzeilige Ausgabe in das Feld value ein.
Jede Transformation ist ein Knoten mit nodeType: 'Graph'. Die configuration Eigenschaft übergibt Regeln als JSON-Zeichenfolge:
{
nodeType: 'Graph'
name: 'convert'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-map:1.0.0'
configuration: [
{
key: 'rules'
value: '{"map":[{"inputs":["temperature"],"output":"temperature_f","expression":"cToF($1)"}]}'
}
]
}
}
Von Bedeutung
Die Verwendung von Kubernetes-Bereitstellungsmanifesten wird in Produktionsumgebungen nicht unterstützt und sollte nur zum Debuggen und Testen verwendet werden.
Jede Transformation ist ein Knoten mit nodeType: Graph. Die configuration Eigenschaft übergibt Regeln als JSON-Zeichenfolge:
- nodeType: Graph
name: convert
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-map:1.0.0
configuration:
- key: rules
value: |
{
"map": [
{
"inputs": ["temperature"],
"output": "temperature_f",
"expression": "cToF($1)"
}
]
}
Sie können eine beliebige Anzahl von Transformationen verketten. Verbinden Sie sie im nodeConnections Abschnitt in der Reihenfolge, in der Daten fließen sollen:
Ziehen Sie Verbindungen zwischen Transformationen auf der Arbeitsfläche, um die Verarbeitungsreihenfolge festzulegen.
Definieren Sie die Verarbeitungsreihenfolge im nodeConnections Abschnitt Ihrer graph.json Konfigurationsdatei:
"nodeConnections": [
{
"from": {
"name": "sensors"
},
"to": {
"name": "remove-bad-data"
}
},
{
"from": {
"name": "remove-bad-data"
},
"to": {
"name": "convert"
}
},
{
"from": {
"name": "convert"
},
"to": {
"name": "output"
}
}
]
nodeConnections: [
{ from: { name: 'sensors' }, to: { name: 'remove-bad-data' } }
{ from: { name: 'remove-bad-data' }, to: { name: 'convert' } }
{ from: { name: 'convert' }, to: { name: 'output' } }
]
Von Bedeutung
Die Verwendung von Kubernetes-Bereitstellungsmanifesten wird in Produktionsumgebungen nicht unterstützt und sollte nur zum Debuggen und Testen verwendet werden.
nodeConnections:
- from: { name: sensors }
to: { name: remove-bad-data }
- from: { name: remove-bad-data }
to: { name: convert }
- from: { name: convert }
to: { name: output }
Das Ziel definiert, wo verarbeitete Daten gesendet werden. Geben Sie einen Endpunktverweis und ein Thema oder Pfad an.
Wählen Sie das Zielelement aus, und konfigurieren Sie Folgendes:
| Setting |
Beschreibung |
|
Endpunkt |
Der Datenflussendpunkt, an den Daten gesendet werden sollen. |
|
Topic |
Das Thema oder der Pfad, in dem verarbeitete Daten veröffentlicht werden sollen. |
Konfigurieren Sie das Ziel als Knoten Destination in Ihrer graph.json-Konfigurationsdatei und wenden Sie dann den gesamten Graphen mit az iot ops dataflowgraph apply an:
{
"nodeType": "Destination",
"name": "output",
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "telemetry/processed"
}
}
{
nodeType: 'Destination'
name: 'output'
destinationSettings: {
endpointRef: 'default'
dataDestination: 'telemetry/processed'
}
}
Von Bedeutung
Die Verwendung von Kubernetes-Bereitstellungsmanifesten wird in Produktionsumgebungen nicht unterstützt und sollte nur zum Debuggen und Testen verwendet werden.
- nodeType: Destination
name: output
destinationSettings:
endpointRef: default
dataDestination: telemetry/processed
Dynamisches Themenrouting basierend auf Nachrichteninhalten finden Sie unter Weiterleiten von Nachrichten zu verschiedenen Themen.
Überprüfen, ob das Datenflussdiagramm funktioniert
Überprüfen Sie nach der Bereitstellung eines Datenflussgraphs, ob er läuft.
Wählen Sie in der Betriebsumgebung Ihr Datenflussdiagramm aus, um den Status anzuzeigen. Ein gesundes Diagramm zeigt einen laufenden Zustand an.
Verwenden Sie az iot ops dataflowgraph show, um die Details eines Diagramms anzuzeigen:
az iot ops dataflowgraph show \
--name temperature-processing \
--instance <INSTANCE_NAME> \
--resource-group <RESOURCE_GROUP>
Verwenden Sie az iot ops dataflowgraph listzum Auflisten aller Datenflussdiagramme, die einem Profil zugeordnet sind:
az iot ops dataflowgraph list \
--instance <INSTANCE_NAME> \
--resource-group <RESOURCE_GROUP>
Überprüfen Sie den Status der DataflowGraph Ressource:
az resource show --resource-group <RESOURCE_GROUP> --resource-type Microsoft.IoTOperations/instances/dataflowProfiles/dataflowGraphs --name <GRAPH_NAME> --parent instances/<INSTANCE_NAME>/dataflowProfiles/<PROFILE_NAME>
Von Bedeutung
Die Verwendung von Kubernetes-Bereitstellungsmanifesten wird in Produktionsumgebungen nicht unterstützt und sollte nur zum Debuggen und Testen verwendet werden.
kubectl get dataflowgraph temperature-processing -n azure-iot-operations
Überprüfen Sie die Pod-Protokolle auf Fehler:
kubectl logs -l app=dataflow -n azure-iot-operations --tail=50
Nächste Schritte