新闻中心

Dask DataFrame groupby 模式(Mode)聚合的实现指南

2025-11-15
浏览次数:
返回列表

Dask DataFrame groupby 模式(Mode)聚合的实现指南

本教程详细阐述了如何在 dask dataframe 中对分组数据执行模式(mode)聚合。由于 dask 不直接提供 `groupby.agg` 的模式函数,文章通过自定义 `dask.dataframe.aggregation` 类,实现 `chunk`、`agg` 和 `finalize` 阶段的逻辑,从而有效地在分布式环境中计算分组模式,并提供完整的示例代码和注意事项。

引言:Dask Groupby 模式聚合的挑战

在数据分析中,查找一组数据的众数(mode)是一项常见操作。Pandas DataFrame 提供了 Series.mode() 方法,并且可以方便地与 groupby().agg() 结合使用,以计算每个分组的众数。然而,在处理大规模数据集时,Dask DataFrame 成为一个强大的分布式计算工具。尽管 Dask 提供了丰富的聚合功能,但其内置的 groupby().aggregate() 方法并不直接支持像 Pandas Series.mode 这样的聚合操作。这意味着,如果我们需要在 Dask DataFrame 中进行分组众数计算,就需要自定义聚合逻辑。

Pandas 中的模式聚合(作为参考)

在深入 Dask 的自定义聚合之前,我们首先回顾一下在 Pandas 中如何轻松实现这一功能。这有助于理解我们希望在 Dask 中复制的行为。

import pandas as pd
import numpy as np

# 示例数据
data_pandas = pd.DataFrame({
    'status': ['pending', 'pending', 'pending', 'canceled', 'canceled', 'canceled', 'confirmed', 'confirmed', 'confirmed'],
    'clientId': ['A', 'B', 'C', 'A', 'D', 'C', 'A', 'B', 'C'],
    'partner': ['A', np.nan, 'C', 'A', np.nan, 'C', 'A', np.nan, 'C'],
    'product': ['afiliates', 'pre-paid', 'giftcard', 'afiliates', 'pre-paid', 'giftcard', 'afiliates', 'pre-paid', 'giftcard'],
    'brand': ['brand_4', 'brand_2', 'brand_3', 'brand_1', 'brand_2', 'brand_3', 'brand_1', 'brand_3', 'brand_3'],
    'gmv': [100, 100, 100, 100, 100, 100, 100, 100, 100]
})

data_pandas = data_pandas.astype({
    "partner": "category",
    "status": "category",
    "product": "category",
    "brand": "category"
})

# 使用 Pandas 计算分组模式
mode_pandas = data_pandas.groupby(["clientId", "product"], observed=True).agg({"brand": pd.Series.mode})
print("Pandas Groupby Mode Result:")
print(mode_pandas)

Pandas 的 Series.mode 能够返回一个 Series,其中包含所有频率最高的值(如果存在多个众数)。

自定义 Dask 聚合函数:dask.dataframe.Aggregation

Dask 提供了一个 dask.dataframe.Aggregation 类,允许用户定义自定义的分布式聚合操作。这个类需要三个核心函数:chunk、agg 和 finalize,它们分别对应分布式计算的不同阶段。

  1. chunk 函数:局部计数chunk 函数在 Dask 的每个分区(chunk)上独立运行。它的目标是为每个分组键计算目标列中每个值的频率。对于众数计算,这意味着在每个分区内,我们需要统计每个 brand 值出现的次数。pd.Series.value_counts() 是实现这一目标的理想工具。

    def chunk(s):
        """
        在每个 Dask 分区上执行,计算每个值的频率。
        输入是一个 Pandas Series。
        """
        return s.value_counts()
  2. agg 函数:合并中间结果agg 函数负责合并 chunk 函数在不同分区上产生的中间结果。由于 chunk 函数返回的是每个值及其计数的 Series,agg 函数需要将这些 Series 合并,并对相同的值的计数进行求和,从而得到全局的频率统计。

    def agg(s0):
        """
        合并来自不同分区的中间结果(频率计数)。
        输入是一个 Pandas Series,其索引包含分组键和值,值是计数。
        """
        # _selected_obj 是 Dask 内部结构,代表了聚合的 Series。
        # groupby(level=s0._selected_obj.index.names) 确保按原始分组键和值进行求和。
        _intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
        # 过滤掉计数为0或负数的情况
        _intermediate = _intermediate[_intermediate > 0]
        return _intermediate
  3. finalize 函数:确定最终模式finalize 函数在所有 agg 操作完成后运行,它接收合并后的全局频率计数,并从中确定最终的众数。这个函数需要能够处理可能存在多个众数的情况,即返回所有频率最高的值。

    Reachout.ai Reachout.ai

    一个AI驱动的视频开发平台,专为忙碌的企业家和销售团队打造

    Reachout.ai 142 查看详情 Reachout.ai
    def finalize(s):
        """
        从合并后的频率计数中确定最终的众数。
        输入是一个 Pandas Series,其索引包含分组键和值,值是合并后的计数。
        """
        # 获取原始分组键的层级(不包括聚合列的值本身)
        level = list(range(s.index.nlevels - 1))
        # 对每个分组,找出频率最高的项
        # s.groupby(level=level) 按原始分组键重新分组
        # apply(lambda x: x[x == x.max()]) 找出每个组内频率等于最大频率的所有项
        return s.groupby(level=level, group_keys=False).apply(lambda x: x[x == x.max()])

