OA0 = Omni AI 0
OA0 是一个探索 AI 的论坛
现在注册
已注册用户请  登录
OA0  ›  技能包  ›  pandas-construction-analysis:基于 Pandas 的综合建筑工程数据分析

pandas-construction-analysis:基于 Pandas 的综合建筑工程数据分析

 
  authorization ·  2026-02-24 15:08:41 · 2 次点击  · 0 条评论  

名称: "pandas-construction-analysis"
描述: "用于建筑工程数据分析的综合性 Pandas 工具包。可对结构化工程数据进行过滤、分组、聚合BIM构件、计算工程量、合并数据集及生成报告。"
主页: "https://datadrivenconstruction.io"
元数据: {"openclaw": {"emoji": "🐼", "os": ["darwin", "linux", "win32"], "homepage": "https://datadrivenconstruction.io", "requires": {"bins": ["python3"]}}}


Pandas 建筑工程数据分析

概述

基于 DDC 方法论(第 2.3 章),本技能提供了一套用于建筑工程数据处理的综合性 Pandas 操作。Pandas 是数据分析师的瑞士军刀——从简单的数据过滤到跨数百万行的复杂聚合,无所不能。

参考书籍:《Pandas DataFrame 与 LLM ChatGPT》

“使用 Pandas,您可以管理和分析远超 Excel 能力的数据集。Excel 最多能处理约 100 万行数据,而 Pandas 可以轻松处理包含数千万行数据的数据集。”
— DDC 书籍,第 2.3 章

快速开始

import pandas as pd

# 读取工程数据
df = pd.read_excel("bim_export.xlsx")

# 基本操作
print(df.head())           # 前 5 行
print(df.info())           # 列类型和内存信息
print(df.describe())       # 数值列的统计信息

# 过滤结构构件
structural = df[df['Category'] == 'Structural']

# 计算总体积
total_volume = df['Volume'].sum()
print(f"总体积: {total_volume:.2f} m³")

DataFrame 基础

创建 DataFrame

import pandas as pd

# 从字典创建(工程构件)
elements = pd.DataFrame({
    'ElementId': ['E001', 'E002', 'E003', 'E004'],
    'Category': ['Wall', 'Floor', 'Wall', 'Column'],
    'Material': ['Concrete', 'Concrete', 'Brick', 'Steel'],
    'Volume_m3': [45.5, 120.0, 32.0, 8.5],
    'Level': ['Level 1', 'Level 1', 'Level 2', 'Level 1']
})

# 从 CSV 读取
df_csv = pd.read_csv("construction_data.csv")

# 从 Excel 读取
df_excel = pd.read_excel("project_data.xlsx", sheet_name="Elements")

# 从多个 Excel 工作表读取
all_sheets = pd.read_excel("project.xlsx", sheet_name=None)  # 返回 DataFrame 字典

工程数据中的数据类型

# 工程中常见的数据类型
df = pd.DataFrame({
    'element_id': pd.Series(['W001', 'W002'], dtype='string'),
    'quantity': pd.Series([10, 20], dtype='int64'),
    'volume': pd.Series([45.5, 32.0], dtype='float64'),
    'is_structural': pd.Series([True, False], dtype='bool'),
    'created_date': pd.to_datetime(['2024-01-15', '2024-01-16']),
    'category': pd.Categorical(['Wall', 'Slab'])
})

# 检查数据类型
print(df.dtypes)

# 转换类型
df['quantity'] = df['quantity'].astype('float64')
df['volume'] = pd.to_numeric(df['volume'], errors='coerce')

过滤与选择

基本过滤

# 单条件过滤
walls = df[df['Category'] == 'Wall']

# 多条件过滤(AND)
large_concrete = df[(df['Material'] == 'Concrete') & (df['Volume_m3'] > 50)]

# 多条件过滤(OR)
walls_or_floors = df[(df['Category'] == 'Wall') | (df['Category'] == 'Floor')]

# 使用 isin 匹配多个值
structural = df[df['Category'].isin(['Wall', 'Column', 'Beam', 'Foundation'])]

# 字符串包含
insulated = df[df['Description'].str.contains('insulated', case=False, na=False)]

# 空值过滤
incomplete = df[df['Cost'].isna()]
complete = df[df['Cost'].notna()]

高级选择

# 选择列
volumes = df[['ElementId', 'Category', 'Volume_m3']]

# Query 语法(类 SQL)
result = df.query("Category == 'Wall' and Volume_m3 > 30")

# Loc 和 iloc
specific_row = df.loc[0]                    # 按标签
range_rows = df.iloc[0:10]                  # 按位置
specific_cell = df.loc[0, 'Volume_m3']      # 行和列
subset = df.loc[0:5, ['Category', 'Volume_m3']]  # 行范围与列

