diff --git a/internal/handler/continuous.go b/internal/handler/continuous.go index 72eb8f8..8cf9d2b 100644 --- a/internal/handler/continuous.go +++ b/internal/handler/continuous.go @@ -13,6 +13,7 @@ import ( "linkmaster-node/internal/config" "linkmaster-node/internal/continuous" + "linkmaster-node/internal/heartbeat" "github.com/gin-gonic/gin" "go.uber.org/zap" @@ -204,15 +205,33 @@ func pushResultToBackend(taskID string, result map[string]interface{}) { // 推送结果到后端 url := fmt.Sprintf("%s/api/public/node/continuous/result", backendURL) - // 获取节点IP - nodeIP := getLocalIP() + // 优先使用心跳返回的节点信息 + nodeID := heartbeat.GetNodeID() + nodeIP := heartbeat.GetNodeIP() - // 发送node_ip,后端可以通过node_ip查询node_id进行匹配 + // 如果心跳还没有返回节点信息,使用本地IP作为后备 + if nodeIP == "" { + nodeIP = getLocalIP() + logger.Debug("使用本地IP作为后备", zap.String("node_ip", nodeIP)) + } + + // 发送 node_id 和 node_ip,后端可以通过这些信息精准匹配 data := map[string]interface{}{ "task_id": taskID, - "node_ip": nodeIP, "result": result, } + + // 如果 node_id 存在,优先发送 node_id + if nodeID > 0 { + data["node_id"] = nodeID + logger.Debug("推送结果时使用存储的node_id", zap.Uint("node_id", nodeID)) + } + + // 如果 node_ip 存在,发送 node_ip + if nodeIP != "" { + data["node_ip"] = nodeIP + logger.Debug("推送结果时使用存储的node_ip", zap.String("node_ip", nodeIP)) + } jsonData, err := json.Marshal(data) if err != nil { diff --git a/internal/heartbeat/reporter.go b/internal/heartbeat/reporter.go index b26db91..fde1287 100644 --- a/internal/heartbeat/reporter.go +++ b/internal/heartbeat/reporter.go @@ -3,8 +3,11 @@ package heartbeat import ( "bytes" "context" + "encoding/json" "fmt" + "io" "net/http" + "sync" "time" "linkmaster-node/internal/config" @@ -12,6 +15,27 @@ import ( "go.uber.org/zap" ) +// 节点信息存储(通过心跳更新) +var nodeInfo struct { + sync.RWMutex + nodeID uint + nodeIP string +} + +// GetNodeID 获取节点ID +func GetNodeID() uint { + nodeInfo.RLock() + defer nodeInfo.RUnlock() + return nodeInfo.nodeID +} + +// GetNodeIP 获取节点IP +func GetNodeIP() string { + nodeInfo.RLock() + defer nodeInfo.RUnlock() + return nodeInfo.nodeIP +} + type Reporter struct { cfg *config.Config client *http.Client @@ -74,7 +98,32 @@ func (r *Reporter) sendHeartbeat() { defer resp.Body.Close() if resp.StatusCode == http.StatusOK { - r.logger.Debug("心跳发送成功,后端将从请求中获取节点IP") + // 尝试解析响应,获取 node_id 和 node_ip + body, err := io.ReadAll(resp.Body) + if err == nil && len(body) > 0 { + // 尝试解析 JSON 响应 + var result struct { + Status string `json:"status"` + NodeID uint `json:"node_id"` + NodeIP string `json:"node_ip"` + } + if err := json.Unmarshal(body, &result); err == nil { + // 成功解析 JSON,更新节点信息 + if result.NodeID > 0 && result.NodeIP != "" { + nodeInfo.Lock() + nodeInfo.nodeID = result.NodeID + nodeInfo.nodeIP = result.NodeIP + nodeInfo.Unlock() + r.logger.Debug("心跳响应解析成功,已更新节点信息", + zap.Uint("node_id", result.NodeID), + zap.String("node_ip", result.NodeIP)) + } + } else { + // 不是 JSON 格式,可能是旧格式的 "done",忽略 + r.logger.Debug("心跳响应为旧格式,跳过解析") + } + } + r.logger.Debug("心跳发送成功") } else { r.logger.Warn("心跳发送失败", zap.Int("status", resp.StatusCode)) }