Configurar o repositório de estado do RocksDB no Azure Databricks

O RocksDB é o provedor de armazenamento de estado padrão no Databricks Runtime 17.3 e superior. Para versões do Databricks Runtime abaixo da 17.3, você pode habilitar o gerenciamento de estado baseado no RocksDB definindo a seguinte configuração no SparkSession antes de iniciar a consulta de streaming.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Você pode habilitar o RocksDB no Lakeflow Spark Declarative Pipelines. Consulte Otimizar a configuração do pipeline para processamento com estado.

Habilitar o ponto de verificação do log de alterações

No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do log de alterações para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho de Streaming Estruturado. A Databricks recomenda habilitar o ponto de verificação do log de alterações para todas as consultas com estado de Streaming Estruturado. Ponto de verificação do log de alterações é habilitado por padrão no Databricks Runtime 17.3 e superior.

Tradicionalmente, o RocksDB State Store tira instantâneos e faz upload de arquivos de dados durante o ponto de verificação. Para evitar esse custo, o ponto de verificação do log de alterações grava apenas os registros que foram alterados desde o último ponto de verificação em um armazenamento durável.

O ponto de verificação do log de alterações é desabilitado por padrão no Databricks Runtime abaixo da versão 17.3. É possível utilizar a seguinte sintaxe para habilitar o ponto de verificação do log de alterações no nível do SparkSession:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Você pode habilitar o ponto de verificação do log de alterações em um stream existente e manter as informações de estado armazenadas no ponto de verificação.

Importante

As consultas com ponto de verificação do log de alterações habilitado só podem ser executadas no Databricks Runtime 13.3 LTS e superior. Você pode desabilitar o ponto de verificação do log de alterações para reverter para o comportamento de ponto de verificação herdado, mas deve continuar a executar essas consultas no Databricks Runtime 13.3 LTS ou superior. Você deve reiniciar o trabalho para que essas alterações ocorram.

Métricas de armazenamento de estado RocksDB

Cada operador de estado coleta métricas relacionadas às operações de gerenciamento de estado realizadas na sua instância RocksDB para observar o repositório de estado e, potencialmente, ajudar na depuração de lentidão do trabalho.

No Databricks Runtime 16.4 LTS e posteriores, as métricas de uma instância específica do repositório de estado são rotuladas com a ID de partição e o nome do repositório, garantindo que permaneçam separadas. Todas as outras métricas são relatadas como a soma agregada para cada operador de estado em todas as tarefas em que o operador de estado está em execução.

Essas métricas fazem parte do mapa customMetrics dentro dos campos stateOperators em StreamingQueryProgress. Veja a seguir um exemplo de StreamingQueryProgress no formato JSON (obtido usando StreamingQueryProgress.json()).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Descrições detalhadas das métricas são as seguinte:

Nome da métrica Descrição
rocksdbCommitWriteBatchLatency Tempo (mili-segundos) necessário para aplicar as gravações em fases na estrutura na memória (WriteBatch) ao RocksDB nativo.
rocksdbCommitFlushLatency Tempo (mili-segundos) necessário para liberar as alterações na memória do RocksDB para o disco local.
rocksdbCommitCompactLatency O tempo (mili-segundos) levou para compactação (opcional) durante a confirmação do ponto de verificação.
rocksdbCommitPauseLatency Tempo (em milissegundos) necessário para interromper os threads de trabalho em segundo plano (para compactação etc.) como parte da confirmação de ponto de verificação.
rocksdbCommitCheckpointLatency Tempo (mili-segundos) levou para tirar um instantâneo do RocksDB nativo e gravar em um diretório local.
rocksdbCommitFileSyncLatencyMs Tempo (mili-segundos) necessário para sincronizar os arquivos nativos relacionados ao instantâneo do RocksDB para um armazenamento externo (local do ponto de verificação).
rocksdbGetLatency O tempo médio (em nanos) levou de acordo com a chamada RocksDB::Get nativa subjacente.
rocksdbPutCount O tempo médio (em nanos) levou de acordo com a chamada RocksDB::Put nativa subjacente.
rocksdbGetCount Número de chamadas RocksDB::Get nativas (não inclui Gets do WriteBatch – no lote de memória usado para gravações de preparação).
rocksdbPutCount Número de chamadas RocksDB::Put nativas (não inclui Puts para WriteBatch – no lote de memória usado para gravações de preparação).
rocksdbTotalBytesReadByGet Número de bytes descompactados lidos por chamadas RocksDB::Get nativas.
rocksdbTotalBytesWrittenByPut Número de bytes descompactados gravados por chamadas RocksDB::Put nativas.
rocksdbReadBlockCacheHitCount Número de vezes que o cache de blocos do RocksDB nativo é usado para evitar a leitura de dados do disco local.
rocksdbReadBlockCacheMissCount Número de vezes que o cache de blocos do RocksDB nativo perdeu e exigiu a leitura de dados do disco local.
rocksdbTotalBytesReadByCompaction Número de bytes lidos do disco local pelo processo de compactação do RocksDB nativo.
rocksdbTotalBytesWrittenByCompaction Número de bytes gravados no disco local pelo processo de compactação do RocksDB nativo.
rocksdbTotalCompactionLatencyMs O tempo (mili-segundos) levou para as compactações do RocksDB (tanto em segundo plano quanto para a compactação opcional iniciada durante a confirmação).
rocksdbWriterStallLatencyMs Tempo (mili-segundos) o autor foi paralisado devido a uma compactação em segundo plano ou liberação das memtables para o disco.
rocksdbTotalBytesReadThroughIterator Algumas operações com estado (como processamento de tempo limite em flatMapGroupsWithState ou marca d'água em agregações com janela) exigem a leitura de todos os dados no BD por meio do iterador. O tamanho total dos dados descompactados lidos usando o iterador.

Uso de memória do Cap RocksDB

No Databricks Runtime 17.3 e superior, o Azure Databricks limita automaticamente o uso de memória do RocksDB por nó. Em versões anteriores do Databricks Runtime, configure isso manualmente para evitar erros fora de memória.

O RocksDB aloca memória para memtables, o cache de blocos e os blocos de filtro e de índice. Sem um limite, o uso de memória entre várias instâncias do RocksDB em um nó pode aumentar indefinidamente e causar erros de falta de memória. O gerenciador do buffer de gravação do RocksDB aplica um limite de memória por nó para todas as instâncias do RocksDB em execução nesse nó.

Para limitar o uso de memória do RocksDB, defina as seguintes configurações na sessão do Spark antes de iniciar a consulta de streaming:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", "true")
spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", "<value>")

As configurações a seguir controlam o uso de memória do RocksDB:

Configuration Descrição
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage Habilita um limite de memória compartilhado para todas as instâncias do RocksDB em um nó. Defina como true para habilitar.
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB Memória máxima em MB compartilhada entre todas as instâncias do RocksDB em um nó. Defina isso como um valor estático ou como um valor calculado como uma fração da memória física do nó.
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB Tamanho máximo do buffer de gravação em MB para uma instância do RocksDB individual. Usa como padrão o valor interno do RocksDB.
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber Número máximo de buffers de gravação para uma instância individual do RocksDB. Usa como padrão o valor interno do RocksDB.