Como integrar validação de CPF em Apache Airflow para pipelines de dados

Aprenda a integrar validação de CPF em pipelines Apache Airflow usando a API da CPFHub.io com DAGs, operators e boas práticas.

Redação CPFHub.io
Redação CPFHub.io
··7 min de leitura
Como integrar validação de CPF em Apache Airflow para pipelines de dados

Para integrar validação de CPF em um pipeline Apache Airflow, crie um PythonOperator que chama GET https://api.cpfhub.io/cpf/{CPF} com o header x-api-key armazenado no sistema de Connections do Airflow. A arquitetura baseada em DAGs permite modelar a sequência extrair → consultar → gerar relatório de forma declarativa, com retentativas automáticas e monitoramento integrado. Em pipelines que processam dados cadastrais, a validação e o enriquecimento de CPFs garantem a qualidade dos dados antes que eles sigam para camadas downstream.


1. Pré-requisitos

  • Apache Airflow 2.7+ configurado e em execução.

  • Pacote requests disponível no ambiente do Airflow.

  • Uma conta gratuita na CPFHub.io


2. Configure a conexão no Airflow

Use o sistema de Connections do Airflow para armazenar a chave de API de forma segura. No Admin > Connections, crie uma nova conexão:

  • Connection Id: cpfhub_api
  • Connection Type: HTTP
  • Host: https://api.cpfhub.io
  • Extra: {"x-api-key": "SUA_CHAVE_DE_API", "timeout": 5}

Ou via CLI:

airflow connections add cpfhub_api \
    --conn-type http \
    --conn-host https://api.cpfhub.io \
    --conn-extra '{"x-api-key": "SUA_CHAVE_DE_API", "timeout": 5}'

3. Crie um operator customizado

Crie um operator reutilizável para consultar CPFs em qualquer DAG:

# plugins/operators/cpfhub_operator.py
import re
import requests
from airflow.models import BaseOperator
from airflow.hooks.base import BaseHook

class CpfHubOperator(BaseOperator):
    """
    Operator customizado para consultar CPF na API da CPFHub.io.
    """

    template_fields = ("cpf",)

    def __init__(self, cpf: str, conn_id: str = "cpfhub_api", **kwargs):
    super().__init__(**kwargs)
    self.cpf = cpf
    self.conn_id = conn_id

    def execute(self, context):
    connection = BaseHook.get_connection(self.conn_id)
    extra = connection.extra_dejson

    cpf_limpo = re.sub(r"\D", "", self.cpf)
    if len(cpf_limpo) != 11:
    raise ValueError(f"CPF inválido: {self.cpf}")

    url = f"{connection.host}/cpf/{cpf_limpo}"
    headers = {
    "x-api-key": extra.get("x-api-key"),
    "Accept": "application/json",
    }
    timeout = extra.get("timeout", 5)

    self.log.info(f"Consultando CPF: {cpf_limpo}")

    try:
    response = requests.get(url, headers=headers, timeout=timeout)
    except requests.exceptions.Timeout:
    raise RuntimeError("Timeout ao consultar API da CPFHub")
    except requests.exceptions.RequestException as e:
    raise RuntimeError(f"Erro de conexão: {str(e)}")

    if response.status_code == 200:
    data = response.json()
    if data.get("success"):
    self.log.info(f"CPF consultado com sucesso: {data['data']['name']}")
    return data["data"]

    error_map = {
    400: "CPF com formato inválido",
    401: "Chave de API inválida",
    404: "CPF não encontrado",
    }
    msg = error_map.get(response.status_code, f"HTTP {response.status_code}")
    raise RuntimeError(f"Erro na consulta: {msg}")

4. Crie a DAG de validação

Crie uma DAG que processa uma lista de CPFs, válida cada um e gera um relatório:

# dags/cpf_validation_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.base import BaseHook
import requests
import re
import json
import time

default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "cpf_validation_pipeline",
    default_args=default_args,
    description="Pipeline de validação de CPF via CPFHub.io",
    schedule_interval="0 8 * * 1", # Toda segunda às 8h
    start_date=datetime(2026, 7, 1),
    catchup=False,
    tags=["cpf", "validacao", "cpfhub"],
)