在 Dask DataFrame 中应用自定义模式聚合

定义好 chunk、agg 和 finalize 函数后,我们可以将它们封装到 dask.dataframe.Aggregation 对象中,然后将其传递给 Dask DataFrame 的 groupby().aggregate() 方法。

import dask.dataframe as dd
from dask.dataframe import Aggregation

# 将 Pandas DataFrame 转换为 Dask DataFrame
df_dask = dd.from_pandas(data_pandas, npartitions=1) # npartitions=1 简化示例,实际应用中可根据数据大小调整

# 定义自定义的 Dask 模式聚合
mode_dask_agg = Aggregation(
    name="mode", # 聚合的名称
    chunk=chunk,
    agg=agg,
    finalize=finalize,
)

# 应用自定义聚合
mode_dask_result = df_dask.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
    {"brand": mode_dask_agg}
).compute() # .compute() 触发计算并返回 Pandas DataFrame

print("\nDask Groupby Mode Result:")
print(mode_dask_result)

完整示例代码

以下是包含所有步骤的完整示例代码:

from pandas import DataFrame, Series, NA
import pandas as pd
from dask.dataframe import from_pandas, Aggregation
import dask.dataframe as dd
import numpy as np

# 1. 准备数据
data = DataFrame(
    {
        "status": [
            "pending", "pending", "pending", "canceled", "canceled", "canceled", "confirmed", "confirmed", "confirmed",
        ],
        "clientId": ["A", "B", "C", "A", "D", "C", "A", "B", "C"],
        "partner": ["A", NA, "C", "A", NA, "C", "A", NA, "C"],
        "product": [
            "afiliates", "pre-paid", "giftcard", "afiliates", "pre-paid", "giftcard", "afiliates", "pre-paid", "giftcard",
        ],
        "brand": [
            "brand_4", "brand_2", "brand_3", "brand_1", "brand_2", "brand_3", "brand_1", "brand_3", "brand_3",
        ],
        "gmv": [100, 100, 100, 100, 100, 100, 100, 100, 100],
    }
)

data = data.astype(
    {
        "partner": "category",
        "status": "category",
        "product": "category",
        "brand": "category",
    }
)

# 2. Pandas 模式聚合(作为对比)
mode_pandas = data.groupby(["clientId", "product"], observed=True).agg(
    {"brand": Series.mode}
)
print("Pandas Groupby Mode Result:")
print(mode_pandas)

# 3. 转换为 Dask DataFrame
df_dask = from_pandas(data, npartitions=1)

# 4. 定义 Dask 自定义聚合函数的三个阶段
def chunk(s):
    """在每个 Dask 分区上执行,计算每个值的频率。"""
    return s.value_counts()

def agg(s0):
    """合并来自不同分区的中间结果(频率计数)。"""
    _intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
    _intermediate = _intermediate[_intermediate > 0]
    return _intermediate

def finalize(s):
    """从合并后的频率计数中确定最终的众数。"""
    level = list(range(s.index.nlevels - 1))
    return s.groupby(level=level, group_keys=False).apply(lambda x: x[x == x.max()])

# 5. 创建 dask.dataframe.Aggregation 对象
mode_dask_agg = Aggregation(
    name="mode",
    chunk=chunk,
    agg=agg,
    finalize=finalize,
)

# 6. 在 Dask DataFrame 上应用自定义聚合
mode_dask_result = df_dask.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
    {"brand": mode_dask_agg}
).compute()

print("\nDask Groupby Mode Result:")
print(mode_dask_result)

注意事项与 Dask/Pandas 差异

尽管上述自定义聚合旨在模拟 Pandas Series.mode 的行为,但在某些特定情况下,Dask 的结果可能与 Pandas 略有不同。这通常发生在以下情况:

  • 多个众数(Multi-mode): 当一个分组中存在多个值具有相同的最高频率时,Pandas 的 Series.mode 会返回一个包含所有这些众数的 Series。我们自定义的 finalize 函数也尝试处理这种情况,返回所有具有最大频率的值。
  • 数据类型和 NaN 处理: Dask 和 Pandas 在处理分类数据或 NaN 值时可能存在细微差异。dropna=True 参数在 Dask 的 groupby 中可以控制是否在分组前删除 NaN 值。在自定义聚合函数中,也需要确保对 NaN 的处理逻辑符合预期。
  • 性能考量: 自定义聚合虽然功能强大,但其性能可能不如 Dask 内置的、高度优化的聚合函数。对于非常大的数据集,应评估其性能影响。

