名称: log-analyzer
描述: 解析、搜索和分析多种格式的应用程序日志。适用于以下场景:从日志文件调试、设置结构化日志记录、分析错误模式、跨服务关联事件、解析堆栈跟踪或实时监控日志输出。
元数据: {"clawdbot":{"emoji":"📋","requires":{"anyBins":["grep","awk","jq","python3"]},"os":["linux","darwin","win32"]}}
解析、搜索和调试应用程序日志。涵盖纯文本日志、结构化 JSON 日志、堆栈跟踪、多服务关联和实时监控。
# 查找日志文件中的所有错误
grep -i 'error\|exception\|fatal\|panic\|fail' app.log
# 查找错误并显示前后 3 行上下文
grep -i -C 3 'error\|exception' app.log
# 查找最近一小时内的错误(ISO 时间戳格式)
HOUR_AGO=$(date -u -d '1 hour ago' '+%Y-%m-%dT%H:%M' 2>/dev/null || date -u -v-1H '+%Y-%m-%dT%H:%M')
awk -v t="$HOUR_AGO" '$0 ~ /^[0-9]{4}-[0-9]{2}-[0-9]{2}T/ && $1 >= t' app.log | grep -i 'error'
# 按类型统计错误数量
grep -oP '(?:Error|Exception): \K[^\n]+' app.log | sort | uniq -c | sort -rn | head -20
# 从访问日志中查找 HTTP 5xx 错误
awk '$9 >= 500' access.log
# 在日志条目中追踪单个请求
grep 'req-abc123' app.log
# 跨多个文件搜索
grep -r 'req-abc123' /var/log/myapp/
# 跨多个服务搜索(输出包含文件名前缀)
grep -rH 'correlation-id-xyz' /var/log/service-a/ /var/log/service-b/ /var/log/service-c/
# 查找两个时间戳之间的日志(ISO 格式)
awk '$0 >= "2026-02-03T10:00" && $0 <= "2026-02-03T11:00"' app.log
# 查看最后 N 行日志
tail -1000 app.log | grep -i error
# 查找自特定时间以来的日志(GNU date)
awk -v start="$(date -d '30 minutes ago' '+%Y-%m-%dT%H:%M')" '$1 >= start' app.log
# 美化打印 JSON 日志
cat app.log | jq '.'
# 按日志级别过滤
cat app.log | jq 'select(.level == "error")'
# 按时间范围过滤
cat app.log | jq 'select(.timestamp >= "2026-02-03T10:00:00Z")'
# 提取特定字段
cat app.log | jq -r '[.timestamp, .level, .message] | @tsv'
# 按级别统计数量
cat app.log | jq -r '.level' | sort | uniq -c | sort -rn
# 按嵌套字段过滤
cat app.log | jq 'select(.context.userId == "user-123")'
# 按消息内容对错误分组
cat app.log | jq -r 'select(.level == "error") | .message' | sort | uniq -c | sort -rn
# 提取请求耗时统计信息
cat app.log | jq -r 'select(.duration != null) | .duration' | awk '{sum+=$1; count++; if($1>max)max=$1} END {print "count="count, "avg="sum/count, "max="max}'
# 仅提取有效的 JSON 行
while IFS= read -r line; do
echo "$line" | jq '.' 2>/dev/null && continue
done < app.log
# 或者使用 grep 查找以 { 开头的行
grep '^\s*{' app.log | jq '.'
# 提取 Java/Kotlin 堆栈跟踪(以 Exception/Error 开头,后跟 \tat 行)
awk '/Exception|Error/{trace=$0; while(getline && /^\t/) trace=trace"\n"$0; print trace"\n---"}' app.log
# 提取 Python 回溯信息
awk '/^Traceback/{p=1} p{print} /^[A-Za-z].*Error/{if(p) print "---"; p=0}' app.log
# 提取 Node.js 堆栈跟踪(Error + 缩进的 "at" 行)
awk '/Error:/{trace=$0; while(getline && /^ at /) trace=trace"\n"$0; print trace"\n---"}' app.log
# 去重:按根本原因(跟踪的第一行)分组
awk '/Exception|Error:/{cause=$0} /^\tat|^ at /{next} cause{print cause; cause=""}' app.log | sort | uniq -c | sort -rn
#!/usr/bin/env python3
"""从日志文件中解析 Python 回溯信息并按根本原因分组。"""
import sys
import re
from collections import Counter
def extract_tracebacks(filepath):
tracebacks = []
current = []
in_trace = False
with open(filepath) as f:
for line in f:
if line.startswith('Traceback (most recent call last):'):
in_trace = True
current = [line.rstrip()]
elif in_trace:
current.append(line.rstrip())
# 异常行标志着回溯结束
if re.match(r'^[A-Za-z]\w*(Error|Exception|Warning)', line):
tracebacks.append('\n'.join(current))
in_trace = False
current = []
return tracebacks
if __name__ == '__main__':
filepath = sys.argv[1] if len(sys.argv) > 1 else '/dev/stdin'
traces = extract_tracebacks(filepath)
# 按异常类型和消息分组
causes = Counter()
for trace in traces:
lines = trace.split('\n')
cause = lines[-1] if lines else 'Unknown'
causes[cause] += 1
print(f"找到 {len(traces)} 条回溯信息,{len(causes)} 个唯一原因:\n")
for cause, count in causes.most_common(20):
print(f" {count:4d}x {cause}")
# 跟踪日志文件,并用红色高亮错误
tail -f app.log | grep --color=always -i 'error\|warn\|$'
# 跟踪并仅过滤错误
tail -f app.log | grep --line-buffered -i 'error\|exception'
# 跟踪 JSON 日志,美化打印错误
tail -f app.log | while IFS= read -r line; do
level=$(echo "$line" | jq -r '.level // empty' 2>/dev/null)
if [ "$level" = "error" ] || [ "$level" = "fatal" ]; then
echo "$line" | jq '.'
fi
done
# 跟踪多个文件
tail -f /var/log/service-a/app.log /var/log/service-b/app.log
# 跟踪并添加时间戳(当日志本身不包含时间戳时很有用)
tail -f app.log | while IFS= read -r line; do
echo "$(date '+%H:%M:%S') $line"
done
# 遇到错误时发出蜂鸣声(终端铃声)
tail -f app.log | grep --line-buffered -i 'error' | while read line; do
echo -e "\a$line"
done
# 统计每分钟的错误数
tail -f app.log | grep --line-buffered -i 'error' | while read line; do
echo "$(date '+%Y-%m-%d %H:%M') ERROR"
done | uniq -c
# 解析字段:IP、日期、方法、路径、状态码、大小
awk '{print $1, $9, $7}' access.log
# 按请求数统计 Top IP
awk '{print $1}' access.log | sort | uniq -c | sort -rn | head -20
# 按请求数统计 Top 路径
awk '{print $7}' access.log | sort | uniq -c | sort -rn | head -20
# 慢请求(响应时间在最后一个字段,微秒)
awk '{if ($NF > 1000000) print $0}' access.log
# 每分钟请求数
awk '{split($4,a,":"); print a[1]":"a[2]":"a[3]}' access.log | uniq -c
# 状态码分布
awk '{print $9}' access.log | sort | uniq -c | sort -rn
# 4xx 和 5xx 错误及其路径
awk '$9 >= 400 {print $9, $7}' access.log | sort | uniq -c | sort -rn | head -20
# 竖线分隔:timestamp|level|service|message
awk -F'|' '{print $2, $3, $4}' app.log
# 制表符分隔
awk -F'\t' '$2 == "ERROR" {print $1, $4}' app.log
# CSV 日志
python3 -c "
import csv, sys
with open(sys.argv[1]) as f:
for row in csv.DictReader(f):
if row.get('level') == 'error':
print(f\"{row['timestamp']} {row['message']}\")
" app.csv
// npm install pino
const pino = require('pino');
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
// 为每一行日志添加标准字段
base: { service: 'my-api', version: '1.2.0' },
});
// 用法示例
logger.info({ userId: 'u123', action: 'login' }, 'User logged in');
logger.error({ err, requestId: req.id }, 'Request failed');
// 输出示例: {"level":30,"time":1706900000000,"service":"my-api","userId":"u123","action":"login","msg":"User logged in"}
// 带有绑定上下文的子日志记录器
const reqLogger = logger.child({ requestId: req.id, userId: req.user?.id });
reqLogger.info('Processing order');
reqLogger.error({ err }, 'Order failed');
# pip install structlog
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger(service="my-api")
# 用法示例
logger.info("user_login", user_id="u123", ip="1.2.3.4")
logger.error("request_failed", request_id="req-abc", error=str(e))
# 输出示例: {"event":"user_login","user_id":"u123","ip":"1.2.3.4","level":"info","timestamp":"2026-02-03T12:00:00Z","service":"my-api"}
import (
"os"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
func init() {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
log.Logger = zerolog.New(os.Stdout).With().
Timestamp().
Str("service", "my-api").
Logger()
}
// 用法示例
log.Info().Str("userId", "u123").Msg("User logged in")
log.Error().Err(err).Str("requestId", reqID).Msg("Request failed")
#!/bin/bash
# error-report.sh - 从日志文件汇总错误信息
LOG="${1:?Usage: error-report.sh <logfile>}"
echo "=== 错误报告: $(basename "$LOG") ==="
echo "生成时间: $(date -u '+%Y-%m-%dT%H:%M:%SZ')"
echo ""
total=$(wc -l < "$LOG")
errors=$(grep -ci 'error\|exception\|fatal' "$LOG")
warns=$(grep -ci 'warn' "$LOG")
echo "总行数: $total"
echo "错误数: $errors"
echo "警告数: $warns"
echo ""
echo "--- Top 15 错误消息 ---"
grep -i 'error\|exception' "$LOG" | \
sed 's/^[0-9TZ:.+\-]* //' | \
sed 's/\b[0-9a-f]\{8,\}\b/ID/g' | \
sed 's/[0-9]\{1,\}/N/g' | \
sort | uniq -c | sort -rn | head -15
echo ""
echo "--- 每小时错误数 ---"
grep -i 'error\|exception' "$LOG" | \
grep -oP '\d{4}-\d{2}-\d{2}T\d{2}' | \
sort | uniq -c
echo ""
echo "--- 每种错误类型的首次出现 ---"
grep -i 'error\|exception' "$LOG" | \
sed 's/^[0-9TZ:.+\-]* //' | \
sort -u | head -10
#!/usr/bin/env python3
"""从 JSON 日志文件生成错误摘要。"""
import json
import sys
from collections import Counter, defaultdict
from datetime import datetime
def analyze_logs(filepath):
errors = []
levels = Counter()
errors_by_hour = defaultdict(int)
with open(filepath) as f:
for line in f:
try:
entry = json.loads(line.strip())
except (json.JSONDecodeError, ValueError):
continue
level = entry.get('level', entry.get('severity', '')).lower()
levels[level] += 1
if level in ('error', 'fatal', 'critical'):
msg = entry.get('message', entry.get('msg', entry.get('event', 'unknown')))
ts = entry.get('timestamp', entry.get('time', ''))
errors.append({'message': msg, 'timestamp': ts, 'entry': entry})
# 按小时分组
try:
hour = ts[:13] # "2026-02-03T12"
errors_by_hour[hour] += 1
except (TypeError, IndexError):
pass
# 按消息内容对错误分组
error_counts = Counter(e['message'] for e in errors)
print(f"=== 日志分析: {filepath} ===\n")
print("级别分布:")
for level, count in levels.most_common():
print(f" {level:10s} {count}")
print(f"\n错误总数: {len(errors)}")
print(f"唯一错误消息数: {len(error_counts)}\n")
print("Top 15 错误:")
for msg, count in error_counts.most_common(15):
print(f" {count:4d}x {msg[:100]}")
if errors_by_hour:
print("\n每小时错误数:")
for hour in sorted(errors_by_hour):
bar = '#' * min(errors_by_hour[hour], 50)
print(f" {hour} {errors_by_hour[hour]:4d} {bar}")
if __name__ == '__main__':
analyze_logs(sys.argv[1])
# 合并多个日志文件,按时间戳排序
sort -m -t'T' -k1,1 service-a.log service-b.log service-c.log > merged.log
# 如果文件本身未排序,使用完整排序
sort -t'T' -k1,1 service-*.log > merged.log
# 合并 JSON 日志,添加来源字段
for f in service-*.log; do
service=$(basename "$f" .log)
jq --arg svc "$service" '. + {source: $svc}' "$f"
done | jq -s 'sort_by(.timestamp)[]'
```bash
REQUEST_ID="req-abc-123"
grep -rH "$REQUEST_ID" /var/log/services/ | sort -t: -k2
for f in /var/log/services/*.log; do
jq --arg rid "$REQUEST_ID" 'select(.requestId == $rid or .correlationId == $rid)' "$f