1. 添加README说明项目结构 2. 配置Python和Node.js的.gitignore 3. 包含认证模块和账号管理的前后端基础代码 4. 开发计划文档记录当前阶段任务
146 lines
5.4 KiB
Python
146 lines
5.4 KiB
Python
from Crypto.Cipher import AES
|
||
from Crypto.PublicKey import RSA
|
||
from Crypto.Random import get_random_bytes
|
||
import base64
|
||
import os
|
||
from config import CONFIG_PATHS
|
||
from libs import gmpy2_pkcs10aep_cipher
|
||
import logging
|
||
|
||
# 配置日志
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class Encryption:
|
||
"""加密工具类"""
|
||
|
||
PREFIX_HYBRID = b"HYBRID:"
|
||
|
||
@staticmethod
|
||
def load_public_key(public_key_path_or_content):
|
||
"""加载公钥"""
|
||
if public_key_path_or_content is None:
|
||
logger.error("公钥路径或内容不能为空")
|
||
raise ValueError("公钥路径或内容不能为空")
|
||
|
||
try:
|
||
if isinstance(public_key_path_or_content, str) and os.path.exists(public_key_path_or_content):
|
||
with open(public_key_path_or_content, "rb") as f:
|
||
public_key = f.read()
|
||
else:
|
||
# 假设输入的是公钥内容
|
||
public_key = public_key_path_or_content.encode() if isinstance(public_key_path_or_content, str) else public_key_path_or_content
|
||
|
||
if not public_key:
|
||
logger.error("公钥内容为空")
|
||
raise ValueError("公钥内容为空")
|
||
|
||
return public_key
|
||
except Exception as e:
|
||
logger.error(f"加载公钥失败: {e}")
|
||
raise
|
||
|
||
@staticmethod
|
||
def load_private_key(private_key_path):
|
||
"""加载私钥"""
|
||
try:
|
||
with open(private_key_path, "rb") as f:
|
||
private_key = f.read()
|
||
return private_key
|
||
except Exception as e:
|
||
logger.error(f"加载私钥失败: {e}")
|
||
raise
|
||
|
||
@staticmethod
|
||
def encrypt(text, public_key):
|
||
"""使用混合加密方式(RSA + AES)加密文本"""
|
||
if text is None or text == "":
|
||
logger.error("待加密文本不能为空")
|
||
raise ValueError("待加密文本不能为空")
|
||
|
||
if public_key is None:
|
||
logger.error("公钥不能为空")
|
||
raise ValueError("公钥不能为空")
|
||
|
||
try:
|
||
if isinstance(public_key, str):
|
||
public_key = public_key.encode()
|
||
|
||
# 生成随机AES密钥
|
||
aes_key = get_random_bytes(16)
|
||
cipher_aes = AES.new(aes_key, AES.MODE_EAX)
|
||
|
||
# 使用AES加密文本
|
||
ciphertext, tag = cipher_aes.encrypt_and_digest(text.encode())
|
||
|
||
# 使用RSA加密AES密钥
|
||
rsa_key = RSA.import_key(public_key)
|
||
cipher_rsa = gmpy2_pkcs10aep_cipher.new(rsa_key)
|
||
enc_aes_key = cipher_rsa.encrypt(aes_key)
|
||
|
||
# 组合加密结果
|
||
encrypted_data = enc_aes_key + cipher_aes.nonce + tag + ciphertext
|
||
|
||
return Encryption.PREFIX_HYBRID + encrypted_data
|
||
except Exception as e:
|
||
logger.error(f"加密失败: {e}")
|
||
raise
|
||
|
||
@staticmethod
|
||
def decrypt(encrypted_text, private_key):
|
||
"""解密加密后的文本"""
|
||
if encrypted_text is None or encrypted_text == b"":
|
||
logger.error("待解密文本不能为空")
|
||
raise ValueError("待解密文本不能为空")
|
||
|
||
if private_key is None:
|
||
logger.error("私钥不能为空")
|
||
raise ValueError("私钥不能为空")
|
||
|
||
try:
|
||
# 加载私钥
|
||
rsa_key = RSA.import_key(private_key)
|
||
cipher_rsa = gmpy2_pkcs10aep_cipher.new(rsa_key)
|
||
|
||
# 解密
|
||
if encrypted_text.startswith(Encryption.PREFIX_HYBRID):
|
||
encrypted_text = encrypted_text[len(Encryption.PREFIX_HYBRID):]
|
||
|
||
if len(encrypted_text) < rsa_key.size_in_bytes() + 32:
|
||
logger.error("加密数据格式不正确")
|
||
raise ValueError("加密数据格式不正确")
|
||
|
||
enc_aes_key = encrypted_text[:rsa_key.size_in_bytes()]
|
||
nonce = encrypted_text[rsa_key.size_in_bytes():rsa_key.size_in_bytes() + 16]
|
||
tag = encrypted_text[rsa_key.size_in_bytes() + 16:rsa_key.size_in_bytes() + 32]
|
||
ciphertext = encrypted_text[rsa_key.size_in_bytes() + 32:]
|
||
|
||
aes_key = cipher_rsa.decrypt(enc_aes_key)
|
||
|
||
cipher_aes = AES.new(aes_key, AES.MODE_EAX, nonce=nonce)
|
||
decrypted_text = cipher_aes.decrypt_and_verify(ciphertext, tag)
|
||
else:
|
||
decrypted_text = cipher_rsa.decrypt(encrypted_text)
|
||
|
||
return decrypted_text.decode()
|
||
except Exception as e:
|
||
logger.error(f"解密失败: {e}")
|
||
raise
|
||
|
||
@staticmethod
|
||
def encrypt_api_key(public_key_pem, api_key):
|
||
"""加密API密钥并返回base64编码的结果"""
|
||
if api_key is None or api_key == "":
|
||
logger.error("API密钥不能为空")
|
||
raise ValueError("API密钥不能为空")
|
||
|
||
if public_key_pem is None:
|
||
logger.error("公钥不能为空")
|
||
raise ValueError("公钥不能为空")
|
||
|
||
try:
|
||
encrypted_api_key = Encryption.encrypt(api_key, public_key_pem)
|
||
return base64.b64encode(encrypted_api_key).decode()
|
||
except Exception as e:
|
||
logger.error(f"加密API密钥失败: {e}")
|
||
raise
|