RxJava Android 创建操作符实战:从数据源到Observable
引言
在Android开发中,创建Observable是使用RxJava的第一步。RxJava提供了丰富的创建操作符,让我们能够将各种数据源转换为可观察序列。本文将深入讲解RxJava中最常用的创建操作符,并通过Android实际案例展示它们的应用场景和最佳实践。
一、基础创建操作符
1. just() - 直接发射数据
just()
是最简单的创建操作符,用于将已有数据转换为Observable:
kotlin
// 发射单个数据 Observable.just("Hello RxJava").subscribe { log -> Log.d("TAG", log) }// 发射多个数据(最多10个) Observable.just(1, 2, 3, 4, 5).subscribe { num -> Log.d("TAG", "Number: $num") }
Android应用场景:
快速创建测试数据
将常量或配置值包装为Observable
2. from() 系列 - 从集合/数组创建
RxJava提供了多种from
操作符来处理集合数据:
kotlin
// fromIterable - 从集合创建 val list = listOf("Apple", "Banana", "Orange") Observable.fromIterable(list).subscribe { fruit -> Log.d("TAG", fruit) }// fromArray - 从数组创建 val array = arrayOf(1, 2, 3) Observable.fromArray(*array).subscribe { num -> Log.d("TAG", num.toString()) }// fromCallable - 延迟执行代码块 Observable.fromCallable {// 模拟耗时操作Thread.sleep(1000)"Result from background" }.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe { result -> textView.text = result}
Android应用场景:
将数据库查询结果转换为流
处理文件读取操作
转换网络请求结果
二、高级创建操作符
3. create() - 完全控制Observable创建
create()
提供了最大的灵活性,可以手动控制数据发射:
kotlin
Observable.create<String> { emitter ->try {// 模拟网络请求val response = mockNetworkRequest()emitter.onNext(response)emitter.onComplete()} catch (e: Exception) {emitter.onError(e)} }.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe({ response -> updateUI(response) },{ error -> showError(error) })
注意事项:
必须考虑背压问题
确保正确调用onComplete/onError
避免在create中直接执行耗时操作
4. defer() - 延迟创建Observable
defer()
直到有订阅者时才创建Observable,确保获取最新数据:
kotlin
var count = 0// 不使用defer - 创建时就确定count值 val badObservable = Observable.just(count)// 使用defer - 订阅时才获取count值 val goodObservable = Observable.defer { Observable.just(count) }count = 5badObservable.subscribe { Log.d("TAG", "Bad: $it") } // 输出0 goodObservable.subscribe { Log.d("TAG", "Good: $it") } // 输出5
Android应用场景:
依赖动态配置的请求
需要获取实时数据的场景
避免值在创建和订阅之间变化的问题
三、时间相关创建操作符
5. interval() - 定时发射数据
interval()
定期发射递增的Long值:
kotlin
// 每1秒发射一个数字,从0开始 Observable.interval(1, TimeUnit.SECONDS).takeUntil(lifecycleEventObservable) // 配合生命周期管理.observeOn(AndroidSchedulers.mainThread()).subscribe { count -> timerText.text = "Count: $count"}
Android应用场景:
实现计时器/倒计时
轮询请求
动画帧控制
6. timer() - 延迟执行
timer()
在指定延迟后发射一个0L:
kotlin
// 3秒后执行 Observable.timer(3, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe { showSplashScreen(false)}
Android应用场景:
启动页延迟跳转
延时执行任务
防抖后的延迟操作
四、Android实战案例
案例1:多个网络请求顺序执行
kotlin
// 使用fromIterable + concatMap实现顺序请求 Observable.fromIterable(apiList).concatMap { api -> RetrofitClient.getService(api).subscribeOn(Schedulers.io())}.observeOn(AndroidSchedulers.mainThread()).subscribe { response ->// 处理每个响应}
案例2:表单验证
kotlin
// 使用combineLatest监听多个输入框 Observable.combineLatest(RxTextView.textChanges(editText1).skip(1),RxTextView.textChanges(editText2).skip(1),RxTextView.textChanges(editText3).skip(1) ) { text1, text2, text3 ->validateForm(text1.toString(), text2.toString(), text3.toString()) }.subscribe { isValid ->submitButton.isEnabled = isValid }
案例3:搜索防抖
kotlin
RxTextView.textChanges(searchEditText).debounce(300, TimeUnit.MILLISECONDS) // 防抖300ms.filter { it.length >= 3 } // 至少3个字符才搜索.distinctUntilChanged() // 内容变化才搜索.switchMap { query ->searchApi.search(query.toString()).onErrorReturn { emptyList() }}.observeOn(AndroidSchedulers.mainThread()).subscribe { results ->updateSearchResults(results)}
五、创建操作符选择指南
操作符 | 适用场景 | 是否延迟执行 | 是否支持背压 |
---|---|---|---|
just() | 已知固定数据 | 否 | 否 |
fromIterable() | 集合/列表数据 | 否 | 否 |
fromCallable() | 需要延迟计算的值 | 是 | 否 |
create() | 完全自定义数据流 | 是 | 需要手动处理 |
defer() | 需要最新值的场景 | 是 | 否 |
interval() | 定时任务 | 是 | 是(Flowable) |
timer() | 延迟任务 | 是 | 是(Flowable) |
结语
掌握RxJava的创建操作符是构建响应式Android应用的基础。在实际开发中,应根据具体场景选择合适的创建方式:
简单数据使用
just()
或fromIterable()
需要最新值使用
defer()
异步任务使用
fromCallable()
定时任务使用
interval()
或timer()
完全自定义流程使用
create()
最佳实践提示:
在Android中始终考虑生命周期管理,避免内存泄漏
合理使用线程调度器,避免主线程阻塞
对于可能产生大量数据的场景,考虑使用Flowable处理背压
复杂的创建逻辑可以封装为可复用的方法
在下一篇文章中,我们将深入探讨RxJava的变换操作符,学习如何对数据流进行转换和处理。