# Como integrar validação de CPF em Apache Airflow para pipelines de dados

> Aprenda a integrar validação de CPF em pipelines Apache Airflow usando a API da CPFHub.io com DAGs, operators e boas práticas.

**Publicado:** 01/07/2026
**Autor:** Redação CPFHub.io
**URL:** https://cpfhub.io/blog/como-integrar-validacao-de-cpf-em-apache-airflow-para-pipelines-de-dados

---


Para integrar validação de CPF em um pipeline [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/), crie um `PythonOperator` que chama `GET https://api.cpfhub.io/cpf/{CPF}` com o header `x-api-key` armazenado no sistema de Connections do Airflow. A arquitetura baseada em DAGs permite modelar a sequência extrair → consultar → gerar relatório de forma declarativa, com retentativas automáticas e monitoramento integrado. Em pipelines que processam dados cadastrais, a validação e o enriquecimento de CPFs garantem a qualidade dos dados antes que eles sigam para camadas downstream.

---

## 1. Pré-requisitos

* **Apache Airflow 2.7+** configurado e em execução.

* Pacote `requests` disponível no ambiente do Airflow.

* Uma conta gratuita na [**CPFHub.io**](https://www.cpfhub.io/)

---

## 2. Configure a conexão no Airflow

Use o sistema de Connections do Airflow para armazenar a chave de API de forma segura. No Admin > Connections, crie uma nova conexão:

* **Connection Id:** `cpfhub_api`
* **Connection Type:** `HTTP`
* **Host:** `https://api.cpfhub.io`
* **Extra:** `{"x-api-key": "SUA_CHAVE_DE_API", "timeout": 5}`

Ou via CLI:

```bash
airflow connections add cpfhub_api \
 --conn-type http \
 --conn-host https://api.cpfhub.io \
 --conn-extra '{"x-api-key": "SUA_CHAVE_DE_API", "timeout": 5}'
```

---

## 3. Crie um operator customizado

Crie um operator reutilizável para consultar CPFs em qualquer DAG:

```python
# plugins/operators/cpfhub_operator.py
import re
import requests
from airflow.models import BaseOperator
from airflow.hooks.base import BaseHook

class CpfHubOperator(BaseOperator):
 """
 Operator customizado para consultar CPF na API da CPFHub.io.
 """

 template_fields = ("cpf",)

 def __init__(self, cpf: str, conn_id: str = "cpfhub_api", **kwargs):
 super().__init__(**kwargs)
 self.cpf = cpf
 self.conn_id = conn_id

 def execute(self, context):
 connection = BaseHook.get_connection(self.conn_id)
 extra = connection.extra_dejson

 cpf_limpo = re.sub(r"\D", "", self.cpf)
 if len(cpf_limpo) != 11:
 raise ValueError(f"CPF inválido: {self.cpf}")

 url = f"{connection.host}/cpf/{cpf_limpo}"
 headers = {
 "x-api-key": extra.get("x-api-key"),
 "Accept": "application/json",
 }
 timeout = extra.get("timeout", 5)

 self.log.info(f"Consultando CPF: {cpf_limpo}")

 try:
 response = requests.get(url, headers=headers, timeout=timeout)
 except requests.exceptions.Timeout:
 raise RuntimeError("Timeout ao consultar API da CPFHub")
 except requests.exceptions.RequestException as e:
 raise RuntimeError(f"Erro de conexão: {str(e)}")

 if response.status_code == 200:
 data = response.json()
 if data.get("success"):
 self.log.info(f"CPF consultado com sucesso: {data['data']['name']}")
 return data["data"]

 error_map = {
 400: "CPF com formato inválido",
 401: "Chave de API inválida",
 404: "CPF não encontrado",
 }
 msg = error_map.get(response.status_code, f"HTTP {response.status_code}")
 raise RuntimeError(f"Erro na consulta: {msg}")
```

---

## 4. Crie a DAG de validação

Crie uma DAG que processa uma lista de CPFs, válida cada um e gera um relatório:

```python
# dags/cpf_validation_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.base import BaseHook
import requests
import re
import json
import time

default_args = {
 "owner": "data-team",
 "depends_on_past": False,
 "email_on_failure": True,
 "retries": 2,
 "retry_delay": timedelta(minutes=5),
}

dag = DAG(
 "cpf_validation_pipeline",
 default_args=default_args,
 description="Pipeline de validação de CPF via CPFHub.io",
 schedule_interval="0 8 * * 1", # Toda segunda às 8h
 start_date=datetime(2026, 7, 1),
 catchup=False,
 tags=["cpf", "validacao", "cpfhub"],
)

def extrair_cpfs(**context):
 """Extrai lista de CPFs a serem validados."""
 # Em produção, leia de um banco de dados ou arquivo
 cpfs = ["12345678900", "98765432100", "11122233344"]
 context["ti"].xcom_push(key="cpfs", value=cpfs)
 return len(cpfs)

def consultar_cpfs(**context):
 """Consulta cada CPF na API da CPFHub.io."""
 cpfs = context["ti"].xcom_pull(key="cpfs", task_ids="extrair_cpfs")
 connection = BaseHook.get_connection("cpfhub_api")
 extra = connection.extra_dejson

 headers = {
 "x-api-key": extra.get("x-api-key"),
 "Accept": "application/json",
 }
 timeout = extra.get("timeout", 5)
 resultados = []

 for cpf in cpfs:
 cpf_limpo = re.sub(r"\D", "", cpf)
 url = f"{connection.host}/cpf/{cpf_limpo}"

 try:
 response = requests.get(url, headers=headers, timeout=timeout)
 if response.status_code == 200:
 data = response.json()
 if data.get("success"):
 resultados.append({
 "cpf": cpf_limpo,
 "nome": data["data"]["name"],
 "genero": data["data"]["gender"],
 "nascimento": data["data"]["birthDate"],
 "status": "valido",
 })
 continue
 resultados.append({"cpf": cpf_limpo, "status": f"erro_http_{response.status_code}"})
 except Exception as e:
 resultados.append({"cpf": cpf_limpo, "status": f"erro: {str(e)}"})

 time.sleep(0.5) # Delay entre requisições

 context["ti"].xcom_push(key="resultados", value=resultados)
 return len(resultados)

def gerar_relatorio(**context):
 """Gera relatório de validação."""
 resultados = context["ti"].xcom_pull(key="resultados", task_ids="consultar_cpfs")

 total = len(resultados)
 validos = sum(1 for r in resultados if r["status"] == "valido")
 invalidos = total - validos

 relatorio = {
 "data_execucao": datetime.now().isoformat(),
 "total_cpfs": total,
 "validos": validos,
 "invalidos": invalidos,
 "taxa_sucesso": f"{(validos/total*100):.1f}%" if total > 0 else "0%",
 "detalhes": resultados,
 }

 print(f"Relatório de Validação de CPF")
 print(f"Total: {total} | Válidos: {validos} | Inválidos: {invalidos}")
 print(f"Taxa de sucesso: {relatorio['taxa_sucesso']}")

 context["ti"].xcom_push(key="relatorio", value=relatorio)
 return relatorio

# Definição das tasks
t_extrair = PythonOperator(
 task_id="extrair_cpfs",
 python_callable=extrair_cpfs,
 dag=dag,
)

t_consultar = PythonOperator(
 task_id="consultar_cpfs",
 python_callable=consultar_cpfs,
 dag=dag,
)

t_relatorio = PythonOperator(
 task_id="gerar_relatorio",
 python_callable=gerar_relatorio,
 dag=dag,
)

# Dependências
t_extrair >> t_consultar >> t_relatorio
```

---

## 5. Adicione sensores e alertas

Use sensores para verificar a disponibilidade da API antes de iniciar o pipeline:

```python
# Sensor de disponibilidade (adicionar na DAG)
from airflow.sensors.python import PythonSensor

def verificar_api_disponivel(**context):
 """Verifica se a API da CPFHub está acessível."""
 connection = BaseHook.get_connection("cpfhub_api")
 try:
 response = requests.get(
 f"{connection.host}/health",
 timeout=5,
 )
 return response.status_code == 200
 except Exception:
 return False

t_sensor = PythonSensor(
 task_id="verificar_api",
 python_callable=verificar_api_disponivel,
 poke_interval=60,
 timeout=300,
 mode="poke",
 dag=dag,
)

t_sensor >> t_extrair >> t_consultar >> t_relatorio
```

---

## 6. Processamento em lote com controle de taxa

Para volumes maiores, implemente processamento em lote com controle de taxa:

```python
def consultar_cpfs_batch(batch_size: int = 10, delay: float = 1.0, **context):
 """Consulta CPFs em lotes com controle de taxa."""
 cpfs = context["ti"].xcom_pull(key="cpfs", task_ids="extrair_cpfs")
 connection = BaseHook.get_connection("cpfhub_api")
 extra = connection.extra_dejson

 headers = {
 "x-api-key": extra.get("x-api-key"),
 "Accept": "application/json",
 }
 timeout = extra.get("timeout", 5)
 resultados = []

 for i in range(0, len(cpfs), batch_size):
 batch = cpfs[i:i + batch_size]
 print(f"Processando lote {i // batch_size + 1}: {len(batch)} CPFs")

 for cpf in batch:
 cpf_limpo = re.sub(r"\D", "", cpf)
 url = f"{connection.host}/cpf/{cpf_limpo}"

 try:
 resp = requests.get(url, headers=headers, timeout=timeout)
 if resp.status_code == 200:
 data = resp.json()
 resultados.append({**data.get("data", {}), "status": "ok"})
 else:
 resultados.append({"cpf": cpf_limpo, "status": f"http_{resp.status_code}"})
 except Exception as e:
 resultados.append({"cpf": cpf_limpo, "status": f"erro: {str(e)}"})

 time.sleep(delay)

 context["ti"].xcom_push(key="resultados", value=resultados)
```

---

## 7. Boas práticas

* **Connections** -- Use o sistema de Connections do Airflow para armazenar credenciais. Nunca hardcode chaves de API em DAGs.

* **Retries** -- Configure `retries` e `retry_delay` para lidar com falhas temporárias da rede.

* **Controle de volume** -- Adicione delays entre consultas. O plano gratuito oferece 50 consultas/mês; ao ultrapassar, a API cobra R$0,15 por consulta extra — ela não bloqueia. O plano Pro inclui 1.000 consultas por R$149/mês.

* **XComs** -- Use XComs para passar dados entre tasks, mas evite volumes muito grandes. Para datasets extensos, use armazenamento intermediário como S3 ou GCS.

* **Timeout** -- Configure timeout de 5 segundos nas requisições HTTP, alinhado com o tempo de resposta de ~900ms da API.

* **LGPD** -- A API da CPFHub.io opera em conformidade com a LGPD. Garanta que os dados processados no Airflow também respeitem as políticas de privacidade.

---

## Perguntas frequentes

### Como armazenar a chave de API da CPFHub.io com segurança no Airflow?

Use o sistema de Connections do Airflow (Admin > Connections) para armazenar a chave no campo Extra como JSON: `{"x-api-key": "SUA_CHAVE"}`. Acesse via `BaseHook.get_connection("cpfhub_api").extra_dejson`. Nunca inclua chaves diretamente em arquivos de DAG que são versionados no Git — isso expõe credenciais no histórico do repositório.

### A API CPFHub.io retorna erro 429 quando o limite de consultas é atingido?

Não. A API não bloqueia nem retorna 429 ao atingir o limite do plano. Quando o limite mensal é excedido, ela continua respondendo normalmente e cobra R$0,15 por consulta adicional. No Airflow, monitore o consumo via painel do CPFHub.io e configure alertas de custo para evitar cobranças inesperadas em pipelines de alto volume.

### Qual é a latência da API CPFHub.io em contextos de pipeline Airflow?

A latência média é de ~900ms por consulta. Para um pipeline com 100 CPFs e delay de 0,5s entre requisições, o tempo total da task `consultar_cpfs` será de aproximadamente 2,5 minutos. Considere esse tempo ao configurar o `timeout` da task e o `sla` da DAG, e use processamento em lote para distribuir consultas ao longo de múltiplas execuções quando o volume for grande.

### Como lidar com falhas de rede em tasks do Airflow que consultam a API de CPF?

Configure `retries=2` e `retry_delay=timedelta(minutes=5)` no `default_args` da DAG para que o Airflow repita automaticamente a task em caso de timeout ou erro de conexão. Trate exceções específicas no código Python — `requests.exceptions.Timeout` e `RequestException` — e registre os CPFs com falha no XCom para reprocessamento posterior. Consulte as [boas práticas de segurança da OWASP](https://owasp.org/www-project-api-security/) para orientações sobre tratamento seguro de erros em integrações de API.

### Leia também

- [Como validar CPF no frontend com React e API REST](https://cpfhub.io/blog/como-validar-cpf-no-frontend-com-react-e-api-rest)
- [Como consumir API de CPF em Prefect para orquestração de workflows](https://cpfhub.io/blog/como-consumir-api-de-cpf-em-prefect-para-orquestracao-de-workflows)
- [Como consumir API de CPF em Jupyter Notebook para análise exploratória](https://cpfhub.io/blog/como-consumir-api-de-cpf-em-jupyter-notebook-para-analise-exploratoria)
- [Como armazenar respostas da API de CPF em PostgreSQL com Python](https://cpfhub.io/blog/como-armazenar-respostas-da-api-de-cpf-em-postgresql-com-python)

---

## Conclusão

Integrar a API da [**CPFHub.io**](https://www.cpfhub.io/)

Cadastre-se em [cpfhub.io](https://www.cpfhub.io/)