def extrair_cpfs(**context):
    """Extrai lista de CPFs a serem validados."""
    # Em produção, leia de um banco de dados ou arquivo
    cpfs = ["12345678900", "98765432100", "11122233344"]
    context["ti"].xcom_push(key="cpfs", value=cpfs)
    return len(cpfs)

def consultar_cpfs(**context):
    """Consulta cada CPF na API da CPFHub.io."""
    cpfs = context["ti"].xcom_pull(key="cpfs", task_ids="extrair_cpfs")
    connection = BaseHook.get_connection("cpfhub_api")
    extra = connection.extra_dejson

    headers = {
    "x-api-key": extra.get("x-api-key"),
    "Accept": "application/json",
    }
    timeout = extra.get("timeout", 5)
    resultados = []

    for cpf in cpfs:
    cpf_limpo = re.sub(r"\D", "", cpf)
    url = f"{connection.host}/cpf/{cpf_limpo}"

    try:
    response = requests.get(url, headers=headers, timeout=timeout)
    if response.status_code == 200:
    data = response.json()
    if data.get("success"):
    resultados.append({
    "cpf": cpf_limpo,
    "nome": data["data"]["name"],
    "genero": data["data"]["gender"],
    "nascimento": data["data"]["birthDate"],
    "status": "valido",
    })
    continue
    resultados.append({"cpf": cpf_limpo, "status": f"erro_http_{response.status_code}"})
    except Exception as e:
    resultados.append({"cpf": cpf_limpo, "status": f"erro: {str(e)}"})

    time.sleep(0.5) # Delay entre requisições

    context["ti"].xcom_push(key="resultados", value=resultados)
    return len(resultados)

def gerar_relatorio(**context):
    """Gera relatório de validação."""
    resultados = context["ti"].xcom_pull(key="resultados", task_ids="consultar_cpfs")

    total = len(resultados)
    validos = sum(1 for r in resultados if r["status"] == "valido")
    invalidos = total - validos

    relatorio = {
    "data_execucao": datetime.now().isoformat(),
    "total_cpfs": total,
    "validos": validos,
    "invalidos": invalidos,
    "taxa_sucesso": f"{(validos/total*100):.1f}%" if total > 0 else "0%",
    "detalhes": resultados,
    }

    print(f"Relatório de Validação de CPF")
    print(f"Total: {total} | Válidos: {validos} | Inválidos: {invalidos}")
    print(f"Taxa de sucesso: {relatorio['taxa_sucesso']}")

    context["ti"].xcom_push(key="relatorio", value=relatorio)
    return relatorio

# Definição das tasks
t_extrair = PythonOperator(
    task_id="extrair_cpfs",
    python_callable=extrair_cpfs,
    dag=dag,
)

t_consultar = PythonOperator(
    task_id="consultar_cpfs",
    python_callable=consultar_cpfs,
    dag=dag,
)

t_relatorio = PythonOperator(
    task_id="gerar_relatorio",
    python_callable=gerar_relatorio,
    dag=dag,
)

# Dependências
t_extrair >> t_consultar >> t_relatorio

5. Adicione sensores e alertas

Use sensores para verificar a disponibilidade da API antes de iniciar o pipeline:

# Sensor de disponibilidade (adicionar na DAG)
from airflow.sensors.python import PythonSensor

def verificar_api_disponivel(**context):
    """Verifica se a API da CPFHub está acessível."""
    connection = BaseHook.get_connection("cpfhub_api")
    try:
    response = requests.get(
    f"{connection.host}/health",
    timeout=5,
    )
    return response.status_code == 200
    except Exception:
    return False

t_sensor = PythonSensor(
    task_id="verificar_api",
    python_callable=verificar_api_disponivel,
    poke_interval=60,
    timeout=300,
    mode="poke",
    dag=dag,
)

t_sensor >> t_extrair >> t_consultar >> t_relatorio

6. Processamento em lote com controle de taxa

Para volumes maiores, implemente processamento em lote com controle de taxa:

