"""Import service for handling XLSX file import operations.""" from io import BytesIO from datetime import datetime from typing import Tuple, List, Dict, Any from openpyxl import Workbook, load_workbook from openpyxl.utils.exceptions import InvalidFileException from app import db from app.models.work_record import WorkRecord from app.models.person import Person from app.models.item import Item class ImportService: """Service class for importing work records from XLSX files.""" REQUIRED_COLUMNS = ['人员姓名', '物品名称', '工作日期', '数量'] MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB @staticmethod def generate_template() -> BytesIO: """Generate an XLSX import template file. Creates a template with required column headers and example data rows demonstrating the correct format. Returns: BytesIO: In-memory XLSX file content Requirements: 3.1, 3.2, 3.3, 3.4, 3.5 """ wb = Workbook() ws = wb.active ws.title = "导入模板" # Add header row with Chinese column names headers = ImportService.REQUIRED_COLUMNS for col_idx, header in enumerate(headers, start=1): ws.cell(row=1, column=col_idx, value=header) # Adjust column widths for better readability ws.column_dimensions['A'].width = 15 ws.column_dimensions['B'].width = 15 ws.column_dimensions['C'].width = 15 ws.column_dimensions['D'].width = 10 # Save to BytesIO output = BytesIO() wb.save(output) output.seek(0) return output @staticmethod def parse_and_validate(file_content: bytes) -> Tuple[List[Dict[str, Any]], List[str]]: """Parse and validate XLSX file content. Parses the uploaded XLSX file, validates the format and required columns, then validates each row's data against existing persons and items. Args: file_content: Raw bytes of the uploaded XLSX file Returns: Tuple of (valid_records, errors): - valid_records: List of dicts with person_id, item_id, work_date, quantity - errors: List of error messages for invalid rows Requirements: 4.1, 4.2, 4.3, 4.4 """ errors = [] valid_records = [] # Try to load the workbook try: file_stream = BytesIO(file_content) wb = load_workbook(file_stream, read_only=True, data_only=True) except InvalidFileException: return [], ["文件格式错误,请上传XLSX文件"] except Exception: return [], ["文件格式错误,请上传XLSX文件"] ws = wb.active # Get header row and validate required columns header_row = [cell.value for cell in ws[1]] # Check for missing required columns missing_columns = [] column_indices = {} for col_name in ImportService.REQUIRED_COLUMNS: if col_name not in header_row: missing_columns.append(col_name) else: column_indices[col_name] = header_row.index(col_name) if missing_columns: return [], [f"缺少必需列: {', '.join(missing_columns)}"] # Cache persons and items for lookup persons = {p.name: p.id for p in Person.query.all()} items = {i.name: i.id for i in Item.query.all()} # Validate each data row (starting from row 2) for row_idx, row in enumerate(ws.iter_rows(min_row=2, values_only=True), start=2): # Skip empty rows if all(cell is None or (isinstance(cell, str) and cell.strip() == '') for cell in row): continue row_errors = [] record = {} # Get values from row person_name = row[column_indices['人员姓名']] item_name = row[column_indices['物品名称']] work_date = row[column_indices['工作日期']] quantity = row[column_indices['数量']] # Validate person_name if person_name is None or (isinstance(person_name, str) and person_name.strip() == ''): row_errors.append(f"第{row_idx}行: '人员姓名' 不能为空") else: person_name_str = str(person_name).strip() if person_name_str not in persons: row_errors.append(f"第{row_idx}行: 人员 '{person_name_str}' 不存在") else: record['person_id'] = persons[person_name_str] # Validate item_name if item_name is None or (isinstance(item_name, str) and item_name.strip() == ''): row_errors.append(f"第{row_idx}行: '物品名称' 不能为空") else: item_name_str = str(item_name).strip() if item_name_str not in items: row_errors.append(f"第{row_idx}行: 物品 '{item_name_str}' 不存在") else: record['item_id'] = items[item_name_str] # Validate work_date if work_date is None or (isinstance(work_date, str) and work_date.strip() == ''): row_errors.append(f"第{row_idx}行: '工作日期' 不能为空") else: parsed_date = None if isinstance(work_date, datetime): parsed_date = work_date.date() elif hasattr(work_date, 'date'): # Handle date objects parsed_date = work_date else: # Try to parse string date try: parsed_date = datetime.strptime(str(work_date).strip(), '%Y-%m-%d').date() except ValueError: row_errors.append(f"第{row_idx}行: 日期格式错误,应为 YYYY-MM-DD") if parsed_date: record['work_date'] = parsed_date # Validate quantity if quantity is None or (isinstance(quantity, str) and quantity.strip() == ''): row_errors.append(f"第{row_idx}行: '数量' 不能为空") else: try: qty_value = int(float(quantity)) if qty_value <= 0: row_errors.append(f"第{row_idx}行: 数量必须为正数") else: record['quantity'] = qty_value except (ValueError, TypeError): row_errors.append(f"第{row_idx}行: 数量必须为正数") if row_errors: errors.extend(row_errors) elif len(record) == 4: # All fields validated successfully valid_records.append(record) wb.close() return valid_records, errors @staticmethod def import_records(records: List[Dict[str, Any]]) -> int: """Import validated records as WorkRecord entries. Creates WorkRecord entries for each validated record. Uses atomic operation - if any record fails to create, all changes are rolled back. Args: records: List of validated record dicts with person_id, item_id, work_date, and quantity Returns: int: Count of successfully imported records Raises: Exception: If any record fails to create (triggers rollback) Requirements: 4.6, 4.7, 4.8 """ if not records: return 0 try: created_records = [] for record in records: work_record = WorkRecord( person_id=record['person_id'], item_id=record['item_id'], work_date=record['work_date'], quantity=record['quantity'] ) db.session.add(work_record) created_records.append(work_record) # Commit all records atomically db.session.commit() return len(created_records) except Exception as e: # Rollback on any failure db.session.rollback() raise e