# Como processar consultas de CPF em paralelo usando GenServer e Task

> Processe múltiplas consultas de CPF em paralelo usando GenServer e Task do Elixir para máximo throughput e resiliência.

**Publicado:** 21/11/2024
**Autor:** Redação CPFHub.io
**URL:** https://cpfhub.io/blog/como-processar-consultas-cpf-paralelo-genserver-task

---


O modelo de concorrência do Elixir, baseado na BEAM VM, permite processar lotes de consultas de CPF em paralelo de forma eficiente usando `GenServer` para gerenciar estado e `Task` para execução assíncrona. Com poucos módulos, é possível alcançar alto throughput mantendo controle total sobre concorrência, timeouts e tolerância a falhas. Este guia mostra implementações completas com GenServer, `Task.async_stream` e `Task.Supervisor`.

## Introdução

O modelo de concorrência do Elixir, baseado na BEAM VM, permite criar milhares de processos leves que executam em paralelo de forma eficiente. Utilizando `GenServer` para gerenciar estado e `Task` para execução assíncrona, é possível processar grandes volumes de consultas de CPF com alto throughput.

---

## Entendendo GenServer e Task

Antes de implementar, é importante compreender o papel de cada componente no processamento paralelo:

| Componente | Papel | Características |
|---|---|---|
| `GenServer` | Gerenciar estado e filas | Processo único, acesso serializado |
| `Task` | Executar operações assíncronas | Processos efêmeros, execução paralela |
| `Task.Supervisor` | Supervisionar tasks | Tolerância a falhas, reinicialização |
| `Task.async_stream` | Stream de operações paralelas | Controle de concorrência, backpressure |

