from Crypto.Cipher import AES from Crypto.PublicKey import RSA from Crypto.Random import get_random_bytes import base64 import os from config import CONFIG_PATHS from libs import gmpy2_pkcs10aep_cipher import logging # 配置日志 logger = logging.getLogger(__name__) class Encryption: """加密工具类""" PREFIX_HYBRID = b"HYBRID:" @staticmethod def load_public_key(public_key_path_or_content): """加载公钥""" if public_key_path_or_content is None: logger.error("公钥路径或内容不能为空") raise ValueError("公钥路径或内容不能为空") try: if isinstance(public_key_path_or_content, str) and os.path.exists(public_key_path_or_content): with open(public_key_path_or_content, "rb") as f: public_key = f.read() else: # 假设输入的是公钥内容 public_key = public_key_path_or_content.encode() if isinstance(public_key_path_or_content, str) else public_key_path_or_content if not public_key: logger.error("公钥内容为空") raise ValueError("公钥内容为空") return public_key except Exception as e: logger.error(f"加载公钥失败: {e}") raise @staticmethod def load_private_key(private_key_path): """加载私钥""" try: with open(private_key_path, "rb") as f: private_key = f.read() return private_key except Exception as e: logger.error(f"加载私钥失败: {e}") raise @staticmethod def encrypt(text, public_key): """使用混合加密方式(RSA + AES)加密文本""" if text is None or text == "": logger.error("待加密文本不能为空") raise ValueError("待加密文本不能为空") if public_key is None: logger.error("公钥不能为空") raise ValueError("公钥不能为空") try: if isinstance(public_key, str): public_key = public_key.encode() # 生成随机AES密钥 aes_key = get_random_bytes(16) cipher_aes = AES.new(aes_key, AES.MODE_EAX) # 使用AES加密文本 ciphertext, tag = cipher_aes.encrypt_and_digest(text.encode()) # 使用RSA加密AES密钥 rsa_key = RSA.import_key(public_key) cipher_rsa = gmpy2_pkcs10aep_cipher.new(rsa_key) enc_aes_key = cipher_rsa.encrypt(aes_key) # 组合加密结果 encrypted_data = enc_aes_key + cipher_aes.nonce + tag + ciphertext return Encryption.PREFIX_HYBRID + encrypted_data except Exception as e: logger.error(f"加密失败: {e}") raise @staticmethod def decrypt(encrypted_text, private_key): """解密加密后的文本""" if encrypted_text is None or encrypted_text == b"": logger.error("待解密文本不能为空") raise ValueError("待解密文本不能为空") if private_key is None: logger.error("私钥不能为空") raise ValueError("私钥不能为空") try: # 加载私钥 rsa_key = RSA.import_key(private_key) cipher_rsa = gmpy2_pkcs10aep_cipher.new(rsa_key) # 解密 if encrypted_text.startswith(Encryption.PREFIX_HYBRID): encrypted_text = encrypted_text[len(Encryption.PREFIX_HYBRID):] if len(encrypted_text) < rsa_key.size_in_bytes() + 32: logger.error("加密数据格式不正确") raise ValueError("加密数据格式不正确") enc_aes_key = encrypted_text[:rsa_key.size_in_bytes()] nonce = encrypted_text[rsa_key.size_in_bytes():rsa_key.size_in_bytes() + 16] tag = encrypted_text[rsa_key.size_in_bytes() + 16:rsa_key.size_in_bytes() + 32] ciphertext = encrypted_text[rsa_key.size_in_bytes() + 32:] aes_key = cipher_rsa.decrypt(enc_aes_key) cipher_aes = AES.new(aes_key, AES.MODE_EAX, nonce=nonce) decrypted_text = cipher_aes.decrypt_and_verify(ciphertext, tag) else: decrypted_text = cipher_rsa.decrypt(encrypted_text) return decrypted_text.decode() except Exception as e: logger.error(f"解密失败: {e}") raise @staticmethod def encrypt_api_key(public_key_pem, api_key): """加密API密钥并返回base64编码的结果""" if api_key is None or api_key == "": logger.error("API密钥不能为空") raise ValueError("API密钥不能为空") if public_key_pem is None: logger.error("公钥不能为空") raise ValueError("公钥不能为空") try: encrypted_api_key = Encryption.encrypt(api_key, public_key_pem) return base64.b64encode(encrypted_api_key).decode() except Exception as e: logger.error(f"加密API密钥失败: {e}") raise