Fix router failover and add Discord bot

- Remove duplicate claude_api provider to fix automatic failover
- Enhance error detection with HTTP status codes and more indicators
- Add comprehensive README documentation with manual switching
- Implement Discord bot with Claude Code CLI integration
- Support /terminal and /claude commands with AI-powered responses

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Will Song
2025-07-16 21:24:06 -05:00
parent 13e98d1229
commit 568dcc45e4
29 changed files with 2732 additions and 204 deletions

21
discord_bot/Dockerfile Normal file
View File

@@ -0,0 +1,21 @@
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
curl \
wget \
git \
nano \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# 创建必要的目录
RUN mkdir -p databases backups
CMD ["python", "main.py"]

146
discord_bot/README.md Normal file
View File

@@ -0,0 +1,146 @@
# Discord Bot MVP
基于AI的终端控制系统核心功能是 `/terminal` 命令智能管理Docker服务器。
## 🚀 快速开始
### 1. 初始化和测试
```bash
# 运行启动脚本(包含测试)
./start.sh --test
# 或手动执行
python3 init_db.py # 初始化数据库
python3 test_bot.py # 测试功能
python3 main.py # 启动bot
```
### 2. Docker部署
```bash
# 构建和启动
docker-compose up --build
# 后台运行
docker-compose up -d
```
## 📁 项目结构
```
discord_bot/
├── main.py # 主程序入口
├── tu_bot/ # Tu Bot (主控制器)
│ └── bot.py # Discord Bot核心逻辑
├── slash_commands/ # Slash命令处理器
│ ├── terminal/ # /terminal命令
│ │ └── terminal_handler.py # Terminal处理器
│ ├── terminal_fin/ # /terminal_fin命令 (计划)
│ ├── fin/ # 金融相关命令 (计划)
│ ├── db_operations/ # 数据库操作命令 (计划)
│ └── help/ # 帮助命令 (计划)
├── ai_bots/ # 多AI验证系统 (计划)
│ ├── ai_bot_1/
│ ├── ai_bot_2/
│ ├── ai_bot_3/
│ └── bot_template/
├── utils/ # 工具类
│ ├── database.py # 数据库管理器
│ └── token_reader.py # Token读取器
├── databases/ # SQLite数据库文件
│ └── discord_bot.db # 主数据库
├── backups/ # 备份文件
├── SlashCommandList.txt # 命令列表(热更新)
├── requirements.txt # Python依赖
├── Dockerfile # Docker配置
├── docker-compose.yml # Docker Compose配置
├── init_db.py # 数据库初始化脚本
├── test_bot.py # 功能测试脚本
└── start.sh # 启动脚本
```
## 🤖 主要功能
### `/terminal` 命令
- **AI控制终端**: 通过Discord聊天控制Docker服务器
- **自然语言支持**: "查看Docker容器状态" 等自然语言命令
- **Shell命令**: `ls`, `docker ps`, `pwd` 等直接命令
- **安全限制**:
- 仅限 `~/docker/*` 目录
- 禁止危险命令 (`rm -rf` 等)
- 1小时会话超时
- **记忆管理**: 智能压缩对话历史控制token消耗
### 术语字典系统
- **term_dic.db**: 存储自定义术语和命令
- **优先级**: term_dic定义 > AI默认知识
- **分类管理**: docker, finance, project等分类
### 数据库结构
- **toy_balance**: ALP交易确认记录
- **portfolio**: 投资组合数据
- **term_dic**: 术语字典
- **conversation_memory**: 对话记录
- **command_log**: 命令执行日志
## 🔧 配置要求
### Token文件 (`~/docker/tokens.txt`)
```
tu_discord_token=你的Discord_Bot_Token
claude_API=你的Claude_API_Key
gitea_token=你的Gitea_Token
# ... 其他配置
```
### 环境变量
- `DB_PATH`: 数据库文件路径
- `FR_DB_PATH`: FinancialReport数据库路径只读
- `TOKEN_PATH`: token文件路径
- `DISCORD_PREFIX`: Bot命令前缀
## 🛡️ 安全机制
- **工作空间隔离**: 限制在 `~/docker/*` 目录
- **命令黑名单**: 自动拒绝危险操作
- **权限分离**: MVP开发AI有sudo权限生产环境无sudo
- **审计日志**: 记录所有重要操作
## 📊 使用示例
```
/terminal ls # 列出文件
/terminal docker ps # 查看容器
/terminal 查看当前目录文件 # 自然语言
/terminal 启动nginx容器 # AI理解意图
/help # 显示帮助
```
## 🔄 开发状态
-**基础架构**: Docker配置、Python框架
-**数据库系统**: SQLite表结构、术语字典
-**/terminal命令**: AI控制、安全限制、记忆管理
- ⏸️ **多AI验证**: 3个AI bot + 讨论机制 (暂未实现)
- ⏸️ **金融命令**: /fin_portfolio, /fin_report (计划中)
- ⏸️ **数据库操作**: /db_* 系列命令 (计划中)
## 🐛 故障排除
### 常见问题
1. **Token错误**: 检查 `~/docker/tokens.txt` 格式
2. **权限问题**: 确保脚本有执行权限 `chmod +x start.sh`
3. **数据库错误**: 运行 `python3 init_db.py` 重新初始化
4. **Docker问题**: 检查 `docker-compose.yml` 卷挂载路径
### 日志查看
```bash
# 查看bot日志
tail -f bot.log
# Docker日志
docker-compose logs -f discord_bot
```
---
💡 **提示**: 这是MVP版本重点实现 `/terminal` 核心功能。多AI验证系统作为开发工具暂时跳过。

View File

