错误 [WinError 10061] 由于目标计算机积极拒绝,无法连接 表明Redis服务没有运行。AWS Resource Scanner使用Redis作为Celery的消息队列和结果后端。
下载Redis for Windows
# 使用Chocolatey安装
choco install redis-64
# 或者下载预编译版本
# https://github.com/microsoftarchive/redis/releases
启动Redis服务
# 方式1: 作为Windows服务启动
redis-server --service-install
redis-server --service-start
# 方式2: 直接启动
redis-server
验证Redis运行
redis-cli ping
# 应该返回: PONG
安装Redis
# Ubuntu/Debian
sudo apt-get install redis-server
# CentOS/RHEL
sudo yum install redis
# macOS (使用Homebrew)
brew install redis
启动Redis
# 启动服务
sudo systemctl start redis-server
# 或者直接启动
redis-server
验证Redis运行
redis-cli ping
# 应该返回: PONG
# 拉取并运行Redis容器
docker run -d --name redis -p 6379:6379 redis:alpine
# 验证运行
docker exec redis redis-cli ping
如果暂时不想安装Redis,可以使用Mock模式进行开发测试。
创建文件 backend/app/tasks/mock_tasks.py:
"""
Mock任务模块 - 用于没有Redis时的开发测试
"""
import time
import threading
from typing import Dict, Any, List
import uuid
class MockAsyncResult:
"""模拟Celery AsyncResult"""
def __init__(self, task_id: str):
self.id = task_id
self.state = 'PENDING'
self.info = {}
def ready(self) -> bool:
return self.state in ['SUCCESS', 'FAILURE']
def successful(self) -> bool:
return self.state == 'SUCCESS'
def failed(self) -> bool:
return self.state == 'FAILURE'
class MockCeleryTask:
"""模拟Celery任务"""
def __init__(self, func):
self.func = func
self._results = {}
def delay(self, *args, **kwargs):
"""模拟异步执行"""
task_id = str(uuid.uuid4())
# 创建结果对象
result = MockAsyncResult(task_id)
self._results[task_id] = result
# 在后台线程中执行任务
def run_task():
try:
result.state = 'PROGRESS'
result.info = {'progress': 0}
# 执行实际任务
task_result = self.func(*args, **kwargs)
result.state = 'SUCCESS'
result.info = task_result
except Exception as e:
result.state = 'FAILURE'
result.info = {'error': str(e)}
thread = threading.Thread(target=run_task)
thread.daemon = True
thread.start()
return result
def mock_scan_aws_resources(task_id: int, credential_ids: List[int],
regions: List[str], project_metadata: Dict[str, Any]) -> Dict[str, Any]:
"""模拟AWS资源扫描任务"""
from app import db
from app.models import Task
print(f"Mock: 开始扫描任务 {task_id}")
# 更新任务状态
task = db.session.get(Task, task_id)
if task:
task.status = 'running'
task.progress = 0
db.session.commit()
# 模拟扫描过程
for i in range(1, 6):
time.sleep(1) # 模拟扫描时间
if task:
task.progress = i * 20
db.session.commit()
print(f"Mock: 扫描进度 {i * 20}%")
# 模拟完成
if task:
task.status = 'completed'
task.progress = 100
db.session.commit()
print(f"Mock: 任务 {task_id} 完成")
return {'status': 'success', 'message': 'Mock scan completed'}
# 创建模拟任务
scan_aws_resources = MockCeleryTask(mock_scan_aws_resources)
在 backend/app/api/tasks.py 中添加环境检测:
# 在文件顶部添加
import os
# 在create_task函数中,替换Celery调用
try:
# 尝试使用真实的Celery
from app.tasks.scan_tasks import scan_aws_resources
celery_task = scan_aws_resources.delay(
task_id=task.id,
credential_ids=credential_ids,
regions=regions,
project_metadata=project_metadata
)
except Exception as e:
if "Connection refused" in str(e) or "10061" in str(e):
# Redis不可用,使用Mock模式
print("Redis不可用,使用Mock模式")
from app.tasks.mock_tasks import scan_aws_resources
celery_task = scan_aws_resources.delay(
task_id=task.id,
credential_ids=credential_ids,
regions=regions,
project_metadata=project_metadata
)
else:
raise
在 .env 文件中设置Redis连接:
# Redis配置
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/1
# 如果Redis有密码
# CELERY_BROKER_URL=redis://:password@localhost:6379/0
# CELERY_RESULT_BACKEND=redis://:password@localhost:6379/1
# 如果Redis在其他主机
# CELERY_BROKER_URL=redis://redis-host:6379/0
# CELERY_RESULT_BACKEND=redis://redis-host:6379/1
创建测试脚本 backend/test_redis.py:
#!/usr/bin/env python
"""测试Redis连接"""
import redis
import os
def test_redis_connection():
try:
# 从环境变量获取Redis URL
broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
# 解析Redis URL
if broker_url.startswith('redis://'):
# 简单解析
host = 'localhost'
port = 6379
db = 0
# 创建Redis连接
r = redis.Redis(host=host, port=port, db=db, decode_responses=True)
# 测试连接
response = r.ping()
if response:
print("✓ Redis连接成功")
# 测试基本操作
r.set('test_key', 'test_value')
value = r.get('test_key')
if value == 'test_value':
print("✓ Redis读写测试成功")
r.delete('test_key')
return True
else:
print("❌ Redis读写测试失败")
return False
else:
print("❌ Redis ping失败")
return False
except redis.ConnectionError as e:
print(f"❌ Redis连接失败: {e}")
return False
except Exception as e:
print(f"❌ Redis测试异常: {e}")
return False
if __name__ == '__main__':
test_redis_connection()
运行测试:
cd backend
python test_redis.py
Redis运行后,需要启动Celery Worker来处理任务:
cd backend
# 激活虚拟环境
activate_venv.bat # Windows
source activate_venv.sh # Unix/Linux
# 启动Celery Worker
celery -A app.celery_app worker --loglevel=info
# Windows上可能需要使用solo池
celery -A app.celery_app worker --loglevel=info --pool=solo
Redis启动失败
netstat -an | grep 6379连接被拒绝
Celery Worker启动失败
# 检查Redis进程
ps aux | grep redis
# 检查端口占用
netstat -tulpn | grep 6379
# 测试Redis连接
redis-cli ping
# 检查Celery Worker
celery -A app.celery_app inspect active
选择适合你环境的解决方案,推荐在开发环境使用本地Redis,生产环境使用专门的Redis服务。