新闻中心
在Lar*el WebSockets中实现连接生命周期管理与业务逻辑绑定

本教程将指导您如何在lar*el websockets中定制连接的生命周期事件,包括连接的打开与关闭。通过扩展默认的websocket处理器,我们将演示如何捕获并关联客户端的业务上下文(如用户id、订单id),从而在连接断开时执行特定的业务逻辑,例如自动解锁正在处理的订单,实现对应用资源的精确状态管理。
引言:定制WebSocket连接的必要性
在实时应用开发中,WebSocket连接不仅仅是数据传输的通道,它更代表了客户端与服务器之间的一种持续性会话。很多业务场景需要我们能够感知并响应这些会话的生命周期事件,例如:
- 资源锁定与解锁:当用户打开一个订单页面进行编辑时,锁定该订单以防止其他用户同时修改;当用户关闭页面或断开连接时,自动解锁订单。
- 用户在线状态:实时显示用户的在线或离线状态。
- 协作编辑:跟踪文档编辑者的连接状态,实现实时协作。
Lar*el WebSockets 包(Beyondcode 的 Pusher 替代方案)提供了强大的功能,但其默认处理器可能无法满足所有复杂的业务需求。为了实现上述场景,我们需要扩展其核心处理器,介入连接的打开、关闭及消息处理过程,并注入自定义的业务逻辑。
理解WebSocket处理器
Lar*el WebSockets 的核心是 BeyondCode\Lar*elWebSockets\WebSockets\WebSocketHandler 接口,它定义了处理WebSocket连接生命周期的方法:
- onOpen(ConnectionInterface $connection, RequestInterface $request, $appId): 当新的WebSocket连接建立时调用。
- onClose(ConnectionInterface $connection): 当WebSocket连接关闭时调用。
- onMessage(ConnectionInterface $connection, MessageInterface $msg): 当收到来自客户端的消息时调用。
- onError(ConnectionInterface $connection, \Exception $e): 当连接发生错误时调用。
- onPong(ConnectionInterface $connection, MessageInterface $msg): 当收到客户端的 Pong 消息时调用。
通常,我们不是直接实现 WebSocketHandler 接口,而是继承 BeyondCode\Lar*elWebSockets\WebSockets\PusherHandler。PusherHandler 已经实现了 Pusher 协议的诸多细节,我们可以在此基础上重写或增强特定方法,以集成我们的业务逻辑。
创建自定义WebSocket处理器
为了定制连接行为,我们首先需要创建一个自定义的处理器类。我们将使用 SplObjectStorage 来存储与每个连接关联的业务上下文数据,因为 ConnectionInterface 对象是唯一的且可以作为 SplObjectStorage 的键。
首先,在 app/WebSockets 目录下创建 CustomWebSocketHandler.php 文件:
// app/WebSockets/CustomWebSocketHandler.php
<?php
namespace App\WebSockets;
use BeyondCode\Lar*elWebSockets\WebSockets\PusherHandler;
use Ratchet\ConnectionInterface;
use Illuminate\Support\Facades\Log;
use SplObjectStorage; // 引入 SplObjectStorage
class CustomWebSocketHandler extends PusherHandler
{
/**
* @var SplObjectStorage 存储连接与业务上下文的映射
*/
protected SplObjectStorage $connections;
public function __construct()
{
parent::__construct();
$this->connections = new SplObjectStorage();
}
/**
* 当新的WebSocket连接建立时调用。
*
* @param ConnectionInterface $connection
* @param \Psr\Http\Message\RequestInterface $request
* @param string $appId
* @return void
*/
public function onOpen(ConnectionInterface $connection, \Psr\Http\Message\RequestInterface $request, $appId)
{
// 调用父类的onOpen方法,确保Pusher协议的正常初始化
parent::onOpen($connection, $request, $appId);
Log::info("Connection opened: {$connection->resourceId}");
// 尝试从请求中获取业务上下文,例如用户ID或订单ID
// 客户端可以通过WebSocket URL的查询参数传递这些信息
$queryParams = $request->getQueryParams();
$userId = $queryParams['user_id'] ?? null;
$orderId = $queryParams['order_id'] ?? null;
// 存储连接与业务上下文
$this->connections->attach($connection, [
'resource_id' => $connection->resourceId,
'user_id' => $userId,
'order_id' => $orderId,
'connected_at' => now(),
'channels' => [], // 用于存储该连接订阅的频道
]);
if ($orderId) {
Log::info("Order {$orderId} is now being processed by user {$userId} via connection {$connection->resourceId}");
// 触发事件以锁定订单
event(new \App\Events\OrderLocked($orderId, $userId, $connection->resourceId));
}
}
/**
* 当收到客户端消息时调用。
*
* @param ConnectionInterface $connection
* @param \Ratchet\MessageComponent\MessageInterface $msg
* @return void
*/
public function onMessage(ConnectionInterface $connection, \Ratchet\MessageComponent\MessageInterface $msg)
{
parent::onMessage($connection, $msg);
$payload = json_decode($msg->getPayload());
// 进一步处理消息,例如当客户端订阅特定频道时更新上下文
if (isset($payload->event) && $payload->event === 'pusher:subscribe' && isset($payload->data->chan
nel)) {
$channelName = $payload->data->channel;
$context = $this->connections->offsetGet($connection);
$context['channels'][] = $channelName;
$this->connections->offsetSet($connection, $context); // 更新存储的上下文
Log::info("Connection {$connection->resourceId} subscribed to channel: {$channelName}");
// 如果频道名包含订单ID,可以进一步提取并更新
if (preg_match('/^private-order\.(\d+)$/', $channelName, $matches)) {
$orderId = $matches[1];
if ($context['order_id'] !== $orderId) {
Log::warning("Connection {$connection->resourceId} subscribed to order {$orderId}, but initial order was {$context['order_id']}");
// 可以在这里更新或处理冲突
}
}
}
}
/**
* 当WebSocket连接关闭时调用。
*
* @param ConnectionInterface $connection
* @return void
*/
public function onClose(ConnectionInterface $connection)
{
Log::info("Connection closed: {$connection->resourceId}");
// 确保该连接存在于我们的存储中
if ($this->connections->contains($connection)) {
$context = $this->connections->offsetGet($connection);
$userId = $context['user_id'];
$orderId = $context['order_id'];
if ($orderId) {
Log::info("Order {$orderId} is no longer processed by user {$userId} via connection {$connection->resourceId}");
// 触发事件以解锁订单
event(new \App\Events\OrderUnlocked($orderId, $userId, $connection->resourceId));
}
// 清理连接上下文
$this->connections->detach($connection);
}
// 调用父类的onClose方法
parent::onClose($connection);
}
/**
* 当连接发生错误时调用。
*
* @param ConnectionInterface $connection
* @param \Exception $e
* @return void
*/
public function onError(ConnectionInterface $connection, \Exception $e)
{
Log::error("Connection error for {$connection->resourceId}: " . $e->getMessage());
parent::onError($connection, $e);
}
}代码说明:
- SplObjectStorage $connections: 这是关键,用于存储每个 ConnectionInterface 对象及其关联的业务数据。
-
onOpen 方法:
- 在调用 parent::onOpen 之后,我们从 RequestInterface $request 的查询参数中尝试提取 user_id 和 order_id。
- 将这些信息与 connection 对象一起存储到 $this->connections 中。
- 如果成功获取到 orderId,则触发一个 OrderLocked 事件,通知应用层锁定该订单。
-
onMessage 方法 (可选但推荐):
- 此方法用于处理客户端发送的所有消息。在这里,我们特别关注 pusher:subscribe 事件。
- 当客户端订阅一个频道时,我们可以解析频道名称(例如 private-order.123),从中提取更具体的业务ID,并更新 SplObjectStorage 中该连接的上下文信息。这在初始 onOpen 无法获得所有上下文时非常有用。
-
onClose 方法:
- 在连接关闭时,我们通过 ConnectionInterface $connection 从 $this->connections 中检索之前存储的业务上下文。
- 根据上下文中的 order_id,触发一个 OrderUnlocked 事件,通知应用层解锁订单。
- 最后,从 $this->connections 中移除该连接的上下文,防止内存泄漏。
- onError 方法: 记录错误信息,以便调试。
定义业务事件
为了解耦 WebSocket 处理器与具体的业务逻辑,我们推荐使用 Lar*el 事件。
GoEnhance
全能AI视频制作平台:通过GoEnhance AI让视频创作变得比以往任何时候都更简单。
347
查看详情
OrderLocked 事件:
// app/Events/OrderLocked.php
<?php
namespace App\Events;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class OrderLocked
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public $orderId;
public $userId;
public $connectionId;
public function __construct($orderId, $userId, $connectionId)
{
$this->orderId = $orderId;
$this->userId = $userId;
$this->connectionId = $connectionId;
}
}OrderUnlocked 事件:
// app/Events/OrderUnlocked.php
<?php
namespace App\Events;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class OrderUnlocked
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public $orderId;
public $userId;
public $connectionId;
public function __construct($orderId, $userId, $connectionId)
{
$this->orderId = $orderId;
$this->userId = $userId;
$this->connectionId = $connectionId;
}
}然后,您可以在 app/Listeners 中创建相应的监听器来处理这些事件,例如更新数据库中的订单状态。
注册自定义处理器
最后一步是告诉 Lar*el WebSockets 使用您的自定义处理器。修改 config/websockets.php 文件:
// config/websockets.php
return [
// ... 其他配置
'handler' => \App\WebSockets\CustomWebSocketHandler::class,
// ... 其他配置
];客户端实现
为了让 onOpen 方法能够获取到 user_id 和 order_id,客户端在建立 WebSocket 连接时需要将这些信息作为查询参数传递。
使用 Lar*el Echo 和 J*aScript:
import Echo from 'lar*el-echo';
window.Pusher = require('pusher-js');
// 假设您在后端视图中将这些ID传递给前端
const currentUserId = @json(auth()->id());
const currentOrderId = @json($order->id ?? null); // 如果在订单页面
window.Echo = new Echo({
broadcaster: 'pusher',
key: process.env.MIX_PUSHER_APP_KEY,
wsHost: window.location.hostname,
wsPort: 6001以上就是在Lar*el WebSockets中实现连接生命周期管理与业务逻辑绑定的详细内容,更多请关注php中文网其它相关文章!
# 绑定
# 长尾关键词排名策略
# 抖音seo优化服务热线
# 网站seo优化方案制作步骤分享
# 网站优化与推广的细节
# seo不懂代码
# 电商推广营销欢迎选购
# 上蔡网站推广营销招聘
# 免费的广告推广网站哪个好用
# 海鲜烧烤怎么营销推广
# 网站优化专家文案
# 这是
# 组中
# 发生错误
# 我们可以
# 在这里
# php
# 上传
# 解锁
# 自定义
# 客户端
# websocket
# app
# cad
# 处理器
# json
# 前端
# js
# java
# laravel
# javascript
相关栏目:
【
科技资讯46185 】
【
网络学院92790 】
相关推荐:
sublime怎么进行远程开发编辑_配置rsub/rmate实现sublime编辑服务器文件
JUnit5/Mockito:优雅测试内部依赖与异常处理的实践
怎么在html里运行vbs脚本_html中运行vbs脚本方法【教程】
Mac怎么查看崩溃日志_Mac控制台错误报告分析
在J*a中如何隐藏复杂性_使用门面模式组织对象交互
《噬血代码2》新预告片发布 展示游戏剧情
J*aScriptWebpack优化_J*aScript构建工具实战
解决 Express.js 中 PUT 请求密码修改失败的路由配置指南
Win11怎么关闭触摸屏_Windows 11禁用HID符合标准触摸屏
NRF24L01数据传输深度解析:解决大载荷接收异常与分包策略
如何使用Go和Martini动态服务解码后的图片
Log4j Console Appender性能瓶颈与高并发优化策略
Win11网速慢怎么解决 Win11网络设置优化解除限速
Yandex免登录网页版地址 Yandex搜索引擎官方访问入口
QQ邮箱网页版快速登录 QQ邮箱邮箱账号官方入口地址
css链接悬停下划线样式如何自定义_使用::after结合content和transition
c++中为什么推荐使用using替代typedef_c++现代化类型别名
Typer应用中灵活处理命令行参数的令牌化与解析
Python多线程中正确使用sigwait处理SIGALRM信号
PDF怎么合并PDF并保持格式_PDF合并文件保持排版教程
composer 和 npm/yarn 在管理依赖方面有什么核心思想差异?
Python多版本共存与虚拟环境管理深度指南
TikTok搜索不到用户发布内容怎么办 TikTok用户内容搜索优化方法
百度网盘网页版入口 百度网盘网页版官方登录网址
J*a实现学校排课程序_面向对象结构化项目示例
如何在J*a中使用Locale处理多语言环境
免费抖音短视频入口_抖音网页版短视频免费通道
支付宝碰一碰设备是REDMI手机吗 博主拆机辟谣:处理器、内存都不一样
腾讯QQ邮箱登录入口_QQ邮箱官方网站使用地址
利用5118提升短视频内容效果_5118短视频关键词优化方法
2025AO3夸克浏览器通道_AO3手机HTTPS安全入口分享
Python自定义类排序:解决lambda键值访问TypeError的实践指南
钉钉视频会议画面卡顿如何解决 钉钉会议画面优化方法
网易大神怎么保存别人动态的图片_网易大神动态图片保存方法
TikTok国际版网页端快速入口 TikTok全球版短视频浏览教程
b站怎么看视频的弹幕数量_b站弹幕数量查看方法
QQ邮箱官方登录入口_QQ邮箱网页版快捷使用平台
怎样把文件彻底粉碎无法恢复_Windows下安全删除敏感数据【隐私保护】
CSS Flexbox与媒体查询:实现响应式布局中元素的并排与堆叠
TikTok网页版直接登录 TikTok网页端官方平台入口
Windows10怎么开启存储感知 Windows10系统设置自动清理临时文件释放C盘空间【教程】
J*a编写用户注册与登录功能_掌握字符串与验证逻辑
HTML长属性值处理:表单action路径优化与代码规范应对
Node.js 中使用 node-cron 实现定时 API 数据抓取与处理
处理动态列数据:J*a ArrayList的正确初始化与字符累加教程
CSS子选择器:如何区分并样式化嵌套列表的子层级
今日头条怎么同步内容到抖音_今日头条内容同步到抖音教程
如何使用Rector自动化升级旧代码_通过Composer安装和配置Rector进行代码重构
必由学官方登录入口 必由学教师学生账号快速访问
windows10怎么关闭系统提示音_windows10彻底静音设置方法


2025-12-01
浏览次数:次
返回列表
nel)) {
$channelName = $payload->data->channel;
$context = $this->connections->offsetGet($connection);
$context['channels'][] = $channelName;
$this->connections->offsetSet($connection, $context); // 更新存储的上下文
Log::info("Connection {$connection->resourceId} subscribed to channel: {$channelName}");
// 如果频道名包含订单ID,可以进一步提取并更新
if (preg_match('/^private-order\.(\d+)$/', $channelName, $matches)) {
$orderId = $matches[1];
if ($context['order_id'] !== $orderId) {
Log::warning("Connection {$connection->resourceId} subscribed to order {$orderId}, but initial order was {$context['order_id']}");
// 可以在这里更新或处理冲突
}
}
}
}
/**
* 当WebSocket连接关闭时调用。
*
* @param ConnectionInterface $connection
* @return void
*/
public function onClose(ConnectionInterface $connection)
{
Log::info("Connection closed: {$connection->resourceId}");
// 确保该连接存在于我们的存储中
if ($this->connections->contains($connection)) {
$context = $this->connections->offsetGet($connection);
$userId = $context['user_id'];
$orderId = $context['order_id'];
if ($orderId) {
Log::info("Order {$orderId} is no longer processed by user {$userId} via connection {$connection->resourceId}");
// 触发事件以解锁订单
event(new \App\Events\OrderUnlocked($orderId, $userId, $connection->resourceId));
}
// 清理连接上下文
$this->connections->detach($connection);
}
// 调用父类的onClose方法
parent::onClose($connection);
}
/**
* 当连接发生错误时调用。
*
* @param ConnectionInterface $connection
* @param \Exception $e
* @return void
*/
public function onError(ConnectionInterface $connection, \Exception $e)
{
Log::error("Connection error for {$connection->resourceId}: " . $e->getMessage());
parent::onError($connection, $e);
}
}