name: elixir-dev
description: "Elixir/Phoenix 开发助手。运行和解读 mix test、mix credo、mix dialyzer、mix format。遵循 OTP 规范生成模块:上下文、模式、GenServer、监督者、任务。调试编译错误和警告。协助处理 Ecto 迁移、查询、变更集和关联。可用于任何 Elixir 或 Phoenix 开发任务,包括编写模块、修复测试、重构代码或理解 OTP 模式。"
完整命令参考请见 references/mix-commands.md。
# 运行所有测试
mix test
# 指定文件或行号
mix test test/my_app/accounts_test.exs:42
# 按标签运行
mix test --only integration
# 仅运行失败的测试(需要之前运行过 --failed 标志)
mix test --failed
# 带覆盖率运行
mix test --cover
解读测试失败:
- ** (MatchError) — 模式匹配失败;检查返回值结构。
- ** (Ecto.NoResultsError) — Repo.get! 使用了不存在的 ID;改用 Repo.get 或准备种子数据。
- ** (DBConnection.OwnershipError) — 缺少 async: true 或沙盒设置。
- no function clause matching — 参数数量错误或参数类型不符合预期。
mix credo --strict
mix credo suggest --format json
mix credo explain MyApp.Module # 解释特定模块的问题
常见的 Credo 修复:
- Credo.Check.Readability.ModuleDoc — 添加 @moduledoc。
- Credo.Check.Refactor.CyclomaticComplexity — 提取辅助函数。
- Credo.Check.Design.TagTODO — 处理或移除 TODO 注释。
mix dialyzer
mix dialyzer --format short
常见的 Dialyzer 警告:
- The pattern can never match — 死代码或模式中的类型错误。
- Function has no local return — 所有路径都会崩溃;检查内部调用。
- The call will never return — 调用了一个总是会抛出异常的函数。
- 修复方法:添加 @spec 注解;万不得已时使用 @dialyzer {:nowarn_function, func: arity}。
mix format
mix format --check-formatted # CI 模式 — 如果未格式化则退出码为 1
公共函数始终包含 @moduledoc、@doc 和 @spec。
defmodule MyApp.Notifications do
@moduledoc """
管理通知发送和偏好设置。
"""
import Ecto.Query
alias MyApp.Repo
alias MyApp.Notifications.Notification
@doc "列出用户的通知,按最新优先排序。"
@spec list_notifications(String.t(), keyword()) :: [Notification.t()]
def list_notifications(user_id, opts \\ []) do
limit = Keyword.get(opts, :limit, 50)
Notification
|> where(user_id: ^user_id)
|> order_by(desc: :inserted_at)
|> limit(^limit)
|> Repo.all()
end
end
defmodule MyApp.Notifications.Notification do
@moduledoc """
推送/邮件/短信通知的模式。
"""
use Ecto.Schema
import Ecto.Changeset
@type t :: %__MODULE__{}
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
@timestamps_opts [type: :utc_datetime_usec]
schema "notifications" do
field :channel, Ecto.Enum, values: [:push, :email, :sms]
field :title, :string
field :body, :string
field :delivered_at, :utc_datetime_usec
field :user_id, :binary_id
timestamps()
end
@required ~w(channel title body user_id)a
@doc false
def changeset(notification, attrs) do
notification
|> cast(attrs, @required ++ [:delivered_at])
|> validate_required(@required)
|> validate_length(:title, max: 255)
end
end
GenServer、Supervisor、Agent、Task 等模式请见 references/otp-patterns.md。
| 模式 | 适用场景 |
|---|---|
| GenServer | 具有同步/异步调用的有状态进程(缓存、限流器、连接池) |
| Agent | 无复杂逻辑的简单状态包装器 |
| Task | 一次性异步任务,可即发即弃或等待结果 |
| Task.Supervisor | 受监督的即发即弃任务 |
| Supervisor | 管理子进程的生命周期 |
| Registry | 按名称/键查找进程 |
| DynamicSupervisor | 在运行时启动子进程 |
defmodule MyApp.RateLimiter do
@moduledoc "令牌桶限流器。"
use GenServer
# 客户端 API
def start_link(opts) do
name = Keyword.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, opts, name: name)
end
@spec check_rate(String.t()) :: :ok | {:error, :rate_limited}
def check_rate(key), do: GenServer.call(__MODULE__, {:check, key})
# 服务器回调
@impl true
def init(opts) do
{:ok, %{limit: Keyword.get(opts, :limit, 100), window_ms: 60_000, buckets: %{}}}
end
@impl true
def handle_call({:check, key}, _from, state) do
now = System.monotonic_time(:millisecond)
{count, state} = increment(state, key, now)
if count <= state.limit, do: {:reply, :ok, state}, else: {:reply, {:error, :rate_limited}, state}
end
defp increment(state, key, now) do
# 实现逻辑
end
end
| 错误 | 原因 | 修复方法 |
|---|---|---|
module X is not available |
缺少依赖或拼写错误 | 检查 mix.exs 中的依赖,验证模块名 |
undefined function X/N |
未导入/别名 | 添加 import、alias 或完整模块路径 |
(CompileError) redefining module |
模块名重复 | 重命名其中一个 |
protocol not implemented |
缺少协议实现 | 为你的结构体添加 defimpl |
cannot use ^x outside of match |
插值符号 ^ 位置错误 |
移至模式匹配的上下文中 |
def list(filters) do
Enum.reduce(filters, base_query(), fn
{:status, val}, q -> where(q, [r], r.status == ^val)
{:since, dt}, q -> where(q, [r], r.inserted_at >= ^dt)
{:search, term}, q -> where(q, [r], ilike(r.name, ^"%#{term}%"))
_, q -> q
end)
|> Repo.all()
end
# 查询时预加载(使用连接的单次查询)
from(p in Post, join: a in assoc(p, :author), preload: [author: a])
# 单独查询预加载
Post |> Repo.all() |> Repo.preload(:author)
# 嵌套预加载
Repo.preload(posts, [comments: :author])
from(o in Order,
where: o.tenant_id == ^tenant_id,
group_by: o.status,
select: {o.status, count(o.id), sum(o.amount)}
)
|> Repo.all()
defmodule MyAppWeb.DashboardLive do
use MyAppWeb, :live_view
@impl true
def mount(_params, _session, socket) do
{:ok, assign(socket, items: [], loading: true)}
end
@impl true
def handle_event("delete", %{"id" => id}, socket) do
MyApp.Items.delete_item!(id)
{:noreply, assign(socket, items: MyApp.Items.list_items())}
end
@impl true
def render(assigns) do
~H"""
<div :for={item <- @items}>
<span><%= item.name %></span>
<button phx-click="delete" phx-value-id={item.id}>删除</button>
</div>
"""
end
end
# 在 mount 中订阅
def mount(_, _, socket) do
if connected?(socket), do: Phoenix.PubSub.subscribe(MyApp.PubSub, "items")
{:ok, assign(socket, items: list_items())}
end
# 在上下文中广播
def create_item(attrs) do
with {:ok, item} <- %Item{} |> Item.changeset(attrs) |> Repo.insert() do
Phoenix.PubSub.broadcast(MyApp.PubSub, "items", {:item_created, item})
{:ok, item}
end
end
# 在 LiveView 中处理
def handle_info({:item_created, item}, socket) do
{:noreply, update(socket, :items, &[item | &1])}
end