@@ -0,0 +1,32 @@
# Discord Bot Slash Commands List
# 此文件用于热更新命令系统,修改后自动生效
## 核心命令
/terminal <command> - AI控制终端系统支持自然语言和shell命令
/claude <question> - 直接与Claude Code CLI对话用于一般性问答
/help - 显示命令帮助信息
## 使用示例
/terminal ls
/terminal docker ps
/terminal 查看当前目录文件
/terminal 查看Docker容器状态
/terminal 启动nginx容器
/claude 如何优化Docker容器性能
/claude 解释一下这个错误信息
## 限制说明
- 仅限于~/docker/*目录操作
- 不支持危险命令(如rm -rf)
- 自动1小时会话超时
- AI智能理解自然语言命令
## 计划中的命令 (未实现)
/terminal_fin <command> - AI辅助金融策略开发
/fin_portfolio - 显示投资组合当日盈亏
/fin_report - 获取最新FinancialReport.pdf
/db_query <sql> - 数据库查询
/db_append <table> <data> - 数据库追加
/db_delete <table> <condition> - 数据库删除
/db_add <table> <data> - 添加数据记录
/db_update <table> <data> <condition> - 更新数据记录

Binary file not shown.

View File

@@ -0,0 +1,325 @@
# Discord Bot MVP 项目需求文档
## 项目概述
### 核心目标
构建一个基于AI的终端控制系统核心功能是/terminal命令能够智能管理Docker服务器。多AI验证系统作为开发工具用于开发和完善/terminal系统。
### 技术栈
- **不做Python版本要求**: 使用系统现有Python版本
- **部署**: Docker容器化
- **平台**: Discord
- **数据库**: SQLite (discord_bot.db)
- **通信**: 与多个项目进行通信现阶段只与FinancialReport项目通信
- **外部数据访问**: 只读其他项目的数据库文件
- **运行模式**: 7x24小时持续运行
## 系统架构
### 核心组件
1. **3个AI Bot**: 独立的AI对话机器人
2. **tu Bot**: 中央控制的slash command机器人
3. **热更新系统**: 监听命令列表变化
4. **外部服务**: Finance Report Docker容器
### 文件结构
```
~/docker/
├── token.txt # 全局token文件只读访问
├── discord_bot/
│ ├── ai_bots/
│ │ ├── ai_bot_1/
│ │ ├── ai_bot_2/
│ │ ├── ai_bot_3/
│ │ └── bot_template/
│ ├── slash_commands/
│ │ ├── terminal/
│ │ ├── terminal_fin/
│ │ ├── fin/
│ │ ├── db_operations/
│ │ └── help/
│ ├── tu_bot/
│ ├── databases/
│ │ └── discord_bot.db # 主数据库包含toy_balance、portfolio等表
│ ├── backups/
│ └── SlashCommandList.txt
├── FinancialReport/
│ └── [FR项目文件]
├── docker-compose.yml
└── shared_volumes/
```
## 核心功能
### 1. 多AI验证机制
#### 3轮讨论流程
1. **第1轮**: 用户提问 → 同时发送给所有AI → 各AI回答并@其他AI
2. **第2轮**: 各AI接收其他AI的@,回应并继续@其他AI
3. **第3轮**: 讨论结束,通过权限控制防止继续
#### 权限控制机制
- **触发条件**: 每个AI bot被其他AI @3次后自动锁定
- **锁定状态**: 不接受任何消息和@
- **解锁机制**: 只有tu bot发送空@才能解锁
- **安全性**: 防止AI无限循环对话
#### AI Bot配置
- 独立的Python文件和配置文件
- 共用模板生成,独立文件夹管理
- 字数限制等个性化配置
- 容器化部署,相互隔离
### 2. Slash Command 系统
#### tu Bot (中央控制器)
- **热更新**: 监听 `SlashCommandList.txt` 文件变化
- **文件监控**: 文件修改时立即更新命令列表
- **权限管理**: 控制AI bot的锁定/解锁状态
#### /help 命令
- 根据 `SlashCommandList.txt` 自动生成使用说明
- 类似 `--help` 格式的输出
- 实时反映当前可用命令
### 3. Terminal 系列命令
#### /terminal
- **AI引擎**: 使用预装的Claude Code CLI (集成方式待研究)
- **运行模式**: 默认plan mode
- **消息返回**: 按句返回Claude Code CLI的输出
- **工作空间**: `~/docker/*` (排除 `~/docker/gitea/*`)
- **术语系统**: `term_dic.db` 存储自定义术语和命令
- **优先级**: term_dic.db定义 > AI默认知识
- **对话管理**: 1小时内加载上文超时开启新对话
- **记忆管理**: 重要性权重压缩策略实时控制token消耗
- 永久保留: 成功的Docker操作、重要配置变更
- 压缩保留: 调试过程→结论、错误信息→解决方案
- 立即清理: 详细的命令输出、重复的尝试过程
- **安全限制**:
- **禁用命令**: 不能使用 `rm -rf` (必须指定详细路径)
- **权限控制**: /terminal AI无sudo权限
- **工作范围**: 严格限制在~/docker/* (除gitea)
- **保护机制**: 拒绝删除不必要文件,需要明确路径
- **开发权限**: MVP开发阶段AI可获得sudo权限 (仅限开发)
#### /terminal_fin_backtest
- **功能**: AI辅助金融策略开发
- **记忆管理**: `memory.md` 文件记录对话历史
- **智能压缩**: 重要性权重策略控制token消耗
- 永久保留: 成功策略、核心逻辑、重要学习记录
- 压缩保留: 调试过程→结论、中间代码→最终版本
- 定期清理: 纯调试过程、失败尝试细节
- **代码管理**: Python策略生成、回测、解释
- **数据源**: 缓存的日线级别金融数据
### 4. Finance 系列命令
#### /fin_portfolio
- **功能**: 纯Python脚本无AI参与
- **数据源**: Yahoo Finance API
- **数据库**: `portfolio`在discord_bot.db中
- **计算**: 实时价格×持仓 = 当日盈亏
- **特殊处理**: 跳过 `toy_balance` 字段无法从Yahoo获取
- **输出**: 各账户当日盈亏 + 总盈亏
#### /fin_report
- **数据源**: FinancialReport项目的FinancialDataBaseProcessed.db (只读)
- **功能**: 传递最新的FinancialReport.pdf文件给用户
- **ALP交易确认**: 显示pending_alp_transactions表格供用户确认
- **确认界面**:
- 列表显示所有pending的ALP交易 (每日不超过20笔)
- 用户可勾选"不是toy_fund"的交易
- 默认全部不勾选即默认都是toy_fund
- 用户点击确定后提交确认结果
- 显示确认后的不可修改表格
- **数据记录**: 详细记录每笔确认的交易信息到toy_balance表
- **错误处理**: 用户点错无法撤销,需手动修改数据库
- **时序**: 显示的数据比实际晚一天FR项目处理延迟
- **容错设计**: /fin_report功能失效不影响其他功能
### 5. Database 系列命令
#### db_系列命令
- **独立命令集**: 每个都是独立的slash command
- `/db_append`: 向数据库表追加数据
- `/db_delete`: 删除数据库记录
- `/db_query`: 查询数据库
- `/db_add`: 添加新的数据记录
- `/db_update`: 更新现有记录
- **功能**: 直接操作所有.db文件
- **目标**: 任何discord_bot目录下的.db文件
- **安全性**: 操作权限控制和备份机制
- **操作日志**: 所有数据库操作都记录在审计日志中
## 数据管理
### 数据库文件
- **discord_bot.db**: Discord Bot主数据库
- `toy_balance`表: 详细的ALP交易确认记录
```
CREATE TABLE toy_balance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_id TEXT UNIQUE,
date DATE,
amount REAL,
merchant TEXT,
description TEXT,
is_toy_fund BOOLEAN, -- true=Ginbo, false=Family
confirmed_by TEXT, -- Discord用户ID
confirmed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```
- `portfolio`表: 投资组合数据
- 其他Bot相关数据表
- **term_dic.db**: 术语字典,存储自定义术语和命令
- **外部数据库访问**: 只读FinancialReport/data/processed/FinancialDataBaseProcessed.db
### SQLite跨容器访问
- **访问模式**: 使用`?mode=ro`参数确保只读
- **重试机制**: 数据库锁时自动重试最多3次
- **超时设置**: 单次访问超时2秒
- **错误处理**: 访问失败时的降级策略
### 记忆文件
- **memory.md**: terminal_fin_backtest的对话记录
- **对话记录**: terminal系列的历史记录
### 备份策略
- **频率**: 每日自动备份
- **保留策略**: 3份日备份 + 3份周备份 + 3份月备份
- **实现**: Debian crontab任务
- **覆盖范围**: 所有.db文件和重要配置文件
- **Gitea集成**: 使用token.txt中的Gitea token自动备份到版本控制
## Docker配置
### 容器挂载
```
volumes:
# 访问FinancialReport的数据库只读
- ../FinancialReport/data/processed:/external/financial_report:ro
# 访问全局token文件只读
- ../token.txt:/app/token.txt:ro
# 数据持久化
- ./databases:/app/databases
- ./backups:/app/backups
```
### 环境变量
- DB_PATH: 数据库文件路径
- FR_DB_PATH: FinancialReport数据库路径只读
- TOKEN_PATH: token文件路径
- DISCORD_PREFIX: Bot命令前缀
## 鲁棒性设计
### 错误处理
- **API失败**: 超时重试、降级处理
- **数据库异常**: 连接池、事务回滚、只读访问保护
- **AI服务中断**: 优雅降级、状态恢复
- **文件系统**: 权限检查、磁盘空间监控
### 安全机制
- **工作空间隔离**: /terminal AI限制在~/docker/*目录 (除gitea)
- **命令限制**: 禁用危险命令 (如不带详细路径的rm -rf)
- **权限分离**:
- MVP开发AI: sudo权限 (仅限开发阶段)
- /terminal AI: 无sudo权限 (生产环境)
- **数据验证**: 输入验证、路径检查
- **审计日志**: 关键操作记录
### 监控和恢复
- **健康检查**: 定期检查各组件状态
- **自动重启**: 异常时自动恢复
- **数据一致性**: 数据完整性检查
- **回滚机制**: Gitea版本控制 + 数据库备份
### 性能优化
- **连接池**: 数据库连接管理
- **缓存策略**: 频繁查询数据缓存
- **异步处理**: 非阻塞操作
- **资源限制**: 内存和CPU使用限制
## 部署要求
### 系统环境
- **操作系统**: Debian
- **容器**: Docker + Docker Compose (位于 `~/docker/docker-compose.yml`)
- **版本控制**: Gitea (不受AI控制)
- **文件访问**: Docker共享卷挂载跨容器文件访问
- **访问控制**: 单一时间点只有一个Docker容器访问数据库文件 (访问时间<2s)
### 依赖服务
- **Discord API**: Bot权限和Token存储在 `~/docker/token.txt` 文件 (只读)
- **Claude Code CLI**: 预装的Claude Code CLI用于/terminal命令 (集成方式待研究)
- **外部项目数据库**: 通过Docker共享卷挂载只读访问权限
- **配置文件**: 所有敏感信息从 `~/docker/token.txt` 读取,禁止硬编码
### 配置管理
- **环境变量**: 敏感信息通过环境变量管理
- **配置文件**: JSON格式支持热更新
- **秘钥管理**: 统一的token文件AI可访问
## 开发优先级
### MVP版本 (Phase 1): /terminal系统核心实现
1. **文件夹结构创建**: 按照架构图创建所有目录和空文件
2. **基础架构搭建**: Docker配置、基础Python框架
3. **多AI验证系统**: 用于开发/terminal的AI开发工具
- 3个AI bot + tu bot基础框架
- 3轮讨论机制开发时使用
4. **/terminal命令完整实现**:
- AI控制Docker服务器核心功能
- term_dic.db术语系统
- 对话记录和1小时窗口机制
- **记忆管理系统**: 重要性权重压缩策略
- **上下文压缩**: 防止token消耗过快
- **对话状态管理**: 加载上文、新对话判断
5. **项目间通信**: 与FinancialReport项目的数据交换机制
### Phase 2: AI验证机制
1. 3个AI bot模板和部署
2. 3轮讨论流程实现
3. AI间@和权限控制机制
4. 多AI验证测试
### Phase 3: 扩展功能
1. finance系列命令
2. database操作命令
3. 热更新系统完善
### Phase 4: 完善和优化
1. 高级记忆管理优化
2. 监控和日志系统
3. 备份和恢复机制
## 成功标准
### 功能完整性
- [ ] 3轮AI讨论机制正常工作
- [ ] 权限控制有效防止无限循环
- [ ] 所有slash command功能实现
- [ ] 数据库操作安全可靠
- [ ] 与FinancialReport项目通信正常
### 可靠性指标
- [ ] 99%以上的命令执行成功率
- [ ] 系统异常后30秒内自动恢复
- [ ] 数据备份100%成功率
- [ ] 零数据丢失事故
- [ ] /fin_report失效时其他功能正常运行
### 安全性要求
- [ ] AI无法执行危险系统操作
- [ ] 数据库访问权限正确控制
- [ ] 敏感信息加密存储
- [ ] 审计日志完整记录
- [ ] 跨容器访问只读保护有效
### 性能指标
- [ ] 命令响应时间 < 3秒
- [ ] 数据库查询时间 < 500ms
- [ ] 内存使用 < 1GB
- [ ] CPU使用率 < 50%
- [ ] 并发处理能力 > 10个命令/秒

View File

@@ -0,0 +1,45 @@
version: '3.8'
services:
discord_bot:
build: .
container_name: discord_bot
restart: unless-stopped
volumes:
# 访问全局token文件只读
- ../tokens.txt:/app/tokens.txt:ro
# 数据持久化
- ./databases:/app/databases
- ./backups:/app/backups
# 访问FinancialReport的数据库只读
- ../FinancialReport/data/processed:/external/financial_report:ro
environment:
- DB_PATH=/app/databases/discord_bot.db
- FR_DB_PATH=/external/financial_report/FinancialDataBaseProcessed.db
- TOKEN_PATH=/app/tokens.txt
- DISCORD_PREFIX=/
depends_on:
- financial_report
networks:
- discord_network
financial_report:
build: ../FinancialReport
container_name: financial_report
restart: unless-stopped
volumes:
- ../tokens.txt:/app/tokens.txt:ro
- ../FinancialReport/data:/app/data
- ../FinancialReport/output:/app/output
- ./databases:/external/discord_bot:ro
environment:
- DB_PATH=/app/data/processed/FinancialDataBaseProcessed.db
- BOT_DB_PATH=/external/discord_bot/discord_bot.db
- TOKEN_PATH=/app/tokens.txt
- REPORT_OUTPUT_PATH=/app/output/reports
networks:
- discord_network
networks:
discord_network:
driver: bridge

View File