分组与聚合

GroupBy 操作

# 基本分组
by_category = df.groupby('Category')['Volume_m3'].sum()

# 多重聚合
summary = df.groupby('Category').agg({
    'Volume_m3': ['sum', 'mean', 'count'],
    'Cost': ['sum', 'mean']
})

# 命名聚合(输出更清晰)
summary = df.groupby('Category').agg(
    total_volume=('Volume_m3', 'sum'),
    avg_volume=('Volume_m3', 'mean'),
    element_count=('ElementId', 'count'),
    total_cost=('Cost', 'sum')
).reset_index()

# 多列分组
by_level_cat = df.groupby(['Level', 'Category']).agg({
    'Volume_m3': 'sum',
    'Cost': 'sum'
}).reset_index()

数据透视表

# 创建透视表
pivot = pd.pivot_table(
    df,
    values='Volume_m3',
    index='Level',
    columns='Category',
    aggfunc='sum',
    fill_value=0,
    margins=True,           # 添加总计
    margins_name='Total'
)

# 多个值
pivot_detailed = pd.pivot_table(
    df,
    values=['Volume_m3', 'Cost'],
    index='Level',
    columns='Category',
    aggfunc={'Volume_m3': 'sum', 'Cost': 'mean'}
)

数据转换

添加计算列

# 简单计算
df['Cost_Total'] = df['Volume_m3'] * df['Unit_Price']

# 条件列
df['Size_Category'] = df['Volume_m3'].apply(
    lambda x: 'Large' if x > 50 else ('Medium' if x > 20 else 'Small')
)

# 使用 np.where 处理二元条件
import numpy as np
df['Is_Large'] = np.where(df['Volume_m3'] > 50, True, False)

# 使用 cut 进行分箱
df['Volume_Bin'] = pd.cut(
    df['Volume_m3'],
    bins=[0, 10, 50, 100, float('inf')],
    labels=['XS', 'S', 'M', 'L']
)

字符串操作

# 从字符串中提取
df['Level_Number'] = df['Level'].str.extract(r'(\d+)').astype(int)

# 拆分并扩展
df[['Building', 'Floor']] = df['Location'].str.split('-', expand=True)

# 清理字符串
df['Category'] = df['Category'].str.strip().str.lower().str.title()

# 替换值
df['Material'] = df['Material'].str.replace('Reinforced Concrete', 'RC')

日期操作

# 解析日期
df['Start_Date'] = pd.to_datetime(df['Start_Date'])

# 提取日期组件
df['Year'] = df['Start_Date'].dt.year
df['Month'] = df['Start_Date'].dt.month
df['Week'] = df['Start_Date'].dt.isocalendar().week
df['DayOfWeek'] = df['Start_Date'].dt.day_name()

# 计算持续时间
df['Duration_Days'] = (df['End_Date'] - df['Start_Date']).dt.days

# 按日期范围过滤
recent = df[df['Start_Date'] >= '2024-01-01']

合并与连接

合并 DataFrame

# 构件数据
elements = pd.DataFrame({
    'ElementId': ['E001', 'E002', 'E003'],
    'Category': ['Wall', 'Floor', 'Column'],
    'Volume_m3': [45.5, 120.0, 8.5]
})

# 单价数据
prices = pd.DataFrame({
    'Category': ['Wall', 'Floor', 'Column', 'Beam'],
    'Unit_Price': [150, 80, 450, 200]
})

# 内连接(仅匹配项)
merged = elements.merge(prices, on='Category', how='inner')

# 左连接(保留所有构件)
merged = elements.merge(prices, on='Category', how='left')

# 在不同列名上连接
result = df1.merge(df2, left_on='elem_id', right_on='ElementId')

拼接 DataFrame

# 垂直拼接(堆叠)
all_floors = pd.concat([floor1_df, floor2_df, floor3_df], ignore_index=True)

# 水平拼接
combined = pd.concat([quantities, costs, schedule], axis=1)

# 追加新行
new_elements = pd.DataFrame({'ElementId': ['E004'], 'Category': ['Beam']})
df = pd.concat([df, new_elements], ignore_index=True)

工程专项分析

工程量计算 (QTO)

def generate_qto_report(df):
    """按类别生成工程量计算报告"""
    qto = df.groupby(['Category', 'Material']).agg(
        count=('ElementId', 'count'),
        total_volume=('Volume_m3', 'sum'),
        total_area=('Area_m2', 'sum'),
        avg_volume=('Volume_m3', 'mean')
    ).round(2)

    # 添加百分比列
    qto['volume_pct'] = (qto['total_volume'] /
                          qto['total_volume'].sum() * 100).round(1)

    return qto.sort_values('total_volume', ascending=False)

