Files
docker-configs/discord_bot/utils/database.py
Will Song 568dcc45e4 Fix router failover and add Discord bot
- Remove duplicate claude_api provider to fix automatic failover
- Enhance error detection with HTTP status codes and more indicators
- Add comprehensive README documentation with manual switching
- Implement Discord bot with Claude Code CLI integration
- Support /terminal and /claude commands with AI-powered responses

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-16 21:35:52 -05:00

216 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
import logging
import os
from typing import Optional, List, Dict, Any
from pathlib import Path
logger = logging.getLogger(__name__)
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_path: str = "/app/databases/discord_bot.db"):
self.db_path = db_path
self.ensure_database_exists()
def ensure_database_exists(self):
"""确保数据库和表存在"""
# 确保目录存在
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
# 创建数据库和表
self.init_database()
def init_database(self):
"""初始化数据库表结构"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 创建toy_balance表 - ALP交易确认记录
cursor.execute('''
CREATE TABLE IF NOT EXISTS toy_balance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_id TEXT UNIQUE,
date DATE,
amount REAL,
merchant TEXT,
description TEXT,
is_toy_fund BOOLEAN, -- true=Ginbo, false=Family
confirmed_by TEXT, -- Discord用户ID
confirmed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建portfolio表 - 投资组合数据
cursor.execute('''
CREATE TABLE IF NOT EXISTS portfolio (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
account_name TEXT,
shares REAL,
avg_cost REAL,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建term_dic表 - 术语字典
cursor.execute('''
CREATE TABLE IF NOT EXISTS term_dic (
id INTEGER PRIMARY KEY AUTOINCREMENT,
term TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL,
category TEXT DEFAULT 'general',
priority INTEGER DEFAULT 1, -- 1=highest priority
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建conversation_memory表 - 对话记录
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversation_memory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
session_id TEXT,
command_type TEXT, -- terminal, terminal_fin, etc.
content TEXT,
importance INTEGER DEFAULT 1, -- 重要性权重 1-5
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建command_log表 - 命令执行日志
cursor.execute('''
CREATE TABLE IF NOT EXISTS command_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
command TEXT NOT NULL,
command_type TEXT,
execution_status TEXT, -- success, error, denied
result_summary TEXT,
execution_time REAL, -- 执行时间(秒)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 插入一些默认的术语定义
self._insert_default_terms(cursor)
conn.commit()
logger.info("数据库初始化完成")
except Exception as e:
logger.error(f"数据库初始化失败: {e}")
raise
def _insert_default_terms(self, cursor):
"""插入默认术语定义"""
default_terms = [
('docker ps', '查看当前运行的Docker容器', 'docker', 1),
('docker-compose', '使用docker-compose.yml文件管理多容器应用', 'docker', 1),
('容器重启', 'docker restart <container_name>', 'docker', 2),
('查看日志', 'docker logs <container_name>', 'docker', 2),
('~/docker', '项目根目录包含所有Docker项目', 'path', 1),
('FinancialReport', '金融数据处理项目生成PDF报告', 'project', 1),
('toy_fund', 'Ginbo个人资金在toy_balance表中is_toy_fund=true', 'finance', 1),
('Family', '家庭共同资金在toy_balance表中is_toy_fund=false', 'finance', 1),
]
for term, definition, category, priority in default_terms:
cursor.execute('''
INSERT OR IGNORE INTO term_dic (term, definition, category, priority)
VALUES (?, ?, ?, ?)
''', (term, definition, category, priority))
def execute_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""执行查询并返回结果"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row # 返回字典格式
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"查询执行失败: {e}")
raise
def execute_update(self, query: str, params: tuple = ()) -> int:
"""执行更新操作并返回影响的行数"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
return cursor.rowcount
except Exception as e:
logger.error(f"更新执行失败: {e}")
raise
def log_command(self, user_id: str, command: str, command_type: str,
status: str, result_summary: str = "", execution_time: float = 0.0):
"""记录命令执行日志"""
try:
query = '''
INSERT INTO command_log
(user_id, command, command_type, execution_status, result_summary, execution_time)
VALUES (?, ?, ?, ?, ?, ?)
'''
self.execute_update(query, (user_id, command, command_type, status, result_summary, execution_time))
except Exception as e:
logger.error(f"记录命令日志失败: {e}")
def get_term_definition(self, term: str) -> Optional[str]:
"""获取术语定义"""
try:
query = '''
SELECT definition FROM term_dic
WHERE term = ? OR term LIKE ?
ORDER BY priority ASC
LIMIT 1
'''
results = self.execute_query(query, (term, f'%{term}%'))
return results[0]['definition'] if results else None
except Exception as e:
logger.error(f"获取术语定义失败: {e}")
return None
def add_term(self, term: str, definition: str, category: str = 'general', priority: int = 3) -> bool:
"""添加新术语"""
try:
query = '''
INSERT OR REPLACE INTO term_dic (term, definition, category, priority)
VALUES (?, ?, ?, ?)
'''
self.execute_update(query, (term, definition, category, priority))
return True
except Exception as e:
logger.error(f"添加术语失败: {e}")
return False
def save_conversation_memory(self, user_id: str, session_id: str,
command_type: str, content: str, importance: int = 1):
"""保存对话记录"""
try:
query = '''
INSERT INTO conversation_memory
(user_id, session_id, command_type, content, importance)
VALUES (?, ?, ?, ?, ?)
'''
self.execute_update(query, (user_id, session_id, command_type, content, importance))
except Exception as e:
logger.error(f"保存对话记录失败: {e}")
def get_recent_conversations(self, user_id: str, command_type: str, limit: int = 10) -> List[Dict]:
"""获取最近的对话记录"""
try:
query = '''
SELECT * FROM conversation_memory
WHERE user_id = ? AND command_type = ?
ORDER BY created_at DESC
LIMIT ?
'''
return self.execute_query(query, (user_id, command_type, limit))
except Exception as e:
logger.error(f"获取对话记录失败: {e}")
return []