OA0 = Omni AI 0
OA0 是一个探索 AI 的论坛
现在注册
已注册用户请  登录
OA0  ›  技能包  ›  log-analyzer:解析、搜索并分析应用程序日志的综合工具

log-analyzer:解析、搜索并分析应用程序日志的综合工具

 
  server ·  2026-02-08 12:28:38 · 3 次点击  · 0 条评论  

名称: log-analyzer
描述: 解析、搜索和分析多种格式的应用程序日志。适用于以下场景:从日志文件调试、设置结构化日志记录、分析错误模式、跨服务关联事件、解析堆栈跟踪或实时监控日志输出。
元数据: {"clawdbot":{"emoji":"📋","requires":{"anyBins":["grep","awk","jq","python3"]},"os":["linux","darwin","win32"]}}


日志分析器

解析、搜索和调试应用程序日志。涵盖纯文本日志、结构化 JSON 日志、堆栈跟踪、多服务关联和实时监控。

使用场景

  • 从日志文件调试应用程序错误
  • 在日志中搜索特定模式、错误或请求 ID
  • 解析和分析堆栈跟踪
  • 在应用程序中设置结构化日志记录(JSON)
  • 跨多个服务或日志文件关联事件
  • 在开发过程中实时监控日志
  • 生成错误频率报告或摘要

快速搜索模式

查找错误和异常

# 查找日志文件中的所有错误
grep -i 'error\|exception\|fatal\|panic\|fail' app.log

# 查找错误并显示前后 3 行上下文
grep -i -C 3 'error\|exception' app.log

# 查找最近一小时内的错误(ISO 时间戳格式)
HOUR_AGO=$(date -u -d '1 hour ago' '+%Y-%m-%dT%H:%M' 2>/dev/null || date -u -v-1H '+%Y-%m-%dT%H:%M')
awk -v t="$HOUR_AGO" '$0 ~ /^[0-9]{4}-[0-9]{2}-[0-9]{2}T/ && $1 >= t' app.log | grep -i 'error'

# 按类型统计错误数量
grep -oP '(?:Error|Exception): \K[^\n]+' app.log | sort | uniq -c | sort -rn | head -20

# 从访问日志中查找 HTTP 5xx 错误
awk '$9 >= 500' access.log

按请求或关联 ID 搜索

# 在日志条目中追踪单个请求
grep 'req-abc123' app.log

# 跨多个文件搜索
grep -r 'req-abc123' /var/log/myapp/

# 跨多个服务搜索(输出包含文件名前缀)
grep -rH 'correlation-id-xyz' /var/log/service-a/ /var/log/service-b/ /var/log/service-c/

时间范围过滤

# 查找两个时间戳之间的日志(ISO 格式)
awk '$0 >= "2026-02-03T10:00" && $0 <= "2026-02-03T11:00"' app.log

# 查看最后 N 行日志
tail -1000 app.log | grep -i error

# 查找自特定时间以来的日志(GNU date)
awk -v start="$(date -d '30 minutes ago' '+%Y-%m-%dT%H:%M')" '$1 >= start' app.log

JSON / 结构化日志

使用 jq 解析

# 美化打印 JSON 日志
cat app.log | jq '.'

# 按日志级别过滤
cat app.log | jq 'select(.level == "error")'

# 按时间范围过滤
cat app.log | jq 'select(.timestamp >= "2026-02-03T10:00:00Z")'

# 提取特定字段
cat app.log | jq -r '[.timestamp, .level, .message] | @tsv'

# 按级别统计数量
cat app.log | jq -r '.level' | sort | uniq -c | sort -rn

# 按嵌套字段过滤
cat app.log | jq 'select(.context.userId == "user-123")'

# 按消息内容对错误分组
cat app.log | jq -r 'select(.level == "error") | .message' | sort | uniq -c | sort -rn

