新闻中心
高效处理Pandas中大量CSV文件合并:避免循环内concat的性能陷阱

本文旨在解决在pandas中循环合并大量csv文件时遇到的性能瓶颈。通过分析循环中使用`pd.concat`的低效性,文章提出两种优化策略:一是将所有数据收集到字典中,最后进行一次性`pd.concat`;二是利用`concurrent.futures.threadpoolexecutor`实现文件读取的并行化。这些方法显著提升了处理效率,避免了随着文件数量增加而导致的性能急剧下降。
1. 问题背景与循环concat的性能瓶颈
在数据处理过程中,我们经常需要合并来自多个文件的信息。当面对大量(例如上千个)中等大小(例如每个15MB,10,000行)的CSV文件时,一种直观的做法是使用循环迭代读取每个文件,进行必要的转换,然后通过pd.concat将其追加到一个主DataFrame中。然而,这种方法存在严重的性能问题。
考虑以下原始代码示例:
import pandas as pd
import os
# 假设 df 是一个包含文件路径信息的DataFrame
# root_path 是根目录
# df 示例:
# File ID File Name
# 0 folderA file001.txt
# 1 folderB file002.txt
merged_data = pd.DataFrame()
count = 0
for index, row in df.iterrows():
folder_name = row['File ID'].strip()
file_name = row['File Name'].strip()
file_path = os.path.join(root_path, folder_name, file_name)
# 读取、转置并插入新列
file_data = pd.read_csv(file_path, names=['Case', f'{folder_name}_{file_name}'], sep='\t')
file_data_transposed = file_data.set_index('Case').T.reset_index(drop=True)
file_data_transposed.insert(loc=0, column='folder_file_id', value=str(folder_name+'_'+file_name))
# 在循环中进行拼接
merged_data = pd.concat([merged_data, file_data_transposed], axis=0, ignore_index=True)
count = count + 1
print(count)上述代码的性能问题在于,每次循环调用pd.concat时,Pandas都需要创建一个新的DataFrame来容纳旧数据和新数据。这意味着大量的内存重新分配和数据复制操作,随着merged_data的增大,这些操作的开销会呈指数级增长,导致处理速度越来越慢。对于上千个文件,这种方法是不可持续的。
2. 优化策略一:收集数据并进行单次pd.concat
解决循环中pd.concat低效问题的核心思想是:避免在每次迭代中都进行拼接操作。取而代之的是,将每个文件处理后的数据收集到一个数据结构(如列表或字典)中,然后在循环结束后,一次性地调用pd.concat来合并所有数据。
以下是优化后的代码示例:
import pathlib
import pandas as pd
# 假设 df 是一个包含文件路径信息的DataFrame
# root_path 是根目录,使用 pathlib 替换 os.path
root_path = pathlib.Path('root')
data_parts = {} # 使用字典收集每个文件的处理结果
# 使用 enumerate 简化计数器,并迭代 df
for count, (_, row) in enumerate(df.iterrows(), 1):
folder_name = row['File ID'].strip()
file_name = row['File Name'].strip()
file_path = root_path / folder_name / file_name # pathlib 的路径拼接
folder_file_id = f'{folder_name}_{file_name}'
# 读取CSV文件,并进行初步处理
# header=None 因为文件没有表头
# memory_map=True 可以提高大文件读取效率,low_memory=False 避免混合类型警告
file_data = pd.read_csv(file_path, header=None, sep='\t',
names=['Case', folder_file_id],
memory_map=True, low_memory=False)
# 将 'Case' 列设置为索引,并使用 squeeze() 将 DataFrame 转换为 Series
# 这样可以方便后续的 unstack 操作
data_parts[folder_file_id] = file_data.set_index('Case').squeeze()
print(count)
# 循环结束后,一次性合并所有数据
# pd.concat 传入字典,字典的键将作为新的层级索引 (names=['folder_file_id'])
# unstack('Case') 将 'Case' 索引转换为列
# reset_index() 将多级索引扁平化,并重置索引
merged_data = (pd.concat(data_parts, names=['folder_file_id'])
.unstack('Case').reset_index())代码改进点说明:
- pathlib替代os.path: pathlib模块提供了面向对象的路径操作,代码更简洁、可读性更强。
- 字典收集数据: data_parts = {}用于存储每个文件处理后的Series对象。
- enumerate: 代替手动维护count变量,使代码更简洁。
-
pd.read_csv参数优化:
- header=None: 明确指定CSV文件没有表头。
- memory_map=True: 对于大文件,这可以提高读取效率,因为它将文件映射到内存,而不是一次性加载所有数据。
- low_memory=False: 禁用Pandas的低内存解析器,这在处理混合数据类型的列时可以避免警告,并可能提高解析大型文件的速度。
-
数据转换:
- file_data.set_index('Case').squeeze(): 将Case列设置为索引,然后使用squeeze()将单列DataFrame转换为Series。这样做是为了在最终pd.concat时,能够利用Pandas的特性,通过字典键自动创建folder_file_id的索引层级,并方便后续的unstack操作。
- 单次pd.concat: 循环结束后,通过pd.concat(data_parts, names=['folder_file_id'])一次性合并所有Series。
- unstack和reset_index: unstack('Case')将原始的Case索引(现在是Series的索引)转换为列,reset_index()将生成的MultiIndex扁平化,得到最终所需的DataFrame结构。
示例输入数据:
小云雀
剪映出品的AI视频和图片创作助手
1949
查看详情
>>> df File ID File Name 0 folderA file001.txt 1 folderB file002.txt >>> cat root/folderA/file001.txt 0 1234 1 5678 2 9012 3 3456 4 7890 >>> cat root/folderB/file002.txt 0 4567 1 8901 2 2345 3 6789
示例输出结果:
>>> merged_data Case folder_file_id 0 1 2 3 4 0 folderA_file001.txt 1234.0 5678.0 9012.0 3456.0 7890.0 1 folderB_file002.txt 4567.0 8901.0 2345.0 6789.0 NaN
3. 优化策略二:利用多线程并行处理文件读取
对于I/O密集型任务(如读取大量文件),即使避免了循环内concat,文件读取本身也可能成为瓶颈。在这种情况下,可以考虑使用多线程来并行化文件读取过程。Python的concurrent.futures模块提供了一个ThreadPoolExecutor,非常适合处理这类场景。
from concurrent.futures import ThreadPoolExecutor
import pathlib
import pandas as pd
root_path = pathlib.Path('root')
# 定义一个函数,用于处理单个文件的读取和转换逻辑
def read_csv_and_process(args):
count, row_dict = args # 展开传入的参数
folder_name = row_dict['File ID'].strip()
file_name = row_dict['File Name'].strip()
file_path = root_path / folder_name / file_name
folder_file_id = f'{folder_name}_{file_name}'
file_data = pd.read_csv(file_path, header=None, sep='\t',
names=['Case', folder_file_id],
memory_map=True, low_memory=False)
print(f"Processing {count}: {folder_file_id}")
return folder_file_id, file_data.set_index('Case').squeeze()
# 使用 ThreadPoolExecutor
# max_workers 参数控制并行执行的线程数量,通常根据CPU核心数或I/O特性调整
with ThreadPoolExecutor(max_workers=4) as executor: # 可以根据系统资源调整 max_workers
# 将 df 转换为字典列表,以便每个字典作为参数传递给 read_csv_and_process
# enumerate 用于在并行处理时也能追踪进度
batch_args = enumerate(df[['File ID', 'File Name']].to_dict('
records'), 1)
# executor.map 会将 batch_args 中的每个元素应用到 read_csv_and_process 函数
# 并以提交的顺序返回结果
processed_results_iterator = executor.map(read_csv_and_process, batch_args)
# 将迭代器转换为字典,以便进行最终的 concat
data_parts_threaded = dict(processed_results_iterator)
# 最终合并步骤与单线程版本相同
merged_data_threaded = (pd.concat(data_parts_threaded, names=['folder_file_id'])
.unstack('Case').reset_index())多线程代码改进点说明:
- read_csv_and_process函数: 将单个文件的处理逻辑封装成一个独立的函数,方便多线程调用。这个函数接收一个元组args,其中包含计数和行数据字典。
- ThreadPoolExecutor: 创建一个线程池。max_workers参数决定了同时运行的线程数量。对于I/O密集型任务,可以适当增加max_workers,但过多的线程也可能导致上下文切换开销增加。
- df.to_dict('records'): 将DataFrame的每一行转换为一个字典,这样可以方便地将行数据作为参数传递给并行函数。
- executor.map: 这是ThreadPoolExecutor的核心方法之一,它将一个函数和一个可迭代对象作为输入,并行地对可迭代对象中的每个元素应用该函数,并返回一个迭代器,按提交顺序提供结果。
- 结果收集: dict(processed_results_iterator)将并行处理的结果(键值对元组)收集成一个字典。
注意事项:
- 多线程主要适用于I/O密集型任务(如文件读写、网络请求)。对于CPU密集型任务,由于Python的全局解释器锁(GIL),多线程并不能真正实现并行计算,此时应考虑使用多进程(ProcessPoolExecutor)。
- 在实际应用中,max_workers的设置需要根据系统资源和任务特性进行调优。
4. 总结与最佳实践
处理大量文件合并的场景时,避免在循环中频繁调用pd.concat是提升性能的关键。
- 核心原则:将数据收集起来,然后进行一次性的大规模合并操作。这减少了内存重新分配和数据复制的开销。
- 优化方案一(收集数据后单次concat):适用于大多数情况,是处理大量文件合并的首选优化。它简单有效,并且对内存管理更友好。
- 优化方案二(多线程并行处理):当文件读取本身成为瓶颈时,可以进一步引入多线程来加速文件I/O。这在文件分散在不同物理磁盘或网络存储上时尤其有效。
在实际开发中,应首先采用第一种优化方案,如果性能仍不满足要求,再考虑引入多线程或多进程等并行化技术。同时,合理利用pd.read_csv等函数的参数(如memory_map、low_memory、chunksize等)也能进一步提升数据加载效率。
以上就是高效处理Pandas中大量CSV文件合并:避免循环内concat的性能陷阱的详细内容,更多请关注其它相关文章!
# 面向对象
# 杭州seo软件哪家好
# 理发店网络营销推广方案
# 郑州网站如何推广
# seo推广公司如何营销
# 宜昌seo优化ppt
# 江苏非遗推广官方网站首页
# 连州网站建设及优化
# SEO大牛龙虾清洗
# 临安seo优化交流
# seo优化图片推荐
# 适用于
# 也能
# python
# 键值
# 是一个
# 数据结构
# 文件合并
# 迭代
# 转换为
# 多线程
# 可迭代对象
# 键值对
# csv文件
# 性能瓶颈
# csv
相关栏目:
【
科技资讯46185 】
【
网络学院92790 】
相关推荐:
J*a里如何使用N*igableMap进行导航操作_可导航Map操作技巧解析
必由学官方网站入口 必由学学生教师共用登录通道
如何在离线环境中使用Composer_Composer离线安装依赖包的技巧与策略
Golang如何实现状态模式管理对象状态_Golang State模式实现技巧
Python:递归比较文件夹内容并找出特定类型文件的差异
j*a toString()的覆盖
地铁跑酷免费秒玩入口链接 地铁跑酷小游戏免费秒玩网站
steam官方网页快速访问 steam账号注册全流程
J*aScript设计模式实践_j*ascript代码优化
使用Python高效删除Word宏并转换DOCM为DOCX格式
葱吃多了会怎样 葱吃多了会伤胃吗
Fabric Mod开发:在1.19.3+版本中正确添加自定义物品并管理物品组
天眼查怎么看公司融资情况 天眼查企业融资历史查询步骤【攻略】
魅族20怎样在浏览器开无图省流_iPhone魅族20浏览器开无图省流【流量节省】
高德地图公交到站提醒失败如何解决 高德提醒权限设置
12306几点到几点不能订票? | 官方最新系统维护时间全解析
NVIDIA股价11月重挫12%:下月有望好转 但难回5万亿美元巅峰
PHP中SSG-WSG API的AES加密实践:正确使用初始化向量
Lar*el用户头像管理:实现图片缩放、存储与旧文件安全删除的最佳实践
怎样在Excel中做仪表盘_Excel仪表盘设计与关键指标展示方法
苹果手机如何防止被恶意App追踪
优酷会员付费后没到账怎么办_优酷会员充值异常及解决方法
C#中解析不规范的HTML为XML 常见的坑与解决办法
J*a 递归快速排序中静态变量的状态管理与陷阱
C++如何比较两个字符串_C++ string compare函数与操作符对比
文心一言怎样用批量生成做多版文案_文心一言用批量生成做多版文案【批量创作】
外媒分析《GTA6》定价:卖100美元可以但真没必要!
正确连接J*aScript到HTML实现可点击图片与自定义事件处理
c++如何实现一个简单的软件渲染器_c++从零开始的3D图形学
Selenium Python中处理点击后新窗口加载冻结问题的策略与实践
2025俄罗斯Yandex最新入口 官方网站地址及浏览器下载指南
如何使用Go和Martini动态服务解码后的图片
响应式CSS Grid布局:优化网格项在小屏幕下的堆叠与宽度适配
Yandex官网搜索引擎免登录_俄罗斯Yandex一键直达入口
优化Django表单:提交验证失败后保留用户输入
“在文档元素之后找到了标记”是什么错误? 检查并修复XML中多个根元素的3个方法
mysql如何设置表访问权限_mysql表访问权限配置
c++中的const_cast和reinterpret_cast怎么用_c++四种类型转换
sublime怎么设置启动时打开的窗口_sublime会话管理与热退出
yy漫画网页版官方入口_yy漫画官网登录页面链接
Adobe PDF表单中利用J*aScript解析与格式化日期组件的教程
必由学在线入口 必由学网页版快速登录入口
Django表单提交验证失败后保持字段值不刷新
Lar*el 递归关系中排除指定分支的教程
在Pyomo中实现基于变量的条件约束:Big-M方法详解
写好的html代码怎么运行出来_运行写好的html代码方法【教程】
没有大陆身份证/银行卡如何实名微信? 亲测有效的几种方法分享
ArrayList与LinkedList操作复杂度详解:遍历与修改
vivo手机参数配置怎么增强信号_vivo手机参数配置信号增强方法
Golang切片为何属于引用类型_Golang slice底层结构与引用语义说明


2025-11-19
浏览次数:次
返回列表
records'), 1)
# executor.map 会将 batch_args 中的每个元素应用到 read_csv_and_process 函数
# 并以提交的顺序返回结果
processed_results_iterator = executor.map(read_csv_and_process, batch_args)
# 将迭代器转换为字典,以便进行最终的 concat
data_parts_threaded = dict(processed_results_iterator)
# 最终合并步骤与单线程版本相同
merged_data_threaded = (pd.concat(data_parts_threaded, names=['folder_file_id'])
.unstack('Case').reset_index())