名称: csv-pipeline
描述: 用于处理、转换、分析 CSV 和 JSON 数据文件并生成报告。适用于需要筛选行、合并数据集、计算聚合、转换格式、去重或从表格数据生成摘要报告的场景。支持任何 CSV、TSV 或 JSON Lines 文件。
元数据: {"clawdbot":{"emoji":"📊","requires":{"anyBins":["python3","python","uv"]},"os":["linux","darwin","win32"]}}
使用标准命令行工具和 Python 处理表格数据(CSV、TSV、JSON、JSON Lines)。除 Python 3 外无需其他外部依赖。
# 预览前几行
head -5 data.csv
# 统计行数(排除表头)
tail -n +2 data.csv | wc -l
# 显示列标题
head -1 data.csv
# 统计某列的唯一值数量(第 3 列)
tail -n +2 data.csv | cut -d',' -f3 | sort -u | wc -l
awk 筛选# 筛选第 3 列大于 100 的行
awk -F',' 'NR==1 || $3 > 100' data.csv > filtered.csv
# 筛选第 2 列匹配模式的行
awk -F',' 'NR==1 || $2 ~ /pattern/' data.csv > matched.csv
# 对第 4 列求和
awk -F',' 'NR>1 {sum += $4} END {print sum}' data.csv
# 按第 2 列排序(数值)
head -1 data.csv > sorted.csv && tail -n +2 data.csv | sort -t',' -k2 -n >> sorted.csv
# 按所有列去重
head -1 data.csv > deduped.csv && tail -n +2 data.csv | sort -u >> deduped.csv
# 按指定列去重(保留首次出现)
awk -F',' '!seen[$2]++' data.csv > deduped.csv
import csv, json, sys
from collections import Counter
def read_csv(path, delimiter=','):
"""读取 CSV/TSV 文件为字典列表。"""
with open(path, newline='', encoding='utf-8') as f:
return list(csv.DictReader(f, delimiter=delimiter))
def write_csv(rows, path, delimiter=','):
"""将字典列表写入 CSV 文件。"""
if not rows:
return
with open(path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys(), delimiter=delimiter)
writer.writeheader()
writer.writerows(rows)
# 快速统计
data = read_csv('data.csv')
print(f"行数: {len(data)}")
print(f"列名: {list(data[0].keys())}")
for col in data[0]:
non_empty = sum(1 for r in data if r[col].strip())
print(f" {col}: {non_empty}/{len(data)} 非空")
# 筛选行
filtered = [r for r in data if float(r['amount']) > 100]
# 添加计算列
for r in data:
r['total'] = str(float(r['price']) * int(r['quantity']))
# 重命名列
renamed = [{('new_name' if k == 'old_name' else k): v for k, v in r.items()} for r in data]
# 类型转换
for r in data:
r['amount'] = float(r['amount'])
r['date'] = r['date'].strip()
from collections import defaultdict
def group_by(rows, key):
"""按指定列的值对行进行分组。"""
groups = defaultdict(list)
for r in rows:
groups[r[key]].append(r)
return dict(groups)
def aggregate(rows, group_col, agg_col, func='sum'):
"""按组聚合指定列。"""
groups = group_by(rows, group_col)
results = []
for name, group in sorted(groups.items()):
values = [float(r[agg_col]) for r in group if r[agg_col].strip()]
if func == 'sum':
agg = sum(values)
elif func == 'avg':
agg = sum(values) / len(values) if values else 0
elif func == 'count':
agg = len(values)
elif func == 'min':
agg = min(values) if values else 0
elif func == 'max':
agg = max(values) if values else 0
results.append({group_col: name, f'{func}_{agg_col}': str(agg), 'count': str(len(group))})
return results
# 示例:按类别汇总收入
summary = aggregate(data, 'category', 'revenue', 'sum')
write_csv(summary, 'summary.csv')
def inner_join(left, right, on):
"""基于键列对两个数据集进行内连接。"""
right_index = {}
for r in right:
key = r[on]
if key not in right_index:
right_index[key] = []
right_index[key].append(r)
results = []
for lr in left:
key = lr[on]
if key in right_index:
for rr in right_index[key]:
merged = {**lr}
for k, v in rr.items():
if k != on:
merged[k] = v
results.append(merged)
return results
def left_join(left, right, on):
"""左连接:保留所有左表行,缺失的右表数据留空。"""
right_index = {}
right_cols = set()
for r in right:
key = r[on]
right_cols.update(r.keys())
if key not in right_index:
right_index[key] = []
right_index[key].append(r)
right_cols.discard(on)
results = []
for lr in left:
key = lr[on]
if key in right_index:
for rr in right_index[key]:
merged = {**lr}
for k, v in rr.items():
if k != on:
merged[k] = v
results.append(merged)
else:
merged = {**lr}
for col in right_cols:
merged[col] = ''
results.append(merged)
return results
# 示例
orders = read_csv('orders.csv')
customers = read_csv('customers.csv')
joined = left_join(orders, customers, on='customer_id')
write_csv(joined, 'orders_with_customers.csv')
def deduplicate(rows, key_cols=None):
"""移除重复行。若指定 key_cols,则仅基于这些列去重。"""
seen = set()
unique = []
for r in rows:
if key_cols:
key = tuple(r[c] for c in key_cols)
else:
key = tuple(sorted(r.items()))
if key not in seen:
seen.add(key)
unique.append(r)
return unique
# 按 email 列去重
clean = deduplicate(data, key_cols=['email'])
import json, csv
with open('data.csv', newline='', encoding='utf-8') as f:
rows = list(csv.DictReader(f))
# 对象数组格式
with open('data.json', 'w') as f:
json.dump(rows, f, indent=2)
# JSON Lines 格式(每行一个对象,可流式处理)
with open('data.jsonl', 'w') as f:
for row in rows:
f.write(json.dumps(row) + '\n')
import json, csv
with open('data.json') as f:
rows = json.load(f)
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
import json, csv
rows = []
with open('data.jsonl') as f:
for line in f:
if line.strip():
rows.append(json.loads(line))
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
all_keys = set()
for r in rows:
all_keys.update(r.keys())
writer = csv.DictWriter(f, fieldnames=sorted(all_keys))
writer.writeheader()
writer.writerows(rows)
tr '\t' ',' < data.tsv > data.csv
def clean_csv(rows):
"""清洗常见的 CSV 数据质量问题。"""
cleaned = []
for r in rows:
clean_row = {}
for k, v in r.items():
# 去除键和值的空白字符
k = k.strip()
v = v.strip() if isinstance(v, str) else v
# 规范化空值
if v in ('', 'N/A', 'n/a', 'NA', 'null', 'NULL', 'None', '-'):
v = ''
# 规范化布尔值
if v.lower() in ('true', 'yes', '1', 'y'):
v = 'true'
elif v.lower() in ('false', 'no', '0', 'n'):
v = 'false'
clean_row[k] = v
cleaned.append(clean_row)
return cleaned
def validate_rows(rows, schema):
"""
根据模式验证行数据。
schema: 字典,列名 -> 'int'|'float'|'date'|'email'|'str'
返回 (有效行, 错误行)
"""
import re
valid, errors = [], []
for i, r in enumerate(rows):
errs = []
for col, dtype in schema.items():
val = r.get(col, '').strip()
if not val:
continue
if dtype == 'int':
try:
int(val)
except ValueError:
errs.append(f"{col}: '{val}' 不是整数")
elif dtype == 'float':
try:
float(val)
except ValueError:
errs.append(f"{col}: '{val}' 不是浮点数")
elif dtype == 'email':
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', val):
errs.append(f"{col}: '{val}' 不是有效邮箱")
elif dtype == 'date':
if not re.match(r'^\d{4}-\d{2}-\d{2}', val):
errs.append(f"{col}: '{val}' 不是 YYYY-MM-DD 格式")
if errs:
errors.append({'row': i + 2, 'errors': errs, 'data': r})
else:
valid.append(r)
return valid, errors
# 用法
valid, bad = validate_rows(data, {'amount': 'float', 'email': 'email', 'date': 'date'})
print(f"有效行: {len(valid)}, 错误行: {len(bad)}")
for e in bad[:5]:
print(f" 第 {e['row']} 行: {e['errors']}")
def generate_report(data, title, group_col, value_col):
"""生成 Markdown 格式的摘要报告。"""
lines = [f"# {title}", f"", f"**总行数**: {len(data)}", ""]
# 分组摘要
groups = group_by(data, group_col)
lines.append(f"## 按 {group_col} 分组")
lines.append("")
lines.append(f"| {group_col} | 数量 | 总和 | 平均值 | 最小值 | 最大值 |")
lines.append("|---|---|---|---|---|---|")
for name in sorted(groups):
vals = [float(r[value_col]) for r in groups[name] if r[value_col].strip()]
if vals:
lines.append(f"| {name} | {len(vals)} | {sum(vals):.2f} | {sum(vals)/len(vals):.2f} | {min(vals):.2f} | {max(vals):.2f} |")
lines.append("")
lines.append(f"*基于 {len(data)} 行数据生成*")
return '\n'.join(lines)
report = generate_report(data, "销售摘要", "category", "revenue")
with open('report.md', 'w') as f:
f.write(report)
对于无法一次性加载到内存的大文件:
def stream_process(input_path, output_path, transform_fn, delimiter=','):
"""逐行处理 CSV 文件,无需加载整个文件。"""
with open(input_path, newline='', encoding='utf-8') as fin, \
open(output_path, 'w', newline='', encoding='utf-8') as fout:
reader = csv.DictReader(fin, delimiter=delimiter)
writer = None
for row in reader:
result = transform_fn(row)
if result is None:
continue # 跳过该行
if writer is None:
writer = csv.DictWriter(fout, fieldnames=result.keys(), delimiter=delimiter)
writer.writeheader()
writer.writerow(result)
# 示例:以流式方式筛选和转换
def process_row(row):
if float(row.get('amount', 0) or 0) < 10:
return None # 跳过小额记录
row['amount_usd'] = str(float(row['amount']) * 1.0) # 添加计算字段
return row
stream_process('big_file.csv', 'output.csv', process_row)
file -i data.csv 或对含 BOM 的文件使用 encoding='utf-8-sig' 打开json.dumps(ensure_ascii=False)delimiter='|'sqlite3:bash
sqlite3 :memory: ".mode csv" ".import data.csv t" "SELECT category, SUM(amount) FROM t GROUP BY category;"