# 提取请求耗时统计信息
cat app.log | jq -r 'select(.duration != null) | .duration' | awk '{sum+=$1; count++; if($1>max)max=$1} END {print "count="count, "avg="sum/count, "max="max}'

解析混合格式日志(JSON 行与纯文本混合)

# 仅提取有效的 JSON 行
while IFS= read -r line; do
  echo "$line" | jq '.' 2>/dev/null && continue
done < app.log

# 或者使用 grep 查找以 { 开头的行
grep '^\s*{' app.log | jq '.'

堆栈跟踪分析

提取和去重堆栈跟踪

# 提取 Java/Kotlin 堆栈跟踪(以 Exception/Error 开头,后跟 \tat 行)
awk '/Exception|Error/{trace=$0; while(getline && /^\t/) trace=trace"\n"$0; print trace"\n---"}' app.log

# 提取 Python 回溯信息
awk '/^Traceback/{p=1} p{print} /^[A-Za-z].*Error/{if(p) print "---"; p=0}' app.log

# 提取 Node.js 堆栈跟踪(Error + 缩进的 "at" 行)
awk '/Error:/{trace=$0; while(getline && /^    at /) trace=trace"\n"$0; print trace"\n---"}' app.log

# 去重:按根本原因(跟踪的第一行)分组
awk '/Exception|Error:/{cause=$0} /^\tat|^    at /{next} cause{print cause; cause=""}' app.log | sort | uniq -c | sort -rn

Python 回溯解析器

#!/usr/bin/env python3
"""从日志文件中解析 Python 回溯信息并按根本原因分组。"""
import sys
import re
from collections import Counter

def extract_tracebacks(filepath):
    tracebacks = []
    current = []
    in_trace = False

    with open(filepath) as f:
        for line in f:
            if line.startswith('Traceback (most recent call last):'):
                in_trace = True
                current = [line.rstrip()]
            elif in_trace:
                current.append(line.rstrip())
                # 异常行标志着回溯结束
                if re.match(r'^[A-Za-z]\w*(Error|Exception|Warning)', line):
                    tracebacks.append('\n'.join(current))
                    in_trace = False
                    current = []
    return tracebacks

if __name__ == '__main__':
    filepath = sys.argv[1] if len(sys.argv) > 1 else '/dev/stdin'
    traces = extract_tracebacks(filepath)

    # 按异常类型和消息分组
    causes = Counter()
    for trace in traces:
        lines = trace.split('\n')
        cause = lines[-1] if lines else 'Unknown'
        causes[cause] += 1

    print(f"找到 {len(traces)} 条回溯信息,{len(causes)} 个唯一原因:\n")
    for cause, count in causes.most_common(20):
        print(f"  {count:4d}x  {cause}")

实时监控

跟踪和过滤

# 跟踪日志文件,并用红色高亮错误
tail -f app.log | grep --color=always -i 'error\|warn\|$'

# 跟踪并仅过滤错误
tail -f app.log | grep --line-buffered -i 'error\|exception'

# 跟踪 JSON 日志,美化打印错误
tail -f app.log | while IFS= read -r line; do
  level=$(echo "$line" | jq -r '.level // empty' 2>/dev/null)
  if [ "$level" = "error" ] || [ "$level" = "fatal" ]; then
    echo "$line" | jq '.'
  fi
done

# 跟踪多个文件
tail -f /var/log/service-a/app.log /var/log/service-b/app.log

# 跟踪并添加时间戳(当日志本身不包含时间戳时很有用)
tail -f app.log | while IFS= read -r line; do
  echo "$(date '+%H:%M:%S') $line"
done

监控特定模式并告警

# 遇到错误时发出蜂鸣声(终端铃声)
tail -f app.log | grep --line-buffered -i 'error' | while read line; do
  echo -e "\a$line"
done

# 统计每分钟的错误数
tail -f app.log | grep --line-buffered -i 'error' | while read line; do
  echo "$(date '+%Y-%m-%d %H:%M') ERROR"
