import os import yaml import openpyxl import requests import logging from datetime import datetime from openpyxl import Workbook from openpyxl.styles import Font # API配置 API_KEY = "app-m7XGgbTe3BVHmA1TAYg9Ec4v" # Dify API Key WORKFLOW_ID = os.getenv("DIFY_WORKFLOW_ID", "your-workflow-id-here") # 工作流ID API_BASE_URL = "http://192.168.100.143/v1" # API基础地址 FILE_UPLOAD_URL = f"{API_BASE_URL}/files/upload" # 文件上传地址 EXCEL_PATH = "ai作业/AI考试作业.xlsx" ASSIGNMENT_DIR = "ai作业/作业" OUTPUT_DIR = "results" OUTPUT_FILE = os.path.join(OUTPUT_DIR, "批改结果.xlsx") # 日志配置 LOG_DIR = os.path.join(OUTPUT_DIR, "logs") os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE = os.path.join(LOG_DIR, f"grading_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 确保输出目录存在 os.makedirs(OUTPUT_DIR, exist_ok=True) def read_excel_submissions(): """读取Excel中的作业提交记录""" wb = openpyxl.load_workbook(EXCEL_PATH) ws = wb.active submissions = [] # 获取标题行确定列索引 headers = [cell.value for cell in ws[1]] name_col = headers.index('填写人') workflow_col = headers.index('工作流程描述') solution_col = headers.index('Dify工作流解决方案设计') for row in ws.iter_rows(min_row=2, values_only=True): if len(row) > max(name_col, workflow_col, solution_col): # 确保有足够列 submissions.append({ 'name': row[name_col], 'work_description': row[workflow_col], 'solution': row[solution_col] }) return submissions def check_yml_files(submissions): """检查Excel中的提交记录是否有对应的YML文件""" missing_files = [] for sub in submissions: yml_path = find_assignment_yml(sub['name']) if not yml_path: missing_files.append(sub['name']) if missing_files: print("\n以下提交记录缺少对应的YML文件:") for name in missing_files: print(f"- {name}") def find_assignment_yml(name): """根据姓名查找对应的YML作业文件""" for filename in os.listdir(ASSIGNMENT_DIR): if filename.endswith('.yml'): # 仅支持.yml格式 # 从文件名中提取姓名部分(第二个下划线分隔的部分) parts = filename.split('_') if len(parts) >= 2: # 处理可能存在的@符号 student_name = parts[1].split('@')[0] if name == student_name: return os.path.join(ASSIGNMENT_DIR, filename) return None def parse_yml_file(yml_path): """解析YML文件内容""" with open(yml_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def call_dify_api(yml_path, assignment_data): """调用Dify API进行作业批改,上传YML文件""" # 先上传文件 upload_headers = { "Authorization": f"Bearer {API_KEY}" } upload_data = { "user": "ai-grading-system", "type": "yml" # 自定义文件类型 } try: # 上传文件 with open(yml_path, 'rb') as f: files = {'file': (os.path.basename(yml_path), f, 'text/plain')} upload_response = requests.post( FILE_UPLOAD_URL, headers=upload_headers, files=files, data=upload_data ) upload_response.raise_for_status() file_id = upload_response.json().get('id') if not file_id: print("文件上传失败: 未获取到文件ID") return None # 执行工作流 run_url = f"{API_BASE_URL}/workflows/run" run_headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } run_data = { "inputs": { "yml_file": { "transfer_method": "local_file", "upload_file_id": file_id, "type": "custom" }, "work_description": assignment_data.get('work_description', ''), "solution": assignment_data.get('solution', '') }, "response_mode": "blocking", "user": "ai-grading-system" } response = requests.post(run_url, headers=run_headers, json=run_data) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"API调用失败: {e}") return None def save_result(result, wb=None, is_regrade=False): """保存单个批改结果到Excel Args: result: 批改结果字典 wb: 工作簿对象(可选) is_regrade: 是否为复批(默认False) """ try: # 根据是否为复批确定输出文件路径 output_file = os.path.join(OUTPUT_DIR, "复批结果.xlsx") if is_regrade else OUTPUT_FILE # 如果工作簿不存在则创建 if wb is None: wb = Workbook() ws = wb.active ws.title = "复批结果" if is_regrade else "批改结果" # 添加表头 headers = ["姓名", "评分", "评分详情"] ws.append(headers) # 设置表头样式 for cell in ws[1]: cell.font = Font(bold=True) else: ws = wb.active # 格式化详情 details = result.get('details', {}) if isinstance(details, dict): formatted_details = [] # 添加总分 formatted_details.append(f"总分: {details.get('total_score', 0)}分") # 添加各部分评分 for section, data in details.get('sections', {}).items(): formatted_details.append(f"\n{section} ({data.get('score', 0)}分)") # 添加优点 if details.get('advantages'): formatted_details.append("\n优点:") formatted_details.extend([f"- {adv}" for adv in details['advantages']]) # 添加不足 if details.get('disadvantages'): formatted_details.append("\n不足之处:") formatted_details.extend([f"- {dis}" for dis in details['disadvantages']]) # 添加建议 if details.get('suggestions'): formatted_details.append("\n改进建议:") formatted_details.extend([f"- {sug}" for sug in details['suggestions']]) details_text = "\n".join(formatted_details) else: details_text = str(details) # 添加数据行 try: # 确保details是字符串格式 if isinstance(details_text, dict): details_text = str(details_text) ws.append([ result['name'], result.get('score', 'N/A'), details_text ]) except Exception as e: print(f"添加数据行失败: {e}") # 尝试简化格式保存 ws.append([ result['name'], result.get('score', 'N/A'), str(result.get('details', ''))[:32767] # Excel单元格最大长度限制 ]) # 自动调整列宽 for column in ws.columns: max_length = 0 column = [cell for cell in column] for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(cell.value) except: pass adjusted_width = (max_length + 2) * 1.2 ws.column_dimensions[column[0].column_letter].width = adjusted_width # 保存到文件 temp_file = output_file + ".tmp" wb.save(temp_file) if os.path.exists(output_file): os.remove(output_file) os.rename(temp_file, output_file) return wb except Exception as e: print(f"保存结果失败: {e}") raise def log_api_result(name, response, score=None): """记录API请求结果到日志文件""" try: status = "成功" if response and isinstance(response, dict) else "失败" data = response.get('data', {}) if response else {} outputs = data.get('outputs', {}) if data else {} logger.info( f"学生: {name} - API请求{status}\n" f"评分: {score}\n" f"API响应摘要: {str(outputs.get('text', '无响应文本'))[:200]}...\n" f"完整响应: {str(response)[:500]}..." ) except Exception as e: logger.error(f"记录API结果失败: {e}") def parse_api_response(response): """解析API返回的评分结果""" import re # 添加正则表达式支持 # 直接从response.data中获取评分数据 if not response or not isinstance(response, dict): return None, "无效的API响应" data = response.get('data', {}) outputs = data.get('outputs', {}) text = outputs.get('text', '') if not text: return None, "无评分结果" # 初始化结果字典 result = { 'total_score': 0, 'sections': {}, 'advantages': [], 'disadvantages': [], 'suggestions': [] } try: # 提取总分 - 改进匹配逻辑 total_score_match = re.search(r'### 总分[::]\s*(\d+)分', text) if total_score_match: try: result['total_score'] = int(total_score_match.group(1)) except (ValueError, AttributeError): result['total_score'] = 0 # 提取各部分评分 sections = [ ('工作流描述', r'#### 1\. 工作流描述是否能够覆盖自己提出的问题.*?\*\*评分[::]\s*(\d+)分'), ('YML文件', r'#### 2\. YML文件是否符合Dify工作流的设计标准且能实现自己提出的工作流功能.*?\*\*评分[::]\s*(\d+)分'), ('实用性', r'#### 3\. 工作流的实用性、可重复性和可推广性.*?\*\*评分[::]\s*(\d+)分') ] for name, pattern in sections: section_match = re.search(pattern, text, re.DOTALL) if section_match: section_text = section_match.group(0) try: score = int(section_match.group(1)) result['sections'][name] = { 'score': score, 'analysis': '\n'.join(section_text.split('\n')[2:]).strip() } except (ValueError, AttributeError, IndexError): result['sections'][name] = { 'score': 0, 'analysis': '\n'.join(section_text.split('\n')[2:]).strip() } # 提取优点 adv_start = text.find("### 优点") if adv_start != -1: adv_end = text.find("### 不足之处", adv_start) advantages = text[adv_start+5:adv_end].strip().split('\n')[1:] result['advantages'] = [a.strip()[3:] for a in advantages if a.strip()] # 提取不足 dis_start = text.find("### 不足之处") if dis_start != -1: dis_end = text.find("### 改进建议", dis_start) disadvantages = text[dis_start+7:dis_end].strip().split('\n')[1:] result['disadvantages'] = [d.strip()[3:] for d in disadvantages if d.strip()] # 提取改进建议 sug_start = text.find("### 改进建议") if sug_start != -1: sug_end = text.find("\n\n", sug_start) suggestions = text[sug_start+7:sug_end].strip().split('\n')[1:] result['suggestions'] = [s.strip()[3:] for s in suggestions if s.strip()] return result['total_score'], result except Exception as e: print(f"解析评分结果出错: {e}") return None, f"解析评分结果出错: {e}" def get_graded_students(): """获取已批改的学生名单""" if not os.path.exists(OUTPUT_FILE): return set() try: wb = openpyxl.load_workbook(OUTPUT_FILE) ws = wb.active graded_students = set() for row in ws.iter_rows(min_row=2, values_only=True): if row and row[0]: # 第一列为姓名 graded_students.add(row[0]) return graded_students except Exception as e: print(f"读取已批改名单失败: {e}") return set() def regrade_zero_scores(): """复批0分作业""" print("\n开始复批0分作业...") # 检查原始结果文件是否存在 if not os.path.exists(OUTPUT_FILE): print("❌ 未找到原始批改结果文件,无法进行复批") return try: # 读取原始结果 original_wb = openpyxl.load_workbook(OUTPUT_FILE) original_ws = original_wb.active # 初始化复批工作簿 regrade_wb = None # 查找0分记录 zero_count = 0 for row in original_ws.iter_rows(min_row=2, values_only=True): name, score, details = row[0], row[1], row[2] if score == 0: zero_count += 1 print(f"\n发现0分记录: {name}") # 查找YML文件 yml_path = find_assignment_yml(name) if not yml_path: print(f"⚠️ 未找到 {name} 的作业文件,保持0分") result = { 'name': name, 'score': 0, 'details': details } regrade_wb = save_result(result, regrade_wb, is_regrade=True) continue # 解析YML文件 try: print(f"解析YML文件: {os.path.basename(yml_path)}") yml_content = parse_yml_file(yml_path) except Exception as e: print(f"❌ 解析YML文件失败: {e}") result = { 'name': name, 'score': 0, 'details': f'YML文件解析失败: {str(e)}' } regrade_wb = save_result(result, regrade_wb, is_regrade=True) continue # 准备API调用数据 assignment_data = { 'name': name, **yml_content } # 调用API进行复批 print("调用Dify API进行复批...") api_response = call_dify_api(yml_path, assignment_data) if not api_response: logger.error(f"{name} - 复批API调用失败") print(f"❌ {name} 复批失败") result = { 'name': name, 'score': 0, 'details': '复批API调用失败' } regrade_wb = save_result(result, regrade_wb, is_regrade=True) continue # 解析API响应 try: score, new_details = parse_api_response(api_response) print(f"✅ {name} 复批完成 - 新得分: {score}") logger.info(f"{name} - 复批完成,新得分: {score}") log_api_result(name, api_response, score) # 使用save_result保存复批结果 result = { 'name': name, 'score': score, 'details': new_details } regrade_wb = save_result(result, regrade_wb, is_regrade=True) except Exception as e: print(f"❌ 解析API响应失败: {e}") result = { 'name': name, 'score': 0, 'details': f'复批结果解析失败: {str(e)}' } regrade_wb = save_result(result, regrade_wb, is_regrade=True) # 保存复批结果 if zero_count > 0: print(f"\n✅ 共复批{zero_count}份0分作业") else: print("\n没有发现0分记录需要复批") except Exception as e: print(f"\n❌ 复批流程出现错误: {e}") raise def regrade_100_scores(): """复批0分作业""" print("\n开始复批100分作业...") # 检查原始结果文件是否存在 if not os.path.exists(OUTPUT_FILE): print("❌ 未找到原始批改结果文件,无法进行复批") return try: # 读取原始结果 original_wb = openpyxl.load_workbook(OUTPUT_FILE) original_ws = original_wb.active # 初始化复批工作簿 regrade_wb = None # 查找100分记录 zero_count = 0 for row in original_ws.iter_rows(min_row=2, values_only=True): name, score, details = row[0], row[1], row[2] if score == 100: zero_count += 1 print(f"\n发现100分记录: {name}") # 查找YML文件 yml_path = find_assignment_yml(name) if not yml_path: print(f"⚠️ 未找到 {name} 的作业文件,保持0分") result = { 'name': name, 'score': 0, 'details': details } regrade_wb = save_result(result, regrade_wb, is_regrade=True) continue # 解析YML文件 try: print(f"解析YML文件: {os.path.basename(yml_path)}") yml_content = parse_yml_file(yml_path) except Exception as e: print(f"❌ 解析YML文件失败: {e}") result = { 'name': name, 'score': 0, 'details': f'YML文件解析失败: {str(e)}' } regrade_wb = save_result(result, regrade_wb, is_regrade=True) continue # 准备API调用数据 assignment_data = { 'name': name, **yml_content } # 调用API进行复批 print("调用Dify API进行复批...") api_response = call_dify_api(yml_path, assignment_data) if not api_response: logger.error(f"{name} - 复批API调用失败") print(f"❌ {name} 复批失败") result = { 'name': name, 'score': 0, 'details': '复批API调用失败' } regrade_wb = save_result(result, regrade_wb, is_regrade=True) continue # 解析API响应 try: score, new_details = parse_api_response(api_response) print(f"✅ {name} 复批完成 - 新得分: {score}") logger.info(f"{name} - 复批完成,新得分: {score}") log_api_result(name, api_response, score) # 使用save_result保存复批结果 result = { 'name': name, 'score': score, 'details': new_details } regrade_wb = save_result(result, regrade_wb, is_regrade=True) except Exception as e: print(f"❌ 解析API响应失败: {e}") result = { 'name': name, 'score': 0, 'details': f'复批结果解析失败: {str(e)}' } regrade_wb = save_result(result, regrade_wb, is_regrade=True) # 保存复批结果 if zero_count > 0: print(f"\n✅ 共复批{zero_count}份100分作业") else: print("\n没有发现100分记录需要复批") except Exception as e: print(f"\n❌ 复批流程出现错误: {e}") raise def main(): """主函数,执行作业批改流程""" print("\n请选择批改模式:") print("1. 全部批改(包括未批改和0分复批)") print("2. 仅复批0分作业") print("3. 仅复批100分作业") while True: choice = input("请输入选择(1/2/3): ").strip() if choice in ('1', '2', '3'): break print("无效输入,请重新选择") if choice == '2': # 仅执行复批 regrade_zero_scores() return if choice == '3': # 仅执行复批 regrade_100_scores() return print("\n开始批改作业...") try: # 读取Excel中的作业提交 print("读取Excel提交记录...") submissions = read_excel_submissions() # 获取已批改学生名单 graded_students = get_graded_students() print(f"共找到{len(submissions)}份提交记录") # 检查YML文件是否存在(不再阻止批改流程) check_yml_files(submissions) total_count = len(submissions) processed_count = 0 wb = None # 初始化工作簿对象 for sub in submissions: processed_count += 1 print(f"\n[{processed_count}/{total_count}] 正在处理: {sub['name']}") # 检查是否已批改过 if sub['name'] in graded_students: print(f"⏩ {sub['name']} 已批改过,跳过") continue # 准备结果字典 result = { 'name': sub['name'], 'score': 0, 'details': '' } # 查找对应的YML文件 yml_path = find_assignment_yml(sub['name']) if not yml_path: print(f"⚠️ 未找到 {sub['name']} 的作业文件,自动评为0分") result['details'] = '未提交YML作业文件' wb = save_result(result, wb) continue # 解析YML文件 try: print(f"解析YML文件: {os.path.basename(yml_path)}") yml_content = parse_yml_file(yml_path) except Exception as e: print(f"❌ 解析YML文件失败: {e}") result['details'] = f'YML文件解析失败: {str(e)}' wb = save_result(result, wb) continue # 准备API调用数据 assignment_data = { **sub, **yml_content } # 调用API进行批改(上传YML文件) print("调用Dify API进行批改...") api_response = call_dify_api(yml_path, assignment_data) if not api_response: logger.error(f"{sub['name']} - API调用失败") print(f"❌ {sub['name']} 批改失败") result['details'] = 'API调用失败' wb = save_result(result, wb) continue # 解析API响应 try: score, details = parse_api_response(api_response) result['score'] = score result['details'] = details logger.info(f"{sub['name']} - 批改完成,得分: {score}") print(f"✅ {sub['name']} 批改完成 - 得分: {score}") log_api_result(sub['name'], api_response, score) wb = save_result(result, wb) except Exception as e: print(f"❌ 解析API响应失败: {e}") result['details'] = f'结果解析失败: {str(e)}' wb = save_result(result, wb) print(f"\n✅ 所有{total_count}份作业批改完成") print(f"批改结果已实时保存到: {OUTPUT_FILE}") except Exception as e: print(f"\n❌ 批改流程出现严重错误: {e}") raise if __name__ == "__main__": main()