OA0 = Omni AI 0
OA0 是一个探索 AI 的论坛
现在注册
已注册用户请  登录
OA0  ›  技能包  ›  csv-pipeline:针对 CSV 与 JSON 数据的自动化处理、转换、分析与报表生成

csv-pipeline:针对 CSV 与 JSON 数据的自动化处理、转换、分析与报表生成

 
  container ·  2026-02-05 20:42:13 · 3 次点击  · 0 条评论  

名称: csv-pipeline
描述: 用于处理、转换、分析 CSV 和 JSON 数据文件并生成报告。适用于需要筛选行、合并数据集、计算聚合、转换格式、去重或从表格数据生成摘要报告的场景。支持任何 CSV、TSV 或 JSON Lines 文件。
元数据: {"clawdbot":{"emoji":"📊","requires":{"anyBins":["python3","python","uv"]},"os":["linux","darwin","win32"]}}


CSV 数据处理管道

使用标准命令行工具和 Python 处理表格数据(CSV、TSV、JSON、JSON Lines)。除 Python 3 外无需其他外部依赖。

适用场景

  • 用户提供 CSV/TSV/JSON 文件并要求分析、转换或生成报告
  • 对表格数据进行连接、筛选、分组或聚合
  • 格式转换(CSV 转 JSON、JSON 转 CSV 等)
  • 对混乱数据进行去重、排序或清洗
  • 生成汇总统计或报告
  • ETL 工作流:从一种格式提取、转换、加载到另一种格式

使用标准工具快速操作

数据探查

# 预览前几行
head -5 data.csv

# 统计行数(排除表头)
tail -n +2 data.csv | wc -l

# 显示列标题
head -1 data.csv

# 统计某列的唯一值数量(第 3 列)
tail -n +2 data.csv | cut -d',' -f3 | sort -u | wc -l

使用 awk 筛选

# 筛选第 3 列大于 100 的行
awk -F',' 'NR==1 || $3 > 100' data.csv > filtered.csv

# 筛选第 2 列匹配模式的行
awk -F',' 'NR==1 || $2 ~ /pattern/' data.csv > matched.csv

# 对第 4 列求和
awk -F',' 'NR>1 {sum += $4} END {print sum}' data.csv

排序与去重

# 按第 2 列排序(数值)
head -1 data.csv > sorted.csv && tail -n +2 data.csv | sort -t',' -k2 -n >> sorted.csv

# 按所有列去重
head -1 data.csv > deduped.csv && tail -n +2 data.csv | sort -u >> deduped.csv

# 按指定列去重(保留首次出现)
awk -F',' '!seen[$2]++' data.csv > deduped.csv

Python 操作(用于复杂转换)

读取与探查

import csv, json, sys
from collections import Counter

def read_csv(path, delimiter=','):
    """读取 CSV/TSV 文件为字典列表。"""
    with open(path, newline='', encoding='utf-8') as f:
        return list(csv.DictReader(f, delimiter=delimiter))

def write_csv(rows, path, delimiter=','):
    """将字典列表写入 CSV 文件。"""
    if not rows:
        return
    with open(path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=rows[0].keys(), delimiter=delimiter)
        writer.writeheader()
        writer.writerows(rows)

# 快速统计
data = read_csv('data.csv')
print(f"行数: {len(data)}")
print(f"列名: {list(data[0].keys())}")
for col in data[0]:
    non_empty = sum(1 for r in data if r[col].strip())
    print(f"  {col}: {non_empty}/{len(data)} 非空")

筛选与转换

# 筛选行
filtered = [r for r in data if float(r['amount']) > 100]

# 添加计算列
for r in data:
    r['total'] = str(float(r['price']) * int(r['quantity']))

# 重命名列
renamed = [{('new_name' if k == 'old_name' else k): v for k, v in r.items()} for r in data]

# 类型转换
for r in data:
    r['amount'] = float(r['amount'])
    r['date'] = r['date'].strip()

分组与聚合

from collections import defaultdict

def group_by(rows, key):
    """按指定列的值对行进行分组。"""
    groups = defaultdict(list)
    for r in rows:
        groups[r[key]].append(r)
    return dict(groups)