done | uniq -c

日志格式解析

常见访问日志(Apache/Nginx)

# 解析字段:IP、日期、方法、路径、状态码、大小
awk '{print $1, $9, $7}' access.log

# 按请求数统计 Top IP
awk '{print $1}' access.log | sort | uniq -c | sort -rn | head -20

# 按请求数统计 Top 路径
awk '{print $7}' access.log | sort | uniq -c | sort -rn | head -20

# 慢请求(响应时间在最后一个字段,微秒)
awk '{if ($NF > 1000000) print $0}' access.log

# 每分钟请求数
awk '{split($4,a,":"); print a[1]":"a[2]":"a[3]}' access.log | uniq -c

# 状态码分布
awk '{print $9}' access.log | sort | uniq -c | sort -rn

# 4xx 和 5xx 错误及其路径
awk '$9 >= 400 {print $9, $7}' access.log | sort | uniq -c | sort -rn | head -20

自定义分隔符日志

# 竖线分隔:timestamp|level|service|message
awk -F'|' '{print $2, $3, $4}' app.log

# 制表符分隔
awk -F'\t' '$2 == "ERROR" {print $1, $4}' app.log

# CSV 日志
python3 -c "
import csv, sys
with open(sys.argv[1]) as f:
    for row in csv.DictReader(f):
        if row.get('level') == 'error':
            print(f\"{row['timestamp']} {row['message']}\")
" app.csv

设置结构化日志记录

Node.js (pino — 快速的 JSON 日志记录器)

// npm install pino
const pino = require('pino');
const logger = pino({
  level: process.env.LOG_LEVEL || 'info',
  // 为每一行日志添加标准字段
  base: { service: 'my-api', version: '1.2.0' },
});

// 用法示例
logger.info({ userId: 'u123', action: 'login' }, 'User logged in');
logger.error({ err, requestId: req.id }, 'Request failed');

// 输出示例: {"level":30,"time":1706900000000,"service":"my-api","userId":"u123","action":"login","msg":"User logged in"}

// 带有绑定上下文的子日志记录器
const reqLogger = logger.child({ requestId: req.id, userId: req.user?.id });
reqLogger.info('Processing order');
reqLogger.error({ err }, 'Order failed');

Python (structlog)

# pip install structlog
import structlog

structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.add_log_level,
        structlog.processors.JSONRenderer(),
    ],
)
logger = structlog.get_logger(service="my-api")

# 用法示例
logger.info("user_login", user_id="u123", ip="1.2.3.4")
logger.error("request_failed", request_id="req-abc", error=str(e))

# 输出示例: {"event":"user_login","user_id":"u123","ip":"1.2.3.4","level":"info","timestamp":"2026-02-03T12:00:00Z","service":"my-api"}

Go (zerolog)

import (
    "os"
    "github.com/rs/zerolog"
    "github.com/rs/zerolog/log"
)

func init() {
    zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
    log.Logger = zerolog.New(os.Stdout).With().
        Timestamp().
        Str("service", "my-api").
        Logger()
}

// 用法示例
log.Info().Str("userId", "u123").Msg("User logged in")
log.Error().Err(err).Str("requestId", reqID).Msg("Request failed")

错误模式报告

生成错误频率报告

#!/bin/bash
# error-report.sh - 从日志文件汇总错误信息
LOG="${1:?Usage: error-report.sh <logfile>}"

echo "=== 错误报告: $(basename "$LOG") ==="
echo "生成时间: $(date -u '+%Y-%m-%dT%H:%M:%SZ')"
echo ""

total=$(wc -l < "$LOG")
errors=$(grep -ci 'error\|exception\|fatal' "$LOG")
warns=$(grep -ci 'warn' "$LOG")

echo "总行数:      $total"
echo "错误数:      $errors"
echo "警告数:      $warns"
echo ""

