package handler import ( "bytes" "context" "encoding/json" "fmt" "io" "net" "net/http" "os" "strings" "sync" "time" "linkmaster-node/internal/config" "linkmaster-node/internal/continuous" "linkmaster-node/internal/heartbeat" "github.com/gin-gonic/gin" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) var continuousTasks = make(map[string]*ContinuousTask) var taskMutex sync.RWMutex var backendURL string var logger *zap.Logger // 批量推送缓冲(每个任务一个缓冲) var pushBuffers = make(map[string]*pushBuffer) var bufferMutex sync.RWMutex // pushBuffer 批量推送缓冲 type pushBuffer struct { taskID string results []map[string]interface{} mu sync.Mutex lastPush time.Time pushTimer *time.Timer } const ( // 批量推送配置 batchPushInterval = 1 * time.Second // 批量推送间隔:1秒 batchPushMaxSize = 10 // 批量推送最大数量:10个结果 ) func InitContinuousHandler(cfg *config.Config) { backendURL = cfg.Backend.URL // 根据配置创建logger var level zapcore.Level logLevel := cfg.Log.Level if logLevel == "" { if cfg.Debug { logLevel = "debug" } else { logLevel = "info" } } switch logLevel { case "debug": level = zapcore.DebugLevel case "info": level = zapcore.InfoLevel case "warn": level = zapcore.WarnLevel case "error": level = zapcore.ErrorLevel default: level = zapcore.InfoLevel } // 创建编码器配置 encoderConfig := zap.NewProductionEncoderConfig() if cfg.Debug { encoderConfig = zap.NewDevelopmentEncoderConfig() } encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder // 创建核心 - 输出到标准错误(日志文件由main.go统一管理,这里输出到stderr便于调试) core := zapcore.NewCore( zapcore.NewJSONEncoder(encoderConfig), zapcore.AddSync(os.Stderr), level, ) // 创建logger logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel)) logger.Info("持续测试处理器已初始化", zap.String("backend_url", backendURL), zap.String("log_level", logLevel)) } type ContinuousTask struct { TaskID string Type string Target string Interval time.Duration MaxDuration time.Duration StartTime time.Time LastRequest time.Time StopCh chan struct{} IsRunning bool pingTask *continuous.PingTask tcpingTask *continuous.TCPingTask } func HandleContinuousStart(c *gin.Context) { var req struct { Type string `json:"type" binding:"required"` Target string `json:"target" binding:"required"` Interval int `json:"interval"` // 秒 MaxDuration int `json:"max_duration"` // 分钟 } if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } // 生成任务ID taskID := generateTaskID() // 设置默认值 interval := 10 * time.Second if req.Interval > 0 { interval = time.Duration(req.Interval) * time.Second } maxDuration := 60 * time.Minute if req.MaxDuration > 0 { maxDuration = time.Duration(req.MaxDuration) * time.Minute } // 创建任务 task := &ContinuousTask{ TaskID: taskID, Type: req.Type, Target: req.Target, Interval: interval, MaxDuration: maxDuration, StartTime: time.Now(), LastRequest: time.Now(), StopCh: make(chan struct{}), IsRunning: true, } // 根据类型创建对应的任务 if req.Type == "ping" { pingTask := continuous.NewPingTask(taskID, req.Target, interval, maxDuration) task.pingTask = pingTask } else if req.Type == "tcping" { tcpingTask, err := continuous.NewTCPingTask(taskID, req.Target, interval, maxDuration) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } task.tcpingTask = tcpingTask } else { c.JSON(http.StatusBadRequest, gin.H{"error": "不支持的持续测试类型"}) return } taskMutex.Lock() continuousTasks[taskID] = task taskMutex.Unlock() // 启动持续测试goroutine ctx := context.Background() if task.pingTask != nil { go task.pingTask.Start(ctx, func(result map[string]interface{}) { pushResultToBackend(taskID, result) }) } else if task.tcpingTask != nil { go task.tcpingTask.Start(ctx, func(result map[string]interface{}) { pushResultToBackend(taskID, result) }) } c.JSON(http.StatusOK, gin.H{ "task_id": taskID, }) } func HandleContinuousStop(c *gin.Context) { var req struct { TaskID string `json:"task_id" binding:"required"` } if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } taskMutex.Lock() task, exists := continuousTasks[req.TaskID] if exists { task.IsRunning = false if task.pingTask != nil { task.pingTask.Stop() } if task.tcpingTask != nil { task.tcpingTask.Stop() } // 关闭停止通道 select { case <-task.StopCh: // 已经关闭 default: close(task.StopCh) } delete(continuousTasks, req.TaskID) } taskMutex.Unlock() if !exists { c.JSON(http.StatusNotFound, gin.H{"error": "任务不存在"}) return } // 清理推送缓冲 bufferMutex.Lock() if buffer, exists := pushBuffers[req.TaskID]; exists { if buffer.pushTimer != nil { buffer.pushTimer.Stop() } delete(pushBuffers, req.TaskID) logger.Debug("已清理任务推送缓冲", zap.String("task_id", req.TaskID)) } bufferMutex.Unlock() c.JSON(http.StatusOK, gin.H{"message": "任务已停止"}) } func HandleContinuousStatus(c *gin.Context) { taskID := c.Query("task_id") if taskID == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "task_id参数缺失"}) return } taskMutex.RLock() task, exists := continuousTasks[taskID] if exists { // 更新LastRequest时间 task.LastRequest = time.Now() if task.pingTask != nil { task.pingTask.UpdateLastRequest() } if task.tcpingTask != nil { task.tcpingTask.UpdateLastRequest() } } taskMutex.RUnlock() if !exists { c.JSON(http.StatusNotFound, gin.H{"error": "任务不存在"}) return } c.JSON(http.StatusOK, gin.H{ "task_id": task.TaskID, "is_running": task.IsRunning, "start_time": task.StartTime, "last_request": task.LastRequest, }) } func pushResultToBackend(taskID string, result map[string]interface{}) { // 确保result包含必要的字段 if result["timestamp"] == nil { result["timestamp"] = time.Now().Unix() } if result["latency"] == nil { result["latency"] = 0.0 } if result["success"] == nil { result["success"] = true } if result["packet_loss"] == nil { result["packet_loss"] = false } // 优先使用心跳返回的节点信息 nodeID := heartbeat.GetNodeID() nodeIP := heartbeat.GetNodeIP() // 如果心跳还没有返回节点信息,使用本地IP作为后备 if nodeIP == "" { nodeIP = getLocalIP() logger.Debug("使用本地IP作为后备", zap.String("node_ip", nodeIP)) } // 确保已经获取到 node_id,避免发送无效数据包 if nodeID == 0 { logger.Warn("节点ID未获取,跳过推送结果", zap.String("task_id", taskID), zap.String("node_ip", nodeIP), zap.String("hint", "等待心跳返回node_id后再推送"), zap.Any("result", result)) return } // 确保已经获取到 node_ip if nodeIP == "" { logger.Warn("节点IP未获取,跳过推送结果", zap.String("task_id", taskID), zap.Uint("node_id", nodeID), zap.String("hint", "等待心跳返回node_ip后再推送"), zap.Any("result", result)) return } // 记录调试信息 logger.Debug("准备推送结果到后端", zap.String("task_id", taskID), zap.Uint("node_id", nodeID), zap.String("node_ip", nodeIP), zap.Any("result", result)) // 添加到批量推送缓冲 addToPushBuffer(taskID, nodeID, nodeIP, result) } // addToPushBuffer 添加结果到批量推送缓冲 func addToPushBuffer(taskID string, nodeID uint, nodeIP string, result map[string]interface{}) { bufferMutex.Lock() buffer, exists := pushBuffers[taskID] if !exists { buffer = &pushBuffer{ taskID: taskID, results: make([]map[string]interface{}, 0, batchPushMaxSize), lastPush: time.Now(), } pushBuffers[taskID] = buffer } bufferMutex.Unlock() buffer.mu.Lock() // 添加结果到缓冲 buffer.results = append(buffer.results, result) // 如果缓冲已满,立即推送 shouldFlush := len(buffer.results) >= batchPushMaxSize if shouldFlush { // 复制结果列表 results := make([]map[string]interface{}, len(buffer.results)) copy(results, buffer.results) buffer.results = buffer.results[:0] // 清空缓冲 // 停止定时器 if buffer.pushTimer != nil { buffer.pushTimer.Stop() buffer.pushTimer = nil } buffer.lastPush = time.Now() buffer.mu.Unlock() // 批量推送结果 for _, r := range results { pushSingleResult(taskID, nodeID, nodeIP, r) } return } // 如果距离上次推送超过间隔时间,启动定时器推送 if buffer.pushTimer == nil { buffer.pushTimer = time.AfterFunc(batchPushInterval, func() { flushPushBuffer(taskID, nodeID, nodeIP) }) } buffer.mu.Unlock() } // flushPushBuffer 刷新并推送缓冲中的结果 func flushPushBuffer(taskID string, nodeID uint, nodeIP string) { bufferMutex.RLock() buffer, exists := pushBuffers[taskID] bufferMutex.RUnlock() if !exists { return } buffer.mu.Lock() if len(buffer.results) == 0 { buffer.mu.Unlock() return } // 复制结果列表 results := make([]map[string]interface{}, len(buffer.results)) copy(results, buffer.results) buffer.results = buffer.results[:0] // 清空缓冲 // 停止定时器 if buffer.pushTimer != nil { buffer.pushTimer.Stop() buffer.pushTimer = nil } buffer.lastPush = time.Now() buffer.mu.Unlock() // 批量推送结果(目前后端只支持单个结果,所以逐个推送) // 但可以减少HTTP请求的频率 for _, result := range results { pushSingleResult(taskID, nodeID, nodeIP, result) } } // pushSingleResult 推送单个结果到后端 func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[string]interface{}) { // 推送结果到后端 url := fmt.Sprintf("%s/api/public/node/continuous/result", backendURL) // 获取节点位置信息 country, province, city, isp := heartbeat.GetNodeLocation() // 发送 node_id、node_ip 和位置信息,后端可以通过这些信息精准匹配 data := map[string]interface{}{ "task_id": taskID, "node_id": nodeID, "node_ip": nodeIP, "result": result, } // 添加位置信息(如果存在) if country != "" { data["country"] = country } if province != "" { data["province"] = province } if city != "" { data["city"] = city } if isp != "" { data["isp"] = isp } jsonData, err := json.Marshal(data) if err != nil { logger.Error("序列化结果失败", zap.Error(err), zap.String("task_id", taskID), zap.Uint("node_id", nodeID), zap.String("node_ip", nodeIP), zap.Any("data", data)) return } req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { logger.Error("创建请求失败", zap.Error(err), zap.String("task_id", taskID), zap.String("url", url)) return } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(req) if err != nil { logger.Warn("推送结果失败,继续运行", zap.Error(err), zap.String("task_id", taskID), zap.String("url", url), zap.Uint("node_id", nodeID), zap.String("node_ip", nodeIP)) // 推送失败不停止任务,继续运行 return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) bodyStr := string(body) // 检查是否是任务不存在的错误 if containsTaskNotFoundError(bodyStr) { logger.Warn("后端任务不存在,停止节点端任务", zap.String("task_id", taskID), zap.String("response", bodyStr), zap.Uint("node_id", nodeID), zap.String("node_ip", nodeIP)) // 停止对应的持续测试任务 stopTaskByTaskID(taskID) return } logger.Warn("推送结果失败,继续运行", zap.Int("status", resp.StatusCode), zap.String("task_id", taskID), zap.String("url", url), zap.String("response", bodyStr), zap.Uint("node_id", nodeID), zap.String("node_ip", nodeIP)) // 其他错误不停止任务,继续运行 return } logger.Debug("推送结果成功", zap.String("task_id", taskID), zap.Uint("node_id", nodeID), zap.String("node_ip", nodeIP), zap.Any("result", result)) } // containsTaskNotFoundError 检查响应中是否包含任务不存在的错误 func containsTaskNotFoundError(responseBody string) bool { // 检查常见的任务不存在错误消息 errorKeywords := []string{ "找不到对应的后端任务", "任务不存在", "task not found", "找不到对应的任务", } responseLower := strings.ToLower(responseBody) for _, keyword := range errorKeywords { if strings.Contains(responseLower, strings.ToLower(keyword)) { return true } } // 尝试解析 JSON 响应,检查错误消息 var resp struct { Code int `json:"code"` Msg string `json:"msg"` } if err := json.Unmarshal([]byte(responseBody), &resp); err == nil { msgLower := strings.ToLower(resp.Msg) for _, keyword := range errorKeywords { if strings.Contains(msgLower, strings.ToLower(keyword)) { return true } } } return false } // stopTaskByTaskID 根据 taskID 停止对应的持续测试任务 func stopTaskByTaskID(taskID string) { taskMutex.Lock() defer taskMutex.Unlock() task, exists := continuousTasks[taskID] if !exists { logger.Debug("任务不存在,无需停止", zap.String("task_id", taskID)) return } logger.Info("停止持续测试任务", zap.String("task_id", taskID)) // 停止任务 task.IsRunning = false if task.pingTask != nil { task.pingTask.Stop() } if task.tcpingTask != nil { task.tcpingTask.Stop() } // 关闭停止通道 select { case <-task.StopCh: // 已经关闭 default: close(task.StopCh) } // 删除任务 delete(continuousTasks, taskID) // 清理推送缓冲 bufferMutex.Lock() if buffer, exists := pushBuffers[taskID]; exists { if buffer.pushTimer != nil { buffer.pushTimer.Stop() } delete(pushBuffers, taskID) } bufferMutex.Unlock() logger.Info("持续测试任务已停止", zap.String("task_id", taskID)) } func getLocalIP() string { // 简化实现:返回第一个非回环IP // 实际应该获取外网IP addrs, err := net.InterfaceAddrs() if err != nil { return "127.0.0.1" } for _, addr := range addrs { if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { if ipNet.IP.To4() != nil { return ipNet.IP.String() } } } return "127.0.0.1" } func generateTaskID() string { return fmt.Sprintf("task_%d", time.Now().UnixNano()) } // 定期清理超时任务 func StartTaskCleanup() { ticker := time.NewTicker(1 * time.Minute) go func() { for range ticker.C { now := time.Now() taskMutex.Lock() var tasksToDelete []string for taskID, task := range continuousTasks { shouldDelete := false // 检查最大运行时长 if now.Sub(task.StartTime) > task.MaxDuration { logger.Info("任务达到最大运行时长,自动停止", zap.String("task_id", taskID)) shouldDelete = true } else if now.Sub(task.LastRequest) > 30*time.Minute { // 检查无客户端连接(30分钟无请求) logger.Info("任务无客户端连接,自动停止", zap.String("task_id", taskID)) shouldDelete = true } if shouldDelete { task.IsRunning = false if task.pingTask != nil { task.pingTask.Stop() } if task.tcpingTask != nil { task.tcpingTask.Stop() } // 关闭停止通道 select { case <-task.StopCh: // 已经关闭 default: close(task.StopCh) } tasksToDelete = append(tasksToDelete, taskID) } } taskMutex.Unlock() // 清理任务和推送缓冲 if len(tasksToDelete) > 0 { taskMutex.Lock() for _, taskID := range tasksToDelete { delete(continuousTasks, taskID) } taskMutex.Unlock() // 清理推送缓冲 bufferMutex.Lock() for _, taskID := range tasksToDelete { if buffer, exists := pushBuffers[taskID]; exists { if buffer.pushTimer != nil { buffer.pushTimer.Stop() } delete(pushBuffers, taskID) logger.Debug("已清理任务推送缓冲", zap.String("task_id", taskID)) } } bufferMutex.Unlock() } } }() }