@@ -0,0 +1,307 @@
# FinancialReport MVP 项目需求文档
## 项目概述
### 核心目标
构建一个独立的Docker容器项目用于生成综合性的FinancialReport.pdf报告。MVP版本专注于本月消费汇总功能。
### 技术栈
- **不做Python版本要求**: 使用系统现有Python版本
- **部署**: Docker容器化
- **输出**: PDF报告生成
- **数据库**: SQLite (.db文件)
- **运行模式**: 每日运行一次,然后挂起等待
- **数据获取**: 自动爬取银行/信用卡数据
- **通信**: 与Discord Bot项目通过数据库文件通信
## 项目架构
### 文件结构
```
~/docker/
├── token.txt # 全局token文件只读访问
├── discord_bot/
│ ├── databases/
│ │ └── discord_bot.db # Discord Bot主数据库FR只读访问
│ └── [Bot项目文件]
├── FinancialReport/
│ ├── src/
│ │ ├── main.py
│ │ ├── data_processor/
│ │ │ ├── __init__.py
│ │ │ ├── account_scraper.py
│ │ │ ├── account_parser.py
│ │ │ ├── transaction_processor.py
│ │ │ └── csp_template.py
│ │ ├── report_generator/
│ │ │ ├── __init__.py
│ │ │ ├── pdf_generator.py
│ │ │ └── consumption_summary.py
│ │ └── config/
│ │ ├── config.json
│ │ └── account_configs/
│ ├── data/
│ │ ├── raw_data/ # 原始银行/信用卡数据
│ │ ├── processed/ # 处理后的统一数据
│ │ │ └── FinancialDataBaseProcessed.db # 主数据库
│ │ └── credentials/ # 银行登录凭据(加密存储)
│ ├── output/
│ │ └── reports/ # 生成的PDF报告
│ ├── templates/
│ │ └── report_template.html
│ ├── requirements.txt
│ └── Dockerfile
├── docker-compose.yml
└── shared_volumes/
```
## 核心功能模块
### 1. 数据获取模块 (MVP核心)
#### 自动化数据爬取
- **功能**: 自动登录银行/信用卡网站获取交易数据
- **认证信息**: 从 `~/docker/token.txt` 读取银行登录凭据
- **配置方式**: 所有敏感信息从token.txt读取禁止硬编码
- **数据下载**: 自动下载最新交易记录
- **多账户支持**: 支持多个银行和信用卡账户
- **错误处理**: 爬虫失败、验证码、账户锁定等风险 (后续处理)
#### 账户数据解析器
- **功能**: 解析各个银行/信用卡账户数据
- **输入**: 各种格式的账户数据文件 (CSV, Excel, JSON等)
- **输出**: 统一的CSP结构数据
- **挑战**: 处理不统一的字段和格式
#### Chase Sapphire Preferred (CSP) 数据格式模板
- **CSP信用卡**: 家庭主要消费卡,其数据格式作为统一模板
- **模板来源**: 直接使用Chase Sapphire Preferred的导出格式
- **标准化策略**: 所有其他银行/信用卡数据都转换为CSP格式
- **字段映射**: 根据CSP的实际导出字段建立映射规则
- **格式特点**:
- Chase特有的交易分类系统
- 标准的日期和金额格式
- 商户名称和描述字段
- 其他银行缺失的字段填充null
#### 数据统一处理
- **CSP格式为标准**: 使用Chase Sapphire Preferred的数据格式作为统一标准
- **字段映射**: 将各账户的字段映射到CSP格式
- **数据清洗**: 处理缺失值、格式错误、重复数据
- **null填充**: 不存在的字段自动填充null
- **验证**: 数据完整性和格式验证
### 2. ALP交易识别和分类
#### ALP交易筛选
- **识别规则**: 商户名称包含"ALP"字样的所有交易
- **临时分类**: 初始标记为待确认状态
- **数据输出**: 生成ALP交易列表供确认
#### 确认流程和数据同步
- **每日启动流程**:
1. 检查昨日pending_alp_transactions vs Discord Bot的toy_balance表
2. 根据toy_balance表的is_toy_fund字段
- true: 移动到transactions表的Ginbo分类
- false: 移动到transactions表的Family分类
3. 清空pending_alp_transactions表
4. 爬取新数据识别新的ALP交易
5. 将新的ALP交易写入pending_alp_transactions表
6. 生成FinancialReport.pdf
7. 挂起等待下次运行
- **数据时序**: FR项目的数据总是比实际晚一天
- 用户今天看到的是昨天处理好的数据
- 今天的新数据需要明天才能在报告中看到
#### Discord Bot通信
- **数据库结构**:
```
-- FinancialDataBaseProcessed.db 主表
CREATE TABLE transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_id TEXT UNIQUE,
account_name TEXT,
date DATE,
amount REAL,
merchant TEXT,
description TEXT,
category TEXT, -- Family/Ginbo/其他分类
is_alp BOOLEAN DEFAULT FALSE,
status TEXT DEFAULT 'processed',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 待确认ALP交易表
CREATE TABLE pending_alp_transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_id TEXT,
date DATE,
amount REAL,
merchant TEXT,
description TEXT,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 策略追踪表 (为后续功能预留)
CREATE TABLE strategy_tracking (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy_name TEXT,
strategy_data JSON,
last_updated TIMESTAMP,
status TEXT DEFAULT 'active'
);
```
#### 跨项目数据访问
- **权限模型**:
- FinancialReport: 对自己的FinancialDataBaseProcessed.db读写
- FinancialReport: 对Discord Bot的discord_bot.db只读
- Discord Bot: 对FinancialReport的数据库只读
- **数据同步**: FR项目通过读取Bot的toy_balance表来决定数据分类
- **访问路径**:
- FR访问Bot: `/external/discord_bot/discord_bot.db`Docker挂载路径
- **SQLite只读访问**:
- 连接字符串添加`?mode=ro`参数
- 实现重试机制处理数据库锁
- 访问超时设置为2秒
### 3. 消费汇总报告生成
#### 数据聚合
- **按分类汇总**: 根据消费类别统计金额
- **按账户汇总**: 各个账户的消费情况
- **按时间汇总**: 日/周/月消费趋势
- **特殊分类**: Ginbo (toy_fund) vs Family消费对比
#### PDF报告生成
- **模板系统**: 使用HTML模板生成PDF
- **图表集成**: 消费分布饼图、趋势线图
- **数据表格**: 详细的消费明细表
- **统计摘要**: 关键指标和对比数据
## Docker配置
### 容器挂载
```
volumes:
# 访问Discord Bot的数据库只读
- ../discord_bot/databases:/external/discord_bot:ro
# 访问全局token文件只读
- ../token.txt:/app/token.txt:ro
# 数据持久化
- ./data:/app/data
- ./output:/app/output
```
### 环境变量
- DB_PATH: 数据库文件路径
- BOT_DB_PATH: Discord Bot数据库路径只读
- TOKEN_PATH: token文件路径
- REPORT_OUTPUT_PATH: PDF输出路径
## 开发优先级
### MVP版本 (Phase 1): 本月消费汇总
1. **基础架构搭建**: Docker环境、Python框架
2. **CSP格式分析**: 分析Chase Sapphire Preferred的导出格式作为标准模板
3. **自动化数据获取**: 银行/信用卡数据爬取系统
4. **数据解析器**: 支持主要银行/信用卡格式统一转换为CSP格式
5. **ALP交易识别**: 自动筛选和标记ALP交易
6. **数据库通信**: 实现与Discord Bot的只读数据访问
7. **基础PDF生成**: 简单的消费汇总报告
8. **运行调度**: 每日运行机制和挂起等待
### Phase 2: 市场情绪模块
1. 市场数据接口集成
2. 情绪分析算法
3. 市场指标计算
### Phase 3: 今日涨跌模块
1. 实时价格数据获取
2. 投资组合涨跌计算
3. 可视化图表生成
### Phase 4: 策略追踪模块
1. 与Discord Bot策略回测集成
2. 策略表现追踪
3. 验证数据分析
## 数据源和接口
### 银行/信用卡数据源
- **格式支持**: CSV, Excel, JSON, XML
- **常见银行**: 需要适配主要银行的导出格式
- **自动化**: 预留API接口用于后续自动化数据获取
### 配置管理
- **账户配置**: 每个账户的字段映射配置
- **分类规则**: 消费类别自动分类规则
- **报告配置**: PDF报告的样式和内容配置
- **Token管理**: 从~/docker/token.txt统一读取
## 鲁棒性设计
### 错误处理
- **数据格式错误**: 优雅处理未知格式,记录错误日志
- **数据库访问冲突**: SQLite只读模式重试机制
- **PDF生成失败**: 备用简化版本报告
- **外部依赖故障**: 离线模式和缓存机制
### 数据完整性
- **事务处理**: 确保数据一致性
- **备份机制**: 定期备份处理后的数据
- **版本控制**: 数据变更历史记录
- **校验机制**: 数据完整性检查
### 性能优化
- **增量处理**: 只处理新增/变更数据
- **并行处理**: 多账户数据并行解析
- **缓存策略**: 重复计算结果缓存
- **内存管理**: 大数据集分批处理
## 部署要求
### Docker配置
- **基础镜像**: Python-slim
- **依赖管理**: requirements.txt
- **环境变量**: 配置文件路径、数据库位置
- **卷挂载**: 数据目录、共享目录、输出目录
### 定时任务
- **数据处理**: 每日自动处理新数据
- **报告生成**: 每日生成FinancialReport.pdf
- **数据清理**: 定期清理临时数据和旧报告
### 监控和日志
- **处理日志**: 详细的数据处理过程记录
- **错误日志**: 异常情况和错误信息
- **性能监控**: 处理时间和资源使用
- **通信日志**: 与Discord Bot的交互记录
## 成功标准
### 功能完整性
- [ ] 成功解析至少3种不同银行的数据格式
- [ ] ALP交易100%正确识别和分类
- [ ] 与Discord Bot通信机制稳定工作
- [ ] PDF报告生成成功率99%以上
### 数据准确性
- [ ] 消费金额计算误差 < 0.01%
- [ ] 数据分类准确率 > 95%
- [ ] 无重复或遗漏交易记录
- [ ] 与原始数据完全可追溯
### 性能要求
- [ ] 单月数据处理时间 < 30秒
- [ ] PDF报告生成时间 < 10秒
- [ ] 支持同时处理10个账户数据
- [ ] 内存使用 < 512MB
### 可靠性指标
- [ ] 7x24小时稳定运行
- [ ] 数据库访问冲突自动恢复
- [ ] 异常情况下数据零丢失
- [ ] 通信故障后自动重试成功

59
discord_bot/init_db.py Executable file
View File

