当前位置: 首页 > news >正文

ABP VNext + Dapr Workflows:轻量级分布式工作流

🚀 ABP VNext + Dapr Workflows:轻量级分布式工作流


📚 目录

  • 🚀 ABP VNext + Dapr Workflows:轻量级分布式工作流
    • 一、引言 ✨
      • TL;DR 🔥
    • 二、环境与依赖 🛠️
    • 三、系统架构与流程图 🏗️
    • 四、在 ABP 模块中注册 Dapr Workflows 📦
    • 五、定义 Workflow 与 Activities 🎯
      • 5.1 定义活动(Activity)
      • 5.2 定义工作流(Workflow)
    • 六、触发与查询工作流 🔍
      • 6.1 启动 ABP 应用
      • 6.2 发起工作流
      • 6.3 暴露查询端点
    • 七、示例演示 🎬
    • 八、最佳实践与优化 💡


一、引言 ✨

TL;DR 🔥

  • 在 ABP VNext 应用中,只需一行 services.AddDaprWorkflow(...) 即可无侵入集成 Dapr Workflow SDK,开启长运行分布式工作流编排 🎉 (Dapr Docs)
  • 通过 state.redis 或 CosmosDB 等可插拔 State Store 实现跨服务状态持久化与恢复,支持 Saga 补偿模式 🔄 (Dapr Docs)
  • 定义继承自 Workflow<TInput, TOutput> 的工作流类与 WorkflowActivity<TArg, TResult> 的活动类,使用 context.CallActivityAsync 保证确定性重放 🛠️ (Diagrid)
  • 演示“下单—保留库存—扣款—失败补偿”全流程,涵盖高性能、高可用、易复现实践 ✅

背景
在微服务架构中,分布式事务难以扩展,“最终一致性”与 Saga 模式已成主流。Dapr Workflows 提供代码化工作流,基于 DurableTask 引擎在 State Store 中持久化状态,结合补偿与定时器,简化复杂业务的可靠编排。


二、环境与依赖 🛠️

  • .NET 平台:.NET 9,ABP vNext v9.x

  • Dapr 运行时:Dapr CLI ≥1.10;Workflow Runtime v1.15.4

  • NuGet 包

    dotnet add package Dapr.Workflow --version 1.15.4
    
  • State Store 组件 (components/statestore.yaml):

    apiVersion: dapr.io/v1alpha1
    kind: Component
    metadata:name: statestore
    spec:type: state.redisversion: v1metadata:- name: redisHostvalue: "localhost:6379"# 生产环境推荐使用支持事务的后端,如 Azure Cosmos DB 或 SQL Server
    

    (Dapr Docs)

  • 基础设施:Redis / Azure Cosmos DB;Dapr Sidecar


三、系统架构与流程图 🏗️

State_Store
Workflow_Runtime
ABP_App
ScheduleNewWorkflowAsync
Redis/CosmosDB
OrderWorkflow 实例
ReserveInventoryActivity
ChargePaymentActivity
RefundPaymentActivity
ReleaseInventoryActivity
Dapr Sidecar
OrderService API
  • OrderService API 通过 Dapr Sidecar 调用 Workflow 管理 API
  • Workflow Runtime 调度活动并将状态写入 State Store,支持断点重放
  • Saga 补偿:在失败场景通过补偿活动保证最终一致性

四、在 ABP 模块中注册 Dapr Workflows 📦

using Dapr.Client;
using Dapr.Workflow;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity;public class MyAppModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){// 可选:显式注册 DaprClientcontext.Services.AddDaprClient();// 一行集成 Dapr Workflows,自动注册 Client 与 Workercontext.Services.AddDaprWorkflow(options =>{options.RegisterWorkflow<OrderWorkflow>();options.RegisterActivity<ReserveInventoryActivity>();options.RegisterActivity<ChargePaymentActivity>();options.RegisterActivity<RefundPaymentActivity>();options.RegisterActivity<ReleaseInventoryActivity>();});}
}

AddDaprWorkflow 会自动注册 DaprWorkflowClientDaprClient(若未注册)及后台 HostedService,无需额外中间件调用 (Dapr Docs)


五、定义 Workflow 与 Activities 🎯

using Dapr.Workflow;
using Dapr.Workflow.Models;

5.1 定义活动(Activity)

继承自 WorkflowActivity<TArg, TResult> 并重写 RunAsync,实现幂等逻辑:

