当前位置: 首页 > article >正文

ABP VNext + Webhook:订阅与异步回调

🚀 ABP VNext + Webhook:订阅与异步回调


📚 目录

  • 🚀 ABP VNext + Webhook:订阅与异步回调
    • 🎯 一、背景切入:如何优雅地支持第三方回调?
    • 🏗 二、系统架构设计
    • 🔍 三、核心能力实现
      • 3.1 🔐 签名验证(防伪造)
        • 接口定义
        • 实现示例(Wxpay)
      • 3.2 🔄 幂等控制(防重复处理)
        • 接口与实现
      • 🛠️ 3.3 多厂商处理策略
        • 接口
        • 策略工厂
    • 🔁 四、关键流程图
      • 4.1 请求处理流程
      • 4.2 重试工作流程
    • 🛠️ 3.5 接收控制器(统一入口)
    • 📈 五、DevOps & 监控
      • 1. Prometheus 指标
      • 2. 健康检查
    • ✅ 六、测试
      • 6.1 单元测试
        • 6.1.1 签名校验测试(xUnit)
        • 6.1.2 幂等服务并发安全测试
        • 6.1.3 重试 Worker 测试
      • 6.2 集成测试(Testcontainers)


🎯 一、背景切入:如何优雅地支持第三方回调?

在现代分布式系统中,Webhook 是实现系统解耦和异步通知的重要手段,广泛用于支付通知、审核结果返回、消息推送等场景。但在实践中,我们需要同时解决以下挑战:

  • 🔐 安全防护:如何防止伪造请求?
  • 🔄 幂等控制:如何避免重复处理同一事件?
  • ⚙️ 失败重试:如何确保最终一致性,并避免无限重试?
  • 💼 多厂商 & 多通道:如何优雅地支持不同支付/消息通道?
  • 📊 可观测 & 可运维:如何快速诊断、监控并手动补偿?

🏗 二、系统架构设计

支付厂商
🔐 签名验证
🔄 幂等校验
💾 日志持久化
💾 日志持久化
🔄 重试调度
📈 指标埋点
📈 指标埋点
📊 可视化
微信支付
支付宝
Stripe
Webhook 接收中心
签名验证服务
幂等校验服务
WebhookLog 存储
后台重试调度中心
Prometheus
Grafana

🔍 三、核心能力实现

3.1 🔐 签名验证(防伪造)

接口定义
public interface ISignatureVerifier
{/// <summary>从安全配置中心获取 Secret</summary>string GetSecret(string provider);/// <summary>签名 Header 名</summary>string HeaderName { get; }bool Verify(string payload, string signature);
}
实现示例(Wxpay)
public class WxSignatureVerifier : ISignatureVerifier, ITransientDependency
{private readonly IDynamicParameterStore _paramStore;public string HeaderName { get; } = "X-Wxpay-Signature";public WxSignatureVerifier(IDynamicParameterStore paramStore)=> _paramStore = paramStore;public string GetSecret(string provider)=> _paramStore.GetOrNullAsync($"Webhook:Secret:{provider}").GetAwaiter().GetResult()?? throw new BusinessException("未配置签名 Secret");public bool Verify(string payload, string signature){var secret   = GetSecret("Wxpay");var expected = ComputeHmac(payload, secret);return ConstantTimeEquals(expected, signature);}private static string ComputeHmac(string data, string key){using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));return Convert.ToHexString(hmac.ComputeHash(Encoding.UTF8.GetBytes(data))).ToLowerInvariant();}private static bool ConstantTimeEquals(string a, string b){if (a.Length != b.Length) return false;int diff = 0;for (int i = 0; i < a.Length; i++)diff |= a[i] ^ b[i];return diff == 0;}
}

3.2 🔄 幂等控制(防重复处理)

