新闻中心
基于发布-订阅模式实现多浏览器并行独立自动化操作

本文探讨了如何在多个独立浏览器中并行执行自动化任务,并模拟独立的鼠标光标操作。传统方法难以实现,因此我们提出采用基于发布-订阅(pub-sub)模式的领导者-跟随者架构。通过消息队列系统,领导者程序广播指令,而多个跟随者程序各自驱动一个selenium浏览器实例,接收并执行这些指令,从而实现高效且独立的并行自动化。
在自动化测试、数据抓取或多账号操作等场景中,开发者常常面临一个挑战:如何在多个独立的浏览器实例中同时执行任务,并且每个实例都能模拟独立的鼠标光标操作。这与简单的多线程或多进程启动浏览器不同,因为操作系统通常只提供一个物理鼠标光标,像pyautogui这类库的操作会影响到全局光标,无法实现多个独立浏览器中“各自”的鼠标移动和点击。传统的虚拟化方案虽然能隔离环境,但若要实现程序化的独立鼠标控制,通常会带来较高的复杂性和资源开销。
为了有效解决这一问题,核心策略是采用一种分布式通信模式——发布-订阅(Publish-Subscribe, Pub-Sub)模式,并结合领导者-跟随者(Leader-Follower)架构。这种模式将任务的生成与执行解耦,使得多个浏览器实例能够独立地接收和响应指令。
核心架构:领导者-跟随者与发布-订阅模式
该解决方案的核心在于构建一个领导者程序和N个跟随者程序,并通过一个消息队列系统进行通信。
-
领导者程序 (Leader Program)
- 职责: 负责生成自动化任务的指令。这些指令可以是高层级的(如“打开指定URL并登录”),也可以是低层级的(如“在坐标(X, Y)处模拟鼠标点击”)。
- 操作: 将生成的指令作为消息发布到预定义的消息通道中。领导者无需关心具体哪个跟随者会接收并执行这些指令。
-
跟随者程序 (Follower Programs)
- 职责: 每个跟随者程序独立运行,并控制一个单独的浏览器实例(通常通过Selenium WebDriver)。
- 操作: 持续监听消息队列中的指令通道。一旦接收到指令,跟随者程序就会在其控制的浏览器实例中执行相应的操作,例如模拟鼠标移动、点击、键盘输入、页面导航等。
- 独立性: 每个跟随者程序都拥有自己的浏览器进程和状态,彼此之间互不干扰,从而实现了“独立鼠标光标”的逻辑效果。
-
消息队列系统 (Message Queue System)
标贝悦读AI配音
在线文字转语音软件-专业的配音网站
78
查看详情
- 作用: 作为领导者和跟随者之间可靠的通信骨干。它负责消息的存储、路由和分发。
- 选择: 常用的消息队列系统包括Apache Kafka、RabbitMQ、Redis Pub/Sub等。选择哪个系统取决于项目的规模、性能要求和复杂性。
- 通道: 可以设置一个或多个消息通道。例如,一个通用的指令通道供所有跟随者监听,或者为需要个性化指令的跟随者设置独立的通道。
实现步骤与示例代码(概念性)
以下是使用Python和Selenium结合消息队列系统实现此架构的概念性示例:
1. 消息队列框架选择与配置
首先,选择一个消息队列框架并进行基本配置。以RabbitMQ为例,你需要安装pika库。
pip install pika selenium webdriver_manager
2. 领导者程序示例
领导者程序负责生成并发布指令。
# leader.py (概念性代码,需根据实际消息队列库调整)
import json
import pika # 假设使用RabbitMQ
class Leader:
def __init__(self, queue_name='browser_commands'):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue_name)
self.queue_name = queue_name
def publish_command(self, browser_id, command_type, payload):
"""
发布一个指令到消息队列。
:param browser_id: 目标浏览器实例的ID。
:param command_type: 指令类型,如 'mouse_move', 'click', 'n*igate'。
:param payload: 指令的具体参数,如坐标、URL等。
"""
message = {
"browser_id": browser_id,
"type": command_type,
"payload": payload
}
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body=json.dumps(message)
)
print(f"Leader published: {message}")
def close(self):
self.connection.close()
if __name__ == "__main__":
leader = Leader()
# 模拟为不同的浏览器实例发布指令
leader.publish_command(1, 'n*igate', {'url': 'https://www.example.com'})
leader.publish_command(2, 'n*igate', {'url': 'https://www.google.com'})
leader.publish_command(1, 'mouse_move', {'x': 100, 'y': 200})
leader.publish_command(1, 'click', {'x': 100, 'y': 200})
leader.publish_command(2, 'mouse_move', {'x': 50, 'y': 50})
leader.publish_command(2, 'click', {'x': 50, 'y': 50})
leader.close()3. 跟随者程序示例
每个跟随者程序启动一个独立的Selenium浏览器实例,并监听消息队列。
# follower.py (概念性代码,需根据实际消息队列库调整) import json import pika from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By import time class Follower: def __init__(self, browser_id, queue_name='browser_commands'): self.browser_id = browser_id # 初始化Selenium WebDriver options = webdriver.ChromeOptions() # 可以添加headless模式,或者为每个浏览器设置不同的用户数据目录以完全隔离 # options.add_argument('--headless') # options.add_argument(f'--user-data-dir=/tmp/chrome-profile-{browser_id}') self.driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) print(f"Follower {self.browser_id}: Browser initialized.") # 初始化消息队列连接 self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() self.channel.queue_declare(queue=queue_name) self.queue_name = queue_name def _execute_command(self, command): """根据接收到的指令在浏览器中执行操作。""" if command['browser_id'] != self.browser_id: return # 忽略不属于本实例的指令 command_type = command['type'] payload = command['payload'] print(f"Follower {self.browser_id}: Executing {command_type} with payload {payload}") try: if command_type == 'n*igate': self.driver.get(payload['url']) elif command_type == 'mouse_move': # Selenium的ActionChains模拟鼠标移动到相对页面左上角的坐标 # 注意:这只是将Selenium的内部光标移动到该位置,不影响物理光标 body = self.driver.find_element(By.TAG_NAME, 'body') ActionChains(self.driver).move_to_element_with_offset(body, payload['x'], payload['y']).perform() elif command_type == 'click': # 模拟点击,通常需要先移动到目标位置再点击 body = self.driver.find_element(By.TAG_NAME, 'body') ActionChains(self.driver).move_to_element_with_offset(body, payload['x'], payload['y']).click().perform() # 可以添加更多指令类型,如 keyboard_input, find_element_and_click 等 else: print(f"Follower {self.browser_id}: Unknown command type {command_type}") except Exception as e: print(f"Follower {self.browser_id}: Error executing command {command_type}: {e}") def start_listening(self): """开始监听消息队列并处理指令。""" def callback(ch, method, properties, body): command = json.loads(body) self._execute_command(command) ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息已处理 self.channel.basic_consume( queue=self.queue_name, on_message_callback=callback, auto_ack=False # 手动确认,确保消息被成功处理 ) print(f"Follower {self.browser_id}: Started listening for commands...") self.channel.start_consuming() def close(self): """关闭浏览器和消息队列连接。""" self.driver.quit() self.connection.close() print(f"Follower {self.browser_id}: Closed.") if __name__ == "__main__": # 启动多个跟随者实例,每个实例在独立的终端或进程中运行 # 例如,在不同的终端分别运行: # python follower.py 1 # python follower.py 2 import sys if len(sys.argv) > 1: browser_id = int(sys.argv[1]) follower = Follower(browser_id) try: follower.start_listening() except KeyboardInterrupt: print(f"Follower {browser_id}: Shutting down...") finally: follower.close() else: print("Usage: python follower.py <browser_id>")
注意事项与最佳实践
- 资源管理: 每个Selenium浏览器实例都会消耗独立的内存和CPU资源。在设计系统时,需根据可用硬件资源和预期并行数量进行评估。考虑使用无头浏览器(headless mode)以减少资源消耗。
- 指令粒度: 指令的粒度可以根据需求调整。过于细碎的指令会增加消息队列的负担和网络延迟;过于粗粒度的指令则可能降低灵活性。
- 错误处理与重试: 消息队列通常提供消息持久化、消费者确认(ACK)、死信队列(Dead Letter Queue)等机制,用于处理消息发送失败、消费者崩溃等情况。应充分利用这些机制确保任务的可靠执行。
- 反馈机制: 如果领导者需要知道任务的执行结果或跟随者遇到的问题,可以设计一个反向的“报告通道”,让跟随者将执行状态或日志信息发布回领导者。
- 可扩展性: 这种架构天然具有良好的可扩展性。只需启动更多的跟随者程序即可增加并行处理能力,而无需修改领导者程序的逻辑。
- 同步与异步: 领导者通常以异步方式发布指令,不等待跟随者的执行结果。如果某些任务需要严格的同步(例如,等待一个浏览器完成特定操作后才能启动另一个),则需要引入额外的协调机制,如使用共享状态存储或更复杂的通信协议。
- 隔离性: 为确保浏览器实例的完全独立,除了进程隔离外,还可以考虑为每个浏览器实例配置独立的Selenium用户配置文件或临时目录,以避免缓存、Cookie等数据相互影响。
总结
通过采用发布-订阅模式和领导者-跟随者架构,我们能够有效解决在多个独立浏览器中并行执行自动化任务并模拟独立鼠标光标的复杂问题。这种方案将任务生成与执行解耦,提高了系统的灵活性、可扩展性和鲁棒性。开发者可以根据具体需求选择合适的消息队列系统,并结合Selenium WebDriver等工具,构建出高效、可靠的多浏览器并行自动化解决方案。
以上就是基于发布-订阅模式实现多浏览器并行独立自动化操作的详细内容,更多请关注其它相关文章!
# 器中
# 襄阳网络推广营销方式
# 石家庄抖音推广营销工具商城
# 合肥谷歌seo哪个好
# 建材行业微博推广营销
# 黄州网站建设排名
# seo分类怎么排名
# 本溪网站建设系统官网
# 网站建设策划哪家技术好
# 阜阳抖音SEO费用
# seo描述怎么填写
# 自己的
# 并结合
# 如何用
# 重启
# 可以根据
# python
# 多线程
# 多个
# 鼠标
# goog
# 路由
# ai
# 工具
# 浏览器
# 操作系统
# cookie
# apache
# go
# json
# js
# redis
相关栏目:
【
科技资讯46185 】
【
网络学院92790 】
相关推荐:
QQ邮箱网页版快速登录 QQ邮箱邮箱账号官方入口地址
Windows电脑怎么截图最方便_系统自带截图工具的5种神仙用法【技巧】
《北京人工智能产业白皮书(2025)》发布:全年核心产值预计突破 4500 亿元
天眼查怎么看公司融资情况 天眼查企业融资历史查询步骤【攻略】
EMS快递官网app_中国邮政速递物流手机客户端
Lar*el Excel导入时生成自定义递增ID的策略与实践
Mac怎么查看崩溃日志_Mac控制台错误报告分析
在Socket.IO连接中实现Access Token自动更新与动态重连
AO3官方可用镜像 Archive of Our Own网页版最新入口
印象笔记怎样用批量导出备知识库_印象笔记用批量导出备知识库【备份方法】
深入理解J*aScript中的B样条曲线与节点向量生成
在Go语言中利用后缀数组处理多字符串:实现高效文本匹配与自动补全
《燕云十六声》两周内达九百万玩家!位居畅销榜第五
在Qt QML中通过Python字典动态更新TextEdit内容的教程
LINUX怎么设置定时任务_LINUX crontab配置教程
Golang指针如何与map组合使用_Golang map指针组合实践
PyTorch模型训练效果不佳?深入剖析常见错误与调试技巧
J*aScript map 迭代中检测空数组元素的有效方法
J*aScript中管理异步API调用:确保操作顺序与数据一致性
如何使用J*aScript精确选择并批量修改特定父元素下子链接的样式
迅雷下载到U盘速度很慢怎么办_迅雷U盘下载慢优化方法
mc.js免安装版 mc.js一键畅玩入口
汽水音乐网页版使用入口_汽水音乐电脑版播放指南
荣耀Play7TPro怎样在信息App置顶客服对话_iPhone荣耀Play7TPro信息App置顶客服对话【优先查看】
UC浏览器官网入口2025最新 UC浏览器网页版正式地址
电脑IP地址怎么查 查看本机IP地址的几种方法
如何在J*a中实现统一对象行为接口_项目大型化时的接口规范化
如何使用纯J*aScript判断Input元素是否在特定类容器内
解决 Vaadin 8 中大文件音频播放与定位时出现的 IOException
Lar*el头像管理:图片缩放与旧文件删除的最佳实践
J*aScript打印功能_j*ascript输出控制
C++20的source_location是什么_C++在编译期获取源码位置信息用于日志和断言
2306选座时如何选靠窗位置_12306选座靠窗座位查看方法解析
AO3中文官网链接_AO3网页版稳定镜像站
曝R星经典之作开发图 设计简陋但信息密集!
Golang并发任务中错误如何聚合_Golang goroutine error收集方式
“在文档元素之后找到了标记”是什么错误? 检查并修复XML中多个根元素的3个方法
谷歌浏览器如何快速清除某个网站的数据_Chrome网站缓存清理方法
Go RPC HTTP服务正确实现与常见陷阱解析
Go Martini框架:动态服务解码后的图片内容
c++如何实现一个简单的ECS框架_c++数据驱动设计与游戏开发
NVIDIA股价11月重挫12%:下月有望好转 但难回5万亿美元巅峰
离线运行Go语言之旅:本地部署与GOPATH配置指南
深入理解Google Cloud Datastore查询:祖先路径与数据一致性
css链接悬停下划线样式如何自定义_使用::after结合content和transition
如何优雅地解决Livewire文件上传难题?SpatieLivewireFilepond让一切变得简单
Go语言中高效处理x-www-form-urlencoded表单数据
漫蛙漫画官方主页入口 漫蛙MANWA网页直达访问链接
Python:递归比较文件夹内容并找出特定类型文件的差异
fishbowl官网免费版 fishbowl养鱼网站入口


