1. 添加README说明项目结构 2. 配置Python和Node.js的.gitignore 3. 包含认证模块和账号管理的前后端基础代码 4. 开发计划文档记录当前阶段任务
109 lines
3.8 KiB
Python
109 lines
3.8 KiB
Python
import sqlite3
|
|
import uuid
|
|
import hashlib
|
|
import secrets
|
|
import base64
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class BackendAccountManager:
|
|
"""后台服务账号管理类(使用SQLite)"""
|
|
|
|
def __init__(self):
|
|
self.db_path = "api_service.db"
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""初始化数据库表"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS backend_users (
|
|
id TEXT PRIMARY KEY,
|
|
username TEXT UNIQUE NOT NULL,
|
|
email TEXT UNIQUE NOT NULL,
|
|
password TEXT NOT NULL,
|
|
password_salt TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
conn.commit()
|
|
|
|
def hash_password(self, password, salt=None):
|
|
"""生成密码哈希"""
|
|
if salt is None:
|
|
salt = secrets.token_bytes(16)
|
|
salt_b64 = base64.b64encode(salt).decode()
|
|
dk = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100000)
|
|
hashed = base64.b64encode(dk).decode()
|
|
return hashed, salt_b64
|
|
|
|
def create_account(self, username, email, password):
|
|
"""创建后台账号"""
|
|
try:
|
|
user_id = str(uuid.uuid4())
|
|
hashed_pw, salt = self.hash_password(password)
|
|
now = datetime.now().isoformat()
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO backend_users
|
|
(id, username, email, password, password_salt, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (user_id, username, email, hashed_pw, salt, now, now))
|
|
conn.commit()
|
|
|
|
logger.info(f"后台账号创建成功 - ID: {user_id}, 用户名: {username}, 邮箱: {email}")
|
|
return {
|
|
"id": user_id,
|
|
"username": username,
|
|
"email": email,
|
|
"created_at": now
|
|
}
|
|
except sqlite3.IntegrityError as e:
|
|
logger.error(f"账号已存在: {e}")
|
|
raise ValueError("用户名或邮箱已存在")
|
|
except Exception as e:
|
|
logger.error(f"创建账号失败: {e}")
|
|
raise
|
|
|
|
def get_user_by_username(self, username):
|
|
"""根据用户名获取用户"""
|
|
try:
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT id, username, email, password, password_salt, created_at
|
|
FROM backend_users WHERE username = ?
|
|
""", (username,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
return {
|
|
"id": row[0],
|
|
"username": row[1],
|
|
"email": row[2],
|
|
"password": row[3],
|
|
"password_salt": row[4],
|
|
"created_at": row[5]
|
|
}
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"查询用户失败: {e}")
|
|
raise
|
|
|
|
def verify_password(self, plain_password, hashed_password, salt):
|
|
"""验证密码"""
|
|
try:
|
|
salt_bytes = base64.b64decode(salt)
|
|
dk = hashlib.pbkdf2_hmac('sha256', plain_password.encode(), salt_bytes, 100000)
|
|
input_hashed = base64.b64encode(dk).decode()
|
|
return input_hashed == hashed_password
|
|
except Exception as e:
|
|
logger.error(f"密码验证失败: {e}")
|
|
return False
|