接口与实现
public interface IIdempotencyService
{Task<bool> IsProcessedAsync(string eventId);Task<bool> TryProcessAsync(string eventId, Func<Task> handler);
}
public class IdempotencyService : IIdempotencyService, ITransientDependency
{private readonly IDistributedCache        _cache;private readonly IDistributedLockProvider _lockProvider;public IdempotencyService(IDistributedCache cache,IDistributedLockProvider lockProvider){_cache        = cache;_lockProvider = lockProvider;}public async Task<bool> IsProcessedAsync(string eventId)=> await _cache.GetStringAsync(Key(eventId)) != null;public async Task<bool> TryProcessAsync(string eventId, Func<Task> handler){var lockName = $"webhook:lock:{eventId}";var locker   = _lockProvider.Create(lockName);using var handle = await locker.TryAcquireAsync(TimeSpan.FromSeconds(5));if (handle == null)return false; // 获取锁失败if (await IsProcessedAsync(eventId))return true;  // 已处理// 真正执行业务await handler.Invoke();// 缓存标记await _cache.SetStringAsync(Key(eventId),"1",new DistributedCacheEntryOptions {AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(2)});return true;}private static string Key(string id) => $"webhook:processed:{id}";
}

🛠️ 3.3 多厂商处理策略

接口
public interface IPaymentWebhookHandler : ITransientDependency
{string Provider { get; }Task<WebhookResult> HandleAsync(string payload, IDictionary<string, string> headers);
}
策略工厂
public class WebhookHandlerFactory : ITransientDependency
{private readonly IEnumerable<IPaymentWebhookHandler> _handlers;public WebhookHandlerFactory(IEnumerable<IPaymentWebhookHandler> handlers)=> _handlers = handlers;public IPaymentWebhookHandler Get(string provider)=> _handlers.FirstOrDefault(h =>h.Provider.Equals(provider, StringComparison.OrdinalIgnoreCase))?? throw new BusinessException($"不支持的厂商:{provider}");
}

🔁 四、关键流程图

4.1 请求处理流程

POST /api/webhooks/payments/:provider
失败
成功
第一次
重复
客户端请求
Controller
读取原始 Body
🔐 签名校验
返回 401 Unauthorized
提取 eventId
🔄 TryProcessAsync
执行业务 & 持久化日志
返回 Duplicate
返回 OK
📈 记录 Prometheus 指标
结束

4.2 重试工作流程

成功
失败
Yes
No
定时触发 Worker
查询 Failed & RetryCount<10
遍历日志
获取对应 Handler
尝试处理
设置 Success
RetryCount++
RetryCount>5?
设置 Dead + 告警
指数退避等待
更新日志
处理下一个
完成

🛠️ 3.5 接收控制器(统一入口)

[Route("api/webhooks/payments")]
public class WebhookController : AbpController
{private readonly ISignatureVerifier    _verifier;private readonly WebhookHandlerFactory _factory;private readonly IIdempotencyService   _idem;private readonly IRepository<WebhookLog, Guid> _logRepo;public WebhookController(ISignatureVerifier verifier,WebhookHandlerFactory factory,IIdempotencyService idem,IRepository<WebhookLog, Guid> logRepo){_verifier = verifier;_factory  = factory;_idem     = idem;_logRepo  = logRepo;}[HttpPost("{provider}")]public async Task<IActionResult> HandleAsync(string provider){// 1️⃣ 读取原始 Bodyusing var sr = new StreamReader(Request.Body);var payload  = await sr.ReadToEndAsync();// 2️⃣ 签名校验var signature = Request.Headers[_verifier.HeaderName].FirstOrDefault();if (signature == null || !_verifier.Verify(payload, signature))return Unauthorized(new { code = 1001, message = "Invalid signature" });// 3️⃣ 提取 EventIdstring eventId;try{var obj     = JObject.Parse(payload);eventId     = obj["eventId"]?.ToString() ?? throw new FormatException();}catch{return BadRequest(new { code = 1002, message = "Invalid payload" });}// 4️⃣ 幂等 & 业务处理var success = await _idem.TryProcessAsync(eventId, async () =>{var handler = _factory.Get(provider);var result  = await handler.HandleAsync(payload,Request.Headers.ToDictionary(h => h.Key, h => h.Value.FirstOrDefault()));// 5️⃣ 持久化日志await _logRepo.InsertAsync(new WebhookLog{Provider   = provider,Payload    = payload,EventId    = eventId,RetryCount = 0,Status     = result.Success? WebhookStatus.Success: WebhookStatus.Failed});});// 6️⃣ 指标埋点Metrics.WebhookProcessed.WithLabels(provider, success ? "ok" : "duplicate").Inc();return Ok(new { code = 0, message = success ? "OK" : "Duplicate" });}
}

📈 五、DevOps & 监控

1. Prometheus 指标

