| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- """Import service for handling XLSX file import operations."""
- from io import BytesIO
- from datetime import datetime
- from typing import Tuple, List, Dict, Any
- from openpyxl import Workbook, load_workbook
- from openpyxl.utils.exceptions import InvalidFileException
- from app import db
- from app.models.work_record import WorkRecord
- from app.models.person import Person
- from app.models.item import Item
- class ImportService:
- """Service class for importing work records from XLSX files."""
-
- REQUIRED_COLUMNS = ['人员姓名', '物品名称', '工作日期', '数量']
- MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
-
- @staticmethod
- def generate_template() -> BytesIO:
- """Generate an XLSX import template file.
-
- Creates a template with required column headers and example data rows
- demonstrating the correct format.
-
- Returns:
- BytesIO: In-memory XLSX file content
-
- Requirements: 3.1, 3.2, 3.3, 3.4, 3.5
- """
- wb = Workbook()
- ws = wb.active
- ws.title = "导入模板"
-
- # Add header row with Chinese column names
- headers = ImportService.REQUIRED_COLUMNS
- for col_idx, header in enumerate(headers, start=1):
- ws.cell(row=1, column=col_idx, value=header)
-
- # Adjust column widths for better readability
- ws.column_dimensions['A'].width = 15
- ws.column_dimensions['B'].width = 15
- ws.column_dimensions['C'].width = 15
- ws.column_dimensions['D'].width = 10
-
- # Save to BytesIO
- output = BytesIO()
- wb.save(output)
- output.seek(0)
-
- return output
- @staticmethod
- def parse_and_validate(file_content: bytes) -> Tuple[List[Dict[str, Any]], List[str]]:
- """Parse and validate XLSX file content.
-
- Parses the uploaded XLSX file, validates the format and required columns,
- then validates each row's data against existing persons and items.
-
- Args:
- file_content: Raw bytes of the uploaded XLSX file
-
- Returns:
- Tuple of (valid_records, errors):
- - valid_records: List of dicts with person_id, item_id, work_date, quantity
- - errors: List of error messages for invalid rows
-
- Requirements: 4.1, 4.2, 4.3, 4.4
- """
- errors = []
- valid_records = []
-
- # Try to load the workbook
- try:
- file_stream = BytesIO(file_content)
- wb = load_workbook(file_stream, read_only=True, data_only=True)
- except InvalidFileException:
- return [], ["文件格式错误,请上传XLSX文件"]
- except Exception:
- return [], ["文件格式错误,请上传XLSX文件"]
-
- ws = wb.active
-
- # Get header row and validate required columns
- header_row = [cell.value for cell in ws[1]]
-
- # Check for missing required columns
- missing_columns = []
- column_indices = {}
- for col_name in ImportService.REQUIRED_COLUMNS:
- if col_name not in header_row:
- missing_columns.append(col_name)
- else:
- column_indices[col_name] = header_row.index(col_name)
-
- if missing_columns:
- return [], [f"缺少必需列: {', '.join(missing_columns)}"]
-
- # Cache persons and items for lookup
- persons = {p.name: p.id for p in Person.query.all()}
- items = {i.name: i.id for i in Item.query.all()}
-
- # Validate each data row (starting from row 2)
- for row_idx, row in enumerate(ws.iter_rows(min_row=2, values_only=True), start=2):
- # Skip empty rows
- if all(cell is None or (isinstance(cell, str) and cell.strip() == '') for cell in row):
- continue
-
- row_errors = []
- record = {}
-
- # Get values from row
- person_name = row[column_indices['人员姓名']]
- item_name = row[column_indices['物品名称']]
- work_date = row[column_indices['工作日期']]
- quantity = row[column_indices['数量']]
-
- # Validate person_name
- if person_name is None or (isinstance(person_name, str) and person_name.strip() == ''):
- row_errors.append(f"第{row_idx}行: '人员姓名' 不能为空")
- else:
- person_name_str = str(person_name).strip()
- if person_name_str not in persons:
- row_errors.append(f"第{row_idx}行: 人员 '{person_name_str}' 不存在")
- else:
- record['person_id'] = persons[person_name_str]
-
- # Validate item_name
- if item_name is None or (isinstance(item_name, str) and item_name.strip() == ''):
- row_errors.append(f"第{row_idx}行: '物品名称' 不能为空")
- else:
- item_name_str = str(item_name).strip()
- if item_name_str not in items:
- row_errors.append(f"第{row_idx}行: 物品 '{item_name_str}' 不存在")
- else:
- record['item_id'] = items[item_name_str]
-
- # Validate work_date
- if work_date is None or (isinstance(work_date, str) and work_date.strip() == ''):
- row_errors.append(f"第{row_idx}行: '工作日期' 不能为空")
- else:
- parsed_date = None
- if isinstance(work_date, datetime):
- parsed_date = work_date.date()
- elif hasattr(work_date, 'date'):
- # Handle date objects
- parsed_date = work_date
- else:
- # Try to parse string date
- try:
- parsed_date = datetime.strptime(str(work_date).strip(), '%Y-%m-%d').date()
- except ValueError:
- row_errors.append(f"第{row_idx}行: 日期格式错误,应为 YYYY-MM-DD")
-
- if parsed_date:
- record['work_date'] = parsed_date
-
- # Validate quantity
- if quantity is None or (isinstance(quantity, str) and quantity.strip() == ''):
- row_errors.append(f"第{row_idx}行: '数量' 不能为空")
- else:
- try:
- qty_value = int(float(quantity))
- if qty_value <= 0:
- row_errors.append(f"第{row_idx}行: 数量必须为正数")
- else:
- record['quantity'] = qty_value
- except (ValueError, TypeError):
- row_errors.append(f"第{row_idx}行: 数量必须为正数")
-
- if row_errors:
- errors.extend(row_errors)
- elif len(record) == 4: # All fields validated successfully
- valid_records.append(record)
-
- wb.close()
-
- return valid_records, errors
- @staticmethod
- def import_records(records: List[Dict[str, Any]]) -> int:
- """Import validated records as WorkRecord entries.
-
- Creates WorkRecord entries for each validated record. Uses atomic
- operation - if any record fails to create, all changes are rolled back.
-
- Args:
- records: List of validated record dicts with person_id, item_id,
- work_date, and quantity
-
- Returns:
- int: Count of successfully imported records
-
- Raises:
- Exception: If any record fails to create (triggers rollback)
-
- Requirements: 4.6, 4.7, 4.8
- """
- if not records:
- return 0
-
- try:
- created_records = []
- for record in records:
- work_record = WorkRecord(
- person_id=record['person_id'],
- item_id=record['item_id'],
- work_date=record['work_date'],
- quantity=record['quantity']
- )
- db.session.add(work_record)
- created_records.append(work_record)
-
- # Commit all records atomically
- db.session.commit()
-
- return len(created_records)
-
- except Exception as e:
- # Rollback on any failure
- db.session.rollback()
- raise e
|