使用 Elixir GenServer 与 XState 构建移动端离线优先的持久化状态机


移动端应用中,处理复杂的、多步骤的、长周期的业务流程是一大挑战。一个典型的场景是设备激活流程:用户扫码、App向云端请求配置、固件更新、设备自检、最终激活。整个过程可能持续数分钟,期间网络随时可能中断。如果用户杀掉App再重新打开,状态必须无缝恢复。传统的RESTful API在这种场景下显得力不从心,状态同步逻辑会变得极其复杂,客户端与服务端很容易出现状态不一致。

我们遇到的核心痛点是:如何为每个独立的业务流程(例如,每台设备的激活过程)在服务端维护一个隔离、持久化、可恢复的状态机,并确保移动客户端能以离线优先的方式与其高效、可靠地同步。

最初的构想是为每个流程实例在数据库中存储一个状态字段,每次状态变更都通过API更新该字段。但这很快就暴露了问题:

  1. 竞态条件: 并发请求可能导致状态覆盖。
  2. 逻辑分散: 状态转移的合法性校验逻辑散落在各个API控制器中,难以维护。
  3. 历史不可追溯: 我们只知道当前状态,却不知道状态是如何演变过来的,这给调试和审计带来了巨大困难。

这促使我们转向一个更为健壮的架构:将整个业务流程建模为一个严格的状态机,并让每个流程实例在服务端拥有一个独立的、由Elixir GenServer驱动的“活”的进程。

技术选型决策:Elixir GenServer + XState

选择Elixir并非偶然。BEAM虚拟机的并发模型和容错能力是解决这个问题的关键。我们可以为每一个正在进行的设备激活流程启动一个轻量级的GenServer进程。这意味着如果有10万台设备同时在激活,服务端就会有10万个独立的进程,它们之间互不干扰,由BEAM调度器高效管理。单个进程的崩溃不会影响到其他进程,并且可以由其Supervisor自动重启并从持久化存储中恢复状态。

而XState则解决了状态逻辑的定义与共享问题。XState允许我们用纯JSON格式来定义一个形式化的状态机,包括所有状态、事件、转移(transitions)、动作(actions)和守卫(guards)。最关键的是,这个JSON定义是“可移植”的。同一份定义文件,既可以在移动端(JavaScript/TypeScript)使用,也可以在我们的Elixir后端使用。这确保了客户端的乐观更新UI与服务端的权威状态决策拥有一致的逻辑基础,从根本上杜绝了逻辑漂移。

我们将采用一种事件溯源(Event Sourcing)的变体来做持久化。我们不直接存储状态机的当前状态,而是记录每一次导致状态转移的事件。当一个GenServer进程需要恢复时,它会从数据库加载其事件历史,并从初始状态开始依次应用这些事件,最终重放(replay)到当前状态。这不仅提供了完整的审计日志,还为处理离线同步等复杂场景奠定了基础。

核心实现:可持久化的状态机进程

我们的核心是一个名为StateMachine.ProcessGenServer。它负责托管一个特定流程实例(例如,由device_id标识)的状态机。

1. GenServer 结构与初始化

首先,我们需要一个动态Supervisor来管理这些StateMachine.Process进程,并使用Registry来根据业务ID(如device_id)快速查找进程PID。

# lib/state_machine/supervisor.ex
defmodule StateMachine.Supervisor do
  use DynamicSupervisor

  def start_link(init_arg) do
    DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end

  def start_child(device_id) do
    spec = {StateMachine.Process, device_id}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end
end

# in application.ex
children = [
  {Registry, keys: :unique, name: StateMachine.Registry},
  StateMachine.Supervisor
]

现在看StateMachine.Process的实现。它在启动时需要知道自己为哪个device_id服务。它会尝试从数据库恢复历史状态,如果不存在,则创建一个全新的状态机实例。