A documentação oficial da [hexdocs.pm/elixir/Task](https://hexdocs.pm/elixir/Task.html) detalha as garantias de cada abordagem e quando preferir uma sobre a outra.

---

## Criando o GenServer de gerenciamento

O GenServer coordena as consultas, mantendo o estado do processamento e os resultados:

```elixir
defmodule CpfProcessor do
 use GenServer

 defstruct [:api_key, pendentes: [], resultados: [], em_processamento: 0, max_concurrent: 5]

 # API pública
 def start_link(opts) do
 api_key = Keyword.fetch!(opts, :api_key)
 max_concurrent = Keyword.get(opts, :max_concurrent, 5)
 GenServer.start_link(__MODULE__, %__MODULE__{api_key: api_key, max_concurrent: max_concurrent}, name: __MODULE__)
 end

 def processar_lote(cpfs) when is_list(cpfs) do
 GenServer.call(__MODULE__, {:processar_lote, cpfs}, :infinity)
 end

 def status do
 GenServer.call(__MODULE__, :status)
 end

 # Callbacks
 @impl true
 def init(state) do
 {:ok, state}
 end

 @impl true
 def handle_call({:processar_lote, cpfs}, from, state) do
 state = %{state | pendentes: cpfs, resultados: [], em_processamento: 0}
 state = disparar_tasks(state, from)
 {:noreply, state}
 end

 @impl true
 def handle_call(:status, _from, state) do
 {:reply, %{
 pendentes: length(state.pendentes),
 em_processamento: state.em_processamento,
 concluidos: length(state.resultados)
 }, state}
 end

 @impl true
 def handle_info({ref, resultado}, state) when is_reference(ref) do
 Process.demonitor(ref, [:flush])
 state = %{state | resultados: [resultado | state.resultados], em_processamento: state.em_processamento - 1}
 state = disparar_tasks(state, nil)
 {:noreply, state}
 end

 @impl true
 def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
 state = %{state | em_processamento: state.em_processamento - 1}
 {:noreply, state}
 end

 defp disparar_tasks(state, from) do
 slots_disponiveis = state.max_concurrent - state.em_processamento
 {a_processar, restantes} = Enum.split(state.pendentes, slots_disponiveis)

 Enum.each(a_processar, fn cpf ->
 Task.async(fn -> consultar_cpf(cpf, state.api_key) end)
 end)

 new_state = %{state |
 pendentes: restantes,
 em_processamento: state.em_processamento + length(a_processar)
 }

 if new_state.pendentes == [] and new_state.em_processamento == 0 and from do
 GenServer.reply(from, Enum.reverse(new_state.resultados))
 end

 new_state
 end

 defp consultar_cpf(cpf, api_key) do
 headers = [{"x-api-key", api_key}]
 url = "https://api.cpfhub.io/cpf/#{cpf}"

 case HTTPoison.get(url, headers, recv_timeout: 10_000) do
 {:ok, %{status_code: 200, body: body}} ->
 dados = Jason.decode!(body)
 {cpf, :ok, dados["data"]}

 {:ok, %{status_code: status}} ->
 {cpf, :erro, "Status #{status}"}

 {:error, reason} ->
 {cpf, :erro, inspect(reason)}
 end
 end
end
```

---

## Processamento com Task.async_stream

Para cenários mais simples, `Task.async_stream` oferece processamento paralelo com controle de concorrência integrado:

```elixir
defmodule CpfBatchProcessor do
 @base_url "https://api.cpfhub.io"

 def processar(cpfs, api_key, opts \\ []) do
 max_concurrency = Keyword.get(opts, :max_concurrency, 5)
 timeout = Keyword.get(opts, :timeout, 15_000)

 cpfs
 |> Task.async_stream(
 fn cpf -> consultar(cpf, api_key) end,
 max_concurrency: max_concurrency,
 timeout: timeout,
 on_timeout: :kill_task
 )
 |> Enum.map(fn
 {:ok, resultado} -> resultado
 {:exit, :timeout} -> {:erro, "Timeout na consulta"}
 end)
 end

 defp consultar(cpf, api_key) do
 headers = [{"x-api-key", api_key}]

 case HTTPoison.get("#{@base_url}/cpf/#{cpf}", headers) do
 {:ok, %{status_code: 200, body: body}} ->
 %{"data" => data} = Jason.decode!(body)
 {:ok, cpf, data}

 {:ok, %{status_code: status}} ->
 {:erro, cpf, "HTTP #{status}"}

 {:error, %{reason: reason}} ->
 {:erro, cpf, reason}
 end
 end
end
```

---

## Supervisionando as Tasks

Para garantir tolerância a falhas, utilize um `Task.Supervisor`:

```elixir
defmodule CpfSupervisedProcessor do
 def processar_com_supervisor(cpfs, api_key) do
 cpfs
 |> Enum.map(fn cpf ->
 Task.Supervisor.async_nolink(CpfTaskSupervisor, fn ->
 consultar_com_retry(cpf, api_key, 3)
 end)
 end)
 |> Task.yield_many(15_000)
 |> Enum.map(fn
 {_task, {:ok, resultado}} -> resultado
 {task, nil} ->
 Task.shutdown(task, :brutal_kill)
 {:erro, "Timeout"}
 {_task, {:exit, reason}} ->
 {:erro, "Processo falhou: #{inspect(reason)}"}
 end)
 end

 defp consultar_com_retry(cpf, api_key, tentativas_restantes) do
 headers = [{"x-api-key", api_key}]
 url = "https://api.cpfhub.io/cpf/#{cpf}"

 case HTTPoison.get(url, headers) do
 {:ok, %{status_code: 200, body: body}} ->
 {:ok, cpf, Jason.decode!(body)}

 {:ok, %{status_code: status}} when status in [500, 503] and tentativas_restantes > 0 ->
 Process.sleep(1000 * (4 - tentativas_restantes))
 consultar_com_retry(cpf, api_key, tentativas_restantes - 1)

 {:error, _} when tentativas_restantes > 0 ->
 Process.sleep(500)
 consultar_com_retry(cpf, api_key, tentativas_restantes - 1)

 other ->
 {:erro, cpf, other}
 end
 end
end
```

---

## Exemplo de uso completo

Integre o processador na sua aplicação:

```elixir
# No Application supervisor
children = [
 {Task.Supervisor, name: CpfTaskSupervisor},
 {CpfProcessor, api_key: System.get_env("CPFHUB_API_KEY"), max_concurrent: 5}
]

Supervisor.start_link(children, strategy: :one_for_one)

# Uso com Task.async_stream
api_key = System.get_env("CPFHUB_API_KEY")
cpfs = ["12345678900", "98765432100", "11122233344"]

resultados = CpfBatchProcessor.processar(cpfs, api_key, max_concurrency: 3)

Enum.each(resultados, fn
 {:ok, cpf, data} ->
 IO.puts("[OK] #{cpf} -> #{data["name"]}")

 {:erro, cpf, reason} ->
 IO.puts("[ERRO] #{cpf} -> #{reason}")

 {:erro, reason} ->
 IO.puts("[ERRO] #{reason}")
end)
```

---

## Perguntas frequentes

### Qual é a diferença entre Task.async e Task.Supervisor.async_nolink para consultas de CPF?

`Task.async` vincula o processo da task ao processo chamador: se a task falhar, o chamador também falha. `Task.Supervisor.async_nolink` isola a task do chamador, permitindo que o sistema continue funcionando mesmo se uma task individual falhar. Para consultas de CPF em lote, prefira `async_nolink` com um supervisor dedicado — isso garante que a falha em um CPF não aborte o processamento dos demais.

### Quantas consultas paralelas posso enviar para a API da CPFHub.io?

A API da CPFHub.io não impõe um limite rígido de requisições paralelas que resulte em bloqueio. O plano gratuito oferece 50 consultas mensais, e qualquer excedente é cobrado a R$0,15 por consulta — a API continua respondendo normalmente. Na prática, um `max_concurrency` de 5 a 10 é suficiente para a maioria dos casos e evita sobrecarregar a rede local.

### Como lidar com timeouts individuais em Task.async_stream?

Configure o parâmetro `timeout` em milissegundos e use `on_timeout: :kill_task`. Assim, tasks que excedam o limite são encerradas sem afetar as demais. Para a CPFHub.io, com latência média de ~900ms, um timeout de 10.000ms por consulta é uma margem segura.

### O GenServer é a melhor abordagem para processar grandes lotes de CPF em Elixir?

O GenServer é ideal quando você precisa de controle de estado centralizado, como monitorar progresso em tempo real ou pausar o processamento. Para lotes simples e pontuais sem necessidade de estado persistente, `Task.async_stream` é mais direto e igualmente eficiente. Escolha o GenServer quando o processamento precisar ser observável ou controlável externamente.

### Leia também

- [Como fazer consulta de CPF em lote (batch) via API de forma eficiente](https://cpfhub.io/blog/como-fazer-consulta-de-cpf-em-lote-batch-via-api)
- [Como implementar uma fila de requisições para consultas de CPF em lote](https://cpfhub.io/blog/implementar-fila-requisicoes-consultas-cpf-lote)
- [Como implementar retry com backoff exponencial em consultas de API de CPF](https://cpfhub.io/blog/como-implementar-retry-backoff-exponencial-consultas-api-cpf)
- [SLA de API de CPF: níveis de disponibilidade e o que exigir do seu provedor](https://cpfhub.io/blog/sla-api-cpf-niveis-disponibilidade)

---

## Conclusão

O processamento paralelo de consultas de CPF com GenServer e Task aproveita ao máximo o modelo de concorrência do Elixir. A combinação de processos supervisionados, controle de concorrência e retry automático resulta em um sistema resiliente e eficiente.

Cadastre-se em [cpfhub.io](https://www.cpfhub.io/) — 50 consultas mensais gratuitas, sem cartão de crédito — e comece a processar consultas de CPF em escala com a infraestrutura certa para seus projetos Elixir.

