diff --git a/INSTALL.md b/INSTALL.md index b2accf5..d05e67e 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -169,6 +169,43 @@ EOF **注意:** 使用 `run.sh` 启动的好处是每次启动会自动拉取最新代码并重新编译。 +### 3.1. 配置说明 + +**配置优先级(从高到低):** +1. 环境变量 `BACKEND_URL`(最高优先级) +2. 配置文件 `config.yaml` 中的 `backend.url` +3. 默认值 + +**重要说明:** +- 环境变量 `BACKEND_URL` 会**覆盖**配置文件中的设置 +- 即使配置文件存在,设置环境变量后也会优先使用环境变量的值 +- 这确保了编译后的二进制文件不会硬编码后端地址 +- 配置文件不会被编译进二进制文件,是运行时读取的 + +**使用环境变量(推荐):** +```bash +# 在 systemd 服务文件中设置 +Environment="BACKEND_URL=http://your-backend-server:8080" + +# 或在命令行中设置 +BACKEND_URL=http://your-backend-server:8080 ./run.sh start +``` + +**使用配置文件:** +创建 `/opt/linkmaster-node/config.yaml`: +```yaml +server: + port: 2200 +backend: + url: http://your-backend-server:8080 +heartbeat: + interval: 60 +log: + file: node.log + level: info +debug: false +``` + ### 4. 启动服务 ```bash @@ -177,7 +214,11 @@ sudo systemctl enable linkmaster-node sudo systemctl start linkmaster-node ``` -**注意:** 确保 `BACKEND_URL` 环境变量指向后端服务器的实际地址和端口(默认 8080),不是前端地址。 +**重要说明:** +- 确保 `BACKEND_URL` 环境变量指向后端服务器的实际地址和端口(默认 8080),不是前端地址 +- `BACKEND_URL` 环境变量会**覆盖**配置文件中的 `backend.url` 设置(优先级最高) +- 即使配置文件存在,设置环境变量后也会优先使用环境变量的值 +- 这确保了编译后的二进制文件不会硬编码后端地址 ## 防火墙配置 @@ -238,12 +279,19 @@ sudo lsof -i :2200 **解决:** - 检查后端地址是否正确(应该是 `http://backend-server:8080`,不是前端地址) +- 检查环境变量 `BACKEND_URL` 是否设置正确(优先级最高) +- 检查配置文件 `config.yaml` 中的 `backend.url` 是否正确 - 检查网络连通性:`ping your-backend-server` - 检查端口是否开放:`telnet your-backend-server 8080` 或 `nc -zv your-backend-server 8080` - 检查防火墙规则(确保后端服务器的 8080 端口开放) - 检查后端服务是否运行:`curl http://your-backend-server:8080/api/public/nodes/online` - 如果使用前端代理,节点端仍需要直接连接后端,不能使用前端地址 +**配置优先级说明:** +- 环境变量 `BACKEND_URL` 优先级最高,会覆盖配置文件中的设置 +- 如果同时设置了环境变量和配置文件,优先使用环境变量的值 +- 这确保了编译后的二进制文件不会硬编码后端地址 + ## 卸载 ```bash diff --git a/README.md b/README.md index f919066..07273bf 100644 --- a/README.md +++ b/README.md @@ -85,12 +85,25 @@ BACKEND_URL=http://your-backend-server:8080 ./run.sh start ## 配置 +### 配置优先级 + +配置按以下优先级加载(高优先级会覆盖低优先级): + +1. **环境变量**(最高优先级) +2. **配置文件** `config.yaml` +3. **默认值** + ### 环境变量 -- `BACKEND_URL`: 后端服务地址(必需,默认: http://localhost:8080) +- `BACKEND_URL`: 后端服务地址(**优先级最高**,会覆盖配置文件中的设置) - `CONFIG_PATH`: 配置文件路径(可选,默认: config.yaml) - `LOG_FILE`: 日志文件路径(可选,默认: node.log) +**重要说明:** +- `BACKEND_URL` 环境变量会**覆盖**配置文件中的 `backend.url` 设置 +- 即使配置文件存在,设置环境变量后也会优先使用环境变量的值 +- 这确保了编译后的二进制文件不会硬编码后端地址 + ### 配置文件(可选) 创建 `config.yaml` 文件: @@ -99,20 +112,28 @@ BACKEND_URL=http://your-backend-server:8080 ./run.sh start server: port: 2200 backend: - url: http://your-backend-server:8080 + url: http://your-backend-server:8080 # 会被 BACKEND_URL 环境变量覆盖 heartbeat: interval: 60 log: file: node.log # 日志文件路径(默认: node.log,空则输出到标准错误) level: info # 日志级别: debug, info, warn, error(默认: info) debug: false +node: + id: 0 # 节点ID(通过心跳自动获取) + ip: "" # 节点IP(通过心跳自动获取) + country: "" # 国家(通过心跳自动获取) + province: "" # 省份(通过心跳自动获取) + city: "" # 城市(通过心跳自动获取) + isp: "" # ISP(通过心跳自动获取) ``` -**日志配置说明:** +**配置说明:** +- `backend.url`: 后端服务地址,会被 `BACKEND_URL` 环境变量覆盖 - `log.file`: 日志文件路径。如果为空,日志将输出到标准错误(stderr) - `log.level`: 日志级别,支持 `debug`、`info`、`warn`、`error` -- 也可以通过环境变量 `LOG_FILE` 设置日志文件路径 -- 日志文件会自动创建,如果目录不存在会自动创建 +- `node.*`: 节点信息通过心跳自动获取并保存,无需手动配置 +- 配置文件不会被编译进二进制文件,是运行时读取的 ## 运行脚本 @@ -603,9 +624,67 @@ grep -i "error" node.log tail -n 100 node.log ``` +## 心跳机制 + +节点会定期向后端发送心跳,上报节点状态和获取节点信息。 + +### 心跳请求字段 + +心跳请求包含以下字段: + +- `type`: 固定值 `pingServer` +- `version`: 协议版本号,固定值 `2` +- `host_name`: 节点主机名(自动读取系统主机名) + +### 心跳响应 + +心跳响应包含以下节点信息: + +- `node_id`: 节点ID +- `node_ip`: 节点外网IP +- `country`: 国家 +- `province`: 省份 +- `city`: 城市 +- `isp`: ISP + +这些信息会自动保存到配置文件中,用于后续的数据推送。 + +## 持续测试功能 + +节点支持持续 Ping 和 TCPing 测试,测试结果会自动推送到后端服务器。 + +### 功能特性 + +- ✅ 实时推送测试结果到后端 +- ✅ 批量推送优化(减少HTTP请求频率) +- ✅ 自动清理超时任务 +- ✅ 资源自动清理(防止内存泄漏) +- ✅ 详细的调试日志(debug模式) + +### 数据推送 + +- 测试结果会自动推送到后端 `/api/public/node/continuous/result` 接口 +- 推送包含节点ID、IP、位置信息和测试结果 +- 如果后端任务不存在,节点端会自动停止对应任务 + ## 更新日志 -### v1.1.3 (最新) +### v1.1.4 (最新) + +**新增功能:** +- ✨ 心跳请求新增 `version` 字段(协议版本号,默认值:2) +- ✨ 心跳请求新增 `host_name` 字段(自动读取系统主机名) +- ✨ 支持环境变量 `BACKEND_URL` 覆盖配置文件中的后端地址 +- ✨ 持续测试功能增强,支持批量推送和自动清理 + +**改进:** +- 🔧 修复持续测试数据推送的锁管理问题 +- 🔧 修复任务停止时未清理推送缓冲的内存泄漏问题 +- 🔧 优化配置加载逻辑,环境变量优先级最高 +- 🔧 增强日志记录,添加详细的调试信息 +- 📝 完善文档,添加配置优先级和心跳机制说明 + +### v1.1.3 **新增功能:** - ✨ 添加日志文件输出功能,支持配置日志文件路径和级别 diff --git a/internal/config/config.go b/internal/config/config.go index 923bea4..2af8b3a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -72,6 +72,12 @@ func Load() (*Config, error) { } } + // 环境变量优先级最高,覆盖配置文件中的设置 + // 支持 BACKEND_URL 环境变量覆盖后端地址 + if backendURL := os.Getenv("BACKEND_URL"); backendURL != "" { + cfg.Backend.URL = backendURL + } + // 如果配置文件中没有设置日志文件,使用环境变量或默认值 if cfg.Log.File == "" { logFile := os.Getenv("LOG_FILE") diff --git a/internal/handler/continuous.go b/internal/handler/continuous.go index 0edce11..b56eb2e 100644 --- a/internal/handler/continuous.go +++ b/internal/handler/continuous.go @@ -8,6 +8,7 @@ import ( "io" "net" "net/http" + "os" "strings" "sync" "time" @@ -18,6 +19,7 @@ import ( "github.com/gin-gonic/gin" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) var continuousTasks = make(map[string]*ContinuousTask) @@ -46,7 +48,52 @@ const ( func InitContinuousHandler(cfg *config.Config) { backendURL = cfg.Backend.URL - logger, _ = zap.NewProduction() + + // 根据配置创建logger + var level zapcore.Level + logLevel := cfg.Log.Level + if logLevel == "" { + if cfg.Debug { + logLevel = "debug" + } else { + logLevel = "info" + } + } + + switch logLevel { + case "debug": + level = zapcore.DebugLevel + case "info": + level = zapcore.InfoLevel + case "warn": + level = zapcore.WarnLevel + case "error": + level = zapcore.ErrorLevel + default: + level = zapcore.InfoLevel + } + + // 创建编码器配置 + encoderConfig := zap.NewProductionEncoderConfig() + if cfg.Debug { + encoderConfig = zap.NewDevelopmentEncoderConfig() + } + encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder + + // 创建核心 - 输出到标准错误(日志文件由main.go统一管理,这里输出到stderr便于调试) + core := zapcore.NewCore( + zapcore.NewJSONEncoder(encoderConfig), + zapcore.AddSync(os.Stderr), + level, + ) + + // 创建logger + logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel)) + + logger.Info("持续测试处理器已初始化", + zap.String("backend_url", backendURL), + zap.String("log_level", logLevel)) } type ContinuousTask struct { @@ -160,7 +207,15 @@ func HandleContinuousStop(c *gin.Context) { if task.tcpingTask != nil { task.tcpingTask.Stop() } - close(task.StopCh) + + // 关闭停止通道 + select { + case <-task.StopCh: + // 已经关闭 + default: + close(task.StopCh) + } + delete(continuousTasks, req.TaskID) } taskMutex.Unlock() @@ -170,6 +225,17 @@ func HandleContinuousStop(c *gin.Context) { return } + // 清理推送缓冲 + bufferMutex.Lock() + if buffer, exists := pushBuffers[req.TaskID]; exists { + if buffer.pushTimer != nil { + buffer.pushTimer.Stop() + } + delete(pushBuffers, req.TaskID) + logger.Debug("已清理任务推送缓冲", zap.String("task_id", req.TaskID)) + } + bufferMutex.Unlock() + c.JSON(http.StatusOK, gin.H{"message": "任务已停止"}) } @@ -237,7 +303,8 @@ func pushResultToBackend(taskID string, result map[string]interface{}) { logger.Warn("节点ID未获取,跳过推送结果", zap.String("task_id", taskID), zap.String("node_ip", nodeIP), - zap.String("hint", "等待心跳返回node_id后再推送")) + zap.String("hint", "等待心跳返回node_id后再推送"), + zap.Any("result", result)) return } @@ -246,10 +313,18 @@ func pushResultToBackend(taskID string, result map[string]interface{}) { logger.Warn("节点IP未获取,跳过推送结果", zap.String("task_id", taskID), zap.Uint("node_id", nodeID), - zap.String("hint", "等待心跳返回node_ip后再推送")) + zap.String("hint", "等待心跳返回node_ip后再推送"), + zap.Any("result", result)) return } + // 记录调试信息 + logger.Debug("准备推送结果到后端", + zap.String("task_id", taskID), + zap.Uint("node_id", nodeID), + zap.String("node_ip", nodeIP), + zap.Any("result", result)) + // 添加到批量推送缓冲 addToPushBuffer(taskID, nodeID, nodeIP, result) } @@ -269,28 +344,43 @@ func addToPushBuffer(taskID string, nodeID uint, nodeIP string, result map[strin bufferMutex.Unlock() buffer.mu.Lock() - defer buffer.mu.Unlock() // 添加结果到缓冲 buffer.results = append(buffer.results, result) // 如果缓冲已满,立即推送 shouldFlush := len(buffer.results) >= batchPushMaxSize - buffer.mu.Unlock() if shouldFlush { - flushPushBuffer(taskID, nodeID, nodeIP) + // 复制结果列表 + results := make([]map[string]interface{}, len(buffer.results)) + copy(results, buffer.results) + buffer.results = buffer.results[:0] // 清空缓冲 + + // 停止定时器 + if buffer.pushTimer != nil { + buffer.pushTimer.Stop() + buffer.pushTimer = nil + } + + buffer.lastPush = time.Now() + buffer.mu.Unlock() + + // 批量推送结果 + for _, r := range results { + pushSingleResult(taskID, nodeID, nodeIP, r) + } return } - buffer.mu.Lock() - // 如果距离上次推送超过间隔时间,启动定时器推送 if buffer.pushTimer == nil { buffer.pushTimer = time.AfterFunc(batchPushInterval, func() { flushPushBuffer(taskID, nodeID, nodeIP) }) } + + buffer.mu.Unlock() } // flushPushBuffer 刷新并推送缓冲中的结果 @@ -362,13 +452,21 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri jsonData, err := json.Marshal(data) if err != nil { - logger.Error("序列化结果失败", zap.Error(err), zap.String("task_id", taskID)) + logger.Error("序列化结果失败", + zap.Error(err), + zap.String("task_id", taskID), + zap.Uint("node_id", nodeID), + zap.String("node_ip", nodeIP), + zap.Any("data", data)) return } req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { - logger.Error("创建请求失败", zap.Error(err), zap.String("task_id", taskID)) + logger.Error("创建请求失败", + zap.Error(err), + zap.String("task_id", taskID), + zap.String("url", url)) return } @@ -380,7 +478,9 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri logger.Warn("推送结果失败,继续运行", zap.Error(err), zap.String("task_id", taskID), - zap.String("url", url)) + zap.String("url", url), + zap.Uint("node_id", nodeID), + zap.String("node_ip", nodeIP)) // 推送失败不停止任务,继续运行 return } @@ -394,7 +494,9 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri if containsTaskNotFoundError(bodyStr) { logger.Warn("后端任务不存在,停止节点端任务", zap.String("task_id", taskID), - zap.String("response", bodyStr)) + zap.String("response", bodyStr), + zap.Uint("node_id", nodeID), + zap.String("node_ip", nodeIP)) // 停止对应的持续测试任务 stopTaskByTaskID(taskID) return @@ -404,12 +506,18 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri zap.Int("status", resp.StatusCode), zap.String("task_id", taskID), zap.String("url", url), - zap.String("response", bodyStr)) + zap.String("response", bodyStr), + zap.Uint("node_id", nodeID), + zap.String("node_ip", nodeIP)) // 其他错误不停止任务,继续运行 return } - logger.Debug("推送结果成功", zap.String("task_id", taskID)) + logger.Debug("推送结果成功", + zap.String("task_id", taskID), + zap.Uint("node_id", nodeID), + zap.String("node_ip", nodeIP), + zap.Any("result", result)) } // containsTaskNotFoundError 检查响应中是否包含任务不存在的错误 @@ -522,23 +630,20 @@ func StartTaskCleanup() { for range ticker.C { now := time.Now() taskMutex.Lock() + var tasksToDelete []string for taskID, task := range continuousTasks { + shouldDelete := false // 检查最大运行时长 if now.Sub(task.StartTime) > task.MaxDuration { logger.Info("任务达到最大运行时长,自动停止", zap.String("task_id", taskID)) - task.IsRunning = false - if task.pingTask != nil { - task.pingTask.Stop() - } - if task.tcpingTask != nil { - task.tcpingTask.Stop() - } - delete(continuousTasks, taskID) - continue - } - // 检查无客户端连接(30分钟无请求) - if now.Sub(task.LastRequest) > 30*time.Minute { + shouldDelete = true + } else if now.Sub(task.LastRequest) > 30*time.Minute { + // 检查无客户端连接(30分钟无请求) logger.Info("任务无客户端连接,自动停止", zap.String("task_id", taskID)) + shouldDelete = true + } + + if shouldDelete { task.IsRunning = false if task.pingTask != nil { task.pingTask.Stop() @@ -546,10 +651,41 @@ func StartTaskCleanup() { if task.tcpingTask != nil { task.tcpingTask.Stop() } - delete(continuousTasks, taskID) + + // 关闭停止通道 + select { + case <-task.StopCh: + // 已经关闭 + default: + close(task.StopCh) + } + + tasksToDelete = append(tasksToDelete, taskID) } } taskMutex.Unlock() + + // 清理任务和推送缓冲 + if len(tasksToDelete) > 0 { + taskMutex.Lock() + for _, taskID := range tasksToDelete { + delete(continuousTasks, taskID) + } + taskMutex.Unlock() + + // 清理推送缓冲 + bufferMutex.Lock() + for _, taskID := range tasksToDelete { + if buffer, exists := pushBuffers[taskID]; exists { + if buffer.pushTimer != nil { + buffer.pushTimer.Stop() + } + delete(pushBuffers, taskID) + logger.Debug("已清理任务推送缓冲", zap.String("task_id", taskID)) + } + } + bufferMutex.Unlock() + } } }() } diff --git a/node.pid b/node.pid deleted file mode 100644 index 163a49f..0000000 --- a/node.pid +++ /dev/null @@ -1 +0,0 @@ -48748 diff --git a/version.json b/version.json index ebeb4d5..971ff1b 100644 --- a/version.json +++ b/version.json @@ -1,4 +1,4 @@ { - "version": "1.1.3", - "tag": "v1.1.3" + "version": "1.1.4", + "tag": "v1.1.4" }