# lib/state_machine/process.ex
defmodule StateMachine.Process do
  use GenServer
  require Logger

  alias StateMachine.Persistence
  alias StateMachine.Executor

  # 状态机的JSON定义。在真实项目中,这会从配置文件或数据库加载。
  @machine_def File.read!("priv/machines/device_provisioning.json")
               |> Jason.decode!()

  # Public API
  def start_link(device_id) do
    GenServer.start_link(__MODULE__, device_id, name: via_tuple(device_id))
  end

  def get_state(pid_or_device_id) when is_binary(pid_or_device_id) do
    with {:ok, pid} <- find_process(pid_or_device_id) do
      GenServer.call(pid, :get_state)
    end
  end

  def dispatch(pid_or_device_id, event) when is_binary(pid_or_device_id) do
    with {:ok, pid} <- find_process(pid_or_device_id) do
      # 使用 cast 进行异步事件分发,避免阻塞客户端
      GenServer.cast(pid, {:dispatch, event})
    end
  end

  # GenServer Callbacks
  @impl true
  def init(device_id) do
    Logger.info("Initializing state machine process for device: #{device_id}")
    # 从持久化层加载历史事件
    events = Persistence.load_events(device_id)

    # 基于历史事件重放状态机至最新状态
    # Executor.init/1 会创建状态机初始状态
    # Executor.transition/3 会根据事件计算下一个状态
    initial_state = Executor.init(@machine_def)

    current_state =
      Enum.reduce(events, initial_state, fn event, state ->
        {:ok, next_state, _actions} = Executor.transition(state, event, @machine_def)
        next_state
      end)

    state = %{
      device_id: device_id,
      machine_state: current_state,
      machine_def: @machine_def
    }

    {:ok, state}
  end

  @impl true
  def handle_call(:get_state, _from, state) do
    {:reply, {:ok, state.machine_state}, state}
  end

  @impl true
  def handle_cast({:dispatch, event}, state) do
    %{machine_state: current_state, machine_def: machine_def, device_id: device_id} = state

    case Executor.transition(current_state, event, machine_def) do
      {:ok, next_state, actions} ->
        # 1. 持久化事件
        :ok = Persistence.persist_event(device_id, event)

        # 2. 执行状态转移带来的副作用(actions)
        # 这里的 actions 是在 XState 定义中与状态转移关联的
        # 例如,进入 'downloading_config' 状态时触发一个 'download_config_from_cloud' 的 action
        execute_actions(actions, device_id)

        # 3. 广播状态变更 (例如通过 Phoenix Channels)
        # MyAppWeb.Endpoint.broadcast("device:#{device_id}", "state_changed", next_state)

        {:noreply, %{state | machine_state: next_state}}

      {:error, :invalid_transition} ->
        Logger.warn(
          "Invalid transition for device #{device_id}. Current state: #{inspect(current_state.value)}, Event: #{inspect(event)}"
        )
        # 无效转移,状态不变
        {:noreply, state}
    end
  end

  defp execute_actions(actions, device_id) do
    # 在真实项目中,这里会异步执行任务,例如通过 Oban
    # 避免长时间运行的任务阻塞 GenServer
    Enum.each(actions, fn action ->
      Logger.info("Executing action '#{action}' for device #{device_id}")
      # Task.start(fn -> SideEffects.run(action, device_id) end)
    end)
  end

  # Helper to find a process via Registry
  defp find_process(device_id) do
    case Registry.lookup(StateMachine.Registry, device_id) do
      [{pid, _}] -> {:ok, pid}
      [] ->
        # 进程不存在,可能是首次交互或进程崩溃后未重启
        # 尝试启动它,这种模式称为 "start on demand"
        Logger.info("Process for #{device_id} not found, attempting to start.")
        StateMachine.Supervisor.start_child(device_id)
    end
  end

  defp via_tuple(device_id) do
    {:via, Registry, {StateMachine.Registry, device_id}}
  end
end

2. XState 状态机解释器

StateMachine.Executor是Elixir端的XState解释器。它的核心职责是接收当前状态、一个事件和状态机定义,然后计算出下一个有效状态。为了简化,我们这里只实现核心的转移逻辑,一个生产级的解释器还需要处理 guards, actions, services 等。