public record PaymentInput(Guid OrderId, decimal Amount);public class ReserveInventoryActivity : WorkflowActivity<Guid, bool>
{public override Task<bool> RunAsync(WorkflowActivityContext context,Guid orderId){// 调用库存服务,保证幂等return Task.FromResult(true);}
}public class ChargePaymentActivity : WorkflowActivity<PaymentInput, bool>
{public override Task<bool> RunAsync(WorkflowActivityContext context,PaymentInput input){// 调用支付服务,保证幂等return Task.FromResult(true);}
}

(Diagrid)

5.2 定义工作流(Workflow)

继承自 Workflow<OrderDto, object>,在 RunAsync 中编排活动并处理补偿:

public class OrderWorkflow : Workflow<OrderDto, object>
{public override async Task<object> RunAsync(WorkflowContext context,OrderDto order){var logger = context.CreateReplaySafeLogger<OrderWorkflow>();logger.LogInformation("Order {OrderId} 开始", order.Id);try{await context.CallActivityAsync<bool>(nameof(ReserveInventoryActivity),order.Id);await context.CallActivityAsync<bool>(nameof(ChargePaymentActivity),new PaymentInput(order.Id, order.Amount));}catch (Exception ex){logger.LogWarning(ex, "执行失败,开始补偿");await context.CallActivityAsync<bool>(nameof(RefundPaymentActivity),order.Id);await context.CallActivityAsync<bool>(nameof(ReleaseInventoryActivity),order.Id);throw;}logger.LogInformation("Order {OrderId} 完成", order.Id);return null!;}
}

(Diagrid)


六、触发与查询工作流 🔍

6.1 启动 ABP 应用

dapr run \--app-id order-api \--app-port 5000 \--dapr-http-port 3500 \--components-path ./components \dotnet run

6.2 发起工作流

using Dapr.Workflow;public class OrderAppService : ApplicationService
{public async Task<string> CreateOrderAsync(CreateOrderDto dto){var client = ServiceProvider.GetRequiredService<DaprWorkflowClient>();string instanceId = Guid.NewGuid().ToString();await client.ScheduleNewWorkflowAsync(workflowName: nameof(OrderWorkflow),instanceId: instanceId,input: dto);return instanceId;}// 新增:查询工作流状态public async Task<WorkflowState> GetWorkflowStateAsync(string instanceId){var client = ServiceProvider.GetRequiredService<DaprWorkflowClient>();return await client.GetWorkflowStateAsync(instanceId, includeInputsAndOutputs: true);}
}

使用 ScheduleNewWorkflowAsync 启动实例 (Dapr Docs)

6.3 暴露查询端点

using Dapr.Workflow;
using Microsoft.AspNetCore.Mvc;[ApiController]
[Route("api/workflows")]
public class WorkflowController : ControllerBase
{private readonly DaprWorkflowClient _client;public WorkflowController(DaprWorkflowClient client) => _client = client;[HttpGet("{instanceId}")]public async Task<IActionResult> Get(string instanceId){var state = await _client.GetWorkflowStateAsync(instanceId, includeInputsAndOutputs: true);return Ok(state);}
}
curl http://localhost:5000/api/workflows/{instanceId}
  • 返回 JSON 包含 RuntimeStatus、输入输出、历史事件等信息。

七、示例演示 🎬

  1. 基础设施

    docker run -d --name redis -p 6379:6379 redis
    dapr init --runtime-version v1.10
    
  2. 运行应用并发起订单

    curl -X POST http://localhost:5000/api/orders \-H "Content-Type: application/json" \-d '{"productId":"123","quantity":1,"amount":100}'
    
  3. 查询状态

    curl http://localhost:5000/api/workflows/{instanceId}
    
  4. 模拟失败:在 ChargePaymentActivity 抛出异常,验证补偿活动自动执行 💥


八、最佳实践与优化 💡

  • 幂等性:活动内部调用尽量幂等,防止重试产生副作用。
  • 超时与重试:结合 Durable Timers 及 RetryOptions 控制超时与重试。
  • 并行与分支:可在工作流中使用 Task.WhenAll(...) 或动态 CallActivityAsync 实现并行。
  • 版本兼容:升级工作流时,通过前缀或迁移逻辑兼容老实例。
  • 生产环境:推荐使用支持事务回滚的 State Store(Cosmos DB、SQL Server)替代 Redis (Dapr Docs)

http://www.lryc.cn/news/607074.html

相关文章:

  • stl的MATLAB读取与显示
  • Blender 4.5 安装指南:快速配置中文版,适用于Win/mac/Linux系统
  • 【Mysql】字段隐式转换对where条件和join关联条件的影响
  • 安全专家发现利用多层跳转技术窃取Microsoft 365登录凭证的新型钓鱼攻击
  • 基于Pipeline架构的光存储读取程序 Qt版本
  • 九、Maven入门学习记录
  • 学习游戏制作记录(各种水晶能力以及多晶体)8.1
  • k8s之NDS解析到Ingress服务暴露
  • Wisdom SSH开启高效管理服务器的大门
  • Git之远程仓库
  • 【全网首个公开VMware vCenter 靶场环境】 Vulntarget-o 正式上线
  • Linux(15)——进程间通信
  • VMware 下 Ubuntu 操作系统下载与安装指南
  • 用 TensorFlow 1.x 快速找出两幅图的差异 —— 完整实战与逐行解析 -Python程序图片找不同
  • Ubuntu-Server-24.04-LTS版本操作系统如何关闭自动更新,并移除不必要的内核
  • 使用 Vive Tracker 替代 T265 实现位姿获取(基于 Ubuntu + SteamVR)
  • 【云计算】云主机的亲和性策略(二):集群节点组
  • 如何理解推理模型
  • 渗透作业3
  • 基于C#和NModbus4库实现的Modbus RTU串口通信
  • ansible简单playbook剧本例子2
  • 团购商城 app 系统架构分析
  • 第三篇:几何体入门:内置几何体全解析
  • 无人机气象监测设备:穿梭云端的 “气象观察员”
  • 丝杆支撑座在电子装配中的关键作用
  • NLP 和 LLM 区别、对比 和关系
  • 深入剖析Spring IOC容器——原理、源码与实践全解析
  • mac系统自带终端崩溃修复
  • PAT 1022 Digital Library
  • 关于“PromptPilot” 之5 -标签词与标签动作的语言模型九宫格