OA0 = Omni AI 0
OA0 是一个探索 AI 的论坛
现在注册
已注册用户请  登录
OA0  ›  技能包  ›  oban-designer:在 Elixir 中设计并实现 Oban 后台作业执行器

oban-designer:在 Elixir 中设计并实现 Oban 后台作业执行器

 
  docker ·  2026-02-24 13:38:36 · 2 次点击  · 0 条评论  

名称: oban-designer
描述: "为 Elixir 项目设计并实现 Oban 后台作业 Worker。配置队列、重试策略、唯一性约束、Cron 调度和错误处理。生成 Oban Worker、队列配置和测试设置。适用于使用 Oban 为 Elixir 项目添加后台作业、异步处理、定时任务或周期性 Cron 任务。"


Oban 设计器

安装

# mix.exs
{:oban, "~> 2.18"}

# config/config.exs
config :my_app, Oban,
  repo: MyApp.Repo,
  queues: [default: 10, mailers: 20, webhooks: 50, events: 5],
  plugins: [
    Oban.Plugins.Pruner,
    {Oban.Plugins.Cron, crontab: [
      {"0 2 * * *", MyApp.Workers.DailyCleanup},
      {"*/5 * * * *", MyApp.Workers.MetricsCollector}
    ]}
  ]

# 在 application.ex 的 children 中添加:
{Oban, Application.fetch_env!(:my_app, Oban)}

生成 Oban 迁移:

mix ecto.gen.migration add_oban_jobs_table
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
  use Ecto.Migration
  def up, do: Oban.Migration.up(version: 12)
  def down, do: Oban.Migration.down(version: 1)
end

Worker 实现

基础 Worker

defmodule MyApp.Workers.SendEmail do
  use Oban.Worker,
    queue: :mailers,
    max_attempts: 5,
    priority: 1

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"to" => to, "template" => template} = args}) do
    case MyApp.Mailer.deliver(to, template, args) do
      {:ok, _} -> :ok
      {:error, :temporary} -> {:error, "临时失败"}  # 会重试
      {:error, :permanent} -> {:cancel, "无效地址"} # 不会重试
    end
  end
end

返回值

返回值 效果
:ok 作业标记为完成
{:ok, result} 作业标记为完成
{:error, reason} 作业重试(计为一次尝试)
{:cancel, reason} 作业取消,不再重试
{:snooze, seconds} 重新调度,不计为尝试
{:discard, reason} 作业丢弃(Oban 2.17+)

队列配置

常见 Worker 模式请参阅 references/worker-patterns.md

队列规模指南

队列 并发数 使用场景
default 10 通用用途
mailers 20 邮件发送(I/O 密集型)
webhooks 50 Webhook 发送(I/O 密集型,高吞吐)
media 5 图片/视频处理(CPU 密集型)
events 5 分析、审计日志
critical 3 计费、支付

队列优先级

队列内的作业按优先级执行(0 为最高)。请谨慎使用:

%{user_id: user.id}
|> MyApp.Workers.SendEmail.new(priority: 0)  # 紧急
|> Oban.insert()

重试策略

默认退避策略

Oban 使用指数退避:attempt^4 + attempt 秒。

自定义退避策略

defmodule MyApp.Workers.WebhookDelivery do
  use Oban.Worker,
    queue: :webhooks,
    max_attempts: 10

  @impl Oban.Worker
  def backoff(%Oban.Job{attempt: attempt}) do
    # 带抖动的指数退避:2^attempt + random(0..30)
    trunc(:math.pow(2, attempt)) + :rand.uniform(30)
  end

  @impl Oban.Worker
  def perform(%Oban.Job{args: args}) do
    # ...
  end
end

超时设置

use Oban.Worker, queue: :media

@impl Oban.Worker
def timeout(%Oban.Job{args: %{"size" => "large"}}), do: :timer.minutes(10)
def timeout(_job), do: :timer.minutes(2)

唯一性约束

防止重复作业:

defmodule MyApp.Workers.SyncAccount do
  use Oban.Worker,
    queue: :default,
    unique: [
      period: 300,               # 5 分钟
      states: [:available, :scheduled, :executing, :retryable],
      keys: [:account_id]        # 按此参数键保持唯一
    ]
end

唯一性选项

选项 默认值 描述
period 60 强制执行唯一性的秒数(:infinity 表示永久)
states 所有活跃状态 检查哪些作业状态
keys 所有参数 比较特定的参数键
timestamp :inserted_at 使用 :scheduled_at 用于调度唯一性