   public static class Metrics{public static readonly Counter WebhookProcessed =Metrics.CreateCounter("webhook_processed_total","Webhook 处理总数",new CounterConfiguration {LabelNames = new [] { "provider", "status" }});}
   services.AddPrometheusMetrics();app.UseMetricServer();    // /metricsapp.UseHttpMetrics();     // HTTP 请求指标

2. 健康检查

   services.AddHealthChecks().AddCheck<RedisHealthCheck>("redis").AddSqlServer(connStr, name: "sql").AddCheck<CustomWebhookHealthCheck>("webhook_receive");app.UseHealthChecks("/health");
  1. 容器部署(docker-compose.yml
   version: '3.8'services:api:image: yourrepo/webhook-api:latestports:- "5000:80"healthcheck:test: ["CMD", "curl", "-f", "http://localhost/health"]interval: 30sretries: 3redis:image: redis:6db:image: mcr.microsoft.com/mssql/server:2019-latestenvironment:- ACCEPT_EULA=Y- SA_PASSWORD=Your_password123

✅ 六、测试

6.1 单元测试

6.1.1 签名校验测试(xUnit)
public class SignatureVerifierTests
{private readonly ISignatureVerifier _verifier;public SignatureVerifierTests(){// 这里用测试版本的 DynamicParameterStore 返回固定 secretvar paramStore = A.Fake<IDynamicParameterStore>();A.CallTo(() => paramStore.GetOrNullAsync("Webhook:Secret:Wxpay")).Returns(Task.FromResult<string>("test-secret"));_verifier = new WxSignatureVerifier(paramStore);}[Theory][InlineData("payload", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")]  // 示例 hashpublic void Verify_ValidSignature_ReturnsTrue(string payload, string signature){var result = _verifier.Verify(payload, signature);Assert.True(result);}[Fact]public void Verify_InvalidSignature_ReturnsFalse(){var result = _verifier.Verify("payload", "bad-signature");Assert.False(result);}
}
6.1.2 幂等服务并发安全测试
public class IdempotencyServiceTests
{[Fact]public async Task TryProcessAsync_FirstConcurrency_OnlyOnceExecuted(){var cache = new MemoryDistributedCache(new OptionsWrapper<MemoryDistributedCacheOptions>(new MemoryDistributedCacheOptions()));var lockProvider = new DefaultDistributedLockProvider(); // 假设已实现var service = new IdempotencyService(cache, lockProvider);int executeCount = 0;Func<Task> handler = async () =>{await Task.Delay(50);Interlocked.Increment(ref executeCount);};// 并发 10 次调用var tasks = Enumerable.Range(0, 10).Select(_ => service.TryProcessAsync("evt-1", handler)).ToArray();await Task.WhenAll(tasks);// handler 只应执行一次Assert.Equal(1, executeCount);}
}
6.1.3 重试 Worker 测试
public class WebhookRetryWorkerTests
{[Fact]public async Task DoWorkAsync_FailedLogs_ExponentialBackoffAndDeadLetter(){// 准备内存仓库var logs = new List<WebhookLog>{new WebhookLog { Id = Guid.NewGuid(), Provider="Wxpay", Payload="p", EventId="1", RetryCount=5, Status=WebhookStatus.Failed }};var repo = new InMemoryRepository<WebhookLog, Guid>(logs);var fakeFactory = A.Fake<WebhookHandlerFactory>();// 模拟每次抛异常A.CallTo(() => fakeFactory.Get(A<string>._)).Returns(new FailingHandler());var worker = new WebhookRetryWorker(repo, fakeFactory);using var cts = new CancellationTokenSource();await worker.DoWorkAsync(cts.Token);var updated = logs.Single();Assert.Equal(WebhookStatus.Dead, updated.Status);Assert.Equal(6, updated.RetryCount);}private class FailingHandler : IPaymentWebhookHandler{public string Provider => "Wxpay";public Task<WebhookResult> HandleAsync(string payload, IDictionary<string, string> headers)=> throw new Exception("fail");}
}

6.2 集成测试(Testcontainers)

public class WebhookIntegrationTests : IAsyncLifetime
{private readonly TestcontainerDatabase _redisContainer;private readonly TestcontainerDatabase _sqlContainer;public WebhookIntegrationTests(){_redisContainer = new TestcontainersBuilder<TestcontainersDatabase>().WithDatabase(new RedisTestcontainerConfiguration()).Build();_sqlContainer = new TestcontainersBuilder<TestcontainersDatabase>().WithDatabase(new MsSqlTestcontainerConfiguration{Password = "Your_password123"}).Build();}public async Task InitializeAsync(){await _redisContainer.StartAsync();await _sqlContainer.StartAsync();// 这里可以动态构建 IConfiguration 并启动 TestServer}public async Task DisposeAsync(){await _redisContainer.DisposeAsync();await _sqlContainer.DisposeAsync();}[Fact]public async Task FullWebhookFlow_ReturnsOk(){// 使用 TestServer 调用 APIvar client = TestWebApplicationFactory.CreateClient(new Dictionary<string, string>{["ConnectionStrings:Redis"] = _redisContainer.ConnectionString,["ConnectionStrings:Default"] = _sqlContainer.ConnectionString});var payload   = "{\"eventId\":\"evt-100\",\"data\":{}}";var signature = ComputeTestSignature(payload, "test-secret");var response  = await client.PostAsync("/api/webhooks/payments/Wxpay",new StringContent(payload, Encoding.UTF8, "application/json"));Assert.Equal(HttpStatusCode.OK, response.StatusCode);// 再次调用应返回 Duplicateresponse = await client.PostAsync("/api/webhooks/payments/Wxpay",new StringContent(payload, Encoding.UTF8, "application/json"));var json = await response.Content.ReadAsStringAsync();Assert.Contains("Duplicate", json);}private static string ComputeTestSignature(string payload, string secret){using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(secret));return Convert.ToHexString(hmac.ComputeHash(Encoding.UTF8.GetBytes(payload))).ToLowerInvariant();}
}

说明

  • 使用 DotNet.Testcontainers 启动 Redis 和 SQL Server;
  • 通过 TestWebApplicationFactory 启动完整 ASP.NET Core 应用;
  • 验证首次处理和幂等结果。

http://www.lryc.cn/news/2386134.html

相关文章:

  • Docker(二):开机自启动与基础配置、镜像加速器优化与疑难排查指南
  • Lua基础语法
  • 2025年渗透测试面试题总结-匿名[实习]安全工程师(安全厂商)(题目+回答)
  • 【node.js】实战项目
  • 从AD9361 到 ADSY1100 ,中间的迭代产品历史
  • 免费插件集-illustrator插件-Ai插件-查找选中颜色与pantone中匹配颜色
  • redis集合类型
  • [爬虫实战] 爬微博图片:xpath的具体运用
  • MySQL中简单的操作
  • NNG和DDS
  • 防震基座在半导体晶圆制造设备抛光机详细应用案例-江苏泊苏系统集成有限公司
  • 框架开发与原生开发的权衡:React案例分析(原生JavaScript)
  • Lua5.4.2常用API整理记录
  • Python打卡训练营学习记录Day36
  • ### Mac电脑推送文件至Gitee仓库步骤详解
  • 官方SDK停更后的选择:开源维护的Bugly Unity SDK
  • 什么是智能体agent?
  • 【多线程】Java 实现方式及其优缺点
  • Obsidian 数据可视化深度实践:用 DataviewJS 与 Charts 插件构建智能日报系统
  • Three.js 海量模型加载性能优化指南
  • 6.4.3_有向无环图描述表达式
  • 力扣第157场双周赛
  • 青少年编程与数学 02-019 Rust 编程基础 19课题、项目发布
  • 【HarmonyOS Next之旅】DevEco Studio使用指南(二十五) -> 端云一体化开发 -> 业务介绍(二)
  • LLaMA-Factory 微调模型与训练数据量对应关系
  • 数据库与Redis数据一致性解决方案
  • Spring Boot AI 之 Chat Client API 使用大全
  • 分身空间:手机分身多开工具,轻松实现多账号登录
  • 音视频之视频压缩及数字视频基础概念
  • Ubuntu 24.04部署安装Honeyd蜜罐