# Redis 设置指南 ## 问题描述 错误 `[WinError 10061] 由于目标计算机积极拒绝,无法连接` 表明Redis服务没有运行。AWS Resource Scanner使用Redis作为Celery的消息队列和结果后端。 ## 解决方案 ### 选项1: 安装并启动Redis (推荐) #### Windows 1. **下载Redis for Windows** ```bash # 使用Chocolatey安装 choco install redis-64 # 或者下载预编译版本 # https://github.com/microsoftarchive/redis/releases ``` 2. **启动Redis服务** ```bash # 方式1: 作为Windows服务启动 redis-server --service-install redis-server --service-start # 方式2: 直接启动 redis-server ``` 3. **验证Redis运行** ```bash redis-cli ping # 应该返回: PONG ``` #### Linux/macOS 1. **安装Redis** ```bash # Ubuntu/Debian sudo apt-get install redis-server # CentOS/RHEL sudo yum install redis # macOS (使用Homebrew) brew install redis ``` 2. **启动Redis** ```bash # 启动服务 sudo systemctl start redis-server # 或者直接启动 redis-server ``` 3. **验证Redis运行** ```bash redis-cli ping # 应该返回: PONG ``` ### 选项2: 使用Docker运行Redis ```bash # 拉取并运行Redis容器 docker run -d --name redis -p 6379:6379 redis:alpine # 验证运行 docker exec redis redis-cli ping ``` ### 选项3: 使用Mock模式 (开发测试) 如果暂时不想安装Redis,可以使用Mock模式进行开发测试。 1. **创建Mock任务模块** 创建文件 `backend/app/tasks/mock_tasks.py`: ```python """ Mock任务模块 - 用于没有Redis时的开发测试 """ import time import threading from typing import Dict, Any, List import uuid class MockAsyncResult: """模拟Celery AsyncResult""" def __init__(self, task_id: str): self.id = task_id self.state = 'PENDING' self.info = {} def ready(self) -> bool: return self.state in ['SUCCESS', 'FAILURE'] def successful(self) -> bool: return self.state == 'SUCCESS' def failed(self) -> bool: return self.state == 'FAILURE' class MockCeleryTask: """模拟Celery任务""" def __init__(self, func): self.func = func self._results = {} def delay(self, *args, **kwargs): """模拟异步执行""" task_id = str(uuid.uuid4()) # 创建结果对象 result = MockAsyncResult(task_id) self._results[task_id] = result # 在后台线程中执行任务 def run_task(): try: result.state = 'PROGRESS' result.info = {'progress': 0} # 执行实际任务 task_result = self.func(*args, **kwargs) result.state = 'SUCCESS' result.info = task_result except Exception as e: result.state = 'FAILURE' result.info = {'error': str(e)} thread = threading.Thread(target=run_task) thread.daemon = True thread.start() return result def mock_scan_aws_resources(task_id: int, credential_ids: List[int], regions: List[str], project_metadata: Dict[str, Any]) -> Dict[str, Any]: """模拟AWS资源扫描任务""" from app import db from app.models import Task print(f"Mock: 开始扫描任务 {task_id}") # 更新任务状态 task = db.session.get(Task, task_id) if task: task.status = 'running' task.progress = 0 db.session.commit() # 模拟扫描过程 for i in range(1, 6): time.sleep(1) # 模拟扫描时间 if task: task.progress = i * 20 db.session.commit() print(f"Mock: 扫描进度 {i * 20}%") # 模拟完成 if task: task.status = 'completed' task.progress = 100 db.session.commit() print(f"Mock: 任务 {task_id} 完成") return {'status': 'success', 'message': 'Mock scan completed'} # 创建模拟任务 scan_aws_resources = MockCeleryTask(mock_scan_aws_resources) ``` 2. **修改任务API使用Mock模式** 在 `backend/app/api/tasks.py` 中添加环境检测: ```python # 在文件顶部添加 import os # 在create_task函数中,替换Celery调用 try: # 尝试使用真实的Celery from app.tasks.scan_tasks import scan_aws_resources celery_task = scan_aws_resources.delay( task_id=task.id, credential_ids=credential_ids, regions=regions, project_metadata=project_metadata ) except Exception as e: if "Connection refused" in str(e) or "10061" in str(e): # Redis不可用,使用Mock模式 print("Redis不可用,使用Mock模式") from app.tasks.mock_tasks import scan_aws_resources celery_task = scan_aws_resources.delay( task_id=task.id, credential_ids=credential_ids, regions=regions, project_metadata=project_metadata ) else: raise ``` ## 配置Redis连接 ### 环境变量配置 在 `.env` 文件中设置Redis连接: ```env # Redis配置 CELERY_BROKER_URL=redis://localhost:6379/0 CELERY_RESULT_BACKEND=redis://localhost:6379/1 # 如果Redis有密码 # CELERY_BROKER_URL=redis://:password@localhost:6379/0 # CELERY_RESULT_BACKEND=redis://:password@localhost:6379/1 # 如果Redis在其他主机 # CELERY_BROKER_URL=redis://redis-host:6379/0 # CELERY_RESULT_BACKEND=redis://redis-host:6379/1 ``` ### 验证连接 创建测试脚本 `backend/test_redis.py`: ```python #!/usr/bin/env python """测试Redis连接""" import redis import os def test_redis_connection(): try: # 从环境变量获取Redis URL broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0') # 解析Redis URL if broker_url.startswith('redis://'): # 简单解析 host = 'localhost' port = 6379 db = 0 # 创建Redis连接 r = redis.Redis(host=host, port=port, db=db, decode_responses=True) # 测试连接 response = r.ping() if response: print("✓ Redis连接成功") # 测试基本操作 r.set('test_key', 'test_value') value = r.get('test_key') if value == 'test_value': print("✓ Redis读写测试成功") r.delete('test_key') return True else: print("❌ Redis读写测试失败") return False else: print("❌ Redis ping失败") return False except redis.ConnectionError as e: print(f"❌ Redis连接失败: {e}") return False except Exception as e: print(f"❌ Redis测试异常: {e}") return False if __name__ == '__main__': test_redis_connection() ``` 运行测试: ```bash cd backend python test_redis.py ``` ## 启动Celery Worker Redis运行后,需要启动Celery Worker来处理任务: ```bash cd backend # 激活虚拟环境 activate_venv.bat # Windows source activate_venv.sh # Unix/Linux # 启动Celery Worker celery -A app.celery_app worker --loglevel=info # Windows上可能需要使用solo池 celery -A app.celery_app worker --loglevel=info --pool=solo ``` ## 故障排除 ### 常见问题 1. **Redis启动失败** - 检查端口6379是否被占用: `netstat -an | grep 6379` - 检查Redis配置文件 - 查看Redis日志 2. **连接被拒绝** - 确认Redis服务正在运行 - 检查防火墙设置 - 验证Redis绑定地址 3. **Celery Worker启动失败** - 确认Redis连接正常 - 检查Python环境和依赖 - 查看Celery日志 ### 检查服务状态 ```bash # 检查Redis进程 ps aux | grep redis # 检查端口占用 netstat -tulpn | grep 6379 # 测试Redis连接 redis-cli ping # 检查Celery Worker celery -A app.celery_app inspect active ``` ## 生产环境建议 1. **Redis持久化配置** 2. **Redis内存限制设置** 3. **Redis安全配置 (密码、绑定地址)** 4. **Celery Worker监控** 5. **任务结果清理策略** --- 选择适合你环境的解决方案,推荐在开发环境使用本地Redis,生产环境使用专门的Redis服务。