新闻中心
Reactor框架中非阻塞地聚合多个Flux流结果到单个Mono对象

本文将深入探讨在Project Reactor框架中,如何高效且非阻塞地将多个独立的Flux流的聚合结果合并为一个单一的Mono对象。通过详细分析常见的错误模式,并引入Reactor提供的zip操作符,我们将展示如何优雅地实现这一目标,确保应用程序的响应性和并发性。
引言:响应式数据流聚合的挑战
在响应式编程中,我们经常需要从不同的异步源获取数据流,并在所有数据都可用后将它们组合成一个统一的结果对象。例如,您可能需要从两个不同的服务获取成功账户列表和失败账户列表,然后将它们封装在一个Payments对象中。
一个常见的错误尝试是,在获取到Flux流后,立即调用collectList().subscribe()来获取数据,并尝试在订阅回调外部构建结果。然而,这种做法通常会导致阻塞,因为它试图在响应式流完成之前,同步地访问其结果。在Reactor中,subscribe()方法是非阻塞的,但如果您在订阅回调之外立即依赖其副作用来构建一个同步对象,那么在异步操作完成之前,您将无法获得所需的数据,从而引入阻塞或不确定的行为。
考虑以下数据模型和初始的错误尝试:
package org.example;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import j*a.util.List;
@Getter
@Builder
@ToString
public class Payments {
private List<SuccessAccount> successAccounts;
private List<FailedAccount> failedAccounts;
@Getter
@Builder
@ToString
public static class SuccessAccount {
private String name;
private String accountNumber;
}
@Getter
@Builder
@ToString
public static class FailedAccount {
private String name;
private String accountNumber;
private String errorCode;
}
}以及一个试图聚合的错误方法:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import j*a.util.ArrayList;
import j*a.util.List;
public class Main {
public static Mono<Payments> getPaymentDataIncorrect() {
Flux<Payments.SuccessAccount> accountsSucceeded = getAccountsSucceeded();
Flux<Payments.FailedAccount> accountsFailed = getAccountsFailed();
List<Payments.SuccessAccount> successAccounts = new ArrayList<>();
List<Payments.FailedAccount> failedAccounts = new ArrayList<>();
// 这种方式是阻塞的,因为它试图在异步操作完成前同步地填充列表
accountsFailed.collectList().subscribe(failedAccounts::addAll);
accountsSucceeded.collectList().subscribe(successAccounts::addAll);
// 在此处,successAccounts和failedAccounts可能还未被填充
return Mono.just(Payments.builder()
.failedAccounts(failedAccounts)
.successAccounts(successAccounts)
.build());
}
// ... getAccountsSucceeded() 和 getAccountsFailed() 方法省略,与原始问题相同
}上述代码中的accountsFailed.collectList().subscribe(failedAccounts::addAll)和accountsSucceeded.collectList().subscribe(successAccounts::addAll)虽然subscribe本身是非阻塞的,但它不会立即填充failedAccounts和successAccounts。当Mono.just()被调用时,这两个列表很可能仍然是空的,因为订阅的回调是异步执行的。这导致了逻辑上的错误,并且如果强制同步等待,则会引入阻塞。
使用 zip 操作符实现非阻塞聚合
Project Reactor提供了zip操作符来解决这种场景。zip操作符能够将多个Publisher(例如Mono或Flux)的元素按照索引进行组合,当所有参与的Publisher都发出一个元素时,zip操作符会将这些元素组合成一个新的元素。
独响
一个轻笔记+角色扮演的app
249
查看详情
在我们的案例中,我们需要将两个Flux流的最终聚合结果(即List)组合起来。首先,我们可以使用collectList()操作符将每个Flux转换为一个Mono,表示该流所有元素的列表。然后,我们就可以使用Mono.zipWith()来组合这两个Mono
。
Mono.zipWith()接受另一个Mono作为参数,以及一个BiFunction(或更高阶的函数,如zip有多个重载),该函数定义了如何将两个Mono发出的结果组合成一个新的结果。
以下是使用zipWith操作符的正确实现:
package org.example;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import j*a.util.List;
public class Main {
public static void main(String[] args) {
getPaymentData().subscribe(System.out::println);
}
public static Mono<Payments> getPaymentData() {
// 1. 获取两个独立的Flux流
Flux<Payments.SuccessAccount> accountsSucceededFlux = getAccountsSucceeded();
Flux<Payments.FailedAccount> accountsFailedFlux = getAccountsFailed();
// 2. 将每个Flux转换为一个Mono<List>
// collectList() 会收集Flux中的所有元素,并在Flux完成时发出一个包含这些元素的List
Mono<List<Payments.FailedAccount>> failedAccountsMono = accountsFailedFlux.collectList();
Mono<List<Payments.SuccessAccount>> successAccountsMono = accountsSucceededFlux.collectList();
// 3. 使用 Mono.zipWith() 组合两个 Mono<List>
// zipWith 会等待两个Mono都发出其结果,然后使用提供的BiFunction进行组合
Mono<Payments> combined = failedAccountsMono.zipWith(
successAccountsMono,
(failedAccounts, successAccounts) -> Payments.builder()
.failedAccounts(failedAccounts)
.successAccounts(successAccou
nts)
.build()
);
return combined;
}
// 模拟获取成功账户的Flux流
public static Flux<Payments.SuccessAccount> getAccountsSucceeded() {
return Flux.just(Payments.SuccessAccount.builder()
.accountNumber("1234345")
.name("Payee1")
.build(),
Payments.SuccessAccount.builder()
.accountNumber("83673674")
.name("Payee2")
.build());
}
// 模拟获取失败账户的Flux流
public static Flux<Payments.FailedAccount> getAccountsFailed() {
return Flux.just(Payments.FailedAccount.builder()
.accountNumber("12234345")
.name("Payee3")
.errorCode("8938")
.build(),
Payments.FailedAccount.builder()
.accountNumber("3342343")
.name("Payee4")
.errorCode("8938")
.build());
}
}在这个修正后的实现中:
- getAccountsSucceeded() 和 getAccountsFailed() 方法返回了两个独立的 Flux 流。
- accountsFailedFlux.collectList() 和 accountsSucceededFlux.collectList() 将这两个 Flux 转换为两个 Mono
- 。这些 Mono 会在各自的 Flux 完成收集所有元素后发出一个 List。
- failedAccountsMono.zipWith(successAccountsMono, ...) 操作符会等待 failedAccountsMono 和 successAccountsMono 都发出它们的 List 结果。一旦两个结果都可用,zipWith 会调用提供的 BiFunction(在本例中是一个Lambda表达式),将这两个 List 作为参数传入,并使用它们构建一个 Payments 对象。
- 最终,zipWith 操作符返回一个 Mono
,它会在 Payments 对象成功构建后发出该对象。整个过程是非阻塞的,并且完全符合响应式编程范式。
关键概念与优势
- 非阻塞性: zip 操作符是完全非阻塞的。它不会在等待上游Publisher发出元素时阻塞当前线程。相反,它会注册订阅,并在元素可用时异步地处理它们。
- 并发执行: zip 操作符的两个上游Publisher(在本例中是两个 collectList() 操作)可以并发地执行。这意味着获取成功账户和失败账户的数据流可以同时进行,从而提高整体效率。
- 结果组合的原子性: zip 确保只有当所有参与的Publisher都准备好发出一个元素时,组合函数才会被调用。这保证了在创建 Payments 对象时,所需的两个 List 数据都是完整且可用的。
- 错误处理: 如果任何一个上游 Mono 在发出其 List 之前失败,zip 操作符将立即传播该错误,而不会等待其他 Mono 完成。
总结
在Project Reactor中,当需要将多个独立的异步数据流(Flux或Mono)的最终结果聚合成一个单一的响应式对象时,zip操作符是首选的非阻塞解决方案。通过将每个Flux首先转换为一个Mono(使用collectList()),然后利用Mono.zipWith()结合一个自定义的组合函数,可以优雅且高效地实现复杂的聚合逻辑,同时保持应用程序的响应性和并发性。避免在响应式流中进行同步阻塞操作是构建高性能、可伸缩的响应式系统的关键。
以上就是Reactor框架中非阻塞地聚合多个Flux流结果到单个Mono对象的详细内容,更多请关注其它相关文章!
# java
# react
# 回调
# 并在
# 会在
# 转换为
# 这两个
# 中非
# 多个
# 响应式编程
# ai
# 北仑区独栋家装网站建设
# 什么是网站建设技术
# 松江区推广网站价格多少
# 重庆seo关键词优化
# 网站建设有问林洁
# 文章百度关键词排名
# 佛山seo搜索优化报价
# 北京seo如何排名靠前
# 商丘网站关键词优化排名
# 网络营销推广和数据分析
# 应用程序
# 因为它
# 所需
相关栏目:
【
科技资讯46185 】
【
网络学院92790 】
相关推荐:
Python实时数据流中的动态最值查找策略
俄罗斯搜索引擎Yandex指南 附2025年免登录官网入口
深入理解Go语言中的指针类型:以*string为例
拷贝漫画电脑版官网入口 拷贝漫画(PC版)在线直达
Angular响应式表单:实现提交后表单及按钮的禁用与只读化
如何在离线环境中使用Composer_Composer离线安装依赖包的技巧与策略
J*aScript中如何高效提取对象指定属性
4399免费游戏网址入口 4399小游戏免费入口点开即玩
《GTA6》开发画面疑似泄露!这次可不是AI了
韩剧圈正版入口页面_韩剧圈官网登录链接
拼多多购物车商品数量无法修改如何处理 拼多多购物车操作优化方法
KFC游戏互动怎么赢取优惠券_KFC线上游戏活动参与与优惠代码赢取教程
C#使用XPath查询节点时出错? 常见语法错误与调试技巧
MinIO大规模对象列表性能瓶颈深度解析与外部元数据管理策略
MongoDB聚合管道:正确匹配对象数组中_id的方法
qq游戏网页版直接玩_qq游戏免下载快速入口
12306选座怎么选到商务座_12306商务座选择与配置说明
抖音隐秘迷城小游戏入口_ 抖音冒险解谜小游戏秒玩
《北京人工智能产业白皮书(2025)》发布:全年核心产值预计突破 4500 亿元
2025年云电脑操作系统体验 | 无需本地硬件,随时随地使用高性能PC
使用 Pandas 高效处理 .dat 文件:字符清理与数据计算
PySpark中高效提取字符串右侧可变长度数字:使用regexp_extract
优化 Jest 模拟:强制未实现函数抛出错误以提升测试效率
EMS快递官网app_中国邮政速递物流手机客户端
12306选座如何查看座位示意图_12306座位示意图解读与使用
火锅吃太多会怎样 火锅吃太多会上火吗
PyTorch模型训练效果不佳?深入剖析常见错误与调试技巧
ArrayList与LinkedList操作复杂度详解:遍历与修改
J*aScriptWebpack优化_J*aScript构建工具实战
天猫2025双十一0点秒杀攻略 天猫爆款抢购时间
神经网络二分类模型训练异常:高损失与完美验证准确率的排查与修正
2025俄罗斯Yandex最新入口 官方网站地址及浏览器下载指南
PHP表单数据传递:如何通过隐藏输入字段获取动态ID
Angular中单选按钮的正确使用与常见陷阱解析
如何在CSS中使用浮动制作导航栏_float实现水平菜单
Pandas DataFrame 多条件优先级排序与排名
Discord Slash 命令响应超时问题的异步解决方案
Win11怎么设置鼠标指针速度_Win11提高鼠标指针精确度选项
在J*a项目里如何构建对象之间的契约_接口约束的实际落地
LINUX下如何进行磁盘分区_fdisk与parted工具在LINUX中的使用对比
押井守高度称赞《辐射4》:玩了八年都停不下来!
Composer如何处理Git子模块(submodule)依赖_Composer与Git Submodule的对比与选择
excel怎么制作工资条 excel快速生成工资条的方法
J*a如何使用AtomicInteger控制计数_J*a无锁计数器性能分析
QQ邮箱官方网页版登录 QQ邮箱个人邮箱快速访问
魅族17怎样用浏览器译外语网页_iPhone魅族17浏览器译外语网页【即时翻译】
Web Components中自定义开关组件状态同步的常见陷阱与解决方案
抖音商城签到领现金是真的吗_抖音商城签到奖励与提现说明
KFC套餐升级怎么获取优惠代码_KFC套餐升级活动与优惠代码获取方法
汽水音乐在线解析 汽水音乐在线解析入口


2025-12-03
浏览次数:次
返回列表
nts)
.build()
);
return combined;
}
// 模拟获取成功账户的Flux流
public static Flux<Payments.SuccessAccount> getAccountsSucceeded() {
return Flux.just(Payments.SuccessAccount.builder()
.accountNumber("1234345")
.name("Payee1")
.build(),
Payments.SuccessAccount.builder()
.accountNumber("83673674")
.name("Payee2")
.build());
}
// 模拟获取失败账户的Flux流
public static Flux<Payments.FailedAccount> getAccountsFailed() {
return Flux.just(Payments.FailedAccount.builder()
.accountNumber("12234345")
.name("Payee3")
.errorCode("8938")
.build(),
Payments.FailedAccount.builder()
.accountNumber("3342343")
.name("Payee4")
.errorCode("8938")
.build());
}
}