# lib/state_machine/executor.ex
defmodule StateMachine.Executor do
  # 代表状态机的当前状态,value是当前状态节点,context是状态机上下文数据
  defstruct value: nil, context: %{}

  def init(machine_def) do
    %__MODULE__{
      value: machine_def["initial"],
      context: machine_def["context"] || %{}
    }
  end

  def transition(%__MODULE__{value: current_state_name} = current_state, event, machine_def) do
    event_type = Map.get(event, "type")

    # 在状态机定义中查找当前状态节点
    with {:ok, state_node} <- find_state_node(machine_def, current_state_name),
         # 在当前状态节点中查找事件对应的转移规则
         {:ok, transition_def} <- find_transition(state_node, event_type) do

      target_state_name = transition_def["target"]
      actions = transition_def["actions"] || []

      next_state = %{current_state | value: target_state_name}
      # 生产级实现中,这里还需要处理 context 的更新

      {:ok, next_state, actions}
    else
      _error -> {:error, :invalid_transition}
    end
  end

  defp find_state_node(machine_def, state_name) do
    Map.fetch(machine_def["states"], state_name)
  end

  defp find_transition(state_node, event_type) do
    Map.get(state_node, "on", %{}) |> Map.fetch(event_type)
  end
end

下面是一个示例 device_provisioning.json 状态机定义:

{
  "id": "deviceProvisioning",
  "initial": "uninitialized",
  "context": {},
  "states": {
    "uninitialized": {
      "on": {
        "START_PROVISIONING": { "target": "validating_firmware" }
      }
    },
    "validating_firmware": {
      "on": {
        "VALIDATION_SUCCESS": {
          "target": "downloading_config",
          "actions": ["log_validation_success"]
        },
        "VALIDATION_FAILURE": {
          "target": "provisioning_failed",
          "actions": ["log_validation_failure"]
        }
      }
    },
    "downloading_config": {
      "on": {
        "DOWNLOAD_SUCCESS": {
          "target": "applying_config",
          "actions": ["trigger_device_reboot"]
        },
        "DOWNLOAD_FAILURE": {
          "target": "provisioning_failed"
        }
      }
    },
    "applying_config": {
      "on": {
        "APPLY_SUCCESS": { "target": "provisioning_complete" },
        "APPLY_FAILURE": { "target": "provisioning_failed" }
      }
    },
    "provisioning_failed": {
      "type": "final"
    },
    "provisioning_complete": {
      "type": "final"
    }
  }
}

3. 持久化层

持久化层我们选择使用Ecto和PostgreSQL,这对于大多数云服务商环境都是稳定且通用的选择。我们需要一张表来存储事件。

# priv/repo/migrations/...._create_state_machine_events.exs
defmodule MyApp.Repo.Migrations.CreateStateMachineEvents do
  use Ecto.Migration

  def change do
    create table(:state_machine_events) do
      add :machine_id, :string, null: false # Corresponds to device_id
      add :event_payload, :map, null: false
      add :sequence, :bigint, null: false

      timestamps()
    end

    create index(:state_machine_events, [:machine_id])
    # 唯一性约束确保事件序列的完整性
    create unique_index(:state_machine_events, [:machine_id, :sequence])
  end
end

# lib/state_machine/persistence.ex
defmodule StateMachine.Persistence do
  alias MyApp.Repo
  import Ecto.Query

  # 简化模型,实际项目中会有一个完整的 Ecto.Schema
  def persist_event(device_id, event_payload) do
    # 在真实项目中,这里需要一个事务来获取当前最大 sequence 并 +1
    # 避免竞态条件。或使用数据库的自增序列。
    # 为简化,我们假设 sequence 是由客户端或调用方提供的。
    sequence = get_next_sequence(device_id)

    params = %{
      machine_id: device_id,
      event_payload: event_payload,
      sequence: sequence
    }

    # 使用 Ecto.Multi 来保证原子性
    Ecto.Multi.new()
    |> Ecto.Multi.insert(:event, Ecto.Changeset.change(%{}, params))
    |> Repo.transaction()
    |> case do
      {:ok, _} -> :ok
      {:error, _, reason, _} ->
        # 强大的错误处理和日志记录
        Logger.error("Failed to persist event for #{device_id}: #{inspect(reason)}")
        {:error, reason}
    end
  end

  def load_events(device_id) do
    from(e in "state_machine_events",
      where: e.machine_id == ^device_id,
      order_by: [asc: e.sequence, asc: e.id]
    )
    |> Repo.all()
    |> Enum.map(& &1.event_payload)
  end

  # ... get_next_sequence/1 实现 ...