echo "--- Top 15 错误消息 ---"
grep -i 'error\|exception' "$LOG" | \
  sed 's/^[0-9TZ:.+\-]* //' | \
  sed 's/\b[0-9a-f]\{8,\}\b/ID/g' | \
  sed 's/[0-9]\{1,\}/N/g' | \
  sort | uniq -c | sort -rn | head -15
echo ""

echo "--- 每小时错误数 ---"
grep -i 'error\|exception' "$LOG" | \
  grep -oP '\d{4}-\d{2}-\d{2}T\d{2}' | \
  sort | uniq -c
echo ""

echo "--- 每种错误类型的首次出现 ---"
grep -i 'error\|exception' "$LOG" | \
  sed 's/^[0-9TZ:.+\-]* //' | \
  sort -u | head -10

使用 Python 生成 JSON 日志错误报告

#!/usr/bin/env python3
"""从 JSON 日志文件生成错误摘要。"""
import json
import sys
from collections import Counter, defaultdict
from datetime import datetime

def analyze_logs(filepath):
    errors = []
    levels = Counter()
    errors_by_hour = defaultdict(int)

    with open(filepath) as f:
        for line in f:
            try:
                entry = json.loads(line.strip())
            except (json.JSONDecodeError, ValueError):
                continue

            level = entry.get('level', entry.get('severity', '')).lower()
            levels[level] += 1

            if level in ('error', 'fatal', 'critical'):
                msg = entry.get('message', entry.get('msg', entry.get('event', 'unknown')))
                ts = entry.get('timestamp', entry.get('time', ''))
                errors.append({'message': msg, 'timestamp': ts, 'entry': entry})

                # 按小时分组
                try:
                    hour = ts[:13]  # "2026-02-03T12"
                    errors_by_hour[hour] += 1
                except (TypeError, IndexError):
                    pass

    # 按消息内容对错误分组
    error_counts = Counter(e['message'] for e in errors)

    print(f"=== 日志分析: {filepath} ===\n")
    print("级别分布:")
    for level, count in levels.most_common():
        print(f"  {level:10s}  {count}")

    print(f"\n错误总数: {len(errors)}")
    print(f"唯一错误消息数: {len(error_counts)}\n")

    print("Top 15 错误:")
    for msg, count in error_counts.most_common(15):
        print(f"  {count:4d}x  {msg[:100]}")

    if errors_by_hour:
        print("\n每小时错误数:")
        for hour in sorted(errors_by_hour):
            bar = '#' * min(errors_by_hour[hour], 50)
            print(f"  {hour}  {errors_by_hour[hour]:4d}  {bar}")

if __name__ == '__main__':
    analyze_logs(sys.argv[1])

多服务日志关联

合并和排序来自多个服务的日志

# 合并多个日志文件,按时间戳排序
sort -m -t'T' -k1,1 service-a.log service-b.log service-c.log > merged.log

# 如果文件本身未排序,使用完整排序
sort -t'T' -k1,1 service-*.log > merged.log

# 合并 JSON 日志,添加来源字段
for f in service-*.log; do
  service=$(basename "$f" .log)
  jq --arg svc "$service" '. + {source: $svc}' "$f"
done | jq -s 'sort_by(.timestamp)[]'

跨服务追踪请求

```bash

在所有服务中查找特定关联/请求 ID 的所有日志条目

REQUEST_ID="req-abc-123"
grep -rH "$REQUEST_ID" /var/log/services/ | sort -t: -k2

针对 JSON 日志

for f in /var/log/services/*.log; do
jq --arg rid "$REQUEST_ID" 'select(.requestId == $rid or .correlationId == $rid)' "$f

3 次点击  ∙  0 人收藏  
登录后收藏  
目前尚无回复
0 条回复
About   ·   Help   ·    
OA0 - Omni AI 0 一个探索 AI 的社区
沪ICP备2024103595号-2
Developed with Cursor