名称: oban-designer
描述: "为 Elixir 项目设计并实现 Oban 后台作业 Worker。配置队列、重试策略、唯一性约束、Cron 调度和错误处理。生成 Oban Worker、队列配置和测试设置。适用于使用 Oban 为 Elixir 项目添加后台作业、异步处理、定时任务或周期性 Cron 任务。"
# mix.exs
{:oban, "~> 2.18"}
# config/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
queues: [default: 10, mailers: 20, webhooks: 50, events: 5],
plugins: [
Oban.Plugins.Pruner,
{Oban.Plugins.Cron, crontab: [
{"0 2 * * *", MyApp.Workers.DailyCleanup},
{"*/5 * * * *", MyApp.Workers.MetricsCollector}
]}
]
# 在 application.ex 的 children 中添加:
{Oban, Application.fetch_env!(:my_app, Oban)}
生成 Oban 迁移:
mix ecto.gen.migration add_oban_jobs_table
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration
def up, do: Oban.Migration.up(version: 12)
def down, do: Oban.Migration.down(version: 1)
end
defmodule MyApp.Workers.SendEmail do
use Oban.Worker,
queue: :mailers,
max_attempts: 5,
priority: 1
@impl Oban.Worker
def perform(%Oban.Job{args: %{"to" => to, "template" => template} = args}) do
case MyApp.Mailer.deliver(to, template, args) do
{:ok, _} -> :ok
{:error, :temporary} -> {:error, "临时失败"} # 会重试
{:error, :permanent} -> {:cancel, "无效地址"} # 不会重试
end
end
end
| 返回值 | 效果 |
|---|---|
:ok |
作业标记为完成 |
{:ok, result} |
作业标记为完成 |
{:error, reason} |
作业重试(计为一次尝试) |
{:cancel, reason} |
作业取消,不再重试 |
{:snooze, seconds} |
重新调度,不计为尝试 |
{:discard, reason} |
作业丢弃(Oban 2.17+) |
常见 Worker 模式请参阅 references/worker-patterns.md。
| 队列 | 并发数 | 使用场景 |
|---|---|---|
default |
10 | 通用用途 |
mailers |
20 | 邮件发送(I/O 密集型) |
webhooks |
50 | Webhook 发送(I/O 密集型,高吞吐) |
media |
5 | 图片/视频处理(CPU 密集型) |
events |
5 | 分析、审计日志 |
critical |
3 | 计费、支付 |
队列内的作业按优先级执行(0 为最高)。请谨慎使用:
%{user_id: user.id}
|> MyApp.Workers.SendEmail.new(priority: 0) # 紧急
|> Oban.insert()
Oban 使用指数退避:attempt^4 + attempt 秒。
defmodule MyApp.Workers.WebhookDelivery do
use Oban.Worker,
queue: :webhooks,
max_attempts: 10
@impl Oban.Worker
def backoff(%Oban.Job{attempt: attempt}) do
# 带抖动的指数退避:2^attempt + random(0..30)
trunc(:math.pow(2, attempt)) + :rand.uniform(30)
end
@impl Oban.Worker
def perform(%Oban.Job{args: args}) do
# ...
end
end
use Oban.Worker, queue: :media
@impl Oban.Worker
def timeout(%Oban.Job{args: %{"size" => "large"}}), do: :timer.minutes(10)
def timeout(_job), do: :timer.minutes(2)
防止重复作业:
defmodule MyApp.Workers.SyncAccount do
use Oban.Worker,
queue: :default,
unique: [
period: 300, # 5 分钟
states: [:available, :scheduled, :executing, :retryable],
keys: [:account_id] # 按此参数键保持唯一
]
end
| 选项 | 默认值 | 描述 |
|---|---|---|
period |
60 | 强制执行唯一性的秒数(:infinity 表示永久) |
states |
所有活跃状态 | 检查哪些作业状态 |
keys |
所有参数 | 比较特定的参数键 |
timestamp |
:inserted_at |
使用 :scheduled_at 用于调度唯一性 |
%{account_id: id}
|> MyApp.Workers.SyncAccount.new(
replace: [:scheduled_at], # 如果重复,更新 scheduled_at
schedule_in: 60
)
|> Oban.insert()
# config.exs
plugins: [
{Oban.Plugins.Cron, crontab: [
{"0 */6 * * *", MyApp.Workers.DigestEmail},
{"0 2 * * *", MyApp.Workers.DailyCleanup},
{"0 0 1 * *", MyApp.Workers.MonthlyReport},
{"*/5 * * * *", MyApp.Workers.HealthCheck, args: %{service: "api"}},
]}
]
Cron 表达式格式:分钟 小时 日 月 星期几。
# 立即执行
%{user_id: user.id, template: "welcome"}
|> MyApp.Workers.SendEmail.new()
|> Oban.insert()
# 延迟调度
%{report_id: id}
|> MyApp.Workers.GenerateReport.new(schedule_in: 3600)
|> Oban.insert()
# 指定时间调度
%{report_id: id}
|> MyApp.Workers.GenerateReport.new(scheduled_at: ~U[2024-01-01 00:00:00Z])
|> Oban.insert()
# 批量插入
changesets = Enum.map(users, fn user ->
MyApp.Workers.SendEmail.new(%{user_id: user.id})
end)
Oban.insert_all(changesets)
# 在 Ecto.Multi 中使用
Ecto.Multi.new()
|> Ecto.Multi.insert(:user, changeset)
|> Oban.insert(:welcome_email, fn %{user: user} ->
MyApp.Workers.SendEmail.new(%{user_id: user.id})
end)
|> Repo.transaction()
需要 Oban Pro 许可证:
# 批量处理项目,全部完成后运行回调
batch = MyApp.Workers.ProcessItem.new_batch(
items |> Enum.map(&%{item_id: &1.id}),
callback: {MyApp.Workers.BatchComplete, %{batch_name: "import"}}
)
Oban.insert_all(batch)
Oban.Pro.Workflow.new()
|> Oban.Pro.Workflow.add(:extract, MyApp.Workers.Extract.new(%{file: path}))
|> Oban.Pro.Workflow.add(:transform, MyApp.Workers.Transform.new(%{}), deps: [:extract])
|> Oban.Pro.Workflow.add(:load, MyApp.Workers.Load.new(%{}), deps: [:transform])
|> Oban.insert_all()
defmodule MyApp.Workers.BulkIndex do
use Oban.Pro.Workers.Chunk,
queue: :indexing,
size: 100, # 每次处理 100 个
timeout: 30_000 # 或 30 秒后
@impl true
def process(jobs) do
items = Enum.map(jobs, & &1.args)
SearchIndex.bulk_upsert(items)
:ok
end
end
详细测试模式请参阅 references/testing-oban.md。
# config/test.exs
config :my_app, Oban,
testing: :manual # 或使用 :inline 进行同步执行
# test_helper.exs(如果使用 :manual)
Oban.Testing.start()
use Oban.Testing, repo: MyApp.Repo
test "注册时入队欢迎邮件" do
{:ok, user} = Accounts.register(%{email: "test@example.com"})
assert_enqueued worker: MyApp.Workers.SendEmail,
args: %{user_id: user.id, template: "welcome"},
queue: :mailers
end
test "处理邮件发送" do
{:ok, _} =
perform_job(MyApp.Workers.SendEmail, %{
"to" => "user@example.com",
"template" => "welcome"
})
end
# 在 application.ex 中附加
:telemetry.attach_many("oban-logger", [
[:oban, :job, :start],
[:oban, :job, :stop],
[:oban, :job, :exception]
], &MyApp.ObanTelemetry.handle_event/4, %{})