end

架构整合与通信

下图展示了整个系统的架构和数据流:

sequenceDiagram
    participant MobileApp as Mobile App (XState)
    participant CloudLB as Cloud Load Balancer
    participant Phoenix as Phoenix Channel
    participant Supervisor as StateMachine.Supervisor
    participant Process as Machine Process (GenServer)
    participant Postgres as PostgreSQL (Events)

    MobileApp->>+CloudLB: WebSocket Connect
    CloudLB->>+Phoenix: Upgrade Connection
    Phoenix-->>-CloudLB: Connected
    CloudLB-->>-MobileApp: Connected

    MobileApp->>Phoenix: dispatch_event("device-123", {type: "START_PROVISIONING"})
    Phoenix->>Supervisor: find_or_start_child("device-123")
    alt Process Not Running
        Supervisor->>+Process: start_link("device-123")
        Process->>+Postgres: load_events("device-123")
        Postgres-->>-Process: [event1, event2, ...]
        Process->>Process: Replay events to reach current state
        Note right of Process: Process initialized and registered
    end
    Phoenix->>Process: GenServer.cast({:dispatch, event})
    Process->>Process: Executor.transition(state, event)
    alt Valid Transition
        Process->>+Postgres: persist_event(event)
        Postgres-->>-Process: :ok
        Process->>Phoenix: (Internal) Broadcast state_changed
        Phoenix->>MobileApp: push("state_changed", {value: "validating_firmware"})
    else Invalid Transition
        Process->>Process: Log warning, state unchanged
    end
    MobileApp->>MobileApp: Update UI based on new state

移动客户端通过Phoenix Channel与后端通信。当用户执行操作时,客户端首先在本地XState实例上进行乐观的状态转移并更新UI,然后将相应的事件发送到服务端。服务端Machine Process作为最终权威,验证该事件。如果转移有效,则持久化事件并向所有订阅了该device_id的客户端广播最新的权威状态。如果无效(例如,由于离线期间服务端状态已发生变化),客户端会收到权威状态更新,并强制将本地状态机重置为与服务端一致的状态。

局限性与未来迭代路径

这个架构虽然解决了核心痛点,但在生产环境中仍有一些需要权衡和优化的方面:

  1. 状态重放性能: 对于生命周期极长、事件数量巨大的状态机,每次启动时都从头重放所有事件可能会成为性能瓶颈。一个常见的优化是引入快照(Snapshotting)机制。例如,每100个事件,系统就将当前状态机的完整状态持久化一次。恢复时,只需加载最新的快照,然后重放该快照之后发生的事件即可。

  2. 复杂的离线冲突合并: 当前模型中,服务端的GenServer是单一写入点,有效避免了服务端的竞态条件。但对于离线优先的客户端,可能会出现更复杂的冲突。例如,客户端离线时执行了 A -> B -> C 的转换,而在此期间,服务端因超时自动触发了一个 A -> FAILED 的转换。当客户端重新上线并提交事件时,如何合并这种分叉的历史是一个难题。这可能需要引入更高级的数据结构,如CRDTs(无冲突复制数据类型),或者在业务层面定义明确的冲突解决策略。

  3. 集群扩展: 当前方案在单个Elixir节点上运行良好。当需要扩展到多节点集群时,需要确保来自特定device_id的所有请求都能被路由到托管其GenServer进程的那个节点。可以使用HordePhoenix.PubSub.PG2等库来实现分布式进程注册和消息传递,但这会增加系统的复杂性。

  4. 云服务商特定优化: 该架构是云无关的。但可以进一步利用云服务商提供的托管服务进行优化。例如,将事件日志从PostgreSQL迁移到专门的消息队列或事件流服务,如AWS Kinesis或Google Cloud Pub/Sub,可以获得更好的吞吐量和可扩展性,并能方便地与其他事件驱动的服务集成。执行actions的副作用也可以交由Serverless函数(如AWS Lambda)处理,使GenServer本身更轻量、响应更快。


  目录