feat: 更新文档和配置逻辑,增强心跳机制和持续测试功能
- 在 INSTALL.md 和 README.md 中添加配置优先级说明,确保环境变量优先级最高。 - 增强心跳机制,新增字段以传递节点信息。 - 持续测试功能优化,支持批量推送和自动清理。 - 更新版本号至 v1.1.4,完善文档以反映新功能和改进。
This commit is contained in:
@@ -72,6 +72,12 @@ func Load() (*Config, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// 环境变量优先级最高,覆盖配置文件中的设置
|
||||
// 支持 BACKEND_URL 环境变量覆盖后端地址
|
||||
if backendURL := os.Getenv("BACKEND_URL"); backendURL != "" {
|
||||
cfg.Backend.URL = backendURL
|
||||
}
|
||||
|
||||
// 如果配置文件中没有设置日志文件,使用环境变量或默认值
|
||||
if cfg.Log.File == "" {
|
||||
logFile := os.Getenv("LOG_FILE")
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -18,6 +19,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
var continuousTasks = make(map[string]*ContinuousTask)
|
||||
@@ -46,7 +48,52 @@ const (
|
||||
|
||||
func InitContinuousHandler(cfg *config.Config) {
|
||||
backendURL = cfg.Backend.URL
|
||||
logger, _ = zap.NewProduction()
|
||||
|
||||
// 根据配置创建logger
|
||||
var level zapcore.Level
|
||||
logLevel := cfg.Log.Level
|
||||
if logLevel == "" {
|
||||
if cfg.Debug {
|
||||
logLevel = "debug"
|
||||
} else {
|
||||
logLevel = "info"
|
||||
}
|
||||
}
|
||||
|
||||
switch logLevel {
|
||||
case "debug":
|
||||
level = zapcore.DebugLevel
|
||||
case "info":
|
||||
level = zapcore.InfoLevel
|
||||
case "warn":
|
||||
level = zapcore.WarnLevel
|
||||
case "error":
|
||||
level = zapcore.ErrorLevel
|
||||
default:
|
||||
level = zapcore.InfoLevel
|
||||
}
|
||||
|
||||
// 创建编码器配置
|
||||
encoderConfig := zap.NewProductionEncoderConfig()
|
||||
if cfg.Debug {
|
||||
encoderConfig = zap.NewDevelopmentEncoderConfig()
|
||||
}
|
||||
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
||||
|
||||
// 创建核心 - 输出到标准错误(日志文件由main.go统一管理,这里输出到stderr便于调试)
|
||||
core := zapcore.NewCore(
|
||||
zapcore.NewJSONEncoder(encoderConfig),
|
||||
zapcore.AddSync(os.Stderr),
|
||||
level,
|
||||
)
|
||||
|
||||
// 创建logger
|
||||
logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))
|
||||
|
||||
logger.Info("持续测试处理器已初始化",
|
||||
zap.String("backend_url", backendURL),
|
||||
zap.String("log_level", logLevel))
|
||||
}
|
||||
|
||||
type ContinuousTask struct {
|
||||
@@ -160,7 +207,15 @@ func HandleContinuousStop(c *gin.Context) {
|
||||
if task.tcpingTask != nil {
|
||||
task.tcpingTask.Stop()
|
||||
}
|
||||
close(task.StopCh)
|
||||
|
||||
// 关闭停止通道
|
||||
select {
|
||||
case <-task.StopCh:
|
||||
// 已经关闭
|
||||
default:
|
||||
close(task.StopCh)
|
||||
}
|
||||
|
||||
delete(continuousTasks, req.TaskID)
|
||||
}
|
||||
taskMutex.Unlock()
|
||||
@@ -170,6 +225,17 @@ func HandleContinuousStop(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 清理推送缓冲
|
||||
bufferMutex.Lock()
|
||||
if buffer, exists := pushBuffers[req.TaskID]; exists {
|
||||
if buffer.pushTimer != nil {
|
||||
buffer.pushTimer.Stop()
|
||||
}
|
||||
delete(pushBuffers, req.TaskID)
|
||||
logger.Debug("已清理任务推送缓冲", zap.String("task_id", req.TaskID))
|
||||
}
|
||||
bufferMutex.Unlock()
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "任务已停止"})
|
||||
}
|
||||
|
||||
@@ -237,7 +303,8 @@ func pushResultToBackend(taskID string, result map[string]interface{}) {
|
||||
logger.Warn("节点ID未获取,跳过推送结果",
|
||||
zap.String("task_id", taskID),
|
||||
zap.String("node_ip", nodeIP),
|
||||
zap.String("hint", "等待心跳返回node_id后再推送"))
|
||||
zap.String("hint", "等待心跳返回node_id后再推送"),
|
||||
zap.Any("result", result))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -246,10 +313,18 @@ func pushResultToBackend(taskID string, result map[string]interface{}) {
|
||||
logger.Warn("节点IP未获取,跳过推送结果",
|
||||
zap.String("task_id", taskID),
|
||||
zap.Uint("node_id", nodeID),
|
||||
zap.String("hint", "等待心跳返回node_ip后再推送"))
|
||||
zap.String("hint", "等待心跳返回node_ip后再推送"),
|
||||
zap.Any("result", result))
|
||||
return
|
||||
}
|
||||
|
||||
// 记录调试信息
|
||||
logger.Debug("准备推送结果到后端",
|
||||
zap.String("task_id", taskID),
|
||||
zap.Uint("node_id", nodeID),
|
||||
zap.String("node_ip", nodeIP),
|
||||
zap.Any("result", result))
|
||||
|
||||
// 添加到批量推送缓冲
|
||||
addToPushBuffer(taskID, nodeID, nodeIP, result)
|
||||
}
|
||||
@@ -269,28 +344,43 @@ func addToPushBuffer(taskID string, nodeID uint, nodeIP string, result map[strin
|
||||
bufferMutex.Unlock()
|
||||
|
||||
buffer.mu.Lock()
|
||||
defer buffer.mu.Unlock()
|
||||
|
||||
// 添加结果到缓冲
|
||||
buffer.results = append(buffer.results, result)
|
||||
|
||||
// 如果缓冲已满,立即推送
|
||||
shouldFlush := len(buffer.results) >= batchPushMaxSize
|
||||
buffer.mu.Unlock()
|
||||
|
||||
if shouldFlush {
|
||||
flushPushBuffer(taskID, nodeID, nodeIP)
|
||||
// 复制结果列表
|
||||
results := make([]map[string]interface{}, len(buffer.results))
|
||||
copy(results, buffer.results)
|
||||
buffer.results = buffer.results[:0] // 清空缓冲
|
||||
|
||||
// 停止定时器
|
||||
if buffer.pushTimer != nil {
|
||||
buffer.pushTimer.Stop()
|
||||
buffer.pushTimer = nil
|
||||
}
|
||||
|
||||
buffer.lastPush = time.Now()
|
||||
buffer.mu.Unlock()
|
||||
|
||||
// 批量推送结果
|
||||
for _, r := range results {
|
||||
pushSingleResult(taskID, nodeID, nodeIP, r)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
buffer.mu.Lock()
|
||||
|
||||
// 如果距离上次推送超过间隔时间,启动定时器推送
|
||||
if buffer.pushTimer == nil {
|
||||
buffer.pushTimer = time.AfterFunc(batchPushInterval, func() {
|
||||
flushPushBuffer(taskID, nodeID, nodeIP)
|
||||
})
|
||||
}
|
||||
|
||||
buffer.mu.Unlock()
|
||||
}
|
||||
|
||||
// flushPushBuffer 刷新并推送缓冲中的结果
|
||||
@@ -362,13 +452,21 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri
|
||||
|
||||
jsonData, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
logger.Error("序列化结果失败", zap.Error(err), zap.String("task_id", taskID))
|
||||
logger.Error("序列化结果失败",
|
||||
zap.Error(err),
|
||||
zap.String("task_id", taskID),
|
||||
zap.Uint("node_id", nodeID),
|
||||
zap.String("node_ip", nodeIP),
|
||||
zap.Any("data", data))
|
||||
return
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
logger.Error("创建请求失败", zap.Error(err), zap.String("task_id", taskID))
|
||||
logger.Error("创建请求失败",
|
||||
zap.Error(err),
|
||||
zap.String("task_id", taskID),
|
||||
zap.String("url", url))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -380,7 +478,9 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri
|
||||
logger.Warn("推送结果失败,继续运行",
|
||||
zap.Error(err),
|
||||
zap.String("task_id", taskID),
|
||||
zap.String("url", url))
|
||||
zap.String("url", url),
|
||||
zap.Uint("node_id", nodeID),
|
||||
zap.String("node_ip", nodeIP))
|
||||
// 推送失败不停止任务,继续运行
|
||||
return
|
||||
}
|
||||
@@ -394,7 +494,9 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri
|
||||
if containsTaskNotFoundError(bodyStr) {
|
||||
logger.Warn("后端任务不存在,停止节点端任务",
|
||||
zap.String("task_id", taskID),
|
||||
zap.String("response", bodyStr))
|
||||
zap.String("response", bodyStr),
|
||||
zap.Uint("node_id", nodeID),
|
||||
zap.String("node_ip", nodeIP))
|
||||
// 停止对应的持续测试任务
|
||||
stopTaskByTaskID(taskID)
|
||||
return
|
||||
@@ -404,12 +506,18 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri
|
||||
zap.Int("status", resp.StatusCode),
|
||||
zap.String("task_id", taskID),
|
||||
zap.String("url", url),
|
||||
zap.String("response", bodyStr))
|
||||
zap.String("response", bodyStr),
|
||||
zap.Uint("node_id", nodeID),
|
||||
zap.String("node_ip", nodeIP))
|
||||
// 其他错误不停止任务,继续运行
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debug("推送结果成功", zap.String("task_id", taskID))
|
||||
logger.Debug("推送结果成功",
|
||||
zap.String("task_id", taskID),
|
||||
zap.Uint("node_id", nodeID),
|
||||
zap.String("node_ip", nodeIP),
|
||||
zap.Any("result", result))
|
||||
}
|
||||
|
||||
// containsTaskNotFoundError 检查响应中是否包含任务不存在的错误
|
||||
@@ -522,23 +630,20 @@ func StartTaskCleanup() {
|
||||
for range ticker.C {
|
||||
now := time.Now()
|
||||
taskMutex.Lock()
|
||||
var tasksToDelete []string
|
||||
for taskID, task := range continuousTasks {
|
||||
shouldDelete := false
|
||||
// 检查最大运行时长
|
||||
if now.Sub(task.StartTime) > task.MaxDuration {
|
||||
logger.Info("任务达到最大运行时长,自动停止", zap.String("task_id", taskID))
|
||||
task.IsRunning = false
|
||||
if task.pingTask != nil {
|
||||
task.pingTask.Stop()
|
||||
}
|
||||
if task.tcpingTask != nil {
|
||||
task.tcpingTask.Stop()
|
||||
}
|
||||
delete(continuousTasks, taskID)
|
||||
continue
|
||||
}
|
||||
// 检查无客户端连接(30分钟无请求)
|
||||
if now.Sub(task.LastRequest) > 30*time.Minute {
|
||||
shouldDelete = true
|
||||
} else if now.Sub(task.LastRequest) > 30*time.Minute {
|
||||
// 检查无客户端连接(30分钟无请求)
|
||||
logger.Info("任务无客户端连接,自动停止", zap.String("task_id", taskID))
|
||||
shouldDelete = true
|
||||
}
|
||||
|
||||
if shouldDelete {
|
||||
task.IsRunning = false
|
||||
if task.pingTask != nil {
|
||||
task.pingTask.Stop()
|
||||
@@ -546,10 +651,41 @@ func StartTaskCleanup() {
|
||||
if task.tcpingTask != nil {
|
||||
task.tcpingTask.Stop()
|
||||
}
|
||||
delete(continuousTasks, taskID)
|
||||
|
||||
// 关闭停止通道
|
||||
select {
|
||||
case <-task.StopCh:
|
||||
// 已经关闭
|
||||
default:
|
||||
close(task.StopCh)
|
||||
}
|
||||
|
||||
tasksToDelete = append(tasksToDelete, taskID)
|
||||
}
|
||||
}
|
||||
taskMutex.Unlock()
|
||||
|
||||
// 清理任务和推送缓冲
|
||||
if len(tasksToDelete) > 0 {
|
||||
taskMutex.Lock()
|
||||
for _, taskID := range tasksToDelete {
|
||||
delete(continuousTasks, taskID)
|
||||
}
|
||||
taskMutex.Unlock()
|
||||
|
||||
// 清理推送缓冲
|
||||
bufferMutex.Lock()
|
||||
for _, taskID := range tasksToDelete {
|
||||
if buffer, exists := pushBuffers[taskID]; exists {
|
||||
if buffer.pushTimer != nil {
|
||||
buffer.pushTimer.Stop()
|
||||
}
|
||||
delete(pushBuffers, taskID)
|
||||
logger.Debug("已清理任务推送缓冲", zap.String("task_id", taskID))
|
||||
}
|
||||
}
|
||||
bufferMutex.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user