@@ -0,0 +1,59 @@
#!/usr/bin/env python3
"""
数据库初始化脚本
运行此脚本来创建必要的数据库表和初始数据
"""
import os
import sys
from pathlib import Path
# 添加项目路径
sys.path.append(str(Path(__file__).parent))
from utils.database import DatabaseManager
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
"""初始化数据库"""
try:
# 确保databases目录存在
db_dir = Path(__file__).parent / "databases"
db_dir.mkdir(exist_ok=True)
# 初始化数据库
db_path = str(db_dir / "discord_bot.db")
logger.info(f"初始化数据库: {db_path}")
db_manager = DatabaseManager(db_path)
# 验证数据库是否正确创建
tables = db_manager.execute_query("""
SELECT name FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
""")
logger.info("已创建的表:")
for table in tables:
logger.info(f" - {table['name']}")
# 检查术语字典是否有数据
term_count = db_manager.execute_query("SELECT COUNT(*) as count FROM term_dic")[0]['count']
logger.info(f"术语字典包含 {term_count} 条记录")
logger.info("数据库初始化完成!")
except Exception as e:
logger.error(f"数据库初始化失败: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

49
discord_bot/main.py Normal file
View File

@@ -0,0 +1,49 @@
import asyncio
import logging
import os
import sys
from pathlib import Path
# 添加项目路径到sys.path
sys.path.append(str(Path(__file__).parent))
from tu_bot.bot import TuBot
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('bot.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
async def main():
"""主函数 - 启动Tu Bot"""
try:
# 读取token文件
token_path = os.getenv('TOKEN_PATH', '/home/will/docker/tokens.txt')
if not os.path.exists(token_path):
logger.error(f"Token文件不存在: {token_path}")
return
# 初始化并启动Tu Bot
bot = TuBot(token_path)
await bot.start()
except Exception as e:
logger.error(f"启动失败: {e}")
raise
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Bot已手动停止")
except Exception as e:
logger.error(f"运行错误: {e}")
sys.exit(1)

View File

@@ -0,0 +1,7 @@
discord.py>=2.3.2
aiohttp>=3.8.0
asyncio
sqlite3
python-dotenv>=1.0.0
aiofiles>=23.0.0
anthropic>=0.25.0

View File

@@ -0,0 +1,279 @@
import asyncio
import logging
import subprocess
import os
import json
import sys
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import re
from pathlib import Path
# 添加项目路径
sys.path.append(str(Path(__file__).parent.parent.parent))
from utils.claude_code_client import ClaudeCodeClient
from utils.token_reader import TokenReader
logger = logging.getLogger(__name__)
class TerminalHandler:
"""Terminal命令处理器 - 实现AI控制的终端系统"""
def __init__(self):
self.work_dir = "/home/will/docker"
self.restricted_paths = ["/home/will/docker/gitea"]
self.conversation_memory: Dict[str, Dict] = {}
self.conversation_timeout = timedelta(hours=1)
# 初始化Claude Code CLI客户端
self.claude_code_client = ClaudeCodeClient(self.work_dir)
# 危险命令黑名单
self.dangerous_commands = [
r"rm\s+-rf\s+/",
r"rm\s+-rf\s+\*",
r"rm\s+-rf\s+~",
r"chmod\s+-R\s+777",
r"chown\s+-R",
r"mkfs\.",
r"dd\s+if=",
r">\s*/dev/",
]
async def handle_command(self, command: str, user_id: str) -> str:
"""处理terminal命令"""
try:
# 检查会话状态
conversation = self._get_or_create_conversation(user_id)
# 安全检查
if not self._is_command_safe(command):
return f"❌ **安全限制**: 命令被拒绝\n```\n{command}\n```\n该命令可能对系统造成危险。"
# 记录命令到会话历史
conversation['history'].append({
'type': 'user_command',
'content': command,
'timestamp': datetime.now().isoformat()
})
# 处理命令
if self._is_direct_shell_command(command):
# 直接shell命令
result = await self._execute_shell_command(command)
else:
# AI解释和执行
result = await self._ai_interpret_and_execute(command, conversation)
# 记录结果
conversation['history'].append({
'type': 'system_response',
'content': result,
'timestamp': datetime.now().isoformat()
})
# 压缩历史记录以控制token消耗
self._compress_conversation_history(conversation)
return result
except Exception as e:
logger.error(f"处理terminal命令错误: {e}")
return f"❌ **执行错误**: {str(e)}"
def _get_or_create_conversation(self, user_id: str) -> Dict:
"""获取或创建用户会话"""
now = datetime.now()
if user_id in self.conversation_memory:
conversation = self.conversation_memory[user_id]
last_activity = datetime.fromisoformat(conversation['last_activity'])
# 检查是否超时
if now - last_activity > self.conversation_timeout:
logger.info(f"用户 {user_id} 会话超时,创建新会话")
conversation = self._create_new_conversation()
else:
conversation['last_activity'] = now.isoformat()
else:
conversation = self._create_new_conversation()
self.conversation_memory[user_id] = conversation
return conversation
def _create_new_conversation(self) -> Dict:
"""创建新会话"""
return {
'last_activity': datetime.now().isoformat(),
'history': [],
'context': {
'current_dir': self.work_dir,
'last_commands': [],
'important_info': []
}
}
def _is_command_safe(self, command: str) -> bool:
"""检查命令是否安全"""
command_lower = command.lower()
# 检查危险命令模式
for pattern in self.dangerous_commands:
if re.search(pattern, command_lower):
logger.warning(f"拒绝危险命令: {command}")
return False
# 检查路径限制
if any(restricted in command for restricted in self.restricted_paths):
logger.warning(f"拒绝访问受限路径: {command}")
return False
return True
def _is_direct_shell_command(self, command: str) -> bool:
"""判断是否为直接shell命令"""
# 简单的shell命令模式
shell_patterns = [
r"^ls(\s|$)",
r"^pwd(\s|$)",
r"^cd\s+",
r"^cat\s+",
r"^grep\s+",
r"^find\s+",
r"^docker\s+",
r"^docker-compose\s+",
r"^tail\s+",
r"^head\s+",
r"^ps(\s|$)",
r"^df(\s|$)",
r"^du\s+",
r"^top(\s|$)",
r"^htop(\s|$)",
]
return any(re.match(pattern, command.lower()) for pattern in shell_patterns)
async def _execute_shell_command(self, command: str) -> str:
"""执行shell命令"""
try:
# 设置工作目录和环境
env = os.environ.copy()
# 执行命令
process = await asyncio.create_subprocess_shell(
command,
cwd=self.work_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30.0)
# 处理输出
output = ""
if stdout:
output += stdout.decode('utf-8', errors='replace')
if stderr:
output += "\n" + stderr.decode('utf-8', errors='replace')
if process.returncode == 0:
result = f"✅ **命令执行成功**\n```bash\n$ {command}\n{output}\n```"
else:
result = f"⚠️ **命令执行完成 (退出码: {process.returncode})**\n```bash\n$ {command}\n{output}\n```"
return result
except asyncio.TimeoutError:
return f"⏰ **命令超时**: 执行时间超过30秒\n```bash\n$ {command}\n```"
except Exception as e:
return f"❌ **执行错误**: {str(e)}\n```bash\n$ {command}\n```"
async def _ai_interpret_and_execute(self, command: str, conversation: Dict) -> str:
"""使用Claude Code CLI解释自然语言命令并执行"""
# 使用Claude Code CLI处理命令
return await self.claude_code_client.run_claude_command(command)
async def _basic_command_mapping(self, command: str) -> str:
"""基础命令映射无AI时的回退方案"""
command_lower = command.lower()
# 基础命令映射
if "查看" in command and ("文件" in command or "目录" in command):
return await self._execute_shell_command("ls -la")
elif "当前目录" in command or "当前位置" in command:
return await self._execute_shell_command("pwd")
elif "查看容器" in command or "docker容器" in command or "容器状态" in command:
return await self._execute_shell_command("docker ps")
elif "查看日志" in command and "容器" in command:
return await self._execute_shell_command("docker ps --format 'table {{.Names}}\\t{{.Status}}'")
elif "磁盘使用" in command or "存储空间" in command:
return await self._execute_shell_command("df -h")
else:
# 未识别的命令,提供帮助
return f"""🤔 **未能理解命令**: {command}
**建议使用以下格式:**
• 直接shell命令: `ls`, `docker ps`, `pwd`
• 自然语言: "查看当前目录文件", "查看Docker容器状态"
**常用命令示例:**
```bash
ls -la # 查看文件列表
docker ps # 查看容器状态
docker-compose ps # 查看compose服务
tail -f /var/log/syslog # 查看系统日志
df -h # 查看磁盘使用
```
💡 **提示**: 当前未启用AI模式仅支持基础命令映射"""
def _compress_conversation_history(self, conversation: Dict):
"""压缩会话历史以控制token消耗"""
history = conversation['history']
# 如果历史记录超过20条进行压缩
if len(history) > 20:
# 保留最近10条完整记录
recent_history = history[-10:]
# 压缩较早的记录,只保留重要信息
compressed_history = []
for item in history[:-10]:
if item['type'] == 'user_command':
# 保留所有用户命令
compressed_history.append(item)
elif item['type'] == 'system_response':
# 只保留成功的重要操作结果
if '' in item['content'] and any(keyword in item['content'].lower()
for keyword in ['docker', 'container', 'service', 'config']):
# 压缩输出,只保留关键信息
compressed_item = item.copy()
compressed_item['content'] = self._extract_key_info(item['content'])
compressed_history.append(compressed_item)
# 更新历史记录
conversation['history'] = compressed_history + recent_history
logger.info(f"压缩会话历史: {len(history)} -> {len(conversation['history'])} 条记录")
def _extract_key_info(self, content: str) -> str:
"""提取内容中的关键信息"""
# 简单的关键信息提取
lines = content.split('\n')
key_lines = []
for line in lines:
# 保留状态信息、错误信息、重要路径等
if any(keyword in line.lower() for keyword in
['status', 'running', 'stopped', 'error', 'success', 'docker', 'container']):
key_lines.append(line)
if key_lines:
return '\n'.join(key_lines[:3]) # 最多保留3行
else:
return content[:100] + "..." if len(content) > 100 else content

55
discord_bot/start.sh Executable file
View File

@@ -0,0 +1,55 @@
#!/bin/bash
# Discord Bot 启动脚本
echo "🚀 Discord Bot 启动脚本"
echo "========================"
# 检查Python版本
python3 --version
# 检查必要文件
echo "📋 检查必要文件..."
if [ ! -f "../tokens.txt" ]; then
echo "❌ 错误: 找不到 ~/docker/tokens.txt 文件"
echo " 请确保token文件存在并包含Discord Bot tokens"
exit 1
fi
if [ ! -f "requirements.txt" ]; then
echo "❌ 错误: 找不到 requirements.txt 文件"
exit 1
fi
# 创建虚拟环境(可选)
if [ "$1" = "--venv" ]; then
echo "🐍 创建Python虚拟环境..."
python3 -m venv venv
source venv/bin/activate
echo "✅ 虚拟环境已激活"
fi
# 安装依赖
echo "📦 安装Python依赖..."
pip install -r requirements.txt
# 初始化数据库
echo "🗄️ 初始化数据库..."
python3 init_db.py
# 运行测试(可选)
if [ "$1" = "--test" ] || [ "$2" = "--test" ]; then
echo "🧪 运行功能测试..."
python3 test_bot.py
fi
# 询问是否启动bot
echo ""
read -p "是否启动Discord Bot? (y/n): " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
echo "🤖 启动Discord Bot..."
python3 main.py
else
echo "👋 启动脚本完成,手动运行: python3 main.py"
fi

106
discord_bot/test_bot.py Executable file
View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
Discord Bot 测试脚本
用于本地测试bot功能不连接Discord
"""
import asyncio
import sys
from pathlib import Path
import logging
# 添加项目路径
sys.path.append(str(Path(__file__).parent))
from slash_commands.terminal.terminal_handler import TerminalHandler
from utils.database import DatabaseManager
from utils.token_reader import TokenReader
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def test_terminal_handler():
"""测试Terminal处理器"""
print("\n🧪 测试Terminal处理器...")
handler = TerminalHandler()
test_user_id = "test_user_123"
# 测试命令列表
test_commands = [
"ls",
"pwd",
"docker ps",
"查看当前目录文件",
"查看Docker容器状态",
"rm -rf /", # 应该被拒绝
]
for command in test_commands:
print(f"\n🔧 测试命令: {command}")
try:
result = await handler.handle_command(command, test_user_id)
print(f"✅ 结果: {result[:200]}...")
except Exception as e:
print(f"❌ 错误: {e}")
def test_database():
"""测试数据库功能"""
print("\n🧪 测试数据库功能...")
try:
# 使用测试数据库
db_manager = DatabaseManager("./test.db")
# 测试术语查询
term_def = db_manager.get_term_definition("docker ps")
print(f"✅ 术语定义: {term_def}")
# 测试添加术语
success = db_manager.add_term("test_term", "测试术语定义", "test", 1)
print(f"✅ 添加术语: {'成功' if success else '失败'}")
# 测试命令日志
db_manager.log_command("test_user", "test command", "terminal", "success", "测试结果", 1.5)
print("✅ 命令日志记录成功")
except Exception as e:
print(f"❌ 数据库测试失败: {e}")
def test_token_reader():
"""测试Token读取器"""
print("\n🧪 测试Token读取器...")
try:
token_reader = TokenReader("/home/will/docker/tokens.txt")
# 测试读取token
tu_token = token_reader.get_token("tu_discord_token")
print(f"✅ Tu Token: {tu_token[:20]}..." if tu_token else "❌ Tu Token读取失败")
claude_api = token_reader.get_token("claude_API")
print(f"✅ Claude API: {claude_api[:20]}..." if claude_api else "❌ Claude API读取失败")
except Exception as e:
print(f"❌ Token读取测试失败: {e}")
async def main():
"""主测试函数"""
print("🚀 Discord Bot 功能测试")
print("=" * 50)
# 测试各个组件
test_token_reader()
test_database()
await test_terminal_handler()
print("\n" + "=" * 50)
print("✅ 测试完成")
if __name__ == "__main__":
asyncio.run(main())

78
discord_bot/test_claude.py Executable file
View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
"""
Claude API 测试脚本
"""
import asyncio
import sys
from pathlib import Path
# 添加项目路径
sys.path.append(str(Path(__file__).parent))
from utils.claude_client import ClaudeClient
from utils.token_reader import TokenReader
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
async def test_claude_api():
"""测试Claude API功能"""
print("🧪 测试Claude API...")
try:
# 读取API密钥
token_reader = TokenReader("/home/will/docker/tokens.txt")
claude_api_key = token_reader.get_token("claude_API")
if not claude_api_key:
print("❌ 未找到Claude API密钥")
return
print(f"✅ 找到Claude API密钥: {claude_api_key[:20]}...")
# 初始化Claude客户端
claude_client = ClaudeClient(claude_api_key)
# 测试基础聊天
print("\n🔧 测试基础聊天...")
messages = [{"role": "user", "content": "你好请简单介绍一下Docker"}]
response = await claude_client.chat_completion(messages, max_tokens=200)
if response:
print(f"✅ 聊天测试成功:\n{response[:200]}...")
else:
print("❌ 聊天测试失败")
# 测试命令解释
print("\n🔧 测试命令解释...")
test_commands = [
"查看Docker容器状态",
"列出当前目录文件",
"删除所有文件", # 应该被拒绝
"查看磁盘使用情况"
]
for cmd in test_commands:
print(f"\n 测试命令: {cmd}")
result = await claude_client.interpret_command(cmd)
if result:
print(f" 解释结果: {result}")
else:
print(f" ❌ 解释失败")
# 测试错误解释
print("\n🔧 测试错误解释...")
error_output = "docker: command not found"
explanation = await claude_client.explain_error("docker ps", error_output)
if explanation:
print(f"✅ 错误解释: {explanation}")
else:
print("❌ 错误解释失败")
except Exception as e:
print(f"❌ 测试失败: {e}")
if __name__ == "__main__":
asyncio.run(test_claude_api())

51
discord_bot/test_claude_code.py Executable file
View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
"""
Claude Code CLI 测试脚本
"""
import asyncio
import sys
from pathlib import Path
# 添加项目路径
sys.path.append(str(Path(__file__).parent))
from utils.claude_code_client import ClaudeCodeClient
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
async def test_claude_code_cli():
"""测试Claude Code CLI集成"""
print("🧪 测试Claude Code CLI集成...")
try:
# 初始化Claude Code CLI客户端
claude_code_client = ClaudeCodeClient("/home/will/docker")
# 测试基础对话
print("\n🔧 测试Claude Code CLI对话...")
response = await claude_code_client.ask_claude("你好,请简单介绍一下你的功能")
print(f"对话测试结果:\n{response}\n")
# 测试命令解释
print("🔧 测试命令解释功能...")
test_commands = [
"ls", # 直接shell命令
"docker ps", # Docker命令
"查看当前目录文件", # 自然语言
"查看Docker容器状态", # 自然语言
"启动nginx容器", # 复杂自然语言
]
for cmd in test_commands:
print(f"\n 测试命令: {cmd}")
result = await claude_code_client.run_claude_command(cmd)
print(f" 执行结果: {result[:200]}...")
except Exception as e:
print(f"❌ 测试失败: {e}")
if __name__ == "__main__":
asyncio.run(test_claude_code_cli())

Binary file not shown.

310
discord_bot/tu_bot/bot.py Normal file
View File

@@ -0,0 +1,310 @@
# -*- coding: utf-8
Tu Bot - Discord服务器管理机器人
================================
这个bot实现了通过Discord聊天界面来控制服务器的功能
主要特点
1. 使用Claude Code CLI作为AI后端来解释和执行命令2. 支持自然语言和直接shell命令
3 具有安全限制和会话管理
4ord slash commands提供用户界面
架构说明
- Discord Bot (前端界面) -> TerminalHandler (命令处理) -> Claude Code CLI (AI解释) -> Shell执行
- 或者直接执行shell命令绕过AI解释
工作原理
1. 用户通过Discord发送命令"/terminal ls"/terminal 查看当前目录"
2ot接收命令并传递给TerminalHandler
3 TerminalHandler检查命令安全性然后
- 如果是直接shell命令直接执行
- 如果是自然语言使用Claude Code CLI解释后执行4. 执行结果返回给Discord用户
"
import discord
from discord.ext import commands
import asyncio
import logging
import os
import sys
from pathlib import Path
# 添加项目根目录到Python路径以便导入其他模块
sys.path.append(str(Path(__file__).parent.parent))
# 导入自定义模块
from slash_commands.terminal.terminal_handler import TerminalHandler
from utils.token_reader import TokenReader
# 设置日志记录器
logger = logging.getLogger(__name__)
class TuBot:
Tu Bot - 处理所有slash commands的主控制器
这个类是整个Discord Bot的核心负责
1. 初始化Discord Bot连接
2. 设置事件处理器连接错误处理等
3. 注册slash commands
4协调各个处理器模块
def __init__(self, token_path: str):
初始化Tu Bot
Args:
token_path (str): Discord Bot token文件路径
self.token_path = token_path
# 初始化token读取器用于安全地读取Discord Bot token
self.token_reader = TokenReader(token_path)
# 设置Discord Bot的权限intents
# 这些权限决定了bot能访问哪些Discord功能
intents = discord.Intents.default()
intents.message_content =true# 允许读取消息内容
intents.guild_messages = true许访问服务器消息
# 创建Discord Bot实例
# command_prefix='/' 表示使用slash commands实际上slash commands不需要prefix
# help_command=None 禁用默认的帮助命令,使用自定义的
self.bot = commands.Bot(
command_prefix='/, intents=intents,
help_command=None
)
# 初始化终端命令处理器
# 这个处理器负责将Discord命令转换为实际的服务器操作
self.terminal_handler = TerminalHandler()
# 设置事件处理器和命令处理器
self.setup_events()
self.setup_commands()
def setup_events(self):
设置Discord Bot的事件处理器
事件处理器用于响应Discord的各种事件
- bot连接成功
- 命令执行错误
- 消息接收等
@self.bot.event
async def on_ready():
Bot连接成功时触发的事件
这个事件在bot成功连接到Discord服务器时触发
用于
1录连接状态
2. 同步slash commands到Discord
3他必要的设置
logger.info(f{self.bot.user} 已连接到Discord!')
# 修复linter错误添加空值检查
if self.bot.user:
logger.info(fBotID: {self.bot.user.id}')
# 同步slash commands到Discord
# 这会将我们定义的slash commands注册到Discord使其在客户端可见
try:
synced = await self.bot.tree.sync()
logger.info(f同步了 {len(synced)} 个slash commands")
except Exception as e:
logger.error(f"同步slash commands失败: {e}")
@self.bot.event
async def on_command_error(ctx, error):
命令执行错误时的事件处理器
当任何命令执行出错时这个处理器会被调用
用于
1录错误日志
2的错误消息
3. 处理不同类型的错误如命令不存在权限不足等
logger.error(f"命令错误: {error}")
if isinstance(error, commands.CommandNotFound):
await ctx.send(未知命令请使用 /help 查看可用命令")
else:
await ctx.send(f"执行命令时出错: {str(error)}")
def setup_commands(self):
设置slash commands
Slash commands是Discord的现代命令系统提供
1. 自动补全
2. 参数提示
3 更好的用户体验
这里定义了三个主要命令
- /terminal: AI控制的终端系统
- /claude: 直接与Claude AI对话
- /help: 显示帮助信息
@self.bot.tree.command(name="terminal, description="AI控制终端系统")
async def terminal_command(interaction: discord.Interaction, command: str):
处理/terminal命令
这是核心功能命令允许用户通过Discord控制服务器
工作流程
1 接收用户输入的命令可以是自然语言或shell命令
2 调用TerminalHandler进行处理
3. TerminalHandler会
- 检查命令安全性
- 判断是直接执行还是需要AI解释
- 执行命令并返回结果
4. 将结果分段发送回Discord处理2000字符限制
Args:
interaction (discord.Interaction): Discord交互对象
command (str): 用户输入的命令
try:
# 延迟响应,显示"正在思考"状态
# 这很重要,因为命令执行可能需要时间
await interaction.response.defer(thinking=True)
# 调用terminal处理器处理命令
# 修复linter错误将user.id转换为字符串
response = await self.terminal_handler.handle_command(command, str(interaction.user.id))
# 分段发送响应Discord有2000字符限制
await self.send_long_response(interaction, response)
except Exception as e:
logger.error(f"Terminal命令错误: {e}) await interaction.followup.send(f执行命令时出错: {str(e)}")
@self.bot.tree.command(name="claude, description="直接与Claude Code CLI对话")
async def claude_command(interaction: discord.Interaction, question: str):
处理/claude命令
这个命令允许用户直接与Claude AI进行对话
不涉及服务器操作纯粹是AI问答功能
Args:
interaction (discord.Interaction): Discord交互对象
question (str): 用户的问题
try:
await interaction.response.defer(thinking=True)
# 动态导入Claude Code CLI客户端
# 这样可以避免循环导入问题
from utils.claude_code_client import ClaudeCodeClient
claude_code_client = ClaudeCodeClient()
# 使用Claude Code CLI进行对话
response = await claude_code_client.ask_claude(question)
await self.send_long_response(interaction, response)
except Exception as e:
logger.error(f"Claude命令错误: {e}) await interaction.followup.send(f"执行Claude命令时出错: {str(e)}")
@self.bot.tree.command(name="help, description="显示帮助信息")
async def help_command(interaction: discord.Interaction):
处理/help命令
显示bot的使用说明和命令列表
帮助用户了解如何使用这个bot
help_text = **Discord Bot 命令帮助**
🤖 **核心命令:**
`/terminal <command>` - AI控制终端系统
`/claude <question>` - 直接与Claude AI对话
`/help` - 显示此帮助信息
💡 **使用示例:**
`/terminal ls` - 列出当前目录文件
`/terminal docker ps` - 查看运行的容器
`/terminal 启动nginx容器` - 自然语言命令
`/claude 如何优化Docker容器性能` - Claude Code CLI问答
**注意事项:**
- Terminal命令仅限于~/docker/*目录使用Claude Code CLI解释
- 不支持危险操作如rm -rf
- Claude命令直接使用Claude Code CLI进行对话
- 需要先安装Claude Code CLI: https://docs.anthropic.com/en/docs/claude-code
await interaction.response.send_message(help_text)
async def send_long_response(self, interaction: discord.Interaction, response: str):
分段发送长响应
Discord有200字符的消息长度限制这个方法将长响应分段发送
算法说明
1. 如果响应长度不超过2000字符直接发送
2超过按行分割响应
3 将行组合成不超过2000符的片段
4依次发送每个片段
Args:
interaction (discord.Interaction): Discord交互对象
response (str): 要发送的响应内容
max_length =200rd消息最大长度
# 如果响应长度在限制内,直接发送
if len(response) <= max_length:
await interaction.followup.send(response)
return
# 分段处理长响应
parts =] # 存储所有片段
current_part =前正在构建的片段
# 按行分割响应,逐行处理
for line in response.split(undefinedn):
# 检查添加这一行是否会超过长度限制
if len(current_part) + len(line) + 1 > max_length:
# 如果会超过,保存当前片段并开始新片段
if current_part:
parts.append(current_part)
current_part = line
else:
# 如果不会超过,添加到当前片段
if current_part:
current_part += '\n' + line
else:
current_part = line
# 添加最后一个片段
if current_part:
parts.append(current_part)
# 发送所有片段
if parts:
# 发送第一部分
await interaction.followup.send(parts0)
# 发送剩余部分
for part in parts[1:]:
await interaction.followup.send(part)
async def start(self):
启动bot
这个方法负责
1取Discord Bot token
2. 启动Discord Bot连接
3启动过程中的错误
启动流程
1 从token文件读取Discord Bot token
2证token有效性
3iscord服务器
4 开始监听事件和命令
try:
# 从token文件读取Discord Bot token
token = self.token_reader.get_token('tu_discord_token')
if not token:
raise ValueError("无法读取tu_discord_token")
logger.info("启动Tu Bot...")
# 启动Discord Bot连接
# 这是一个阻塞调用bot会一直运行直到被停止
await self.bot.start(token)
except Exception as e:
logger.error(f"启动Bot失败: {e}")
raise

Binary file not shown.

View File

@@ -0,0 +1,111 @@
import asyncio
import logging
import aiohttp
import json
from typing import Optional, Dict, List
logger = logging.getLogger(__name__)
class ClaudeClient:
"""Claude API客户端"""
def __init__(self, api_key: str, base_url: str = "https://api.anthropic.com"):
self.api_key = api_key
self.base_url = base_url
self.model = "claude-3-5-sonnet-20241022"
async def chat_completion(self, messages: List[Dict], system_prompt: str = None, max_tokens: int = 1000) -> Optional[str]:
"""发送聊天完成请求到Claude API"""
try:
headers = {
"Content-Type": "application/json",
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01"
}
data = {
"model": self.model,
"max_tokens": max_tokens,
"messages": messages
}
if system_prompt:
data["system"] = system_prompt
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/v1/messages",
headers=headers,
json=data,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
result = await response.json()
return result["content"][0]["text"]
else:
error_text = await response.text()
logger.error(f"Claude API错误 {response.status}: {error_text}")
return None
except asyncio.TimeoutError:
logger.error("Claude API请求超时")
return None
except Exception as e:
logger.error(f"Claude API请求失败: {e}")
return None
async def interpret_command(self, command: str, context: Dict = None) -> Optional[str]:
"""让Claude解释并转换自然语言命令为shell命令"""
system_prompt = """你是一个Docker服务器管理助手。用户会给你自然语言命令你需要将其转换为安全的shell命令。
工作目录: /home/will/docker
允许的操作: ls, pwd, cat, grep, find, docker, docker-compose, tail, head, ps, df, du
禁止的操作: rm -rf, chmod 777, 任何可能破坏系统的命令
如果是危险命令或超出范围的操作,回复"DENIED: 原因"
如果是正常命令只回复shell命令不要解释。
示例:
用户: "查看Docker容器状态"
你: "docker ps"
用户: "删除所有文件"
你: "DENIED: 危险操作,可能删除重要文件"
"""
messages = [
{"role": "user", "content": f"请将这个命令转换为shell命令: {command}"}
]
if context:
# 添加上下文信息
context_info = f"当前目录上下文: {context.get('current_dir', '/home/will/docker')}"
if context.get('last_commands'):
context_info += f"\n最近执行的命令: {', '.join(context['last_commands'][-3:])}"
messages[0]["content"] = f"{context_info}\n\n{messages[0]['content']}"
return await self.chat_completion(messages, system_prompt, max_tokens=150)
async def explain_error(self, command: str, error_output: str) -> Optional[str]:
"""让Claude解释错误并提供解决建议"""
system_prompt = """你是一个Docker服务器管理助手。用户执行命令时遇到错误请简洁地解释问题并提供解决建议。
要求:
1. 简洁明了不超过3行
2. 重点说明可能的原因
3. 提供具体的解决方案
4. 如果是权限问题,提醒用户当前限制
"""
messages = [
{
"role": "user",
"content": f"命令: {command}\n错误输出: {error_output}\n\n请解释这个错误并提供解决建议。"
}
]
return await self.chat_completion(messages, system_prompt, max_tokens=200)

View File

@@ -0,0 +1,211 @@
import asyncio
import logging
import subprocess
import os
import json
from typing import Optional, Dict, List
logger = logging.getLogger(__name__)
class ClaudeCodeClient:
"""Claude Code CLI客户端"""
def __init__(self, work_dir: str = "/home/will/docker"):
self.work_dir = work_dir
self.claude_cli = "claude" # 假设claude命令已安装
async def run_claude_command(self, command: str, timeout: int = 30) -> Optional[str]:
"""运行Claude Code CLI命令"""
try:
# 检查Claude CLI是否可用
check_process = await asyncio.create_subprocess_shell(
"which claude",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await check_process.communicate()
if check_process.returncode != 0:
logger.warning("Claude CLI未找到回退到shell命令执行")
return await self._fallback_shell_execution(command)
# 构建Claude CLI命令
claude_command = f'claude "{command}"'
# 执行Claude CLI
process = await asyncio.create_subprocess_shell(
claude_command,
cwd=self.work_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
output = ""
if stdout:
output += stdout.decode('utf-8', errors='replace')
if stderr:
output += "\n" + stderr.decode('utf-8', errors='replace')
if process.returncode == 0:
return output.strip()
else:
logger.error(f"Claude CLI执行失败: {output}")
return await self._fallback_shell_execution(command)
except asyncio.TimeoutError:
logger.error(f"Claude CLI超时: {command}")
return f"⏰ **Claude CLI超时**: 执行时间超过{timeout}"
except Exception as e:
logger.error(f"Claude CLI执行错误: {e}")
return await self._fallback_shell_execution(command)
async def _fallback_shell_execution(self, command: str) -> str:
"""回退到shell命令执行"""
try:
# 检查是否为直接shell命令
if self._is_shell_command(command):
return await self._execute_shell_command(command)
else:
# 尝试解释自然语言命令
return await self._interpret_natural_language(command)
except Exception as e:
return f"❌ **执行错误**: {str(e)}"
def _is_shell_command(self, command: str) -> bool:
"""判断是否为shell命令"""
shell_commands = [
'ls', 'pwd', 'cd', 'cat', 'grep', 'find', 'docker', 'docker-compose',
'tail', 'head', 'ps', 'df', 'du', 'top', 'htop', 'systemctl', 'service'
]
cmd_parts = command.strip().split()
if cmd_parts:
return cmd_parts[0] in shell_commands
return False
async def _execute_shell_command(self, command: str) -> str:
"""直接执行shell命令"""
try:
process = await asyncio.create_subprocess_shell(
command,
cwd=self.work_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30)
output = ""
if stdout:
output += stdout.decode('utf-8', errors='replace')
if stderr:
output += "\n" + stderr.decode('utf-8', errors='replace')
if process.returncode == 0:
return f"✅ **命令执行成功**\n```bash\n$ {command}\n{output}\n```"
else:
return f"⚠️ **命令执行完成 (退出码: {process.returncode})**\n```bash\n$ {command}\n{output}\n```"
except asyncio.TimeoutError:
return f"⏰ **命令超时**: 执行时间超过30秒\n```bash\n$ {command}\n```"
except Exception as e:
return f"❌ **执行错误**: {str(e)}\n```bash\n$ {command}\n```"
async def _interpret_natural_language(self, command: str) -> str:
"""解释自然语言命令(简单映射)"""
command_lower = command.lower()
# 自然语言到shell命令的映射
mappings = {
"查看当前目录": "ls -la",
"查看文件": "ls -la",
"当前目录": "pwd",
"当前位置": "pwd",
"查看容器": "docker ps",
"docker容器": "docker ps",
"容器状态": "docker ps",
"查看进程": "ps aux",
"磁盘使用": "df -h",
"存储空间": "df -h",
"内存使用": "free -h",
"系统状态": "systemctl status"
}
for key, cmd in mappings.items():
if key in command_lower:
return await self._execute_shell_command(cmd)
# 尝试提取Docker相关操作
if "启动" in command and "容器" in command:
# 尝试提取容器名
words = command.split()
for word in words:
if word not in ["启动", "容器", ""]:
container_name = word
return await self._execute_shell_command(f"docker start {container_name}")
if "停止" in command and "容器" in command:
words = command.split()
for word in words:
if word not in ["停止", "容器", ""]:
container_name = word
return await self._execute_shell_command(f"docker stop {container_name}")
# 未识别的命令
return f"""🤔 **未能理解命令**: {command}
**建议使用以下格式:**
• 直接shell命令: `ls`, `docker ps`, `pwd`
• 自然语言: "查看当前目录", "查看Docker容器状态"
**常用命令示例:**
```bash
ls -la # 查看文件列表
docker ps # 查看容器状态
docker-compose ps # 查看compose服务
df -h # 查看磁盘使用
```
💡 **提示**: 当前使用简化的命令解释器建议直接使用shell命令获得最佳效果"""
async def ask_claude(self, question: str) -> str:
"""使用Claude Code CLI进行对话"""
try:
# 首先检查是否有Claude CLI
check_process = await asyncio.create_subprocess_shell(
"which claude",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await check_process.communicate()
if check_process.returncode != 0:
return "❌ **Claude CLI未安装**: 请先安装Claude Code CLI\n\n可以访问 https://docs.anthropic.com/en/docs/claude-code 了解安装方法"
# 使用Claude CLI进行对话
claude_command = f'claude "{question}"'
process = await asyncio.create_subprocess_shell(
claude_command,
cwd=self.work_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=60)
if process.returncode == 0 and stdout:
response = stdout.decode('utf-8', errors='replace').strip()
return f"🤖 **Claude Code回答:**\n\n{response}"
else:
error_msg = stderr.decode('utf-8', errors='replace') if stderr else "未知错误"
return f"❌ **Claude CLI错误**: {error_msg}"
except asyncio.TimeoutError:
return f"⏰ **Claude CLI超时**: 响应时间超过60秒"
except Exception as e:
logger.error(f"Claude CLI对话失败: {e}")
return f"❌ **对话错误**: {str(e)}"

View File

@@ -0,0 +1,216 @@
import sqlite3
import logging
import os
from typing import Optional, List, Dict, Any
from pathlib import Path
logger = logging.getLogger(__name__)
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_path: str = "/app/databases/discord_bot.db"):
self.db_path = db_path
self.ensure_database_exists()
def ensure_database_exists(self):
"""确保数据库和表存在"""
# 确保目录存在
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
# 创建数据库和表
self.init_database()
def init_database(self):
"""初始化数据库表结构"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 创建toy_balance表 - ALP交易确认记录
cursor.execute('''
CREATE TABLE IF NOT EXISTS toy_balance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_id TEXT UNIQUE,
date DATE,
amount REAL,
merchant TEXT,
description TEXT,
is_toy_fund BOOLEAN, -- true=Ginbo, false=Family
confirmed_by TEXT, -- Discord用户ID
confirmed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建portfolio表 - 投资组合数据
cursor.execute('''
CREATE TABLE IF NOT EXISTS portfolio (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
account_name TEXT,
shares REAL,
avg_cost REAL,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建term_dic表 - 术语字典
cursor.execute('''
CREATE TABLE IF NOT EXISTS term_dic (
id INTEGER PRIMARY KEY AUTOINCREMENT,
term TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL,
category TEXT DEFAULT 'general',
priority INTEGER DEFAULT 1, -- 1=highest priority
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建conversation_memory表 - 对话记录
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversation_memory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
session_id TEXT,
command_type TEXT, -- terminal, terminal_fin, etc.
content TEXT,
importance INTEGER DEFAULT 1, -- 重要性权重 1-5
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建command_log表 - 命令执行日志
cursor.execute('''
CREATE TABLE IF NOT EXISTS command_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
command TEXT NOT NULL,
command_type TEXT,
execution_status TEXT, -- success, error, denied
result_summary TEXT,
execution_time REAL, -- 执行时间(秒)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 插入一些默认的术语定义
self._insert_default_terms(cursor)
conn.commit()
logger.info("数据库初始化完成")
except Exception as e:
logger.error(f"数据库初始化失败: {e}")
raise
def _insert_default_terms(self, cursor):
"""插入默认术语定义"""
default_terms = [
('docker ps', '查看当前运行的Docker容器', 'docker', 1),
('docker-compose', '使用docker-compose.yml文件管理多容器应用', 'docker', 1),
('容器重启', 'docker restart <container_name>', 'docker', 2),
('查看日志', 'docker logs <container_name>', 'docker', 2),
('~/docker', '项目根目录包含所有Docker项目', 'path', 1),
('FinancialReport', '金融数据处理项目生成PDF报告', 'project', 1),
('toy_fund', 'Ginbo个人资金在toy_balance表中is_toy_fund=true', 'finance', 1),
('Family', '家庭共同资金在toy_balance表中is_toy_fund=false', 'finance', 1),
]
for term, definition, category, priority in default_terms:
cursor.execute('''
INSERT OR IGNORE INTO term_dic (term, definition, category, priority)
VALUES (?, ?, ?, ?)
''', (term, definition, category, priority))
def execute_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""执行查询并返回结果"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row # 返回字典格式
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"查询执行失败: {e}")
raise
def execute_update(self, query: str, params: tuple = ()) -> int:
"""执行更新操作并返回影响的行数"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
return cursor.rowcount
except Exception as e:
logger.error(f"更新执行失败: {e}")
raise
def log_command(self, user_id: str, command: str, command_type: str,
status: str, result_summary: str = "", execution_time: float = 0.0):
"""记录命令执行日志"""
try:
query = '''
INSERT INTO command_log
(user_id, command, command_type, execution_status, result_summary, execution_time)
VALUES (?, ?, ?, ?, ?, ?)
'''
self.execute_update(query, (user_id, command, command_type, status, result_summary, execution_time))
except Exception as e:
logger.error(f"记录命令日志失败: {e}")
def get_term_definition(self, term: str) -> Optional[str]:
"""获取术语定义"""
try:
query = '''
SELECT definition FROM term_dic
WHERE term = ? OR term LIKE ?
ORDER BY priority ASC
LIMIT 1
'''
results = self.execute_query(query, (term, f'%{term}%'))
return results[0]['definition'] if results else None
except Exception as e:
logger.error(f"获取术语定义失败: {e}")
return None
def add_term(self, term: str, definition: str, category: str = 'general', priority: int = 3) -> bool:
"""添加新术语"""
try:
query = '''
INSERT OR REPLACE INTO term_dic (term, definition, category, priority)
VALUES (?, ?, ?, ?)
'''
self.execute_update(query, (term, definition, category, priority))
return True
except Exception as e:
logger.error(f"添加术语失败: {e}")
return False
def save_conversation_memory(self, user_id: str, session_id: str,
command_type: str, content: str, importance: int = 1):
"""保存对话记录"""
try:
query = '''
INSERT INTO conversation_memory
(user_id, session_id, command_type, content, importance)
VALUES (?, ?, ?, ?, ?)
'''
self.execute_update(query, (user_id, session_id, command_type, content, importance))
except Exception as e:
logger.error(f"保存对话记录失败: {e}")
def get_recent_conversations(self, user_id: str, command_type: str, limit: int = 10) -> List[Dict]:
"""获取最近的对话记录"""
try:
query = '''
SELECT * FROM conversation_memory
WHERE user_id = ? AND command_type = ?
ORDER BY created_at DESC
LIMIT ?
'''
return self.execute_query(query, (user_id, command_type, limit))
except Exception as e:
logger.error(f"获取对话记录失败: {e}")
return []

View File

@@ -0,0 +1,48 @@
import logging
from typing import Optional, Dict
logger = logging.getLogger(__name__)
class TokenReader:
"""Token文件读取器"""
def __init__(self, token_path: str):
self.token_path = token_path
self._tokens: Optional[Dict[str, str]] = None
def _load_tokens(self) -> Dict[str, str]:
"""加载token文件"""
if self._tokens is not None:
return self._tokens
tokens = {}
try:
with open(self.token_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
tokens[key.strip()] = value.strip()
self._tokens = tokens
logger.info(f"成功加载 {len(tokens)} 个token")
except Exception as e:
logger.error(f"读取token文件失败: {e}")
raise
return tokens
def get_token(self, key: str) -> Optional[str]:
"""获取指定的token"""
tokens = self._load_tokens()
return tokens.get(key)
def get_all_tokens(self) -> Dict[str, str]:
"""获取所有tokens"""
return self._load_tokens().copy()
def reload(self):
"""重新加载token文件"""
self._tokens = None
logger.info("Token文件已重新加载")

View File

@@ -1,238 +1,314 @@
# Claude Router
# Claude API Router
一个智能的Claude API路由器支持Claude Pro和Claude API之间的自动故障转移。当Claude Pro达到使用限制时自动切换到Claude API确保服务的连续性
智能的Claude API路由器提供自动故障转移和负载均衡功能。当主要API服务遇到限制或错误时自动切换到备用提供商
## 功能特性
## 🚀 功能特性
- **自动故障转移**: 检测到速率限制或使用限制时自动切换provider
- **定时健康检查**: 每小时前5分钟自动检测Claude Pro限额恢复
- **智能恢复**: 自动切换回Claude Pro优先使用高级功能
- **手动切换**: 支持手动切换到指定provider
- **兼容Claude Code CLI**: 完全兼容Anthropic API格式
- **Docker化部署**: 一键部署,开箱即用
- **自动故障转移**: 检测到速率限制或错误时自动切换API提供商
- **多提供商支持**: 支持Claude Pro、DeepSeek、Kimi
- **手动切换**: 支持手动强制切换到指定提供商
- **健康检查**: 自动监控和恢复主要提供商
- **完全兼容**: 与Anthropic Claude API完全兼容
- **流式响应**: 支持流式和非流式响应
## 快速开始
## 📋 支持的提供商
### 1. 使用Docker Compose部署
按优先级排序:
1. **Claude Pro** - 最高优先级,首选提供商
2. **DeepSeek** - OpenAI兼容的备用提供商
3. **Kimi** - OpenAI兼容的备用提供商
## 🛠️ 安装和配置
### 1. 环境要求
```bash
# 克隆或进入项目目录
cd /home/will/docker/router
cd router
pip install -r requirements.txt
```
# 构建并启动服务
### 2. 配置API密钥
`/home/will/docker/tokens.txt` 文件中添加以下令牌:
```txt
claude_api_key=your_claude_api_key_here
deepseek_api_key=your_deepseek_api_key_here
kimi_api_key=your_kimi_api_key_here
```
或者设置环境变量:
```bash
export CLAUDE_API_KEY="your_claude_api_key"
export DEEPSEEK_API_KEY="your_deepseek_api_key"
export KIMI_API_KEY="your_kimi_api_key"
```
### 3. 启动服务
```bash
# 开发模式
python app.py
# 生产模式 (使用Docker Compose)
docker-compose up -d
# 查看服务状态
docker-compose ps
```
### 2. 验证服务运行
服务将在 `http://localhost:8000` 启动。
```bash
# 健康检查
curl http://localhost:8000/health
## 🔧 手动切换提供商
# 查看当前状态
curl http://localhost:8000/v1/status
```
### API端点
### 3. 配置Claude Code CLI
```http
POST /v1/switch-provider
Content-Type: application/json
设置环境变量将Claude Code CLI指向路由器
```bash
# 设置API endpoint为路由器地址
export ANTHROPIC_API_URL="http://localhost:8000"
# 添加到bashrc使其永久生效
echo 'export ANTHROPIC_API_URL="http://localhost:8000"' >> ~/.bashrc
# 测试配置
echo "Hello Claude Router" | claude --print
```
**注意**: 无需修改ANTHROPIC_API_KEY路由器会自动处理API密钥。
## API端点
### 主要端点
- `POST /v1/messages` - Claude API消息创建兼容Anthropic API
- `GET /health` - 健康检查
- `GET /v1/status` - 获取路由器状态
- `POST /v1/switch-provider` - 手动切换provider
- `POST /v1/health-check` - 手动触发Claude Pro健康检查
### 健康检查响应示例
```json
{
"status": "healthy",
"current_provider": "claude_pro",
"failover_count": 0,
"last_failover": null,
"last_health_check": "2025-07-14T19:00:00.000Z",
"health_check_failures": 0,
"providers": {
"claude_pro": {"active": true},
"claude_api": {"active": true}
}
"provider": "provider_name"
}
```
## 配置说明
### 可用的提供商名称
### 环境变量
- `claude_pro` - Claude Pro提供商 (默认)
- `deepseek` - DeepSeek提供商
- `kimi` - Kimi提供商
- `CLAUDE_API_KEY`: Claude API密钥
- `ROUTER_HOST`: 服务监听地址(默认: 0.0.0.0
- `ROUTER_PORT`: 服务监听端口(默认: 8000
- `MAX_RETRIES`: 最大重试次数(默认: 3
- `RETRY_DELAY`: 重试延迟(默认: 1.0秒)
### 使用示例
### 健康检查配置
- `health_check_enabled`: 是否启用定时健康检查(默认: true
- `health_check_cron`: 检查时间表达式(默认: "0-4 * * * *" - 每小时前5分钟
- `health_check_message`: 测试消息内容(默认: "ping"
- `health_check_model`: 使用的模型(默认: claude-3-haiku-20240307
### Token文件
路由器会自动从 `/home/will/docker/tokens.txt` 读取API密钥无需手动配置环境变量。
## 故障转移机制
当检测到以下错误时路由器会自动切换到下一个可用的provider
- 429 (Too Many Requests)
- 速率限制错误
- 使用限制达到
- "usage limit reached"相关错误
**优先级顺序**: Claude Pro → Claude API
## 使用示例
### 基本API调用
```bash
curl -X POST http://localhost:8000/v1/messages \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your_api_key" \
-d '{
"model": "claude-3-sonnet-20240229",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "Hello, Claude!"}
]
}'
```
### 手动切换provider
#### curl命令
```bash
# 切换到DeepSeek
curl -X POST http://localhost:8000/v1/switch-provider \
-H "Content-Type: application/json" \
-d '"claude_api"'
-d '{"provider": "deepseek"}'
# 切换到Kimi
curl -X POST http://localhost:8000/v1/switch-provider \
-H "Content-Type: application/json" \
-d '{"provider": "kimi"}'
# 切换回Claude Pro
curl -X POST http://localhost:8000/v1/switch-provider \
-H "Content-Type: application/json" \
-d '{"provider": "claude_pro"}'
```
### 手动健康检查
#### Python脚本
```python
import requests
def switch_provider(provider_name):
"""切换API提供商"""
url = "http://localhost:8000/v1/switch-provider"
data = {"provider": provider_name}
response = requests.post(url, json=data)
if response.status_code == 200:
result = response.json()
print(f"成功切换到: {result['current_provider']}")
return True
else:
print(f"切换失败: {response.text}")
return False
# 使用示例
switch_provider("deepseek")
switch_provider("kimi")
switch_provider("claude_pro")
```
### 查看当前状态
```bash
# 立即检测Claude Pro是否可用
curl -X POST http://localhost:8000/v1/health-check
# 查看当前使用的提供商
curl http://localhost:8000/v1/health
# 查看详细状态
curl http://localhost:8000/v1/status
# 响应示例
{
"status": "healthy",
"current_provider": "claude_pro",
"providers": {
"claude_pro": "available",
"deepseek": "unknown",
"kimi": "unknown"
}
}
```
## 开发和调试
## 🔄 自动故障转移
### 本地开发
路由器会自动检测以下错误并进行故障转移:
```bash
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate
- 速率限制 (Rate limit exceeded)
- 使用配额超额 (Usage limit exceeded)
- HTTP 429错误
- 每日/月度限制达到
- 网络连接错误
# 安装依赖
pip install -r requirements.txt
### 故障转移流程
# 运行应用
python app.py
1. 检测到故障转移触发条件
2. 自动切换到下一个优先级提供商
3. 重试请求
4. 记录切换日志
5. 定期尝试恢复到主要提供商
## 🩺 健康检查和自动恢复
- **自动检查频率**: 每小时的前5分钟
- **检查内容**: 向Claude Pro发送测试请求
- **自动恢复**: 如果Claude Pro恢复可用自动切换回去
## 🔌 API兼容性
路由器完全兼容Anthropic Claude API支持
- 所有Claude模型 (claude-3-sonnet, claude-3-opus等)
- 流式和非流式响应
- 系统消息和用户消息
- 所有API参数 (max_tokens, temperature等)
### 使用示例
```python
import requests
# 标准Claude API调用
url = "http://localhost:8000/v1/messages"
headers = {
"Content-Type": "application/json",
"x-api-key": "your_claude_api_key"
}
data = {
"model": "claude-3-sonnet-20240229",
"max_tokens": 1000,
"messages": [
{"role": "user", "content": "Hello, Claude!"}
]
}
response = requests.post(url, headers=headers, json=data)
print(response.json())
```
### 查看日志
```bash
# Docker容器日志
docker-compose logs -f claude-router
# 实时日志
docker logs -f claude-router
```
## 故障排除
## 🐛 故障排除
### 常见问题
1. **服务无法启动**
- 检查tokens.txt文件是否存在且格式正确
- 确认端口8000未被占用
#### 1. 自动切换不工作
2. **API调用失败**
- 验证API密钥是否有效
- 检查网络连接到api.anthropic.com
**问题**: 遇到限制时不自动切换
3. **自动切换不工作**
- 查看日志确认错误检测逻辑
- 确认backup provider配置正确
**解决方案**:
```bash
# 手动切换到备用提供商
curl -X POST http://localhost:8000/v1/switch-provider \
-H "Content-Type: application/json" \
-d '{"provider": "deepseek"}'
```
### 监控
#### 2. API密钥错误
- 健康检查: `http://localhost:8000/health`
- 状态监控: `http://localhost:8000/v1/status`
- Docker健康检查: `docker inspect claude-router`
**问题**: 提示API密钥无效
## 技术架构
**解决方案**:
1. 检查 `tokens.txt` 文件中的密钥格式
2. 确保密钥没有多余的空格或换行
3. 验证密钥是否有效
- **框架**: FastAPI + Uvicorn
- **HTTP客户端**: httpx
- **AI库**: anthropic
- **容器化**: Docker + Docker Compose
- **配置管理**: pydantic + python-dotenv
#### 3. 服务无法启动
## 版本信息
**问题**: 路由器启动失败
- 版本: 1.0.0 (MVP)
- Python: 3.11+
- 支持: Claude-3 系列模型
**解决方案**:
```bash
# 检查日志
docker-compose logs router
## 更新日志
# 手动启动调试
cd router
python app.py
```
### v1.1.0 (2025-07-14)
- ✅ 添加定时健康检查功能
- ✅ 每小时前5分钟自动检测Claude Pro限额恢复
- ✅ 智能自动切换回Claude Pro
- ✅ 新增手动健康检查API
- ✅ 完善日志记录和状态监控
### 调试模式
### v1.0.0 (2025-07-14)
- ✅ 基础路由器功能
- ✅ Claude Pro到Claude API自动故障转移
- ✅ Docker容器化部署
- ✅ Claude Code CLI兼容性
启用详细日志:
## 后续开发计划
```python
import logging
logging.basicConfig(level=logging.DEBUG)
```
- [ ] 添加DeepSeek API支持
- [ ] 添加ChatGPT API支持
- [ ] 实现请求统计和监控面板
- [ ] 添加请求缓存功能
- [ ] 支持负载均衡
- [ ] 集成Kimi v2 API
## 📊 监控和日志
## 许可证
### 日志级别
MIT License
- `INFO`: 正常操作日志
- `WARNING`: 提供商切换警告
- `ERROR`: 错误和故障转移事件
### 关键日志信息
- 提供商切换事件
- API请求失败
- 健康检查结果
- 自动恢复操作
## ⚙️ 高级配置
### 自定义故障转移条件
编辑 `app.py` 中的 `should_failover` 方法:
```python
def should_failover(self, error_message: str) -> bool:
# 添加自定义错误检测条件
custom_indicators = [
"your_custom_error",
"specific_provider_error"
]
return any(indicator in error_message.lower()
for indicator in custom_indicators)
```
### 调整健康检查频率
修改 `app.py` 中的定时任务:
```python
# 每30分钟检查一次
@scheduler.cron("*/30 * * * *")
async def health_check_claude_pro():
# ... 健康检查逻辑
```
## 🤝 SillyTavern集成
在SillyTavern中使用此路由器
1. **API URL**: `http://localhost:8000/v1/messages`
2. **API Key**: 使用你的Claude API密钥
3. **模型**: 选择任何Claude模型
路由器会自动处理故障转移对SillyTavern完全透明。
## 📄 许可证
MIT License
## 🆘 获取帮助
如果遇到问题:
1. 检查日志文件
2. 验证API密钥配置
3. 尝试手动切换提供商
4. 查看健康检查状态
需要进一步帮助请查看代码注释或联系开发者。

View File

@@ -26,11 +26,11 @@ class ClaudeRouter:
self.last_health_check = None
self.health_check_failures = 0
self.scheduler = None
# 按优先级顺序排列Claude Pro > DeepSeek > Kimi > Claude API
# 按优先级顺序排列Claude Pro > DeepSeek > Kimi (removed claude_api)
from collections import OrderedDict
self.providers = OrderedDict([
("claude_pro", {
"api_key": config.claude_pro_api_key,
"api_key": config.claude_api_key, # Use claude_api_key for claude_pro
"base_url": config.claude_pro_base_url,
"type": "anthropic",
"active": True
@@ -46,12 +46,6 @@ class ClaudeRouter:
"base_url": config.kimi_base_url,
"type": "openai",
"active": True
}),
("claude_api", {
"api_key": config.claude_api_key,
"base_url": config.claude_api_base_url,
"type": "anthropic",
"active": True
})
])
@@ -220,19 +214,21 @@ class ClaudeRouter:
return response
async def health_check_claude_pro(self):
"""Check if Claude Pro is available again"""
# Only check if we're not currently using Claude Pro
if self.current_provider == "claude_pro":
logger.debug("Skipping health check - already using Claude Pro")
async def health_check_primary_provider(self):
"""Check if primary provider (claude_pro) is available again"""
primary_provider = "claude_pro"
# Only check if we're not currently using the primary provider
if self.current_provider == primary_provider:
logger.debug(f"Skipping health check - already using {primary_provider}")
return
logger.info("Running Claude Pro health check...")
logger.info(f"Running {primary_provider} health check...")
self.last_health_check = datetime.now()
try:
client = Anthropic(
api_key=config.claude_pro_api_key,
api_key=config.claude_api_key, # Use claude_api_key for claude_pro
base_url=config.claude_pro_base_url
)
@@ -253,21 +249,21 @@ class ClaudeRouter:
prompt=f"Human: {config.health_check_message}\n\nAssistant:"
)
# If successful, switch back to Claude Pro
# If successful, switch back to primary provider
old_provider = self.current_provider
self.current_provider = "claude_pro"
self.current_provider = primary_provider
self.health_check_failures = 0
logger.info(f"Claude Pro health check successful! Switched from {old_provider} to claude_pro")
logger.info(f"{primary_provider} health check successful! Switched from {old_provider} to {primary_provider}")
except Exception as e:
self.health_check_failures += 1
error_str = str(e).lower()
if any(indicator in error_str for indicator in ["rate_limit", "usage limit", "quota exceeded", "429", "too many requests", "limit reached"]):
logger.info(f"Claude Pro still rate limited: {str(e)}")
logger.info(f"{primary_provider} still rate limited: {str(e)}")
else:
logger.warning(f"Claude Pro health check failed (attempt {self.health_check_failures}): {str(e)}")
logger.warning(f"{primary_provider} health check failed (attempt {self.health_check_failures}): {str(e)}")
def start_scheduler(self):
"""Start the health check scheduler"""

View File

@@ -18,7 +18,7 @@ class Config(BaseModel):
retry_delay: float = 1.0
# API endpoints
claude_pro_base_url: str = "https://api.anthropic.com"
claude_pro_base_url: str = "https://api.anthropic.com" # Claude Pro might use different endpoint in future
claude_api_base_url: str = "https://api.anthropic.com"
deepseek_base_url: str = "https://api.deepseek.com"
kimi_base_url: str = "https://api.moonshot.ai"
@@ -57,8 +57,8 @@ class Config(BaseModel):
except FileNotFoundError:
pass
# For MVP, we'll use the same API key for both pro and regular
# In practice, Claude Pro might use a different endpoint or key
# Claude Pro uses the same API key as regular Claude API
# but may use different endpoint or have different rate limits
self.claude_pro_api_key = self.claude_api_key
# Global config instance