dify_admin/api/encryption.py

146 lines
5.4 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes
import base64
import os
from config import CONFIG_PATHS
from libs import gmpy2_pkcs10aep_cipher
import logging
# 配置日志
logger = logging.getLogger(__name__)
class Encryption:
"""加密工具类"""
PREFIX_HYBRID = b"HYBRID:"
@staticmethod
def load_public_key(public_key_path_or_content):
"""加载公钥"""
if public_key_path_or_content is None:
logger.error("公钥路径或内容不能为空")
raise ValueError("公钥路径或内容不能为空")
try:
if isinstance(public_key_path_or_content, str) and os.path.exists(public_key_path_or_content):
with open(public_key_path_or_content, "rb") as f:
public_key = f.read()
else:
# 假设输入的是公钥内容
public_key = public_key_path_or_content.encode() if isinstance(public_key_path_or_content, str) else public_key_path_or_content
if not public_key:
logger.error("公钥内容为空")
raise ValueError("公钥内容为空")
return public_key
except Exception as e:
logger.error(f"加载公钥失败: {e}")
raise
@staticmethod
def load_private_key(private_key_path):
"""加载私钥"""
try:
with open(private_key_path, "rb") as f:
private_key = f.read()
return private_key
except Exception as e:
logger.error(f"加载私钥失败: {e}")
raise
@staticmethod
def encrypt(text, public_key):
"""使用混合加密方式RSA + AES加密文本"""
if text is None or text == "":
logger.error("待加密文本不能为空")
raise ValueError("待加密文本不能为空")
if public_key is None:
logger.error("公钥不能为空")
raise ValueError("公钥不能为空")
try:
if isinstance(public_key, str):
public_key = public_key.encode()
# 生成随机AES密钥
aes_key = get_random_bytes(16)
cipher_aes = AES.new(aes_key, AES.MODE_EAX)
# 使用AES加密文本
ciphertext, tag = cipher_aes.encrypt_and_digest(text.encode())
# 使用RSA加密AES密钥
rsa_key = RSA.import_key(public_key)
cipher_rsa = gmpy2_pkcs10aep_cipher.new(rsa_key)
enc_aes_key = cipher_rsa.encrypt(aes_key)
# 组合加密结果
encrypted_data = enc_aes_key + cipher_aes.nonce + tag + ciphertext
return Encryption.PREFIX_HYBRID + encrypted_data
except Exception as e:
logger.error(f"加密失败: {e}")
raise
@staticmethod
def decrypt(encrypted_text, private_key):
"""解密加密后的文本"""
if encrypted_text is None or encrypted_text == b"":
logger.error("待解密文本不能为空")
raise ValueError("待解密文本不能为空")
if private_key is None:
logger.error("私钥不能为空")
raise ValueError("私钥不能为空")
try:
# 加载私钥
rsa_key = RSA.import_key(private_key)
cipher_rsa = gmpy2_pkcs10aep_cipher.new(rsa_key)
# 解密
if encrypted_text.startswith(Encryption.PREFIX_HYBRID):
encrypted_text = encrypted_text[len(Encryption.PREFIX_HYBRID):]
if len(encrypted_text) < rsa_key.size_in_bytes() + 32:
logger.error("加密数据格式不正确")
raise ValueError("加密数据格式不正确")
enc_aes_key = encrypted_text[:rsa_key.size_in_bytes()]
nonce = encrypted_text[rsa_key.size_in_bytes():rsa_key.size_in_bytes() + 16]
tag = encrypted_text[rsa_key.size_in_bytes() + 16:rsa_key.size_in_bytes() + 32]
ciphertext = encrypted_text[rsa_key.size_in_bytes() + 32:]
aes_key = cipher_rsa.decrypt(enc_aes_key)
cipher_aes = AES.new(aes_key, AES.MODE_EAX, nonce=nonce)
decrypted_text = cipher_aes.decrypt_and_verify(ciphertext, tag)
else:
decrypted_text = cipher_rsa.decrypt(encrypted_text)
return decrypted_text.decode()
except Exception as e:
logger.error(f"解密失败: {e}")
raise
@staticmethod
def encrypt_api_key(public_key_pem, api_key):
"""加密API密钥并返回base64编码的结果"""
if api_key is None or api_key == "":
logger.error("API密钥不能为空")
raise ValueError("API密钥不能为空")
if public_key_pem is None:
logger.error("公钥不能为空")
raise ValueError("公钥不能为空")
try:
encrypted_api_key = Encryption.encrypt(api_key, public_key_pem)
return base64.b64encode(encrypted_api_key).decode()
except Exception as e:
logger.error(f"加密API密钥失败: {e}")
raise