替换现有作业

%{account_id: id}
|> MyApp.Workers.SyncAccount.new(
  replace: [:scheduled_at],    # 如果重复,更新 scheduled_at
  schedule_in: 60
)
|> Oban.insert()

Cron 调度

# config.exs
plugins: [
  {Oban.Plugins.Cron, crontab: [
    {"0 */6 * * *", MyApp.Workers.DigestEmail},
    {"0 2 * * *", MyApp.Workers.DailyCleanup},
    {"0 0 1 * *", MyApp.Workers.MonthlyReport},
    {"*/5 * * * *", MyApp.Workers.HealthCheck, args: %{service: "api"}},
  ]}
]

Cron 表达式格式:分钟 小时 日 月 星期几

插入作业

# 立即执行
%{user_id: user.id, template: "welcome"}
|> MyApp.Workers.SendEmail.new()
|> Oban.insert()

# 延迟调度
%{report_id: id}
|> MyApp.Workers.GenerateReport.new(schedule_in: 3600)
|> Oban.insert()

# 指定时间调度
%{report_id: id}
|> MyApp.Workers.GenerateReport.new(scheduled_at: ~U[2024-01-01 00:00:00Z])
|> Oban.insert()

# 批量插入
changesets = Enum.map(users, fn user ->
  MyApp.Workers.SendEmail.new(%{user_id: user.id})
end)
Oban.insert_all(changesets)

# 在 Ecto.Multi 中使用
Ecto.Multi.new()
|> Ecto.Multi.insert(:user, changeset)
|> Oban.insert(:welcome_email, fn %{user: user} ->
  MyApp.Workers.SendEmail.new(%{user_id: user.id})
end)
|> Repo.transaction()

Oban Pro 功能

需要 Oban Pro 许可证:

批处理(作业组)

# 批量处理项目,全部完成后运行回调
batch = MyApp.Workers.ProcessItem.new_batch(
  items |> Enum.map(&%{item_id: &1.id}),
  callback: {MyApp.Workers.BatchComplete, %{batch_name: "import"}}
)
Oban.insert_all(batch)

工作流(作业依赖)

Oban.Pro.Workflow.new()
|> Oban.Pro.Workflow.add(:extract, MyApp.Workers.Extract.new(%{file: path}))
|> Oban.Pro.Workflow.add(:transform, MyApp.Workers.Transform.new(%{}), deps: [:extract])
|> Oban.Pro.Workflow.add(:load, MyApp.Workers.Load.new(%{}), deps: [:transform])
|> Oban.insert_all()

分块处理(聚合多个作业)

defmodule MyApp.Workers.BulkIndex do
  use Oban.Pro.Workers.Chunk,
    queue: :indexing,
    size: 100,            # 每次处理 100 个
    timeout: 30_000       # 或 30 秒后

  @impl true
  def process(jobs) do
    items = Enum.map(jobs, & &1.args)
    SearchIndex.bulk_upsert(items)
    :ok
  end
end

测试

详细测试模式请参阅 references/testing-oban.md

测试设置

# config/test.exs
config :my_app, Oban,
  testing: :manual  # 或使用 :inline 进行同步执行

# test_helper.exs(如果使用 :manual)
Oban.Testing.start()

断言作业已入队

use Oban.Testing, repo: MyApp.Repo

test "注册时入队欢迎邮件" do
  {:ok, user} = Accounts.register(%{email: "test@example.com"})

  assert_enqueued worker: MyApp.Workers.SendEmail,
    args: %{user_id: user.id, template: "welcome"},
    queue: :mailers
end

在测试中执行作业

test "处理邮件发送" do
  {:ok, _} =
    perform_job(MyApp.Workers.SendEmail, %{
      "to" => "user@example.com",
      "template" => "welcome"
    })
end

监控

Telemetry 事件

# 在 application.ex 中附加
:telemetry.attach_many("oban-logger", [
  [:oban, :job, :start],
  [:oban, :job, :stop],
  [:oban, :job, :exception]
], &MyApp.ObanTelemetry.handle_event/4, %{})

关键监控指标

  • 作业执行时长(p50, p95, p99)
  • 队列深度(每个队列的待处理作业数)
  • 每个 Worker 的错误率
  • 每个 Worker 的重试率
2 次点击  ∙  0 人收藏  
登录后收藏  
目前尚无回复
0 条回复
About   ·   Help   ·    
OA0 - Omni AI 0 一个探索 AI 的社区
沪ICP备2024103595号-2
Developed with Cursor