def consultar_cpfs_batch(batch_size: int = 10, delay: float = 1.0, **context):
    """Consulta CPFs em lotes com controle de taxa."""
    cpfs = context["ti"].xcom_pull(key="cpfs", task_ids="extrair_cpfs")
    connection = BaseHook.get_connection("cpfhub_api")
    extra = connection.extra_dejson

    headers = {
    "x-api-key": extra.get("x-api-key"),
    "Accept": "application/json",
    }
    timeout = extra.get("timeout", 5)
    resultados = []

    for i in range(0, len(cpfs), batch_size):
    batch = cpfs[i:i + batch_size]
    print(f"Processando lote {i // batch_size + 1}: {len(batch)} CPFs")

    for cpf in batch:
    cpf_limpo = re.sub(r"\D", "", cpf)
    url = f"{connection.host}/cpf/{cpf_limpo}"

    try:
    resp = requests.get(url, headers=headers, timeout=timeout)
    if resp.status_code == 200:
    data = resp.json()
    resultados.append({**data.get("data", {}), "status": "ok"})
    else:
    resultados.append({"cpf": cpf_limpo, "status": f"http_{resp.status_code}"})
    except Exception as e:
    resultados.append({"cpf": cpf_limpo, "status": f"erro: {str(e)}"})

    time.sleep(delay)

    context["ti"].xcom_push(key="resultados", value=resultados)

7. Boas práticas

  • Connections -- Use o sistema de Connections do Airflow para armazenar credenciais. Nunca hardcode chaves de API em DAGs.

  • Retries -- Configure retries e retry_delay para lidar com falhas temporárias da rede.

  • Controle de volume -- Adicione delays entre consultas. O plano gratuito oferece 50 consultas/mês; ao ultrapassar, a API cobra R$0,15 por consulta extra — ela não bloqueia. O plano Pro inclui 1.000 consultas por R$149/mês.

  • XComs -- Use XComs para passar dados entre tasks, mas evite volumes muito grandes. Para datasets extensos, use armazenamento intermediário como S3 ou GCS.

  • Timeout -- Configure timeout de 5 segundos nas requisições HTTP, alinhado com o tempo de resposta de ~900ms da API.

  • LGPD -- A API da CPFHub.io opera em conformidade com a LGPD. Garanta que os dados processados no Airflow também respeitem as políticas de privacidade.


Perguntas frequentes

Como armazenar a chave de API da CPFHub.io com segurança no Airflow?

Use o sistema de Connections do Airflow (Admin > Connections) para armazenar a chave no campo Extra como JSON: {"x-api-key": "SUA_CHAVE"}. Acesse via BaseHook.get_connection("cpfhub_api").extra_dejson. Nunca inclua chaves diretamente em arquivos de DAG que são versionados no Git — isso expõe credenciais no histórico do repositório.

A API CPFHub.io retorna erro 429 quando o limite de consultas é atingido?

Não. A API não bloqueia nem retorna 429 ao atingir o limite do plano. Quando o limite mensal é excedido, ela continua respondendo normalmente e cobra R$0,15 por consulta adicional. No Airflow, monitore o consumo via painel do CPFHub.io e configure alertas de custo para evitar cobranças inesperadas em pipelines de alto volume.

Qual é a latência da API CPFHub.io em contextos de pipeline Airflow?

A latência média é de ~900ms por consulta. Para um pipeline com 100 CPFs e delay de 0,5s entre requisições, o tempo total da task consultar_cpfs será de aproximadamente 2,5 minutos. Considere esse tempo ao configurar o timeout da task e o sla da DAG, e use processamento em lote para distribuir consultas ao longo de múltiplas execuções quando o volume for grande.

Como lidar com falhas de rede em tasks do Airflow que consultam a API de CPF?

Configure retries=2 e retry_delay=timedelta(minutes=5) no default_args da DAG para que o Airflow repita automaticamente a task em caso de timeout ou erro de conexão. Trate exceções específicas no código Python — requests.exceptions.Timeout e RequestException — e registre os CPFs com falha no XCom para reprocessamento posterior. Consulte as boas práticas de segurança da OWASP para orientações sobre tratamento seguro de erros em integrações de API.


Conclusão

Integrar a API da CPFHub.io

Cadastre-se em cpfhub.io

CPFHub.io

Pronto para integrar a API?

50 consultas gratuitas para testar agora. Sem cartão de crédito. Acesso imediato à documentação.

Redação CPFHub.io

Sobre a redação

Redação CPFHub.io

Time editorial especializado em APIs de CPF, identidade digital e compliance no mercado brasileiro. Produzimos guias técnicos, análises regulatórias e tutoriais sobre LGPD e KYC para desenvolvedores e líderes de produto.

WhatsAppFale conosco via WhatsApp