From 19f224c7fa99bef2bfcfaa11fd33be1f21bb158f Mon Sep 17 00:00:00 2001 From: yoyo Date: Sun, 23 Nov 2025 03:43:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ip=3D=E2=80=9C=E2=80=9D=20nod?= =?UTF-8?q?eid=3D0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/agent/main.go | 13 ++ install.sh | 79 +++++++++++ internal/config/config.go | 45 ++++++ internal/continuous/ping.go | 66 ++++++++- internal/continuous/tcping.go | 34 ++++- internal/handler/continuous.go | 249 +++++++++++++++++++++++++++++++-- internal/heartbeat/reporter.go | 163 +++++++++++++++++++-- 7 files changed, 614 insertions(+), 35 deletions(-) diff --git a/cmd/agent/main.go b/cmd/agent/main.go index ce5ecaf..61b2a4e 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -37,6 +37,19 @@ func main() { // 初始化错误恢复 recovery.Init() + // 如果配置中没有节点信息,先发送一次心跳获取节点信息 + if cfg.Node.ID == 0 || cfg.Node.IP == "" { + logger.Info("节点信息未配置,发送心跳获取节点信息") + if err := heartbeat.RegisterNode(cfg); err != nil { + logger.Warn("注册节点失败,将在心跳时重试", zap.Error(err)) + } else { + logger.Info("节点信息已获取并保存", + zap.Uint("node_id", cfg.Node.ID), + zap.String("node_ip", cfg.Node.IP), + zap.String("location", fmt.Sprintf("%s/%s/%s", cfg.Node.Country, cfg.Node.Province, cfg.Node.City))) + } + } + // 启动心跳上报 heartbeatReporter := heartbeat.NewReporter(cfg) go heartbeatReporter.Start(context.Background()) diff --git a/install.sh b/install.sh index f7038f4..719f7fe 100755 --- a/install.sh +++ b/install.sh @@ -387,6 +387,84 @@ configure_firewall() { fi } +# 登记节点(调用心跳API获取节点信息) +register_node() { + echo -e "${BLUE}登记节点到后端服务器...${NC}" + + # 创建临时配置文件 + CONFIG_FILE="$SOURCE_DIR/config.yaml" + sudo mkdir -p "$SOURCE_DIR" + + # 创建基础配置文件 + sudo tee "$CONFIG_FILE" > /dev/null <&1) + + if [ $? -eq 0 ]; then + # 尝试解析JSON响应 + NODE_ID=$(echo "$RESPONSE" | grep -o '"node_id":[0-9]*' | grep -o '[0-9]*' | head -1) + NODE_IP=$(echo "$RESPONSE" | grep -o '"node_ip":"[^"]*"' | cut -d'"' -f4 | head -1) + COUNTRY=$(echo "$RESPONSE" | grep -o '"country":"[^"]*"' | cut -d'"' -f4 | head -1) + PROVINCE=$(echo "$RESPONSE" | grep -o '"province":"[^"]*"' | cut -d'"' -f4 | head -1) + CITY=$(echo "$RESPONSE" | grep -o '"city":"[^"]*"' | cut -d'"' -f4 | head -1) + ISP=$(echo "$RESPONSE" | grep -o '"isp":"[^"]*"' | cut -d'"' -f4 | head -1) + + if [ -n "$NODE_ID" ] && [ "$NODE_ID" != "0" ] && [ -n "$NODE_IP" ]; then + # 更新配置文件 + sudo tee "$CONFIG_FILE" > /dev/null <= batchPushMaxSize + buffer.mu.Unlock() + + if shouldFlush { + flushPushBuffer(taskID, nodeID, nodeIP) + return + } + + buffer.mu.Lock() + + // 如果距离上次推送超过间隔时间,启动定时器推送 + if buffer.pushTimer == nil { + buffer.pushTimer = time.AfterFunc(batchPushInterval, func() { + flushPushBuffer(taskID, nodeID, nodeIP) + }) + } +} + +// flushPushBuffer 刷新并推送缓冲中的结果 +func flushPushBuffer(taskID string, nodeID uint, nodeIP string) { + bufferMutex.RLock() + buffer, exists := pushBuffers[taskID] + bufferMutex.RUnlock() + + if !exists { + return + } + + buffer.mu.Lock() + if len(buffer.results) == 0 { + buffer.mu.Unlock() + return + } + + // 复制结果列表 + results := make([]map[string]interface{}, len(buffer.results)) + copy(results, buffer.results) + buffer.results = buffer.results[:0] // 清空缓冲 + + // 停止定时器 + if buffer.pushTimer != nil { + buffer.pushTimer.Stop() + buffer.pushTimer = nil + } + + buffer.lastPush = time.Now() + buffer.mu.Unlock() + + // 批量推送结果(目前后端只支持单个结果,所以逐个推送) + // 但可以减少HTTP请求的频率 + for _, result := range results { + pushSingleResult(taskID, nodeID, nodeIP, result) + } +} + +// pushSingleResult 推送单个结果到后端 +func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[string]interface{}) { + // 推送结果到后端 + url := fmt.Sprintf("%s/api/public/node/continuous/result", backendURL) + + // 获取节点位置信息 + country, province, city, isp := heartbeat.GetNodeLocation() + + // 发送 node_id、node_ip 和位置信息,后端可以通过这些信息精准匹配 data := map[string]interface{}{ "task_id": taskID, + "node_id": nodeID, + "node_ip": nodeIP, "result": result, } - // 如果 node_id 存在,优先发送 node_id - if nodeID > 0 { - data["node_id"] = nodeID - logger.Debug("推送结果时使用存储的node_id", zap.Uint("node_id", nodeID)) + // 添加位置信息(如果存在) + if country != "" { + data["country"] = country } - - // 如果 node_ip 存在,发送 node_ip - if nodeIP != "" { - data["node_ip"] = nodeIP - logger.Debug("推送结果时使用存储的node_ip", zap.String("node_ip", nodeIP)) + if province != "" { + data["province"] = province + } + if city != "" { + data["city"] = city + } + if isp != "" { + data["isp"] = isp } jsonData, err := json.Marshal(data) @@ -261,18 +388,110 @@ func pushResultToBackend(taskID string, result map[string]interface{}) { if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + + // 检查是否是任务不存在的错误 + if containsTaskNotFoundError(bodyStr) { + logger.Warn("后端任务不存在,停止节点端任务", + zap.String("task_id", taskID), + zap.String("response", bodyStr)) + // 停止对应的持续测试任务 + stopTaskByTaskID(taskID) + return + } + logger.Warn("推送结果失败,继续运行", zap.Int("status", resp.StatusCode), zap.String("task_id", taskID), zap.String("url", url), - zap.String("response", string(body))) - // 推送失败不停止任务,继续运行 + zap.String("response", bodyStr)) + // 其他错误不停止任务,继续运行 return } logger.Debug("推送结果成功", zap.String("task_id", taskID)) } +// containsTaskNotFoundError 检查响应中是否包含任务不存在的错误 +func containsTaskNotFoundError(responseBody string) bool { + // 检查常见的任务不存在错误消息 + errorKeywords := []string{ + "找不到对应的后端任务", + "任务不存在", + "task not found", + "找不到对应的任务", + } + + responseLower := strings.ToLower(responseBody) + for _, keyword := range errorKeywords { + if strings.Contains(responseLower, strings.ToLower(keyword)) { + return true + } + } + + // 尝试解析 JSON 响应,检查错误消息 + var resp struct { + Code int `json:"code"` + Msg string `json:"msg"` + } + if err := json.Unmarshal([]byte(responseBody), &resp); err == nil { + msgLower := strings.ToLower(resp.Msg) + for _, keyword := range errorKeywords { + if strings.Contains(msgLower, strings.ToLower(keyword)) { + return true + } + } + } + + return false +} + +// stopTaskByTaskID 根据 taskID 停止对应的持续测试任务 +func stopTaskByTaskID(taskID string) { + taskMutex.Lock() + defer taskMutex.Unlock() + + task, exists := continuousTasks[taskID] + if !exists { + logger.Debug("任务不存在,无需停止", zap.String("task_id", taskID)) + return + } + + logger.Info("停止持续测试任务", zap.String("task_id", taskID)) + + // 停止任务 + task.IsRunning = false + if task.pingTask != nil { + task.pingTask.Stop() + } + if task.tcpingTask != nil { + task.tcpingTask.Stop() + } + + // 关闭停止通道 + select { + case <-task.StopCh: + // 已经关闭 + default: + close(task.StopCh) + } + + // 删除任务 + delete(continuousTasks, taskID) + + // 清理推送缓冲 + bufferMutex.Lock() + if buffer, exists := pushBuffers[taskID]; exists { + if buffer.pushTimer != nil { + buffer.pushTimer.Stop() + } + delete(pushBuffers, taskID) + } + bufferMutex.Unlock() + + logger.Info("持续测试任务已停止", zap.String("task_id", taskID)) +} + func getLocalIP() string { // 简化实现:返回第一个非回环IP // 实际应该获取外网IP diff --git a/internal/heartbeat/reporter.go b/internal/heartbeat/reporter.go index fde1287..553b75f 100644 --- a/internal/heartbeat/reporter.go +++ b/internal/heartbeat/reporter.go @@ -15,11 +15,32 @@ import ( "go.uber.org/zap" ) -// 节点信息存储(通过心跳更新) +// 节点信息存储(通过心跳更新,优先从配置文件读取) var nodeInfo struct { sync.RWMutex - nodeID uint - nodeIP string + nodeID uint + nodeIP string + country string + province string + city string + isp string + cfg *config.Config + initialized bool +} + +// InitNodeInfo 初始化节点信息(从配置文件读取) +func InitNodeInfo(cfg *config.Config) { + nodeInfo.Lock() + defer nodeInfo.Unlock() + + nodeInfo.cfg = cfg + nodeInfo.nodeID = cfg.Node.ID + nodeInfo.nodeIP = cfg.Node.IP + nodeInfo.country = cfg.Node.Country + nodeInfo.province = cfg.Node.Province + nodeInfo.city = cfg.Node.City + nodeInfo.isp = cfg.Node.ISP + nodeInfo.initialized = true } // GetNodeID 获取节点ID @@ -36,6 +57,13 @@ func GetNodeIP() string { return nodeInfo.nodeIP } +// GetNodeLocation 获取节点位置信息 +func GetNodeLocation() (country, province, city, isp string) { + nodeInfo.RLock() + defer nodeInfo.RUnlock() + return nodeInfo.country, nodeInfo.province, nodeInfo.city, nodeInfo.isp +} + type Reporter struct { cfg *config.Config client *http.Client @@ -45,6 +73,10 @@ type Reporter struct { func NewReporter(cfg *config.Config) *Reporter { logger, _ := zap.NewProduction() + + // 初始化节点信息(从配置文件读取) + InitNodeInfo(cfg) + return &Reporter{ cfg: cfg, client: &http.Client{ @@ -78,8 +110,76 @@ func (r *Reporter) Stop() { close(r.stopCh) } +// RegisterNode 注册节点(安装时或首次启动时调用) +func RegisterNode(cfg *config.Config) error { + url := fmt.Sprintf("%s/api/node/heartbeat", cfg.Backend.URL) + req, err := http.NewRequest("POST", url, bytes.NewBufferString("type=pingServer")) + if err != nil { + return fmt.Errorf("创建心跳请求失败: %w", err) + } + + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("发送心跳失败: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("读取响应失败: %w", err) + } + + // 尝试解析 JSON 响应 + var result struct { + Status string `json:"status"` + NodeID uint `json:"node_id"` + NodeIP string `json:"node_ip"` + Country string `json:"country"` + Province string `json:"province"` + City string `json:"city"` + ISP string `json:"isp"` + } + if err := json.Unmarshal(body, &result); err == nil { + // 成功解析 JSON,更新配置文件和内存 + if result.NodeID > 0 && result.NodeIP != "" { + cfg.Node.ID = result.NodeID + cfg.Node.IP = result.NodeIP + cfg.Node.Country = result.Country + cfg.Node.Province = result.Province + cfg.Node.City = result.City + cfg.Node.ISP = result.ISP + + // 保存到配置文件 + if err := cfg.Save(); err != nil { + return fmt.Errorf("保存配置文件失败: %w", err) + } + + // 更新内存中的节点信息 + nodeInfo.Lock() + nodeInfo.nodeID = result.NodeID + nodeInfo.nodeIP = result.NodeIP + nodeInfo.country = result.Country + nodeInfo.province = result.Province + nodeInfo.city = result.City + nodeInfo.isp = result.ISP + nodeInfo.cfg = cfg + nodeInfo.initialized = true + nodeInfo.Unlock() + + return nil + } + } + return fmt.Errorf("心跳响应格式无效或节点信息不完整") + } + + return fmt.Errorf("心跳请求失败,状态码: %d", resp.StatusCode) +} + func (r *Reporter) sendHeartbeat() { - // 新节点不发送IP,让后端服务器从请求中获取 // 发送心跳(使用Form格式,兼容旧接口) url := fmt.Sprintf("%s/api/node/heartbeat", r.cfg.Backend.URL) req, err := http.NewRequest("POST", url, bytes.NewBufferString("type=pingServer")) @@ -103,20 +203,55 @@ func (r *Reporter) sendHeartbeat() { if err == nil && len(body) > 0 { // 尝试解析 JSON 响应 var result struct { - Status string `json:"status"` - NodeID uint `json:"node_id"` - NodeIP string `json:"node_ip"` + Status string `json:"status"` + NodeID uint `json:"node_id"` + NodeIP string `json:"node_ip"` + Country string `json:"country"` + Province string `json:"province"` + City string `json:"city"` + ISP string `json:"isp"` } if err := json.Unmarshal(body, &result); err == nil { - // 成功解析 JSON,更新节点信息 + // 成功解析 JSON,检查是否有更新 if result.NodeID > 0 && result.NodeIP != "" { nodeInfo.Lock() - nodeInfo.nodeID = result.NodeID - nodeInfo.nodeIP = result.NodeIP - nodeInfo.Unlock() - r.logger.Debug("心跳响应解析成功,已更新节点信息", - zap.Uint("node_id", result.NodeID), - zap.String("node_ip", result.NodeIP)) + needUpdate := false + if nodeInfo.nodeID != result.NodeID || nodeInfo.nodeIP != result.NodeIP || + nodeInfo.country != result.Country || nodeInfo.province != result.Province || + nodeInfo.city != result.City || nodeInfo.isp != result.ISP { + needUpdate = true + } + + if needUpdate { + // 更新内存 + nodeInfo.nodeID = result.NodeID + nodeInfo.nodeIP = result.NodeIP + nodeInfo.country = result.Country + nodeInfo.province = result.Province + nodeInfo.city = result.City + nodeInfo.isp = result.ISP + + // 更新配置文件 + if nodeInfo.cfg != nil { + nodeInfo.cfg.Node.ID = result.NodeID + nodeInfo.cfg.Node.IP = result.NodeIP + nodeInfo.cfg.Node.Country = result.Country + nodeInfo.cfg.Node.Province = result.Province + nodeInfo.cfg.Node.City = result.City + nodeInfo.cfg.Node.ISP = result.ISP + if err := nodeInfo.cfg.Save(); err != nil { + r.logger.Warn("保存节点信息到配置文件失败", zap.Error(err)) + } + } + nodeInfo.Unlock() + + r.logger.Info("节点信息已更新", + zap.Uint("node_id", result.NodeID), + zap.String("node_ip", result.NodeIP), + zap.String("location", fmt.Sprintf("%s/%s/%s", result.Country, result.Province, result.City))) + } else { + nodeInfo.Unlock() + } } } else { // 不是 JSON 格式,可能是旧格式的 "done",忽略