新闻中心
Reactor中非阻塞地聚合两个Flux结果为单个Mono的教程

本文将深入探讨在Project Reactor中如何高效、非阻塞地将两个独立的`Flux`流的聚合结果合并成一个`Mono`对象。通过详细分析传统阻塞方法的不足,并引入`zip`操作符,我们将演示如何利用`Mono.zipWith`将两个`Flux`转换为`Mono`,进而安全地组合这些列表,最终生成一个包含所有数据的`Mono
在响应式编程框架Project Reactor中,我们经常会遇到需要从多个异步数据源获取数据,并将这些数据聚合成一个单一结果的场景。例如,从不同的服务或数据库查询成功账户列表和失败账户列表,然后将它们合并到一个统一的Payments对象中。本教程将指导您如何使用Reactor的zip操作符,以非阻塞的方式实现这一目标。
1. 理解问题:传统阻塞方法的陷阱
假设我们有以下领域模型,用于存储支付结果:
package org.example;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import j*a.util.List;
@Getter
@Builder
@ToString
public class Payments {
private List<SuccessAccount> successAccounts;
private List<FailedAccount> failedAccounts;
@Getter
@Builder
@ToString
public static class SuccessAccount {
private String name;
private String accountNumber;
}
@Getter
@Builder
@ToString
public static class FailedAccount {
private String name;
private String accountNumber;
private String errorCode;
}
}我们有两个方法分别返回成功账户和失败账户的Flux流:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import j*a.util.ArrayList;
import j*a.util.List;
public class Main {
public static Flux<Payments.SuccessAccount> getAccountsSucceeded() {
return Flux.just(Payments.SuccessAccount.builder()
.accountNumber("1234345")
.name("Payee1")
.build(),
Payments.SuccessAccount.builder()
.accountNumber("83673674")
.name("Payee2")
.build());
}
public static Flux<Payments.FailedAccount> getAccountsFailed() {
return Flux.just(Payments.FailedAccount.builder()
.accountNumber("12234345")
.name("Payee3")
.errorCode("8938")
.build(),
Payments.FailedAccount.builder()
.accountNumber("3342343")
.name("Payee4")
.errorCode("8938")
.build());
}
// ... main method and getPaymentData will be added later
}初学者可能会尝试通过收集每个Flux的列表,然后手动构建Payments对象。以下是一个常见的错误示例:
// 错误示例:阻塞式操作
public static Mono<Payments> getPaymentDataIncorrect() {
Flux<Payments.SuccessAccount> accountsSucceeded = getAccountsSucceeded();
Flux<Payments.FailedAccount> accountsFailed = getAccountsFailed();
List<Payments.SuccessAccount> successAccounts = new ArrayList<>();
List<Payments.FailedAccount> failedAccounts = new ArrayList<>();
// 这里的 subscribe() 调用是阻塞的,它会等待 Flux 完成并填充列表
// 这破坏了响应式流的非阻塞特性,并且无法在响应式链中无缝衔接
accountsFailed.collectList().subscribe(failedAccounts::addAll);
accountsSucceeded.collectList().subscribe(successAccounts::addAll);
return Mono.just(Payments.builder()
.failedAccounts(failedAccounts)
.successAccounts(successAccounts)
.build());
}上述代码中,collectList().subscribe()虽然可以获取到列表数据,但subscribe()方法本身是阻塞的,它会暂停当前线程直到Flux发出所有元素。这不仅违背了响应式编程的非阻塞原则,也使得getPaymentDataIncorrect方法在Mono.just()返回之前就已经同步地完成了数据收集,无法发挥响应式流的异步优势。在WebFlux等响应式框架中,这种阻塞操作会导致服务器线程池耗尽,严重影响系统吞吐量。
2. 响应式解决方案:zip操作符
Project Reactor提供了一系列强大的操作符来处理这种并行组合多个流的场景,其中zip操作符是解决此问题的理想选择。zip操作符会等待所有上游Publisher都发出一个元素,然后将这些元素通过一个提供的BiFunction(或Function)组合成一个新的元素。
对于将多个Flux的聚合结果合并成一个Mono,我们通常会遵循以下步骤:
Tunee AI
新一代AI音乐智能体
1104
查看详情
2.1 将 Flux 转换为 Mono
首先,我们需要将每个Flux流转换为一个Mono,该Mono在流完成后会发出一个包含所有元素的List。这可以通过collectList()操作符实现:
Mono<List<Payments.FailedAccount>> failedAccountsMono = accountsFailed.collectList(); Mono<List<Payments.SuccessAccount>> successAccountsMono = accountsSucceeded.collectList();
collectList()操作符本身是响应式的,它会创建一个新的Mono。当原始Flux完成时,该Mono会发出一个包含所有收集到的元素的List。这个过程是非阻塞的,它将Flux的多个元素聚合为Mono的单个List元素。
2.2 使用 Mono.zipWith 组合 Mono
一旦我们有了两个Mono,我们就可以使用Mono.zipWith(或静态方法Mono.zip)来组合它们。zipWith方法接受另一个Mono作为参数,以及一个BiFunction来定义如何组合这两个Mono发出的结果:
public static Mono<Payments> getPaymentData() {
Flux<Payments.SuccessAccount> accountsSucceeded = getAccountsSucceeded();
Flux<Payments.FailedAccount> accountsFailed = getAccountsFailed();
Mono<List<Payments.FailedAccount>> failedAccountsMono = accountsFailed.collectList();
Mono<List<Payments.SuccessAccount>> successAccountsMono = accountsSucceeded.collectList();
Mono<Payments> combinedPaymentsMono = failedAccountsMono.zipWith(
successAccountsMono,
(failedList, successList) -> Payments.builder()
.failedAccounts(failedList)
.successAccounts(successList)
.build()
);
return combinedPaymentsMono;
}在这个zipWith调用中:
- failedAccountsMono是源Mono,它会发出失败账户列表。
- successAccountsMono是与之组合的Mono,它会发出成功账户列表。
- BiFunction (failedList, successList) -> Payments.builder()... 定义了如何将这两个Mono发出的List作为参数,组合成最终的Payments对象。
整个操作链是完全非阻塞和响应式的。combinedPaymentsMono将会在两个源Mono都发出其List结果后,才通过BiFunction创建并发出最终的Payments对象。
3. 完整示例代码
为了更清晰地展示,以下是包含领域模型、数据源以及正确getPaymentData方法的完整代码示例:
package org.example;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import j*a.util.List;
public class Main {
public static void main(String[] args) {
// 订阅并打印最终的 Payments 对象
getPaymentData().subscribe(
System.out::println, // onNext:以上就是Reactor中非阻塞地聚合两个Flux结果为单个Mono的教程的详细内容,更多请关注其它相
关文章!
# 模式下
# 信息网站建设口碑推荐
# 孝义外贸网站建设招标
# 来宾创新seo技巧
# 网站建设要做什么准备
# 泰州网站建设实战培训
# 正品网站推广哪家专业
# 溧水区网站关键词优化
# 品牌包装营销推广传播
# 简述网站建设前景分析
# 涟源seo优化推广
# 相关文章
# 会在
# react
# 在这个
# 是一个
# 转换为
# 这两个
# 它会
# 多个
# 中非
# 响应式编程
# ai
# ssl
# java
相关栏目:
【
科技资讯46185 】
【
网络学院92790 】
相关推荐:
理解Python模块与全局变量的作用域管理
ArchiveofOurOwn小说阅读-ArchiveofOurOwn同人作品访问链接
MAC怎么让Dock栏只显示当前运行的应用_MAC终端命令实现极简Dock栏
c++ 命名空间怎么用 c++ namespace使用指南
探索高级语言到C/C++的转译路径:以Go为例及内存管理策略
2025-2030年全球乘用车销量预测:新能源成增长主力
12306怎么选座位选到安静区_12306选座安静区域选择策略
C++指针和引用有什么区别_C++内存管理核心概念深度解析
在Go语言中利用后缀数组处理多字符串:实现高效文本匹配与自动补全
使用Pandas转换并合并DataFrame:多列映射至统一结构
c++如何实现单例设计模式_c++线程安全的单例模式写法
汽水音乐网页版使用入口_汽水音乐电脑版播放指南
J*aScript实现动态背景色下的文本与按钮颜色自适应调整
支付宝如何管理隐私设置_支付宝隐私保护的配置技巧
C#使用XPath查询节点时出错? 常见语法错误与调试技巧
React Router v6 教程:构建认证保护的私有路由与重定向策略
Golang如何测试channel通信行为_Golang channel通信测试与分析方法
J*aScript井字棋(Tic-Tac-Toe)核心交互逻辑实现教程
荣耀Play7TPro怎样在信息App置顶客服对话_iPhone荣耀Play7TPro信息App置顶客服对话【优先查看】
win11 arm版怎么安装 M1/M2 Mac虚拟机安装ARM win11的方法
高德地图家和公司地址在哪设置 高德地图通勤路线设置方法【超详细】
解决macOS Tkinter应用双击启动崩溃:PyInstaller打包指南
Excel中VLOOKUP的第四个参数是干什么用的_Excel VLOOKUP第四参数作用解析
不同用户不同价格! 索尼开启账户个性化定价测试
大象笔记网页版入口 印象笔记网页版登录入口
CSS Flexbox如何实现多行排列_flex-wrap wrap自动换行显示
J*a应用程序首次运行自动创建文件与目录的最佳实践
C#中解析不规范的HTML为XML 常见的坑与解决办法
Kafka Streams中基于消息头条件过滤消息的实现指南
浏览器打开即用 美图秀秀网页版入口
J*aScript中如何高效提取对象指定属性
Windows电脑怎么截图最方便_系统自带截图工具的5种神仙用法【技巧】
抖音极速版最新版本 抖音极速版官方下载地址
sublime怎么覆盖插件的默认快捷键_sublime快捷键优先级与设置
Win11怎么设置鼠标主按键_Win11鼠标左右键功能互换
如何在J*a中使用Locale处理多语言环境
优化HTML表单样式:解决输入框焦点跳动与元素间距问题
蛙漫官方正版入口 蛙漫网页在线全集免费观看
响应式图片在网页设计中的正确实现方法
优化 Jest 模拟:强制未实现函数抛出错误以提升测试效率
J*aScript中高效清空DOM列表元素:解决for循环中断与任务管理问题
J*a里如何实现订单支付与库存同步功能_支付库存同步项目开发方法说明
4399网页游戏电脑版全新入口 4399电脑端在线玩指南
J*aScriptWebpack优化_J*aScript构建工具实战
自定义Bag-of-Words实现:处理带负号的词汇权重
内存疯狂猛猛涨价:主板销量直接腰斩!
Go语言中的*string:深入理解字符串指针
学习通网页版快速入口 学习通官网网页版直接打开
c++如何使用折叠表达式(Fold Expressions)_c++17可变参数模板新技巧
文心一言怎样用插件调度API数据_文心一言用插件调度API数据【API调用】


2025-12-03
浏览次数:次
返回列表