# Como consumir API de CPF em Prefect para orquestração de workflows

> Aprenda a consumir a API de CPF da CPFHub.io em workflows Prefect com flows, tasks e concorrência assíncrona para pipelines de dados.

**Publicado:** 04/07/2026
**Autor:** Redação CPFHub.io
**URL:** https://cpfhub.io/blog/como-consumir-api-de-cpf-em-prefect-para-orquestracao-de-workflows

---


O **Prefect** é uma plataforma moderna de orquestração de workflows que se destaca pela simplicidade e pela abordagem "Pythonic" na definição de pipelines. Para consumir a API de CPF da CPFHub.io dentro de um flow Prefect, você define tasks assíncronas com retentativas automáticas e agrupa a lógica de consulta em um flow principal que controla concorrência e observabilidade. A API aceita uma requisição `GET https://api.cpfhub.io/cpf/{CPF}` com o header `x-api-key` e responde em ~900ms com os dados do titular.

---

## 1. Pré-requisitos

* **Python 3.10+** instalado.

* Pacotes necessários: `pip install prefect httpx pandas`.

* Uma conta gratuita na [**CPFHub.io**](https://www.cpfhub.io/)

---

## 2. Configure os blocos do Prefect

O Prefect utiliza o conceito de **Blocks** para armazenar configurações e credenciais de forma segura. Consulte a [documentação oficial do Prefect](https://docs.prefect.io/latest/concepts/blocks/) para detalhes sobre gerenciamento de secrets:

```python
# setup_blocks.py
from prefect.blocks.system import Secret, JSON

# Armazenar a chave de API como secret
secret_block = Secret(value="SUA_CHAVE_DE_API")
secret_block.save("cpfhub-api-key", overwrite=True)

# Armazenar configurações gerais
config_block = JSON(value={
 "base_url": "https://api.cpfhub.io",
 "timeout": 5,
 "max_retries": 3,
})
config_block.save("cpfhub-config", overwrite=True)

print("Blocos configurados com sucesso!")
```

Execute o script uma vez para registrar os blocos:

```bash
python setup_blocks.py
```

---

## 3. Crie as tasks de consulta

Defina tasks Prefect com retentativas e tratamento de erros:

```python
# tasks/cpf_tasks.py
import re
import httpx
from prefect import task, get_run_logger
from prefect.blocks.system import Secret, JSON

@task(
 name="consultar-cpf",
 retries=3,
 retry_delay_seconds=10,
 tags=["cpfhub", "api"],
)
async def consultar_cpf(cpf: str) -> dict:
 """
 Consulta dados de um CPF na API da CPFHub.io.
 Task com retentativas automáticas em caso de falha.
 """
 logger = get_run_logger()
 cpf_limpo = re.sub(r"\D", "", cpf)

 if len(cpf_limpo) != 11:
 logger.warning(f"CPF inválido: {cpf}")
 return {"cpf": cpf_limpo, "error": "CPF inválido", "success": False}

 # Carregar configurações dos blocos
 api_key = await Secret.load("cpfhub-api-key")
 config = await JSON.load("cpfhub-config")
 config_data = config.value

 url = f"{config_data['base_url']}/cpf/{cpf_limpo}"
 headers = {
 "x-api-key": api_key.get(),
 "Accept": "application/json",
 }

 logger.info(f"Consultando CPF: {cpf_limpo}")

 async with httpx.AsyncClient(timeout=config_data["timeout"]) as client:
 try:
 response = await client.get(url, headers=headers)
 except httpx.TimeoutException:
 logger.error(f"Timeout na consulta do CPF {cpf_limpo}")
 raise # Prefect fará retry automático
 except httpx.RequestError as e:
 logger.error(f"Erro de conexão: {str(e)}")
 raise

 if response.status_code == 200:
 data = response.json()
 if data.get("success"):
 logger.info(f"CPF {cpf_limpo} consultado com sucesso")
 return {**data["data"], "success": True, "error": None}

 error_map = {
 400: "Formato inválido",
 401: "API key inválida",
 404: "Não encontrado",
 }
 error_msg = error_map.get(response.status_code, f"HTTP {response.status_code}")
 logger.warning(f"Erro na consulta: {error_msg}")

 return {"cpf": cpf_limpo, "error": error_msg, "success": False}

@task(name="gerar-relatorio", tags=["relatorio"])
def gerar_relatorio(resultados: list[dict]) -> dict:
 """Gera relatório consolidado de validação."""
 logger = get_run_logger()

 total = len(resultados)
 sucesso = sum(1 for r in resultados if r.get("success"))
 falha = total - sucesso

 relatorio = {
 "total": total,
 "sucesso": sucesso,
 "falha": falha,
 "taxa_sucesso": f"{(sucesso/total*100):.1f}%" if total > 0 else "0%",
 }

 logger.info(f"Relatório: {total} CPFs | {sucesso} válidos | {falha} erros")
 return relatorio
```

---

## 4. Crie o flow principal

O flow orquestra as tasks e gerencia o fluxo de execução:

```python
# flows/cpf_validation_flow.py
import asyncio
from datetime import timedelta
from prefect import flow, get_run_logger
from tasks.cpf_tasks import consultar_cpf, gerar_relatorio

@flow(
 name="cpf-validation-pipeline",
 description="Pipeline de validação de CPF via API CPFHub.io",
 timeout_seconds=600,
 retries=1,
 retry_delay_seconds=60,
)
async def cpf_validation_pipeline(cpfs: list[str]) -> dict:
 """
 Flow principal de validação de CPF.
 Processa uma lista de CPFs com concorrência controlada.
 """
 logger = get_run_logger()
 logger.info(f"Iniciando validação de {len(cpfs)} CPFs")

 # Consultar CPFs com concorrência controlada
 resultados = []
 batch_size = 5 # Processar 5 por vez

 for i in range(0, len(cpfs), batch_size):
 batch = cpfs[i:i + batch_size]
 logger.info(f"Processando lote {i // batch_size + 1}")

 # Executar batch de forma assíncrona
 tasks = [consultar_cpf(cpf) for cpf in batch]
 batch_results = await asyncio.gather(*tasks)
 resultados.extend(batch_results)

 # Delay entre lotes
 if i + batch_size < len(cpfs):
 await asyncio.sleep(1)

 # Gerar relatório
 relatorio = gerar_relatorio(resultados)

 logger.info("Pipeline concluído com sucesso")
 return relatorio

if __name__ == "__main__":
 cpfs_teste = [
 "12345678900",
 "98765432100",
 "11122233344",
 ]
 resultado = asyncio.run(cpf_validation_pipeline(cpfs_teste))
 print(resultado)
```

---

## 5. Agende o flow com deployments

Configure o deployment para execução agendada:

```python
# deploy.py
from flows.cpf_validation_flow import cpf_validation_pipeline
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

deployment = Deployment.build_from_flow(
 flow=cpf_validation_pipeline,
 name="cpf-validation-semanal",
 schedule=CronSchedule(cron="0 8 * * 1"), # Segunda às 8h
 parameters={"cpfs": []}, # Parâmetros padrão
 tags=["cpfhub", "validacao"],
 description="Validação semanal de CPFs via API CPFHub.io",
)

if __name__ == "__main__":
 deployment.apply()
 print("Deployment criado com sucesso!")
```

```bash
python deploy.py
```

---

## 6. Adicione observabilidade com artifacts

Use Prefect Artifacts para registrar resultados persistentes:

```python
# tasks/cpf_tasks.py (adição)
from prefect.artifacts import create_table_artifact, create_markdown_artifact

@task(name="publicar-resultados")
async def publicar_resultados(resultados: list[dict], relatorio: dict):
 """Publica resultados como artifacts do Prefect."""
 # Tabela com resultados detalhados
 table_data = [
 {"CPF": r.get("cpf", "N/A"), "Nome": r.get("name", "N/A"), "Status": "OK" if r.get("success") else r.get("error", "Erro")}
 for r in resultados
 ]
 await create_table_artifact(
 key="cpf-validation-results",
 table=table_data,
 description="Resultados da validação de CPF",
 )

 # Resumo em markdown
 markdown = f"""
## Relatório de Validação de CPF

| Métrica | Valor |
|---------|-------|
| Total | {relatorio['total']} |
| Sucesso | {relatorio['sucesso']} |
| Falha | {relatorio['falha']} |
| Taxa | {relatorio['taxa_sucesso']} |
"""
 await create_markdown_artifact(
 key="cpf-validation-summary",
 markdown=markdown,
 description="Resumo da validação de CPF",
 )
```

---

## 7. Boas práticas

* **Blocos** -- Use Secret blocks para armazenar a chave de API. Nunca exponha credenciais diretamente no código dos flows.

* **Retentativas** -- Configure `retries` nas tasks para lidar com falhas temporárias. O Prefect faz retry automático com backoff.

* **Concorrência** -- Controle o número de consultas simultâneas com `batch_size`. Um valor de 5 é um bom ponto de partida para manter o fluxo estável.

* **Timeout** -- Configure timeout tanto na task (via Prefect) quanto na requisição HTTP (via httpx) para evitar execuções travadas.

* **Artifacts** -- Publique resultados como artifacts para manter histórico de execuções acessível no Prefect UI.

* **LGPD** -- A API da CPFHub.io é 100% compatível com a LGPD. Certifique-se de que os dados processados pelo workflow respeitem as políticas de privacidade.

---

## Perguntas frequentes

### O que é necessário para integrar a API de CPF em um flow Prefect?

Para integrar a API da CPFHub.io em um workflow Prefect, você precisa de uma chave de API armazenada em um Secret Block, do pacote `httpx` para requisições assíncronas e de uma task decorada com `@task` que faça a chamada `GET https://api.cpfhub.io/cpf/{CPF}`. O Prefect gerencia retentativas, observabilidade e agendamento automaticamente, sem código adicional.

### Como controlar a concorrência de consultas em pipelines de dados?

O padrão mais simples é processar CPFs em lotes com `asyncio.gather`, controlando o `batch_size` para limitar requisições simultâneas. A API da CPFHub.io responde em ~900ms, então lotes de 5 permitem processar um volume grande sem sobrecarregar a rede. O plano gratuito oferece 50 consultas/mês; ao ultrapassar, a API não bloqueia — cobra R$0,15 por consulta extra.

### Como o Prefect lida com falhas temporárias na consulta de CPF?

O parâmetro `retries=3` na task faz com que o Prefect reexecute automaticamente em caso de exceção, com o intervalo definido em `retry_delay_seconds`. Timeouts e erros de conexão devem ser relançados com `raise` para que o mecanismo de retry seja acionado. Erros de negócio (CPF inválido, chave incorreta) devem retornar um dicionário de erro sem lançar exceção.

### Como garantir conformidade com a LGPD ao processar CPFs em workflows automatizados?

Use o CPF apenas para a finalidade declarada ao titular, armazene apenas o necessário nos artifacts e implemente controle de acesso aos logs de execução do Prefect. A [ANPD](https://www.gov.br/anpd) orienta que dados de identificação devem ser tratados com o princípio da necessidade — documente a base legal para o tratamento em pipelines de dados.

---

### Leia também

- [Como validar CPF no frontend com React e API REST](https://cpfhub.io/blog/como-validar-cpf-no-frontend-com-react-e-api-rest)
- [Boas práticas para consumir APIs de CPF de forma segura](https://cpfhub.io/blog/boas-praticas-consumir-apis-cpf-segura)
- [Como consumir API de CPF em TypeScript com tipagem segura](https://cpfhub.io/blog/como-consumir-api-de-cpf-em-typescript-com-tipagem-segura)
- [Como implementar validação de CPF em microsserviços com Docker e Kubernetes](https://cpfhub.io/blog/como-implementar-validacao-cpf-microsservicos-docker-kubernetes)

---

## Conclusão

Consumir a API da [**CPFHub.io**](https://www.cpfhub.io/)

Cadastre-se em [cpfhub.io](https://www.cpfhub.io/)

