import_service.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. """Import service for handling XLSX file import operations."""
  2. from io import BytesIO
  3. from datetime import datetime
  4. from typing import Tuple, List, Dict, Any
  5. from openpyxl import Workbook, load_workbook
  6. from openpyxl.utils.exceptions import InvalidFileException
  7. from app import db
  8. from app.models.work_record import WorkRecord
  9. from app.models.person import Person
  10. from app.models.item import Item
  11. class ImportService:
  12. """Service class for importing work records from XLSX files."""
  13. REQUIRED_COLUMNS = ['人员姓名', '物品名称', '工作日期', '数量']
  14. MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
  15. @staticmethod
  16. def generate_template() -> BytesIO:
  17. """Generate an XLSX import template file.
  18. Creates a template with required column headers and example data rows
  19. demonstrating the correct format.
  20. Returns:
  21. BytesIO: In-memory XLSX file content
  22. Requirements: 3.1, 3.2, 3.3, 3.4, 3.5
  23. """
  24. wb = Workbook()
  25. ws = wb.active
  26. ws.title = "导入模板"
  27. # Add header row with Chinese column names
  28. headers = ImportService.REQUIRED_COLUMNS
  29. for col_idx, header in enumerate(headers, start=1):
  30. ws.cell(row=1, column=col_idx, value=header)
  31. # Adjust column widths for better readability
  32. ws.column_dimensions['A'].width = 15
  33. ws.column_dimensions['B'].width = 15
  34. ws.column_dimensions['C'].width = 15
  35. ws.column_dimensions['D'].width = 10
  36. # Save to BytesIO
  37. output = BytesIO()
  38. wb.save(output)
  39. output.seek(0)
  40. return output
  41. @staticmethod
  42. def parse_and_validate(file_content: bytes) -> Tuple[List[Dict[str, Any]], List[str]]:
  43. """Parse and validate XLSX file content.
  44. Parses the uploaded XLSX file, validates the format and required columns,
  45. then validates each row's data against existing persons and items.
  46. Args:
  47. file_content: Raw bytes of the uploaded XLSX file
  48. Returns:
  49. Tuple of (valid_records, errors):
  50. - valid_records: List of dicts with person_id, item_id, work_date, quantity
  51. - errors: List of error messages for invalid rows
  52. Requirements: 4.1, 4.2, 4.3, 4.4
  53. """
  54. errors = []
  55. valid_records = []
  56. # Try to load the workbook
  57. try:
  58. file_stream = BytesIO(file_content)
  59. wb = load_workbook(file_stream, read_only=True, data_only=True)
  60. except InvalidFileException:
  61. return [], ["文件格式错误,请上传XLSX文件"]
  62. except Exception:
  63. return [], ["文件格式错误,请上传XLSX文件"]
  64. ws = wb.active
  65. # Get header row and validate required columns
  66. header_row = [cell.value for cell in ws[1]]
  67. # Check for missing required columns
  68. missing_columns = []
  69. column_indices = {}
  70. for col_name in ImportService.REQUIRED_COLUMNS:
  71. if col_name not in header_row:
  72. missing_columns.append(col_name)
  73. else:
  74. column_indices[col_name] = header_row.index(col_name)
  75. if missing_columns:
  76. return [], [f"缺少必需列: {', '.join(missing_columns)}"]
  77. # Cache persons and items for lookup
  78. persons = {p.name: p.id for p in Person.query.all()}
  79. items = {i.name: i.id for i in Item.query.all()}
  80. # Validate each data row (starting from row 2)
  81. for row_idx, row in enumerate(ws.iter_rows(min_row=2, values_only=True), start=2):
  82. # Skip empty rows
  83. if all(cell is None or (isinstance(cell, str) and cell.strip() == '') for cell in row):
  84. continue
  85. row_errors = []
  86. record = {}
  87. # Get values from row
  88. person_name = row[column_indices['人员姓名']]
  89. item_name = row[column_indices['物品名称']]
  90. work_date = row[column_indices['工作日期']]
  91. quantity = row[column_indices['数量']]
  92. # Validate person_name
  93. if person_name is None or (isinstance(person_name, str) and person_name.strip() == ''):
  94. row_errors.append(f"第{row_idx}行: '人员姓名' 不能为空")
  95. else:
  96. person_name_str = str(person_name).strip()
  97. if person_name_str not in persons:
  98. row_errors.append(f"第{row_idx}行: 人员 '{person_name_str}' 不存在")
  99. else:
  100. record['person_id'] = persons[person_name_str]
  101. # Validate item_name
  102. if item_name is None or (isinstance(item_name, str) and item_name.strip() == ''):
  103. row_errors.append(f"第{row_idx}行: '物品名称' 不能为空")
  104. else:
  105. item_name_str = str(item_name).strip()
  106. if item_name_str not in items:
  107. row_errors.append(f"第{row_idx}行: 物品 '{item_name_str}' 不存在")
  108. else:
  109. record['item_id'] = items[item_name_str]
  110. # Validate work_date
  111. if work_date is None or (isinstance(work_date, str) and work_date.strip() == ''):
  112. row_errors.append(f"第{row_idx}行: '工作日期' 不能为空")
  113. else:
  114. parsed_date = None
  115. if isinstance(work_date, datetime):
  116. parsed_date = work_date.date()
  117. elif hasattr(work_date, 'date'):
  118. # Handle date objects
  119. parsed_date = work_date
  120. else:
  121. # Try to parse string date
  122. try:
  123. parsed_date = datetime.strptime(str(work_date).strip(), '%Y-%m-%d').date()
  124. except ValueError:
  125. row_errors.append(f"第{row_idx}行: 日期格式错误,应为 YYYY-MM-DD")
  126. if parsed_date:
  127. record['work_date'] = parsed_date
  128. # Validate quantity
  129. if quantity is None or (isinstance(quantity, str) and quantity.strip() == ''):
  130. row_errors.append(f"第{row_idx}行: '数量' 不能为空")
  131. else:
  132. try:
  133. qty_value = int(float(quantity))
  134. if qty_value <= 0:
  135. row_errors.append(f"第{row_idx}行: 数量必须为正数")
  136. else:
  137. record['quantity'] = qty_value
  138. except (ValueError, TypeError):
  139. row_errors.append(f"第{row_idx}行: 数量必须为正数")
  140. if row_errors:
  141. errors.extend(row_errors)
  142. elif len(record) == 4: # All fields validated successfully
  143. valid_records.append(record)
  144. wb.close()
  145. return valid_records, errors
  146. @staticmethod
  147. def import_records(records: List[Dict[str, Any]]) -> int:
  148. """Import validated records as WorkRecord entries.
  149. Creates WorkRecord entries for each validated record. Uses atomic
  150. operation - if any record fails to create, all changes are rolled back.
  151. Args:
  152. records: List of validated record dicts with person_id, item_id,
  153. work_date, and quantity
  154. Returns:
  155. int: Count of successfully imported records
  156. Raises:
  157. Exception: If any record fails to create (triggers rollback)
  158. Requirements: 4.6, 4.7, 4.8
  159. """
  160. if not records:
  161. return 0
  162. try:
  163. created_records = []
  164. for record in records:
  165. work_record = WorkRecord(
  166. person_id=record['person_id'],
  167. item_id=record['item_id'],
  168. work_date=record['work_date'],
  169. quantity=record['quantity']
  170. )
  171. db.session.add(work_record)
  172. created_records.append(work_record)
  173. # Commit all records atomically
  174. db.session.commit()
  175. return len(created_records)
  176. except Exception as e:
  177. # Rollback on any failure
  178. db.session.rollback()
  179. raise e