当前位置: 首页 > news >正文

响应式编程入门教程第九节:UniRx 高级特性与自定义

响应式编程入门教程第一节:揭秘 UniRx 核心 - ReactiveProperty - 让你的数据动起来!

响应式编程入门教程第二节:构建 ObservableProperty<T> — 封装 ReactiveProperty 的高级用法

响应式编程入门教程第三节:ReactiveCommand 与 UI 交互

响应式编程入门教程第四节:响应式集合与数据绑定

响应式编程入门教程第五节:Unity 生命周期与资源管理中的响应式编程

响应式编程入门教程第六节:进阶?Combine、Merge、SelectMany 与错误处理

响应式编程入门教程第七节:响应式架构与 MVVM 模式在 Unity 中的应用

响应式编程入门教程第八节:UniRX性能分析与优化

[响应式编程入门教程第九节:UniRx 高级特性与自定义](https://blog.csdn.net/qq_46348216/article/details/149413198?fromshare=blogdetail&sharetype=blogdetail&sharerId=149413198&sharerefer=PC&sharesource=qq_46348216&shar
恭喜你坚持到了这个系列的最后一篇!在前八篇中,我们从 ReactiveProperty 的基础概念开始,逐步深入到 ReactiveCommandReactiveCollection、生命周期管理、高级操作符,直到响应式架构的 MVVM 模式和性能优化。现在,我们将探索 UniRx 库更深层次的奥秘:它的内部运作机制,以及如何在必要时自定义行为创建自己的操作符

理解这些高级特性,不仅能让你更好地利用 UniRx,还能帮助你调试更复杂的问题,甚至为库贡献代码。


1. 深入理解 IObservable<T>IObserver<T>

UniRx 的核心是 Rx (Reactive Extensions) 的实现,它基于两个核心接口:

  • IObservable<T> (可观察对象): 表示一个可以发出零个或多个通知(数据、错误、完成)的生产者。
    • 它只有一个方法:IDisposable Subscribe(IObserver<T> observer)。当你调用 Subscribe 时,你就成为了一个观察者,并开始接收来自 IObservable 的通知。同时,Subscribe 返回一个 IDisposable,用于取消订阅。
  • IObserver<T> (观察者): 表示一个接收来自 IObservable 通知的使用者。
    • 它有三个核心方法:
      • void OnNext(T value):当 IObservable 发出一个新值时调用。
      • void OnError(Exception error):当 IObservable 遇到错误时调用。错误会终止序列。
      • void OnCompleted():当 IObservable 完成时调用。完成也会终止序列。

所有的操作符(如 Where, Select, Merge 等)都是基于这两个接口构建的。当你链式调用操作符时,实际上是在构建一个观察者链条,数据流从上游向下游传递。

示例:手动实现一个简单的 Observable

这部分通常不需要在日常开发中手动编写,但有助于理解原理。

using System;
using UniRx;
using UnityEngine;// 自定义一个简单的 Observable,每秒发出一个递增的数字
public class MySimpleObservable : IObservable<int>
{private class MyDisposable : IDisposable{private Action _onDispose;public MyDisposable(Action onDispose){_onDispose = onDispose;}public void Dispose(){_onDispose?.Invoke();_onDispose = null;}}public IDisposable Subscribe(IObserver<int> observer){Debug.Log("MySimpleObservable: 新的观察者订阅了。");int count = 0;// 模拟一个每秒递增的异步操作IDisposable timerDisposable = Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(_ =>{observer.OnNext(count++); // 发送下一个值},observer.OnError, // 传递错误observer.OnCompleted // 传递完成信号);// 返回一个 IDisposable,当调用 Dispose 时,取消内部的计时器订阅return new MyDisposable(() =>{Debug.Log("MySimpleObservable: 订阅被取消。");timerDisposable.Dispose();});}
}public class ObservableUnderstanding : MonoBehaviour
{void Start(){// 订阅我们自定义的 ObservableMySimpleObservable myObservable = new MySimpleObservable();IDisposable subscription = myObservable.Subscribe(new DebugObserver()); // 使用自定义观察者// 也可以使用 UniRx 的扩展方法// IDisposable subscription = myObservable.Subscribe(//     x => Debug.Log($"接收到值: {x}"),//     ex => Debug.LogError($"接收到错误: {ex.Message}"),//     () => Debug.Log("流已完成")// ).AddTo(this); // 确保订阅被清理Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(_ =>{Debug.Log("5秒后手动取消订阅...");subscription.Dispose(); // 手动取消订阅}).AddTo(this);}
}// 一个简单的 IObserver 实现,用于打印接收到的事件
public class DebugObserver : IObserver<int>
{public void OnNext(int value){Debug.Log($"[DebugObserver] OnNext: {value}");}public void OnError(Exception error){Debug.LogError($"[DebugObserver] OnError: {error.Message}");}public void OnCompleted(){Debug.Log("[DebugObserver] OnCompleted");}
}

这个例子展示了 IObservable 如何接受一个 IObserver 并返回 IDisposable。当 IDisposableDispose 时,它会清理内部资源(这里是停止了 Observable.Interval 的计时器),从而停止向观察者发送通知。


2. Hot Observable vs. Cold Observable

这是响应式编程中一个重要的概念:

  • Cold Observable (冷可观察对象):

    • 每次有新的观察者订阅时,它都会重新开始执行其“生产”逻辑。就像一个点播电影,每个观众观看时电影都会从头播放。
    • 示例:Observable.Range, Observable.Create, Observable.FromAsync, Observable.Start 等。
    • 优点: 简单直观,每个订阅者都拥有独立的执行上下文。
    • 缺点: 如果生产逻辑耗时或资源密集,多次订阅会造成重复计算和资源浪费。
  • Hot Observable (热可观察对象):

    • 无论是否有观察者订阅,它都会一直在生产事件。它在订阅发生之前就开始发出事件,并且事件会在所有订阅者之间共享。就像一个直播电视节目,无论你何时打开电视,你都从当前播放的地方开始看,并且所有观众看的是同一个流。
    • 示例:ReactiveProperty, ReactiveCollection, Subject, AsyncSubject, BehaviorSubject, ReplaySubject
    • 优点: 适用于共享资源、避免重复计算、处理实时事件。
    • 缺点: 如果订阅者在流开始后才订阅,会错过之前已经发出的事件(除非使用 ReplaySubjectBehaviorSubject)。

将 Cold Observable 转换为 Hot Observable:

你可以使用 Publish()Share() 等操作符将 Cold Observable 转换为 Hot Observable。

  • Publish() + Connect() Publish() 会返回一个 IConnectableObservable<T>。当你调用 Connect() 方法时,上游的 Cold Observable 才真正开始执行。
  • Publish().RefCount() 这是更常用的模式。RefCount() 会确保当第一个观察者订阅时,上游的 Observable 开始执行;当所有观察者都取消订阅时,上游的 Observable 停止执行。这非常适合那些需要在有消费者时才激活的共享资源。
  • Share() Share()Publish().RefCount() 的一个简写版本,行为相似。
using UnityEngine;
using UniRx;
using System;
using System.Threading.Tasks;public class HotVsCold : MonoBehaviour
{void Start(){// --- Cold Observable 示例 ---// 每次订阅都会重新执行 FromAsync 内部的 Taskvar coldObservable = Observable.FromAsync(async () =>{Debug.Log("Cold Observable: 开始耗时操作...");await Task.Delay(TimeSpan.FromSeconds(1));Debug.Log("Cold Observable: 耗时操作完成。");return "Cold Data";});Debug.Log("订阅 Cold Observable 1");coldObservable.Subscribe(x => Debug.Log($"Cold 1: {x}")).AddTo(this);Debug.Log("订阅 Cold Observable 2 (会再次执行耗时操作)");coldObservable.Subscribe(x => Debug.Log($"Cold 2: {x}")).AddTo(this);// --- Hot Observable (Publish().RefCount()) 示例 ---// 上游的耗时操作只执行一次,结果被共享var hotObservable = Observable.FromAsync(async () =>{Debug.Log("Hot Observable: 开始耗时操作 (只会执行一次)...");await Task.Delay(TimeSpan.FromSeconds(1));Debug.Log("Hot Observable: 耗时操作完成。");return "Hot Data";}).Publish() // 将 Cold 转换为 ConnectableObservable.RefCount(); // 当有订阅者时连接,无订阅者时断开Debug.Log("订阅 Hot Observable 1");hotObservable.Subscribe(x => Debug.Log($"Hot 1: {x}")).AddTo(this);Observable.Timer(TimeSpan.FromSeconds(0.5f)) // 延迟0.5秒订阅.Subscribe(_ =>{Debug.Log("订阅 Hot Observable 2 (会共享结果)");hotObservable.Subscribe(x => Debug.Log($"Hot 2: {x}")).AddTo(this);}).AddTo(this);// 注意:Hot Observable 2 会在流已经开始后才订阅,因此可能会错过流开始时发出的某些事件// 对于这种情况,可以使用 ReplaySubject 或 BehaviorSubject}
}

理解 Hot 和 Cold Observable 对于设计高效的响应式系统至关重要,尤其是在处理共享资源或避免重复计算时。


3. Subject 类型:兼具 Observable 和 Observer 的特性

Subject 是一个特殊类型,它既是 IObservable 又是 IObserver。这意味着你可以:

  • IObserver 一样向它发送值(OnNext)、错误(OnError)或完成信号(OnCompleted)。
  • IObservable 一样被订阅,从而接收它发出的事件。

UniRx 提供了几种不同类型的 Subject,每种都有其特定行为:

  • Subject<T> 最基本的 Subject。它只会向订阅后的观察者发送事件。如果观察者在事件发生前订阅,它会错过这些事件。
    • 应用场景: 实现事件总线 (Event Bus)、手动触发事件。
  • AsyncSubject<T> 只会发出它在完成前发送的最后一个值。当它完成时,它会将这个最终值发射给所有当前和未来的观察者。
    • 应用场景: 只需要异步操作的最终结果,例如一个只执行一次的资源加载,所有等待者都只关心加载成功后的最终资源。
  • BehaviorSubject<T> 当有新的观察者订阅时,会立即发出它最近的一个值,然后才发出后续的值。在创建时需要提供一个默认值。
    • 应用场景: 缓存最新状态,例如当前游戏分数、玩家当前位置,新加入的 UI 模块可以立即获取到最新状态。
  • ReplaySubject<T> 会向所有新订阅的观察者重放所有或指定数量的过去事件。可以设置缓冲区大小或时间窗口。
    • 应用场景: 记录事件历史,例如聊天记录、操作日志,新加入的观察者需要看到过去的事件。
using UnityEngine;
using UniRx;
using System;
using System.Threading.Tasks;public class SubjectTypes : MonoBehaviour
{void Start(){// --- Subject<T> ---Debug.Log("--- Subject<int> 示例 ---");var subject = new Subject<int>();subject.OnNext(100); // 订阅前发出,会被错过subject.Subscribe(x => Debug.Log($"Subject Sub1: {x}")).AddTo(this);subject.OnNext(1);subject.OnNext(2);subject.Subscribe(x => Debug.Log($"Subject Sub2: {x}")).AddTo(this); // 在此之后订阅subject.OnNext(3);subject.OnCompleted(); // 完成流// --- BehaviorSubject<T> ---Debug.Log("--- BehaviorSubject<string> 示例 ---");var behaviorSubject = new BehaviorSubject<string>("初始状态");behaviorSubject.Subscribe(s => Debug.Log($"Behavior Sub1: {s}")).AddTo(this); // 立即收到"初始状态"behaviorSubject.OnNext("状态A");behaviorSubject.OnNext("状态B");behaviorSubject.Subscribe(s => Debug.Log($"Behavior Sub2: {s}")).AddTo(this); // 立即收到"状态B"behaviorSubject.OnNext("状态C");behaviorSubject.OnCompleted();// --- ReplaySubject<T> (缓冲区大小为2) ---Debug.Log("--- ReplaySubject<float> 示例 (缓冲区2) ---");var replaySubject = new ReplaySubject<float>(2); // 缓存最近2个值replaySubject.OnNext(10.1f);replaySubject.OnNext(10.2f);replaySubject.OnNext(10.3f); // 10.1f 会被移除replaySubject.Subscribe(f => Debug.Log($"Replay Sub1: {f}")).AddTo(this); // 收到10.2f, 10.3freplaySubject.OnNext(10.4f);replaySubject.Subscribe(f => Debug.Log($"Replay Sub2: {f}")).AddTo(this); // 收到10.3f, 10.4freplaySubject.OnCompleted();// --- AsyncSubject<T> ---Debug.Log("--- AsyncSubject<bool> 示例 ---");var asyncSubject = new AsyncSubject<bool>();asyncSubject.OnNext(false); // 不是最后一个值asyncSubject.Subscribe(b => Debug.Log($"Async Sub1: {b}")).AddTo(this); // 不会立即收到asyncSubject.OnNext(true); // 最后一个值asyncSubject.OnCompleted(); // 完成时才发出最后一个值asyncSubject.Subscribe(b => Debug.Log($"Async Sub2: {b}")).AddTo(this); // 也会收到最后一个值 (true)}
}

选择正确的 Subject 类型对于管理事件流和数据状态至关重要。


4. 自定义操作符 (Custom Operators)

虽然 UniRx 提供了丰富的操作符,但在某些特定情况下,你可能需要创建自己的操作符来封装复杂的、可复用的逻辑。自定义操作符本质上是返回 IObservable<TResult> 的扩展方法。

创建自定义操作符的步骤:

  1. 定义一个静态类,包含你的扩展方法。
  2. 扩展方法通常接收一个 IObservable<TSource> 作为 this 参数。
  3. 在方法内部,使用 Observable.Create<TResult> 来创建一个新的 Observable。
  4. Observable.Create 的 Lambda 表达式中,实现你的订阅逻辑:订阅上游 Observable,并根据你的业务逻辑向下游观察者发送数据 (observer.OnNext)、错误 (observer.OnError) 或完成信号 (observer.OnCompleted)。
  5. 返回一个 IDisposable,用于清理你在自定义操作符内部创建的所有订阅。

示例:ThrottleWithFirst (在 UniRx 中可能已有类似功能,这里仅作演示)

假设我们想创建一个操作符,它在指定时间窗口内只发射第一个值,并且在窗口结束后,如果还有后续事件,则重新开始计时。这类似于 ThrottleFirst,但更灵活。

using UnityEngine;
using UniRx;
using System;
using System.Threading; // For CancellationTokenSourcepublic static class CustomRxOperators
{// 扩展方法,可以链式调用public static IObservable<TSource> ThrottleWithFirst<TSource>(this IObservable<TSource> source, TimeSpan dueTime){// Observable.Create 用于构建自定义操作符return Observable.Create<TSource>(observer =>{// 用于控制时间窗口的 CancellationTokenSourcevar cts = new CancellationTokenSource();bool isThrottling = false; // 是否处于节流状态object gate = new object(); // 线程锁// 订阅上游 Observablevar disposable = source.Subscribe(value =>{lock (gate){if (!isThrottling){// 如果不在节流状态,发射第一个值observer.OnNext(value);isThrottling = true;// 开启一个计时器,在 dueTime 后解除节流状态Observable.Timer(dueTime).Subscribe(_ =>{lock (gate){isThrottling = false;}}, cts.Token); // 使用 CancellationToken 确保计时器在外部取消订阅时停止}}},observer.OnError, // 错误直接传递给下游observer.OnCompleted // 完成信号直接传递给下游);// 返回一个 IDisposable,用于清理内部资源return new CompositeDisposable(disposable, Disposable.Create(() => cts.Cancel()));});}
}public class CustomOperatorExample : MonoBehaviour
{void Start(){// 模拟一个高频的鼠标点击流Observable.EveryUpdate().Where(_ => Input.GetMouseButtonDown(0)).ThrottleWithFirst(TimeSpan.FromSeconds(1)) // 应用自定义操作符.Subscribe(_ =>{Debug.Log($"鼠标点击 (节流): {Time.time}");}).AddTo(this);}
}

自定义操作符让你能够将复杂的响应式模式封装成简洁、可复用的单元,提升代码的可读性和维护性。然而,编写自定义操作符需要对 Rx 的内部机制有深入理解,并且要特别注意线程安全和资源管理(确保 IDisposable 的正确返回和清理)。


5. UniRx 的 Triggers 命名空间

我们在前面的教程中已经多次使用 UniRx.Triggers 命名空间下的扩展方法(如 OnClickAsObservable, UpdateAsObservable, OnTriggerEnterAsObservable)。这些扩展方法将 Unity MonoBehaviour 的生命周期回调和事件转换为 IObservable 流,极大地简化了 Unity 和响应式编程的集成。

这些 AsObservable() 方法的实现原理通常是在内部为每个 MonoBehaviour 添加一个隐藏的组件(或利用 ObservableStateMachineTrigger 等),来捕获对应的 Unity 事件,然后通过 Subject 发射给订阅者。这使得你的代码可以完全脱离传统的 Unity 回调,以声明式的方式处理所有事件。


6. 单元测试与响应式代码

由于 ViewModel 及其内部的响应式逻辑是纯 C# 代码,这使得它们非常适合进行单元测试。你可以使用任何 .NET 单元测试框架(如 NUnit)来测试你的 ViewModel。

关键在于:

  1. 模拟依赖: 使用 Mocking 框架(如 Moq)来模拟 Model 层或任何外部依赖。
  2. 控制时间: 对于涉及到时间的操作符(如 Delay, Interval, Throttle),需要使用 TestScheduler 来精确控制时间流逝,以便进行确定性测试。

示例(概念性代码,需要 NUnit 和 UniRx.Tests 包):

/*
using NUnit.Framework;
using UniRx;
using UniRx.Async; // 用于测试异步操作
using UniRx.Diagnostics; // 用于测试 Scheduler
using System;
using System.Threading.Tasks;// 假设这是我们要测试的 ViewModel
public class MySimpleViewModel
{public ReactiveProperty<int> Counter { get; private set; } = new ReactiveProperty<int>(0);public ReactiveCommand IncrementCommand { get; private set; }public MySimpleViewModel(){IncrementCommand = Counter.Select(c => c < 5) // 只能增加到5.ToReactiveCommand();IncrementCommand.Subscribe(_ => Counter.Value++);}
}[TestFixture]
public class MySimpleViewModelTests
{// 测试计数器是否正确递增[Test]public void CounterIncrementsCorrectly(){var viewModel = new MySimpleViewModel();Assert.AreEqual(0, viewModel.Counter.Value);viewModel.IncrementCommand.Execute();Assert.AreEqual(1, viewModel.Counter.Value);viewModel.IncrementCommand.Execute();Assert.AreEqual(2, viewModel.Counter.Value);}// 测试命令的可执行性[Test]public void IncrementCommandCanExecute(){var viewModel = new MySimpleViewModel();// 初始时可执行Assert.IsTrue(viewModel.IncrementCommand.CanExecute.Value);// 连续执行5次for (int i = 0; i < 5; i++){viewModel.IncrementCommand.Execute();}// 此时 Counter 应该为 5,命令不可执行Assert.AreEqual(5, viewModel.Counter.Value);Assert.IsFalse(viewModel.IncrementCommand.CanExecute.Value);// 再次尝试执行,Counter 不会改变viewModel.IncrementCommand.Execute();Assert.AreEqual(5, viewModel.Counter.Value);}// 针对异步操作的测试,需要 TestScheduler[Test]public async Task AsyncOperationCompletes(){var testScheduler = new TestScheduler();var asyncData = Observable.FromAsync(async () =>{await Task.Delay(TimeSpan.FromSeconds(1)); // 模拟异步延迟return "Loaded Data";}).ObserveOn(testScheduler); // 在 TestScheduler 上观察string result = null;asyncData.Subscribe(data => result = data);Assert.IsNull(result); // 此时还没有数据testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(500).Ticks); // 推进半秒Assert.IsNull(result); // 还没有完成testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(500).Ticks); // 再推进半秒,总共1秒Assert.AreEqual("Loaded Data", result); // 此时数据应该已加载}
}
*/

单元测试是保证复杂系统质量的基石。响应式编程的声明式特性和对副作用的限制,使得其核心逻辑(通常在 ViewModel 中)比传统命令式代码更容易进行单元测试。


7. 总结与最终展望

至此,我们已经完成了 UniRx 响应式编程的全面入门到深入教程。我们从最基础的 ReactiveProperty 开始,逐步构建起了对整个 UniRx 生态的理解:

  • 数据绑定与 UI 交互: ReactivePropertyReactiveCommand
  • 集合管理: ReactiveCollection
  • 生命周期与资源: 订阅管理、异步操作封装。
  • 复杂逻辑构建: Combine, Merge, SelectMany 等高级操作符。
  • 健壮性: 错误处理和调度器。
  • 架构: MVVM 模式。
  • 性能: Profiler 与优化策略。
  • 底层与扩展: IObservable/IObserver 原理、Hot/Cold Observable、Subject 类型、自定义操作符和单元测试。

UniRx 提供了一个优雅且强大的范式,用于处理 Unity 游戏开发中常见的异步、事件驱动和状态管理问题。它能帮助你编写出更具声明性、可测试性、可维护性和可扩展性的代码。

响应式编程的学习曲线可能相对陡峭,但一旦你掌握了它的思维模式,你会发现它能极大地提升你的开发效率和代码质量。持续实践、查阅文档、阅读优秀的开源项目代码,是巩固和提升响应式编程技能的最佳途径。

希望这个系列的教程能为你打开响应式编程的大门,并帮助你在 Unity 开发中敲定坚实基础!祝你编程顺利!

响应式编程入门教程第一节:揭秘 UniRx 核心 - ReactiveProperty - 让你的数据动起来!

响应式编程入门教程第二节:构建 ObservableProperty<T> — 封装 ReactiveProperty 的高级用法

响应式编程入门教程第三节:ReactiveCommand 与 UI 交互

响应式编程入门教程第四节:响应式集合与数据绑定

响应式编程入门教程第五节:Unity 生命周期与资源管理中的响应式编程

响应式编程入门教程第六节:进阶?Combine、Merge、SelectMany 与错误处理

响应式编程入门教程第七节:响应式架构与 MVVM 模式在 Unity 中的应用

响应式编程入门教程第八节:UniRX性能分析与优化

响应式编程入门教程第九节:UniRx 高级特性与自定义

http://www.lryc.cn/news/594108.html

相关文章:

  • 分治算法---归并
  • 7. 命令模式
  • 一维数组练题习~
  • 算法题(176):three states
  • windows11环境配置torch-points-kernels库编译安装详细教程
  • 如何优雅解决缓存与数据库的数据一致性问题?
  • 循环黑洞:用Python生成银河系特效图
  • tidyverse-数据可视化 - 图形的分层语法
  • Web开发 04
  • Work SSD基础知识
  • jxORM--编程指南
  • 试用SAP BTP 02:试用SAP HANA Cloud
  • MySQL笔记3
  • Oracle触发器:数据世界的“隐形守护者“
  • Uniapp 纯前端台球计分器开发指南:能否上架微信小程序 打包成APP?
  • Github 贪吃蛇 主页设置
  • 将EXCEL或者CSV转换为键值对形式的Markdown文件
  • 【Python数据采集】Python爬取小红书搜索关键词下面的所有笔记的内容、点赞数量、评论数量等数据,绘制词云图、词频分析、数据分析
  • (LeetCode 面试经典 150 题 ) 1. 两数之和 (哈希表)
  • ps2025下载与安装教程(附安装包) 2025最新版photoshop安装教程
  • 在NLP深层语义分析中,深度学习和机器学习的区别与联系
  • MacBook的ARM架构(M芯片)操作虚拟机的docker拉取镜像问题
  • XSS内容总结
  • 【图文详解】Transformer架构详细解析:多头自注意力机制、qkv计算过程、encoder架构、decoder架构以及mask的意义
  • Logback简单使用
  • WiFiMouseServer手机等作为远程输入
  • 进阶向:基于Python的局域网文件传输工具
  • LeetCode|Day20|9. 回文数|Python刷题笔记
  • 多任务学习AITM算法简介
  • Kafka MQ 控制器 broker