在实际应用中,建议对比 Dask 和 Pandas 在小规模数据集上的结果,以验证自定义聚合的正确性,并理解任何潜在的差异。

总结

通过 dask.dataframe.Aggregation 类,我们成功地为 Dask DataFrame 的 groupby 操作实现了自定义的模式聚合功能。这种方法不仅解决了 Dask 不直接支持 Series.mode 的问题,也展示了 Dask 框架在处理复杂分布式聚合任务时的灵活性和可扩展性。理解 chunk、agg 和 finalize 三个阶段的工作原理是构建高效、正确自定义聚合的关键。

以上就是Dask DataFrame groupby 模式(Mode)聚合的实现指南的详细内容,更多请关注其它相关文章!


# app  # 按原  # 三个阶段  # 不直接  # 在每个  # 转换为  # 但其  # 频率最高  # 多个  # 自定义  # gate  # 聚合函数  # ai  # 工具  # go  # 是一个  # 长春放心的seo优化  # 1号店游戏推广营销策略  # 闽侯厦门网站建设  # 营销推广又叫销售  # seo中心加盟平台  # 营销推广图片合集大全  # 活动策划免费网站推广  # 冯文星seo  # 英文网站建设原创模板  # 关键词宝贝真实排名 


相关栏目: 【 科技资讯46185 】 【 网络学院92790


相关推荐: CSS如何设置hover状态颜色_hover伪类调整背景或文字颜色  Mac怎么锁定备忘录_Mac备忘录加密设置教程  Gmail邮箱申请注册直达_Gmail邮箱免费注册PC版官网入口2025  b站如何看历史记录_b站观看历史找回方法  QQ邮箱稳定登录入口_QQ邮箱官方网站网页版使用  Golang如何实现状态模式管理对象状态_Golang State模式实现技巧  Win11怎么关闭快速启动_Win11彻底关机设置教程  HTML5原生日期选择器与jQuery UI:实现日期选择器的联动与程序化控制  内存疯狂猛猛涨价:主板销量直接腰斩!  如何使用J*aScript精确选择并批量修改特定父元素下子链接的样式  sublime如何配置Go语言开发环境_sublime搭建Golang编译运行系统  excel怎么制作工资条 excel快速生成工资条的方法  KFC套餐升级怎么获取优惠代码_KFC套餐升级活动与优惠代码获取方法  C++如何实现线程池_C++11手动实现一个简单的固定大小线程池  163邮箱官方主页登录 直达网易邮箱登录核心页面  俄罗斯Yandex搜索引擎入口_Yandex官网免登录一键访问  零跑汽车11月交付量达70327台 实现连续9个月正增长  Golang如何使用const iota_Go iota常量计数器讲解  J*aScript数据结构转换:将对象数组按类别分组  怎么去除衣服上的口红印_生活小妙招教你用酒精轻松擦除  《铁拳8》黑皮辣妹新实机:元气满满的18岁少女!  批改网学生版PC登录 批改网官网登录系统入口  Highcharts 雷达图径向轴标签定制指南:利用多Y轴实现数值标注  b站怎么删除评论_b站评论管理与删除操作  Go语言中高效处理x-www-form-urlencoded表单数据  在命令行怎么运行html项目_命令行运行html项目方法【教程】  Pandas DataFrame 多条件优先级排序与排名  AO3官网镜像链接 Archive of Our Own同人文在线浏览  机器学习中对数变换预测结果的反向还原  MAC怎么安装Homebrew包管理器_MAC为开发者和高级用户安装命令行工具  b站怎么取消点赞_b站点赞取消操作方法  解决 MongoDB 聚合查询中对象数组 _id 匹配问题  Angular中单选按钮的正确使用与常见陷阱解析  怎么在html里运行vbs脚本_html中运行vbs脚本方法【教程】  邮政编码查询不到怎么办_邮政编码查询不到的常见原因与对策  今日头条怎么同步内容到抖音_今日头条内容同步到抖音教程  J*aScript中localStorage数据的获取、清洗与格式化教程  Go Martini框架:动态服务解码后的图片内容  Golang如何测试channel通信行为_Golang channel通信测试与分析方法  圆通快递查询实时追踪 圆通物流包裹状态快速查看  铁路12306官网网页端快速入口 铁路12306官方首页登录教程  Windows10怎么开启夜间模式 Windows10系统设置调整色温与亮度缓解夜间用眼疲劳【教程】  Pandas DataFrame 高效批量赋值:告别循环与笛卡尔积误区  JUnit5/Mockito:优雅测试内部依赖与异常处理的实践  MAC的“快捷指令”怎么同步到iPhone_MAC利用iCloud同步所有设备的自动化指令  outlook中文官网入口地址 outlook官方中文版直达首页链接  没有大陆身份证/银行卡如何实名微信? 亲测有效的几种方法分享  Python多线程中正确使用sigwait处理SIGALRM信号  Golang如何使用bytes.Split分割字节切片_Golang bytes切片分割方法  小猿搜题在线学习页面在哪_小猿搜题在线学习中心入口 

搜索