Para integrar validação de CPF em um pipeline Apache Airflow, crie um PythonOperator que chama GET https://api.cpfhub.io/cpf/{CPF} com o header x-api-key armazenado no sistema de Connections do Airflow. A arquitetura baseada em DAGs permite modelar a sequência extrair → consultar → gerar relatório de forma declarativa, com retentativas automáticas e monitoramento integrado. Em pipelines que processam dados cadastrais, a validação e o enriquecimento de CPFs garantem a qualidade dos dados antes que eles sigam para camadas downstream.
1. Pré-requisitos
-
Apache Airflow 2.7+ configurado e em execução.
-
Pacote
requestsdisponível no ambiente do Airflow. -
Uma conta gratuita na CPFHub.io
2. Configure a conexão no Airflow
Use o sistema de Connections do Airflow para armazenar a chave de API de forma segura. No Admin > Connections, crie uma nova conexão:
- Connection Id:
cpfhub_api - Connection Type:
HTTP - Host:
https://api.cpfhub.io - Extra:
{"x-api-key": "SUA_CHAVE_DE_API", "timeout": 5}
Ou via CLI:
airflow connections add cpfhub_api \
--conn-type http \
--conn-host https://api.cpfhub.io \
--conn-extra '{"x-api-key": "SUA_CHAVE_DE_API", "timeout": 5}'
3. Crie um operator customizado
Crie um operator reutilizável para consultar CPFs em qualquer DAG:
# plugins/operators/cpfhub_operator.py
import re
import requests
from airflow.models import BaseOperator
from airflow.hooks.base import BaseHook
class CpfHubOperator(BaseOperator):
"""
Operator customizado para consultar CPF na API da CPFHub.io.
"""
template_fields = ("cpf",)
def __init__(self, cpf: str, conn_id: str = "cpfhub_api", **kwargs):
super().__init__(**kwargs)
self.cpf = cpf
self.conn_id = conn_id
def execute(self, context):
connection = BaseHook.get_connection(self.conn_id)
extra = connection.extra_dejson
cpf_limpo = re.sub(r"\D", "", self.cpf)
if len(cpf_limpo) != 11:
raise ValueError(f"CPF inválido: {self.cpf}")
url = f"{connection.host}/cpf/{cpf_limpo}"
headers = {
"x-api-key": extra.get("x-api-key"),
"Accept": "application/json",
}
timeout = extra.get("timeout", 5)
self.log.info(f"Consultando CPF: {cpf_limpo}")
try:
response = requests.get(url, headers=headers, timeout=timeout)
except requests.exceptions.Timeout:
raise RuntimeError("Timeout ao consultar API da CPFHub")
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Erro de conexão: {str(e)}")
if response.status_code == 200:
data = response.json()
if data.get("success"):
self.log.info(f"CPF consultado com sucesso: {data['data']['name']}")
return data["data"]
error_map = {
400: "CPF com formato inválido",
401: "Chave de API inválida",
404: "CPF não encontrado",
}
msg = error_map.get(response.status_code, f"HTTP {response.status_code}")
raise RuntimeError(f"Erro na consulta: {msg}")
4. Crie a DAG de validação
Crie uma DAG que processa uma lista de CPFs, válida cada um e gera um relatório:
# dags/cpf_validation_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.base import BaseHook
import requests
import re
import json
import time
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email_on_failure": True,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"cpf_validation_pipeline",
default_args=default_args,
description="Pipeline de validação de CPF via CPFHub.io",
schedule_interval="0 8 * * 1", # Toda segunda às 8h
start_date=datetime(2026, 7, 1),
catchup=False,
tags=["cpf", "validacao", "cpfhub"],
)
def extrair_cpfs(**context):
"""Extrai lista de CPFs a serem validados."""
# Em produção, leia de um banco de dados ou arquivo
cpfs = ["12345678900", "98765432100", "11122233344"]
context["ti"].xcom_push(key="cpfs", value=cpfs)
return len(cpfs)
def consultar_cpfs(**context):
"""Consulta cada CPF na API da CPFHub.io."""
cpfs = context["ti"].xcom_pull(key="cpfs", task_ids="extrair_cpfs")
connection = BaseHook.get_connection("cpfhub_api")
extra = connection.extra_dejson
headers = {
"x-api-key": extra.get("x-api-key"),
"Accept": "application/json",
}
timeout = extra.get("timeout", 5)
resultados = []
for cpf in cpfs:
cpf_limpo = re.sub(r"\D", "", cpf)
url = f"{connection.host}/cpf/{cpf_limpo}"
try:
response = requests.get(url, headers=headers, timeout=timeout)
if response.status_code == 200:
data = response.json()
if data.get("success"):
resultados.append({
"cpf": cpf_limpo,
"nome": data["data"]["name"],
"genero": data["data"]["gender"],
"nascimento": data["data"]["birthDate"],
"status": "valido",
})
continue
resultados.append({"cpf": cpf_limpo, "status": f"erro_http_{response.status_code}"})
except Exception as e:
resultados.append({"cpf": cpf_limpo, "status": f"erro: {str(e)}"})
time.sleep(0.5) # Delay entre requisições
context["ti"].xcom_push(key="resultados", value=resultados)
return len(resultados)
def gerar_relatorio(**context):
"""Gera relatório de validação."""
resultados = context["ti"].xcom_pull(key="resultados", task_ids="consultar_cpfs")
total = len(resultados)
validos = sum(1 for r in resultados if r["status"] == "valido")
invalidos = total - validos
relatorio = {
"data_execucao": datetime.now().isoformat(),
"total_cpfs": total,
"validos": validos,
"invalidos": invalidos,
"taxa_sucesso": f"{(validos/total*100):.1f}%" if total > 0 else "0%",
"detalhes": resultados,
}
print(f"Relatório de Validação de CPF")
print(f"Total: {total} | Válidos: {validos} | Inválidos: {invalidos}")
print(f"Taxa de sucesso: {relatorio['taxa_sucesso']}")
context["ti"].xcom_push(key="relatorio", value=relatorio)
return relatorio
# Definição das tasks
t_extrair = PythonOperator(
task_id="extrair_cpfs",
python_callable=extrair_cpfs,
dag=dag,
)
t_consultar = PythonOperator(
task_id="consultar_cpfs",
python_callable=consultar_cpfs,
dag=dag,
)
t_relatorio = PythonOperator(
task_id="gerar_relatorio",
python_callable=gerar_relatorio,
dag=dag,
)
# Dependências
t_extrair >> t_consultar >> t_relatorio
5. Adicione sensores e alertas
Use sensores para verificar a disponibilidade da API antes de iniciar o pipeline:
# Sensor de disponibilidade (adicionar na DAG)
from airflow.sensors.python import PythonSensor
def verificar_api_disponivel(**context):
"""Verifica se a API da CPFHub está acessível."""
connection = BaseHook.get_connection("cpfhub_api")
try:
response = requests.get(
f"{connection.host}/health",
timeout=5,
)
return response.status_code == 200
except Exception:
return False
t_sensor = PythonSensor(
task_id="verificar_api",
python_callable=verificar_api_disponivel,
poke_interval=60,
timeout=300,
mode="poke",
dag=dag,
)
t_sensor >> t_extrair >> t_consultar >> t_relatorio
6. Processamento em lote com controle de taxa
Para volumes maiores, implemente processamento em lote com controle de taxa:
def consultar_cpfs_batch(batch_size: int = 10, delay: float = 1.0, **context):
"""Consulta CPFs em lotes com controle de taxa."""
cpfs = context["ti"].xcom_pull(key="cpfs", task_ids="extrair_cpfs")
connection = BaseHook.get_connection("cpfhub_api")
extra = connection.extra_dejson
headers = {
"x-api-key": extra.get("x-api-key"),
"Accept": "application/json",
}
timeout = extra.get("timeout", 5)
resultados = []
for i in range(0, len(cpfs), batch_size):
batch = cpfs[i:i + batch_size]
print(f"Processando lote {i // batch_size + 1}: {len(batch)} CPFs")
for cpf in batch:
cpf_limpo = re.sub(r"\D", "", cpf)
url = f"{connection.host}/cpf/{cpf_limpo}"
try:
resp = requests.get(url, headers=headers, timeout=timeout)
if resp.status_code == 200:
data = resp.json()
resultados.append({**data.get("data", {}), "status": "ok"})
else:
resultados.append({"cpf": cpf_limpo, "status": f"http_{resp.status_code}"})
except Exception as e:
resultados.append({"cpf": cpf_limpo, "status": f"erro: {str(e)}"})
time.sleep(delay)
context["ti"].xcom_push(key="resultados", value=resultados)
7. Boas práticas
-
Connections -- Use o sistema de Connections do Airflow para armazenar credenciais. Nunca hardcode chaves de API em DAGs.
-
Retries -- Configure
retrieseretry_delaypara lidar com falhas temporárias da rede. -
Controle de volume -- Adicione delays entre consultas. O plano gratuito oferece 50 consultas/mês; ao ultrapassar, a API cobra R$0,15 por consulta extra — ela não bloqueia. O plano Pro inclui 1.000 consultas por R$149/mês.
-
XComs -- Use XComs para passar dados entre tasks, mas evite volumes muito grandes. Para datasets extensos, use armazenamento intermediário como S3 ou GCS.
-
Timeout -- Configure timeout de 5 segundos nas requisições HTTP, alinhado com o tempo de resposta de ~900ms da API.
-
LGPD -- A API da CPFHub.io opera em conformidade com a LGPD. Garanta que os dados processados no Airflow também respeitem as políticas de privacidade.
Perguntas frequentes
Como armazenar a chave de API da CPFHub.io com segurança no Airflow?
Use o sistema de Connections do Airflow (Admin > Connections) para armazenar a chave no campo Extra como JSON: {"x-api-key": "SUA_CHAVE"}. Acesse via BaseHook.get_connection("cpfhub_api").extra_dejson. Nunca inclua chaves diretamente em arquivos de DAG que são versionados no Git — isso expõe credenciais no histórico do repositório.
A API CPFHub.io retorna erro 429 quando o limite de consultas é atingido?
Não. A API não bloqueia nem retorna 429 ao atingir o limite do plano. Quando o limite mensal é excedido, ela continua respondendo normalmente e cobra R$0,15 por consulta adicional. No Airflow, monitore o consumo via painel do CPFHub.io e configure alertas de custo para evitar cobranças inesperadas em pipelines de alto volume.
Qual é a latência da API CPFHub.io em contextos de pipeline Airflow?
A latência média é de ~900ms por consulta. Para um pipeline com 100 CPFs e delay de 0,5s entre requisições, o tempo total da task consultar_cpfs será de aproximadamente 2,5 minutos. Considere esse tempo ao configurar o timeout da task e o sla da DAG, e use processamento em lote para distribuir consultas ao longo de múltiplas execuções quando o volume for grande.
Como lidar com falhas de rede em tasks do Airflow que consultam a API de CPF?
Configure retries=2 e retry_delay=timedelta(minutes=5) no default_args da DAG para que o Airflow repita automaticamente a task em caso de timeout ou erro de conexão. Trate exceções específicas no código Python — requests.exceptions.Timeout e RequestException — e registre os CPFs com falha no XCom para reprocessamento posterior. Consulte as boas práticas de segurança da OWASP para orientações sobre tratamento seguro de erros em integrações de API.
Conclusão
Integrar a API da CPFHub.io
Cadastre-se em cpfhub.io
CPFHub.io
Pronto para integrar a API?
50 consultas gratuitas para testar agora. Sem cartão de crédito. Acesso imediato à documentação.
Sobre a redação
Redação CPFHub.io
Time editorial especializado em APIs de CPF, identidade digital e compliance no mercado brasileiro. Produzimos guias técnicos, análises regulatórias e tutoriais sobre LGPD e KYC para desenvolvedores e líderes de produto.



