Dask, Dagster e Coiled para Processamento de Dados em Produção na OnlineApp

Neste artigo irei mostrar uma integração simples entre o Dagster e o Dask+Coiled. Discutiremos como isso tornou um problema comum, processar um grande conjunto de arquivos mensalmente, realmente uma tarefa muito fácil.

O Usuário e o Problema

Olá 👋, me chamo Lucas, sou o líder do time de ciência e engenharia de dados na OnlineApp, uma empresa B2B que atende o mercado brasileiro.

Todo mês, o governo brasileiro publica um grande conjunto de arquivos CSV com informações sobre empresas brasileiras, que usamos para entender melhor o nosso mercado. Temos muitos serviços internos que desejam ler esses dados, mas antes que isso seja possível, precisamos pré-processar um pouco eles:

  • Filtrar algumas linhas e colunas que não nos interessam; 
  • Limpar alguns valores em várias colunas; 
  • Realizar junções com outros conjuntos de dados internos que temos; 
  • Converter para Parquet e armazenar em nosso próprio armazenamento em cloud;

 Temos muitos outros serviços em nossa empresa que então dependem desses dados.

Arquitetura

Para processos regulares como esses, executados de forma agendada, geralmente utilizamos o Dagster. Gostamos de utilizar o Dagster porque é fácil de usar e parece natural (intuitivo). Ele é executado em uma VM na nuvem, o que nos proporciona acesso rápido a outros dados na nuvem (como esses arquivos CSV).

Normalmente, usaríamos o Pandas para esse tipo de processo, mas esse conjunto de arquivos é muito grande para caber na memória (cerca de ≅150 milhões de linhas com várias colunas), então mudamos para o Dask. Infelizmente, o conjunto de dados também é grande demais para a nossa VM que executa o Dagster, então decidimos utilizar o Coiled para solicitar um cluster de máquinas maiores, cada uma com uma quantidade razoável de memória.

Em princípio, queríamos fazer algo assim:

				
					import coiled
import dask.dataframe as dd
from dagster import op


@op
def run_operation(context):
    '''Processa todos os arquivos CSV e salva o resultado como Parquet.'''

    cluster = coiled.Cluster(
        region="sa-east-1",
        memory="64 GiB",
        n_workers=100,
        ... # outras opções de configuração que você precisa

    )

    client = cluster.get_client()

    df = dd.read_csv("...")

    ... # código personalizado específico para o nosso problema

    df_result.to_parquet("...")

				
			

Isso funcionaria, mas queríamos tornar isso mais natural integrando-o com o Dagster. E estamos animados para compartilhar o que fizemos.

Integração Dagster + Coiled

O Dagster possui o conceito de Recurso (Resource). Recursos geralmente modelam componentes externos com os quais Ativos (Assets) e Operações (Ops, abreviação de “Operações”, como no exemplo acima) interagem. Por exemplo, um recurso pode ser uma conexão com um data warehouse como o Snowflake ou um serviço como uma API REST.

Para tornar o recurso do Dask+Coiled mais reutilizável e também aproveitar toda a funcionalidade adicional que o Dagster oferece ao trabalhar com recursos (mais sobre isso posteriormente), transformamos nosso cluster Coiled em um recurso do Dagster. Isso é tão fácil de fazer quanto:

				
					from contextlib import contextmanager
from dagster import resource
import coiled

@resource(
    config_schema={
        "n_workers": int,
        "worker_memory": str
    }
)
@contextmanager
def dask_coiled_cluster_resource(context):
    '''Yields a Dask cluster client.'''

    with coiled.Cluster(
        name=f"dagster-dask-cluster-{context.run_id}",
        n_workers=context.resource_config["n_workers"],
        worker_memory=context.resource_config["worker_memory"],
        region="sa-east-1",  # use sua região preferida
        compute_purchase_option="spot_with_fallback", # economiza dinheiro com spot
    ) as cluster:
        with cluster.get_client() as client:

            context.log.info(
                f"Cluster criado, dashboard link: {client.dashboard_link}"
            )

            yield client
				
			

Para usar este recurso, nós o adicionamos à definição do nosso Job:

				
					from dagster import job

@job(
    resource_defs={
        "dask_cluster": dask_coiled_cluster_resource,
    },
    config={
        "resources": {
            "dask_cluster": {
                # define os valores padrão, mas você pode alterá-los
                # na UI do Dagster ou posteriormente no código
                "config": { 
                    "n_workers": 5,
                    "worker_memory": "64GiB"
                },
            },
        },
    }
)
def my_job():
    '''Minha definição de DAGs do Job.'''
    ...
    result = my_op()
    ...
				
			

E por fim, nas operações que necessitam de um cluster Dask para rodar computações, informamos essas dependências da seguinte forma:

				
					from dagster import op


@op(
    # especifique a chave de recurso que você deseja usar nesta operação
    required_resource_keys={"dask_cluster"}
)
def my_op(context):
    '''Run my computation with a Dask cluster.'''

    # cada computação dentro desta operação será executado com o cluster Dask
    ...
    
    # também podemos acessar e usar nosso cliente do cluster diretamente
    context.resources.dask_cluster("...")
				
			

E é isso, agora temos integração total entre Dagster, Dask e Coiled. De uma forma muito fácil de configurar e manter.

Como bônus, é assim que definimos um cluster Dask local para fins de desenvolvimento e teste:

				
					from dagster import resource
from contextlib import contextmanager
from dask.distributed import Client, LocalCluster


@resource
@contextmanager
def dask_local_cluster_resource(context):
    '''Yields a local Dask cluster client.'''

    with LocalCluster() as cluster:
        with cluster.get_client() as client:
            context.log.info(
                f"Cluster criado, dashboard link: {client.dashboard_link}"
            )
            yield client
				
			

Resultados

Agora temos em nossas operações no Dagster que recebem uma tag especificando que aquela operação utiliza o Dask+Coiled. Quando o Dagster os executa, ele inicia um cluster Dask temporário, realizam o processamento necessário e, em seguida, automaticamente descarta o cluster.

Esta imagem mostra como os recursos da operação ficam listados na UI do Dagster:

E este é o Launchpad do Dagster, aqui podemos alterar diversas configurações antes de executar nosso job, e uma delas, graças à definição do nosso cluster Coiled como um recurso, é escolher o tamanho do nosso cluster, de forma simples e parametrizada:

Agora podemos usar nossa infraestrutura existente para lidar com conjuntos de dados muito maiores do que os que usávamos anteriormente. Tudo foi muito fácil e barato.

Em conclusão

Neste artigo, examinamos um exemplo de como usar Dagster e Dask+Coiled para orquestrar um pipeline recorrente de processamento de dados. Com Dask e Coiled, conseguimos processar os dados em paralelo usando um cluster de VMs na nuvem. O Dagster é capaz de aproveitar esses recursos da nuvem, permitindo-nos processar conjuntos de dados muito maiores do que poderíamos processar de outra forma.

Espero que outras pessoas que desejam escalar seus jobs do Dagster para a nuvem considerem este artigo útil. Para obter mais informações sobre como começar a usar o Coiled, confira o guia de primeiros passos deles.

 – Este artigo é uma tradução, em português, do post original. –

Sobre o autor

5 4 Votos
Relevância do artigo
Se inscrever
Notificar de
0 Comentários
Inline Feedbacks
Ver todos
Rolar para cima