移动端应用中,处理复杂的、多步骤的、长周期的业务流程是一大挑战。一个典型的场景是设备激活流程:用户扫码、App向云端请求配置、固件更新、设备自检、最终激活。整个过程可能持续数分钟,期间网络随时可能中断。如果用户杀掉App再重新打开,状态必须无缝恢复。传统的RESTful API在这种场景下显得力不从心,状态同步逻辑会变得极其复杂,客户端与服务端很容易出现状态不一致。
我们遇到的核心痛点是:如何为每个独立的业务流程(例如,每台设备的激活过程)在服务端维护一个隔离、持久化、可恢复的状态机,并确保移动客户端能以离线优先的方式与其高效、可靠地同步。
最初的构想是为每个流程实例在数据库中存储一个状态字段,每次状态变更都通过API更新该字段。但这很快就暴露了问题:
- 竞态条件: 并发请求可能导致状态覆盖。
- 逻辑分散: 状态转移的合法性校验逻辑散落在各个API控制器中,难以维护。
- 历史不可追溯: 我们只知道当前状态,却不知道状态是如何演变过来的,这给调试和审计带来了巨大困难。
这促使我们转向一个更为健壮的架构:将整个业务流程建模为一个严格的状态机,并让每个流程实例在服务端拥有一个独立的、由Elixir GenServer驱动的“活”的进程。
技术选型决策:Elixir GenServer + XState
选择Elixir并非偶然。BEAM虚拟机的并发模型和容错能力是解决这个问题的关键。我们可以为每一个正在进行的设备激活流程启动一个轻量级的GenServer
进程。这意味着如果有10万台设备同时在激活,服务端就会有10万个独立的进程,它们之间互不干扰,由BEAM调度器高效管理。单个进程的崩溃不会影响到其他进程,并且可以由其Supervisor
自动重启并从持久化存储中恢复状态。
而XState则解决了状态逻辑的定义与共享问题。XState允许我们用纯JSON格式来定义一个形式化的状态机,包括所有状态、事件、转移(transitions)、动作(actions)和守卫(guards)。最关键的是,这个JSON定义是“可移植”的。同一份定义文件,既可以在移动端(JavaScript/TypeScript)使用,也可以在我们的Elixir后端使用。这确保了客户端的乐观更新UI与服务端的权威状态决策拥有一致的逻辑基础,从根本上杜绝了逻辑漂移。
我们将采用一种事件溯源(Event Sourcing)的变体来做持久化。我们不直接存储状态机的当前状态,而是记录每一次导致状态转移的事件。当一个GenServer
进程需要恢复时,它会从数据库加载其事件历史,并从初始状态开始依次应用这些事件,最终重放(replay)到当前状态。这不仅提供了完整的审计日志,还为处理离线同步等复杂场景奠定了基础。
核心实现:可持久化的状态机进程
我们的核心是一个名为StateMachine.Process
的GenServer
。它负责托管一个特定流程实例(例如,由device_id
标识)的状态机。
1. GenServer 结构与初始化
首先,我们需要一个动态Supervisor
来管理这些StateMachine.Process
进程,并使用Registry
来根据业务ID(如device_id
)快速查找进程PID。
# lib/state_machine/supervisor.ex
defmodule StateMachine.Supervisor do
use DynamicSupervisor
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
def start_child(device_id) do
spec = {StateMachine.Process, device_id}
DynamicSupervisor.start_child(__MODULE__, spec)
end
end
# in application.ex
children = [
{Registry, keys: :unique, name: StateMachine.Registry},
StateMachine.Supervisor
]
现在看StateMachine.Process
的实现。它在启动时需要知道自己为哪个device_id
服务。它会尝试从数据库恢复历史状态,如果不存在,则创建一个全新的状态机实例。
# lib/state_machine/process.ex
defmodule StateMachine.Process do
use GenServer
require Logger
alias StateMachine.Persistence
alias StateMachine.Executor
# 状态机的JSON定义。在真实项目中,这会从配置文件或数据库加载。
@machine_def File.read!("priv/machines/device_provisioning.json")
|> Jason.decode!()
# Public API
def start_link(device_id) do
GenServer.start_link(__MODULE__, device_id, name: via_tuple(device_id))
end
def get_state(pid_or_device_id) when is_binary(pid_or_device_id) do
with {:ok, pid} <- find_process(pid_or_device_id) do
GenServer.call(pid, :get_state)
end
end
def dispatch(pid_or_device_id, event) when is_binary(pid_or_device_id) do
with {:ok, pid} <- find_process(pid_or_device_id) do
# 使用 cast 进行异步事件分发,避免阻塞客户端
GenServer.cast(pid, {:dispatch, event})
end
end
# GenServer Callbacks
@impl true
def init(device_id) do
Logger.info("Initializing state machine process for device: #{device_id}")
# 从持久化层加载历史事件
events = Persistence.load_events(device_id)
# 基于历史事件重放状态机至最新状态
# Executor.init/1 会创建状态机初始状态
# Executor.transition/3 会根据事件计算下一个状态
initial_state = Executor.init(@machine_def)
current_state =
Enum.reduce(events, initial_state, fn event, state ->
{:ok, next_state, _actions} = Executor.transition(state, event, @machine_def)
next_state
end)
state = %{
device_id: device_id,
machine_state: current_state,
machine_def: @machine_def
}
{:ok, state}
end
@impl true
def handle_call(:get_state, _from, state) do
{:reply, {:ok, state.machine_state}, state}
end
@impl true
def handle_cast({:dispatch, event}, state) do
%{machine_state: current_state, machine_def: machine_def, device_id: device_id} = state
case Executor.transition(current_state, event, machine_def) do
{:ok, next_state, actions} ->
# 1. 持久化事件
:ok = Persistence.persist_event(device_id, event)
# 2. 执行状态转移带来的副作用(actions)
# 这里的 actions 是在 XState 定义中与状态转移关联的
# 例如,进入 'downloading_config' 状态时触发一个 'download_config_from_cloud' 的 action
execute_actions(actions, device_id)
# 3. 广播状态变更 (例如通过 Phoenix Channels)
# MyAppWeb.Endpoint.broadcast("device:#{device_id}", "state_changed", next_state)
{:noreply, %{state | machine_state: next_state}}
{:error, :invalid_transition} ->
Logger.warn(
"Invalid transition for device #{device_id}. Current state: #{inspect(current_state.value)}, Event: #{inspect(event)}"
)
# 无效转移,状态不变
{:noreply, state}
end
end
defp execute_actions(actions, device_id) do
# 在真实项目中,这里会异步执行任务,例如通过 Oban
# 避免长时间运行的任务阻塞 GenServer
Enum.each(actions, fn action ->
Logger.info("Executing action '#{action}' for device #{device_id}")
# Task.start(fn -> SideEffects.run(action, device_id) end)
end)
end
# Helper to find a process via Registry
defp find_process(device_id) do
case Registry.lookup(StateMachine.Registry, device_id) do
[{pid, _}] -> {:ok, pid}
[] ->
# 进程不存在,可能是首次交互或进程崩溃后未重启
# 尝试启动它,这种模式称为 "start on demand"
Logger.info("Process for #{device_id} not found, attempting to start.")
StateMachine.Supervisor.start_child(device_id)
end
end
defp via_tuple(device_id) do
{:via, Registry, {StateMachine.Registry, device_id}}
end
end
2. XState 状态机解释器
StateMachine.Executor
是Elixir端的XState解释器。它的核心职责是接收当前状态、一个事件和状态机定义,然后计算出下一个有效状态。为了简化,我们这里只实现核心的转移逻辑,一个生产级的解释器还需要处理 guards, actions, services 等。
# lib/state_machine/executor.ex
defmodule StateMachine.Executor do
# 代表状态机的当前状态,value是当前状态节点,context是状态机上下文数据
defstruct value: nil, context: %{}
def init(machine_def) do
%__MODULE__{
value: machine_def["initial"],
context: machine_def["context"] || %{}
}
end
def transition(%__MODULE__{value: current_state_name} = current_state, event, machine_def) do
event_type = Map.get(event, "type")
# 在状态机定义中查找当前状态节点
with {:ok, state_node} <- find_state_node(machine_def, current_state_name),
# 在当前状态节点中查找事件对应的转移规则
{:ok, transition_def} <- find_transition(state_node, event_type) do
target_state_name = transition_def["target"]
actions = transition_def["actions"] || []
next_state = %{current_state | value: target_state_name}
# 生产级实现中,这里还需要处理 context 的更新
{:ok, next_state, actions}
else
_error -> {:error, :invalid_transition}
end
end
defp find_state_node(machine_def, state_name) do
Map.fetch(machine_def["states"], state_name)
end
defp find_transition(state_node, event_type) do
Map.get(state_node, "on", %{}) |> Map.fetch(event_type)
end
end
下面是一个示例 device_provisioning.json
状态机定义:
{
"id": "deviceProvisioning",
"initial": "uninitialized",
"context": {},
"states": {
"uninitialized": {
"on": {
"START_PROVISIONING": { "target": "validating_firmware" }
}
},
"validating_firmware": {
"on": {
"VALIDATION_SUCCESS": {
"target": "downloading_config",
"actions": ["log_validation_success"]
},
"VALIDATION_FAILURE": {
"target": "provisioning_failed",
"actions": ["log_validation_failure"]
}
}
},
"downloading_config": {
"on": {
"DOWNLOAD_SUCCESS": {
"target": "applying_config",
"actions": ["trigger_device_reboot"]
},
"DOWNLOAD_FAILURE": {
"target": "provisioning_failed"
}
}
},
"applying_config": {
"on": {
"APPLY_SUCCESS": { "target": "provisioning_complete" },
"APPLY_FAILURE": { "target": "provisioning_failed" }
}
},
"provisioning_failed": {
"type": "final"
},
"provisioning_complete": {
"type": "final"
}
}
}
3. 持久化层
持久化层我们选择使用Ecto和PostgreSQL,这对于大多数云服务商环境都是稳定且通用的选择。我们需要一张表来存储事件。
# priv/repo/migrations/...._create_state_machine_events.exs
defmodule MyApp.Repo.Migrations.CreateStateMachineEvents do
use Ecto.Migration
def change do
create table(:state_machine_events) do
add :machine_id, :string, null: false # Corresponds to device_id
add :event_payload, :map, null: false
add :sequence, :bigint, null: false
timestamps()
end
create index(:state_machine_events, [:machine_id])
# 唯一性约束确保事件序列的完整性
create unique_index(:state_machine_events, [:machine_id, :sequence])
end
end
# lib/state_machine/persistence.ex
defmodule StateMachine.Persistence do
alias MyApp.Repo
import Ecto.Query
# 简化模型,实际项目中会有一个完整的 Ecto.Schema
def persist_event(device_id, event_payload) do
# 在真实项目中,这里需要一个事务来获取当前最大 sequence 并 +1
# 避免竞态条件。或使用数据库的自增序列。
# 为简化,我们假设 sequence 是由客户端或调用方提供的。
sequence = get_next_sequence(device_id)
params = %{
machine_id: device_id,
event_payload: event_payload,
sequence: sequence
}
# 使用 Ecto.Multi 来保证原子性
Ecto.Multi.new()
|> Ecto.Multi.insert(:event, Ecto.Changeset.change(%{}, params))
|> Repo.transaction()
|> case do
{:ok, _} -> :ok
{:error, _, reason, _} ->
# 强大的错误处理和日志记录
Logger.error("Failed to persist event for #{device_id}: #{inspect(reason)}")
{:error, reason}
end
end
def load_events(device_id) do
from(e in "state_machine_events",
where: e.machine_id == ^device_id,
order_by: [asc: e.sequence, asc: e.id]
)
|> Repo.all()
|> Enum.map(& &1.event_payload)
end
# ... get_next_sequence/1 实现 ...
end
架构整合与通信
下图展示了整个系统的架构和数据流:
sequenceDiagram participant MobileApp as Mobile App (XState) participant CloudLB as Cloud Load Balancer participant Phoenix as Phoenix Channel participant Supervisor as StateMachine.Supervisor participant Process as Machine Process (GenServer) participant Postgres as PostgreSQL (Events) MobileApp->>+CloudLB: WebSocket Connect CloudLB->>+Phoenix: Upgrade Connection Phoenix-->>-CloudLB: Connected CloudLB-->>-MobileApp: Connected MobileApp->>Phoenix: dispatch_event("device-123", {type: "START_PROVISIONING"}) Phoenix->>Supervisor: find_or_start_child("device-123") alt Process Not Running Supervisor->>+Process: start_link("device-123") Process->>+Postgres: load_events("device-123") Postgres-->>-Process: [event1, event2, ...] Process->>Process: Replay events to reach current state Note right of Process: Process initialized and registered end Phoenix->>Process: GenServer.cast({:dispatch, event}) Process->>Process: Executor.transition(state, event) alt Valid Transition Process->>+Postgres: persist_event(event) Postgres-->>-Process: :ok Process->>Phoenix: (Internal) Broadcast state_changed Phoenix->>MobileApp: push("state_changed", {value: "validating_firmware"}) else Invalid Transition Process->>Process: Log warning, state unchanged end MobileApp->>MobileApp: Update UI based on new state
移动客户端通过Phoenix Channel与后端通信。当用户执行操作时,客户端首先在本地XState实例上进行乐观的状态转移并更新UI,然后将相应的事件发送到服务端。服务端Machine Process
作为最终权威,验证该事件。如果转移有效,则持久化事件并向所有订阅了该device_id
的客户端广播最新的权威状态。如果无效(例如,由于离线期间服务端状态已发生变化),客户端会收到权威状态更新,并强制将本地状态机重置为与服务端一致的状态。
局限性与未来迭代路径
这个架构虽然解决了核心痛点,但在生产环境中仍有一些需要权衡和优化的方面:
状态重放性能: 对于生命周期极长、事件数量巨大的状态机,每次启动时都从头重放所有事件可能会成为性能瓶颈。一个常见的优化是引入快照(Snapshotting)机制。例如,每100个事件,系统就将当前状态机的完整状态持久化一次。恢复时,只需加载最新的快照,然后重放该快照之后发生的事件即可。
复杂的离线冲突合并: 当前模型中,服务端的
GenServer
是单一写入点,有效避免了服务端的竞态条件。但对于离线优先的客户端,可能会出现更复杂的冲突。例如,客户端离线时执行了A -> B -> C
的转换,而在此期间,服务端因超时自动触发了一个A -> FAILED
的转换。当客户端重新上线并提交事件时,如何合并这种分叉的历史是一个难题。这可能需要引入更高级的数据结构,如CRDTs(无冲突复制数据类型),或者在业务层面定义明确的冲突解决策略。集群扩展: 当前方案在单个Elixir节点上运行良好。当需要扩展到多节点集群时,需要确保来自特定
device_id
的所有请求都能被路由到托管其GenServer
进程的那个节点。可以使用Horde
或Phoenix.PubSub.PG2
等库来实现分布式进程注册和消息传递,但这会增加系统的复杂性。云服务商特定优化: 该架构是云无关的。但可以进一步利用云服务商提供的托管服务进行优化。例如,将事件日志从PostgreSQL迁移到专门的消息队列或事件流服务,如AWS Kinesis或Google Cloud Pub/Sub,可以获得更好的吞吐量和可扩展性,并能方便地与其他事件驱动的服务集成。执行
actions
的副作用也可以交由Serverless函数(如AWS Lambda)处理,使GenServer
本身更轻量、响应更快。