2025-12-07
浏览次数:次
返回列表
enium.webdriver.common.by import By
import time
class Follower:
def __init__(self, browser_id, queue_name='browser_commands'):
self.browser_id = browser_id
# 初始化Selenium WebDriver
options = webdriver.ChromeOptions()
# 可以添加headless模式,或者为每个浏览器设置不同的用户数据目录以完全隔离
# options.add_argument('--headless')
# options.add_argument(f'--user-data-dir=/tmp/chrome-profile-{browser_id}')
self.driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
print(f"Follower {self.browser_id}: Browser initialized.")
# 初始化消息队列连接
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue_name)
self.queue_name = queue_name
def _execute_command(self, command):
"""根据接收到的指令在浏览器中执行操作。"""
if command['browser_id'] != self.browser_id:
return # 忽略不属于本实例的指令
command_type = command['type']
payload = command['payload']
print(f"Follower {self.browser_id}: Executing {command_type} with payload {payload}")
try:
if command_type == 'n*igate':
self.driver.get(payload['url'])
elif command_type == 'mouse_move':
# Selenium的ActionChains模拟鼠标移动到相对页面左上角的坐标
# 注意:这只是将Selenium的内部光标移动到该位置,不影响物理光标
body = self.driver.find_element(By.TAG_NAME, 'body')
ActionChains(self.driver).move_to_element_with_offset(body, payload['x'], payload['y']).perform()
elif command_type == 'click':
# 模拟点击,通常需要先移动到目标位置再点击
body = self.driver.find_element(By.TAG_NAME, 'body')
ActionChains(self.driver).move_to_element_with_offset(body, payload['x'], payload['y']).click().perform()
# 可以添加更多指令类型,如 keyboard_input, find_element_and_click 等
else:
print(f"Follower {self.browser_id}: Unknown command type {command_type}")
except Exception as e:
print(f"Follower {self.browser_id}: Error executing command {command_type}: {e}")
def start_listening(self):
"""开始监听消息队列并处理指令。"""
def callback(ch, method, properties, body):
command = json.loads(body)
self._execute_command(command)
ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息已处理
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=callback,
auto_ack=False # 手动确认,确保消息被成功处理
)
print(f"Follower {self.browser_id}: Started listening for commands...")
self.channel.start_consuming()
def close(self):
"""关闭浏览器和消息队列连接。"""
self.driver.quit()
self.connection.close()
print(f"Follower {self.browser_id}: Closed.")
if __name__ == "__main__":
# 启动多个跟随者实例,每个实例在独立的终端或进程中运行
# 例如,在不同的终端分别运行:
# python follower.py 1
# python follower.py 2
import sys
if len(sys.argv) > 1:
browser_id = int(sys.argv[1])
follower = Follower(browser_id)
try:
follower.start_listening()
except KeyboardInterrupt:
print(f"Follower {browser_id}: Shutting down...")
finally:
follower.close()
else:
print("Usage: python follower.py <browser_id>")