def aggregate(rows, group_col, agg_col, func='sum'):
    """按组聚合指定列。"""
    groups = group_by(rows, group_col)
    results = []
    for name, group in sorted(groups.items()):
        values = [float(r[agg_col]) for r in group if r[agg_col].strip()]
        if func == 'sum':
            agg = sum(values)
        elif func == 'avg':
            agg = sum(values) / len(values) if values else 0
        elif func == 'count':
            agg = len(values)
        elif func == 'min':
            agg = min(values) if values else 0
        elif func == 'max':
            agg = max(values) if values else 0
        results.append({group_col: name, f'{func}_{agg_col}': str(agg), 'count': str(len(group))})
    return results

# 示例:按类别汇总收入
summary = aggregate(data, 'category', 'revenue', 'sum')
write_csv(summary, 'summary.csv')

连接数据集

def inner_join(left, right, on):
    """基于键列对两个数据集进行内连接。"""
    right_index = {}
    for r in right:
        key = r[on]
        if key not in right_index:
            right_index[key] = []
        right_index[key].append(r)

    results = []
    for lr in left:
        key = lr[on]
        if key in right_index:
            for rr in right_index[key]:
                merged = {**lr}
                for k, v in rr.items():
                    if k != on:
                        merged[k] = v
                results.append(merged)
    return results

def left_join(left, right, on):
    """左连接:保留所有左表行,缺失的右表数据留空。"""
    right_index = {}
    right_cols = set()
    for r in right:
        key = r[on]
        right_cols.update(r.keys())
        if key not in right_index:
            right_index[key] = []
        right_index[key].append(r)
    right_cols.discard(on)

    results = []
    for lr in left:
        key = lr[on]
        if key in right_index:
            for rr in right_index[key]:
                merged = {**lr}
                for k, v in rr.items():
                    if k != on:
                        merged[k] = v
                results.append(merged)
        else:
            merged = {**lr}
            for col in right_cols:
                merged[col] = ''
            results.append(merged)
    return results

# 示例
orders = read_csv('orders.csv')
customers = read_csv('customers.csv')
joined = left_join(orders, customers, on='customer_id')
write_csv(joined, 'orders_with_customers.csv')

去重

def deduplicate(rows, key_cols=None):
    """移除重复行。若指定 key_cols,则仅基于这些列去重。"""
    seen = set()
    unique = []
    for r in rows:
        if key_cols:
            key = tuple(r[c] for c in key_cols)
        else:
            key = tuple(sorted(r.items()))
        if key not in seen:
            seen.add(key)
            unique.append(r)
    return unique

# 按 email 列去重
clean = deduplicate(data, key_cols=['email'])

格式转换

CSV 转 JSON

import json, csv

with open('data.csv', newline='', encoding='utf-8') as f:
    rows = list(csv.DictReader(f))

# 对象数组格式
with open('data.json', 'w') as f:
    json.dump(rows, f, indent=2)

# JSON Lines 格式(每行一个对象,可流式处理)
with open('data.jsonl', 'w') as f:
    for row in rows:
        f.write(json.dumps(row) + '\n')

JSON 转 CSV

import json, csv

with open('data.json') as f:
    rows = json.load(f)

