ABP VNext + Dapr Workflows:轻量级分布式工作流
🚀 ABP VNext + Dapr Workflows:轻量级分布式工作流
📚 目录
- 🚀 ABP VNext + Dapr Workflows:轻量级分布式工作流
- 一、引言 ✨
- TL;DR 🔥
- 二、环境与依赖 🛠️
- 三、系统架构与流程图 🏗️
- 四、在 ABP 模块中注册 Dapr Workflows 📦
- 五、定义 Workflow 与 Activities 🎯
- 5.1 定义活动(Activity)
- 5.2 定义工作流(Workflow)
- 六、触发与查询工作流 🔍
- 6.1 启动 ABP 应用
- 6.2 发起工作流
- 6.3 暴露查询端点
- 七、示例演示 🎬
- 八、最佳实践与优化 💡
一、引言 ✨
TL;DR 🔥
- 在 ABP VNext 应用中,只需一行
services.AddDaprWorkflow(...)
即可无侵入集成 Dapr Workflow SDK,开启长运行分布式工作流编排 🎉 (Dapr Docs) - 通过
state.redis
或 CosmosDB 等可插拔 State Store 实现跨服务状态持久化与恢复,支持 Saga 补偿模式 🔄 (Dapr Docs) - 定义继承自
Workflow<TInput, TOutput>
的工作流类与WorkflowActivity<TArg, TResult>
的活动类,使用context.CallActivityAsync
保证确定性重放 🛠️ (Diagrid) - 演示“下单—保留库存—扣款—失败补偿”全流程,涵盖高性能、高可用、易复现实践 ✅
背景
在微服务架构中,分布式事务难以扩展,“最终一致性”与 Saga 模式已成主流。Dapr Workflows 提供代码化工作流,基于 DurableTask 引擎在 State Store 中持久化状态,结合补偿与定时器,简化复杂业务的可靠编排。
二、环境与依赖 🛠️
-
.NET 平台:.NET 9,ABP vNext v9.x
-
Dapr 运行时:Dapr CLI ≥1.10;Workflow Runtime v1.15.4
-
NuGet 包:
dotnet add package Dapr.Workflow --version 1.15.4
-
State Store 组件 (
components/statestore.yaml
):apiVersion: dapr.io/v1alpha1 kind: Component metadata:name: statestore spec:type: state.redisversion: v1metadata:- name: redisHostvalue: "localhost:6379"# 生产环境推荐使用支持事务的后端,如 Azure Cosmos DB 或 SQL Server
(Dapr Docs)
-
基础设施:Redis / Azure Cosmos DB;Dapr Sidecar
三、系统架构与流程图 🏗️
- OrderService API 通过 Dapr Sidecar 调用 Workflow 管理 API
- Workflow Runtime 调度活动并将状态写入 State Store,支持断点重放
- Saga 补偿:在失败场景通过补偿活动保证最终一致性
四、在 ABP 模块中注册 Dapr Workflows 📦
using Dapr.Client;
using Dapr.Workflow;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity;public class MyAppModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){// 可选:显式注册 DaprClientcontext.Services.AddDaprClient();// 一行集成 Dapr Workflows,自动注册 Client 与 Workercontext.Services.AddDaprWorkflow(options =>{options.RegisterWorkflow<OrderWorkflow>();options.RegisterActivity<ReserveInventoryActivity>();options.RegisterActivity<ChargePaymentActivity>();options.RegisterActivity<RefundPaymentActivity>();options.RegisterActivity<ReleaseInventoryActivity>();});}
}
AddDaprWorkflow
会自动注册DaprWorkflowClient
、DaprClient
(若未注册)及后台 HostedService,无需额外中间件调用 (Dapr Docs)
五、定义 Workflow 与 Activities 🎯
using Dapr.Workflow;
using Dapr.Workflow.Models;
5.1 定义活动(Activity)
继承自 WorkflowActivity<TArg, TResult>
并重写 RunAsync
,实现幂等逻辑:
public record PaymentInput(Guid OrderId, decimal Amount);public class ReserveInventoryActivity : WorkflowActivity<Guid, bool>
{public override Task<bool> RunAsync(WorkflowActivityContext context,Guid orderId){// 调用库存服务,保证幂等return Task.FromResult(true);}
}public class ChargePaymentActivity : WorkflowActivity<PaymentInput, bool>
{public override Task<bool> RunAsync(WorkflowActivityContext context,PaymentInput input){// 调用支付服务,保证幂等return Task.FromResult(true);}
}
(Diagrid)
5.2 定义工作流(Workflow)
继承自 Workflow<OrderDto, object>
,在 RunAsync
中编排活动并处理补偿:
public class OrderWorkflow : Workflow<OrderDto, object>
{public override async Task<object> RunAsync(WorkflowContext context,OrderDto order){var logger = context.CreateReplaySafeLogger<OrderWorkflow>();logger.LogInformation("Order {OrderId} 开始", order.Id);try{await context.CallActivityAsync<bool>(nameof(ReserveInventoryActivity),order.Id);await context.CallActivityAsync<bool>(nameof(ChargePaymentActivity),new PaymentInput(order.Id, order.Amount));}catch (Exception ex){logger.LogWarning(ex, "执行失败,开始补偿");await context.CallActivityAsync<bool>(nameof(RefundPaymentActivity),order.Id);await context.CallActivityAsync<bool>(nameof(ReleaseInventoryActivity),order.Id);throw;}logger.LogInformation("Order {OrderId} 完成", order.Id);return null!;}
}
(Diagrid)
六、触发与查询工作流 🔍
6.1 启动 ABP 应用
dapr run \--app-id order-api \--app-port 5000 \--dapr-http-port 3500 \--components-path ./components \dotnet run
6.2 发起工作流
using Dapr.Workflow;public class OrderAppService : ApplicationService
{public async Task<string> CreateOrderAsync(CreateOrderDto dto){var client = ServiceProvider.GetRequiredService<DaprWorkflowClient>();string instanceId = Guid.NewGuid().ToString();await client.ScheduleNewWorkflowAsync(workflowName: nameof(OrderWorkflow),instanceId: instanceId,input: dto);return instanceId;}// 新增:查询工作流状态public async Task<WorkflowState> GetWorkflowStateAsync(string instanceId){var client = ServiceProvider.GetRequiredService<DaprWorkflowClient>();return await client.GetWorkflowStateAsync(instanceId, includeInputsAndOutputs: true);}
}
使用
ScheduleNewWorkflowAsync
启动实例 (Dapr Docs)
6.3 暴露查询端点
using Dapr.Workflow;
using Microsoft.AspNetCore.Mvc;[ApiController]
[Route("api/workflows")]
public class WorkflowController : ControllerBase
{private readonly DaprWorkflowClient _client;public WorkflowController(DaprWorkflowClient client) => _client = client;[HttpGet("{instanceId}")]public async Task<IActionResult> Get(string instanceId){var state = await _client.GetWorkflowStateAsync(instanceId, includeInputsAndOutputs: true);return Ok(state);}
}
curl http://localhost:5000/api/workflows/{instanceId}
- 返回 JSON 包含
RuntimeStatus
、输入输出、历史事件等信息。
七、示例演示 🎬
-
基础设施
docker run -d --name redis -p 6379:6379 redis dapr init --runtime-version v1.10
-
运行应用并发起订单
curl -X POST http://localhost:5000/api/orders \-H "Content-Type: application/json" \-d '{"productId":"123","quantity":1,"amount":100}'
-
查询状态
curl http://localhost:5000/api/workflows/{instanceId}
-
模拟失败:在
ChargePaymentActivity
抛出异常,验证补偿活动自动执行 💥
八、最佳实践与优化 💡
- 幂等性:活动内部调用尽量幂等,防止重试产生副作用。
- 超时与重试:结合 Durable Timers 及
RetryOptions
控制超时与重试。 - 并行与分支:可在工作流中使用
Task.WhenAll(...)
或动态CallActivityAsync
实现并行。 - 版本兼容:升级工作流时,通过前缀或迁移逻辑兼容老实例。
- 生产环境:推荐使用支持事务回滚的 State Store(Cosmos DB、SQL Server)替代 Redis (Dapr Docs)