# 用法
qto_report = generate_qto_report(df)
qto_report.to_excel("qto_report.xlsx")

成本估算

def calculate_project_cost(elements_df, prices_df, markup=0.15):
    """计算包含利润的总项目成本"""
    # 与单价合并
    df = elements_df.merge(prices_df, on='Category', how='left')

    # 计算基础成本
    df['Base_Cost'] = df['Volume_m3'] * df['Unit_Price']

    # 应用利润
    df['Total_Cost'] = df['Base_Cost'] * (1 + markup)

    # 按类别汇总
    summary = df.groupby('Category').agg(
        volume=('Volume_m3', 'sum'),
        base_cost=('Base_Cost', 'sum'),
        total_cost=('Total_Cost', 'sum')
    ).round(2)

    return df, summary, summary['total_cost'].sum()

# 用法
detailed, summary, total = calculate_project_cost(elements, prices)
print(f"项目总计: ${total:,.2f}")

材料汇总

def material_summary(df):
    """汇总项目中的材料"""
    summary = df.groupby('Material').agg({
        'Volume_m3': 'sum',
        'Weight_kg': 'sum',
        'ElementId': 'nunique'
    }).rename(columns={'ElementId': 'Element_Count'})

    summary['Volume_Pct'] = (summary['Volume_m3'] /
                              summary['Volume_m3'].sum() * 100).round(1)

    return summary.sort_values('Volume_m3', ascending=False)

按楼层分析

def analyze_by_level(df):
    """按建筑楼层分析工程量"""
    level_summary = df.pivot_table(
        values=['Volume_m3', 'Cost'],
        index='Level',
        columns='Category',
        aggfunc='sum',
        fill_value=0
    )

    level_summary['Total_Volume'] = level_summary['Volume_m3'].sum(axis=1)
    level_summary['Total_Cost'] = level_summary['Cost'].sum(axis=1)

    return level_summary

数据导出

导出到 Excel(多工作表)

def export_to_excel_formatted(df, summary, filepath):
    """以多工作表格式导出"""
    with pd.ExcelWriter(filepath, engine='openpyxl') as writer:
        df.to_excel(writer, sheet_name='Details', index=False)
        summary.to_excel(writer, sheet_name='Summary')

        pivot = pd.pivot_table(df, values='Volume_m3',
                               index='Level', columns='Category')
        pivot.to_excel(writer, sheet_name='By_Level')

# 用法
export_to_excel_formatted(elements, qto_summary, "project_report.xlsx")

导出到 CSV

# 基本导出
df.to_csv("output.csv", index=False)

# 指定编码以处理特殊字符
df.to_csv("output.csv", index=False, encoding='utf-8-sig')

# 指定列
df[['ElementId', 'Category', 'Volume_m3']].to_csv("volumes.csv", index=False)

性能优化技巧

# 对唯一值较少的字符串列使用分类类型
df['Category'] = df['Category'].astype('category')

# 仅读取所需列
df = pd.read_csv("large_file.csv", usecols=['ElementId', 'Category', 'Volume'])

# 对超大文件使用分块读取
chunks = pd.read_csv("huge_file.csv", chunksize=100000)
result = pd.concat([chunk[chunk['Category'] == 'Wall'] for chunk in chunks])

# 检查内存使用
print(df.memory_usage(deep=True).sum() / 1024**2, "MB")

快速参考

操作 代码
读取 Excel pd.read_excel("file.xlsx")
读取 CSV pd.read_csv("file.csv")
过滤行 df[df['Column'] == 'Value']
选择列 df[['Col1', 'Col2']]
分组求和 df.groupby('Cat')['Vol'].sum()
数据透视表 pd.pivot_table(df, values='Vol', index='Level')
合并 df1.merge(df2, on='key')
添加列 df['New'] = df['A'] * df['B']
导出 Excel df.to_excel("out.xlsx", index=False)

资源

  • 书籍:《数据驱动的建筑工程》Artem Boiko 著,第 2.3 章
  • 网站:https://datadrivenconstruction.io
  • Pandas 文档:https://pandas.pydata.org/docs/

后续步骤

  • 查看 llm-data-automation 以了解如何使用 AI 生成 Pandas 代码
  • 查看 qto-report 以了解专业的工程量计算
  • 查看 cost-estimation-resource 以了解详细的成本计算
2 次点击  ∙  0 人收藏  
登录后收藏  
目前尚无回复
0 条回复
About   ·   Help   ·    
OA0 - Omni AI 0 一个探索 AI 的社区
沪ICP备2024103595号-2
Developed with Cursor