with open('data.csv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=rows[0].keys())
    writer.writeheader()
    writer.writerows(rows)

JSON Lines 转 CSV

import json, csv

rows = []
with open('data.jsonl') as f:
    for line in f:
        if line.strip():
            rows.append(json.loads(line))

with open('data.csv', 'w', newline='', encoding='utf-8') as f:
    all_keys = set()
    for r in rows:
        all_keys.update(r.keys())
    writer = csv.DictWriter(f, fieldnames=sorted(all_keys))
    writer.writeheader()
    writer.writerows(rows)

TSV 转 CSV

tr '\t' ',' < data.tsv > data.csv

数据清洗模式

修复常见 CSV 问题

def clean_csv(rows):
    """清洗常见的 CSV 数据质量问题。"""
    cleaned = []
    for r in rows:
        clean_row = {}
        for k, v in r.items():
            # 去除键和值的空白字符
            k = k.strip()
            v = v.strip() if isinstance(v, str) else v
            # 规范化空值
            if v in ('', 'N/A', 'n/a', 'NA', 'null', 'NULL', 'None', '-'):
                v = ''
            # 规范化布尔值
            if v.lower() in ('true', 'yes', '1', 'y'):
                v = 'true'
            elif v.lower() in ('false', 'no', '0', 'n'):
                v = 'false'
            clean_row[k] = v
        cleaned.append(clean_row)
    return cleaned

验证数据类型

def validate_rows(rows, schema):
    """
    根据模式验证行数据。
    schema: 字典,列名 -> 'int'|'float'|'date'|'email'|'str'
    返回 (有效行, 错误行)
    """
    import re
    valid, errors = [], []
    for i, r in enumerate(rows):
        errs = []
        for col, dtype in schema.items():
            val = r.get(col, '').strip()
            if not val:
                continue
            if dtype == 'int':
                try:
                    int(val)
                except ValueError:
                    errs.append(f"{col}: '{val}' 不是整数")
            elif dtype == 'float':
                try:
                    float(val)
                except ValueError:
                    errs.append(f"{col}: '{val}' 不是浮点数")
            elif dtype == 'email':
                if not re.match(r'^[^@]+@[^@]+\.[^@]+$', val):
                    errs.append(f"{col}: '{val}' 不是有效邮箱")
            elif dtype == 'date':
                if not re.match(r'^\d{4}-\d{2}-\d{2}', val):
                    errs.append(f"{col}: '{val}' 不是 YYYY-MM-DD 格式")
        if errs:
            errors.append({'row': i + 2, 'errors': errs, 'data': r})
        else:
            valid.append(r)
    return valid, errors

# 用法
valid, bad = validate_rows(data, {'amount': 'float', 'email': 'email', 'date': 'date'})
print(f"有效行: {len(valid)}, 错误行: {len(bad)}")
for e in bad[:5]:
    print(f"  第 {e['row']} 行: {e['errors']}")

生成报告

生成 Markdown 摘要报告

def generate_report(data, title, group_col, value_col):
    """生成 Markdown 格式的摘要报告。"""
    lines = [f"# {title}", f"", f"**总行数**: {len(data)}", ""]

    # 分组摘要
    groups = group_by(data, group_col)
    lines.append(f"## 按 {group_col} 分组")
    lines.append("")
    lines.append(f"| {group_col} | 数量 | 总和 | 平均值 | 最小值 | 最大值 |")
    lines.append("|---|---|---|---|---|---|")

    for name in sorted(groups):
        vals = [float(r[value_col]) for r in groups[name] if r[value_col].strip()]
        if vals:
            lines.append(f"| {name} | {len(vals)} | {sum(vals):.2f} | {sum(vals)/len(vals):.2f} | {min(vals):.2f} | {max(vals):.2f} |")

    lines.append("")
    lines.append(f"*基于 {len(data)} 行数据生成*")
    return '\n'.join(lines)

report = generate_report(data, "销售摘要", "category", "revenue")
with open('report.md', 'w') as f:
    f.write(report)

大文件处理

对于无法一次性加载到内存的大文件:

def stream_process(input_path, output_path, transform_fn, delimiter=','):
    """逐行处理 CSV 文件,无需加载整个文件。"""
    with open(input_path, newline='', encoding='utf-8') as fin, \
         open(output_path, 'w', newline='', encoding='utf-8') as fout:
        reader = csv.DictReader(fin, delimiter=delimiter)
        writer = None
        for row in reader:
            result = transform_fn(row)
            if result is None:
                continue  # 跳过该行
            if writer is None:
                writer = csv.DictWriter(fout, fieldnames=result.keys(), delimiter=delimiter)
                writer.writeheader()
            writer.writerow(result)

# 示例:以流式方式筛选和转换
def process_row(row):
    if float(row.get('amount', 0) or 0) < 10:
        return None  # 跳过小额记录
    row['amount_usd'] = str(float(row['amount']) * 1.0)  # 添加计算字段
    return row

stream_process('big_file.csv', 'output.csv', process_row)

实用技巧

  • 始终检查文件编码:使用 file -i data.csv 或对含 BOM 的文件使用 encoding='utf-8-sig' 打开
  • 对于包含逗号的 Excel 导出值,CSV 模块会自动处理引号
  • 处理国际字符时,使用 json.dumps(ensure_ascii=False)
  • 对于竖线分隔的文件,在 csv.reader/writer 中使用 delimiter='|'
  • 对于非常大的聚合操作,可考虑使用 Python 内置的 sqlite3
    bash sqlite3 :memory: ".mode csv" ".import data.csv t" "SELECT category, SUM(amount) FROM t GROUP BY category;"
3 次点击  ∙  0 人收藏  
登录后收藏  
目前尚无回复
0 条回复
About   ·   Help   ·    
OA0 - Omni AI 0 一个探索 AI 的社区
沪ICP备2024103595号-2
Developed with Cursor