Files
linkmaster-node/internal/heartbeat/reporter.go
yoyo e0d97c4486 feat: 添加时间同步服务和北京时间支持
- 在主程序中集成时间同步服务,每30分钟同步一次时间。
- 在心跳报告中加载并使用北京时间,确保心跳在每分钟的第1秒发送。
- 增强了错误处理,确保在加载时区失败时使用默认时区。
2025-12-24 02:34:05 +08:00

332 lines
8.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package heartbeat
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"sync"
"time"
"linkmaster-node/internal/config"
"go.uber.org/zap"
)
// 节点信息存储(通过心跳更新,优先从配置文件读取)
var nodeInfo struct {
sync.RWMutex
nodeID uint
nodeIP string
country string
province string
city string
isp string
cfg *config.Config
initialized bool
}
// InitNodeInfo 初始化节点信息(从配置文件读取)
func InitNodeInfo(cfg *config.Config) {
nodeInfo.Lock()
defer nodeInfo.Unlock()
nodeInfo.cfg = cfg
nodeInfo.nodeID = cfg.Node.ID
nodeInfo.nodeIP = cfg.Node.IP
nodeInfo.country = cfg.Node.Country
nodeInfo.province = cfg.Node.Province
nodeInfo.city = cfg.Node.City
nodeInfo.isp = cfg.Node.ISP
nodeInfo.initialized = true
}
// GetNodeID 获取节点ID
func GetNodeID() uint {
nodeInfo.RLock()
defer nodeInfo.RUnlock()
return nodeInfo.nodeID
}
// GetNodeIP 获取节点IP
func GetNodeIP() string {
nodeInfo.RLock()
defer nodeInfo.RUnlock()
return nodeInfo.nodeIP
}
// GetNodeLocation 获取节点位置信息
func GetNodeLocation() (country, province, city, isp string) {
nodeInfo.RLock()
defer nodeInfo.RUnlock()
return nodeInfo.country, nodeInfo.province, nodeInfo.city, nodeInfo.isp
}
type Reporter struct {
cfg *config.Config
client *http.Client
logger *zap.Logger
stopCh chan struct{}
beijingTZ *time.Location
}
func NewReporter(cfg *config.Config) *Reporter {
logger, _ := zap.NewProduction()
// 初始化节点信息(从配置文件读取)
InitNodeInfo(cfg)
// 加载北京时间时区
beijingTZ, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
// 如果加载失败使用UTC+8手动创建
beijingTZ = time.FixedZone("CST", 8*60*60)
logger.Warn("加载时区失败使用UTC+8", zap.Error(err))
}
return &Reporter{
cfg: cfg,
client: &http.Client{
Timeout: 10 * time.Second,
},
logger: logger,
stopCh: make(chan struct{}),
beijingTZ: beijingTZ,
}
}
func (r *Reporter) Start(ctx context.Context) {
// 立即发送一次心跳
r.sendHeartbeat()
for {
// 获取当前北京时间
now := time.Now().In(r.beijingTZ)
// 计算到下一分钟第1秒的时间基于北京时间
nextMinute := now.Truncate(time.Minute).Add(time.Minute)
nextHeartbeatTime := nextMinute.Add(1 * time.Second)
durationUntilNext := nextHeartbeatTime.Sub(now)
// 等待到下一分钟的第1秒
timer := time.NewTimer(durationUntilNext)
select {
case <-ctx.Done():
timer.Stop()
return
case <-r.stopCh:
timer.Stop()
return
case <-timer.C:
// 在每分钟的第1秒发送心跳北京时间
r.sendHeartbeat()
}
timer.Stop()
}
}
func (r *Reporter) Stop() {
close(r.stopCh)
}
// buildHeartbeatBody 构建心跳请求体
func buildHeartbeatBody() string {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
values := url.Values{}
values.Set("type", "pingServer")
values.Set("version", "2")
values.Set("host_name", hostname)
return values.Encode()
}
// RegisterNode 注册节点(安装时或首次启动时调用)
func RegisterNode(cfg *config.Config) error {
url := fmt.Sprintf("%s/api/node/heartbeat", cfg.Backend.URL)
req, err := http.NewRequest("POST", url, bytes.NewBufferString(buildHeartbeatBody()))
if err != nil {
return fmt.Errorf("创建心跳请求失败: %w", err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("发送心跳失败 (URL: %s): %w", url, err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("读取响应失败: %w", err)
}
// 尝试解析 JSON 响应
var result struct {
Status string `json:"status"`
NodeID uint `json:"node_id"`
NodeIP string `json:"node_ip"`
Country string `json:"country"`
Province string `json:"province"`
City string `json:"city"`
ISP string `json:"isp"`
}
if err := json.Unmarshal(body, &result); err == nil {
// 成功解析 JSON更新配置文件和内存
if result.NodeID > 0 && result.NodeIP != "" {
cfg.Node.ID = result.NodeID
cfg.Node.IP = result.NodeIP
cfg.Node.Country = result.Country
cfg.Node.Province = result.Province
cfg.Node.City = result.City
cfg.Node.ISP = result.ISP
// 保存到配置文件
if err := cfg.Save(); err != nil {
return fmt.Errorf("保存配置文件失败: %w", err)
}
// 更新内存中的节点信息
nodeInfo.Lock()
nodeInfo.nodeID = result.NodeID
nodeInfo.nodeIP = result.NodeIP
nodeInfo.country = result.Country
nodeInfo.province = result.Province
nodeInfo.city = result.City
nodeInfo.isp = result.ISP
nodeInfo.cfg = cfg
nodeInfo.initialized = true
nodeInfo.Unlock()
return nil
}
}
return fmt.Errorf("心跳响应格式无效或节点信息不完整 (响应体: %s)", string(body))
}
// 读取响应体以便记录错误详情
body, err := io.ReadAll(resp.Body)
bodyStr := ""
if err == nil && len(body) > 0 {
// 限制响应体长度,避免错误信息过长
if len(body) > 500 {
bodyStr = string(body[:500]) + "..."
} else {
bodyStr = string(body)
}
}
return fmt.Errorf("心跳请求失败,状态码: %d, URL: %s, 响应体: %s", resp.StatusCode, url, bodyStr)
}
func (r *Reporter) sendHeartbeat() {
// 发送心跳使用Form格式兼容旧接口
url := fmt.Sprintf("%s/api/node/heartbeat", r.cfg.Backend.URL)
req, err := http.NewRequest("POST", url, bytes.NewBufferString(buildHeartbeatBody()))
if err != nil {
r.logger.Error("创建心跳请求失败", zap.Error(err))
return
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := r.client.Do(req)
if err != nil {
r.logger.Warn("发送心跳失败",
zap.String("url", url),
zap.Error(err))
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
// 尝试解析响应,获取 node_id 和 node_ip
body, err := io.ReadAll(resp.Body)
if err == nil && len(body) > 0 {
// 尝试解析 JSON 响应
var result struct {
Status string `json:"status"`
NodeID uint `json:"node_id"`
NodeIP string `json:"node_ip"`
Country string `json:"country"`
Province string `json:"province"`
City string `json:"city"`
ISP string `json:"isp"`
}
if err := json.Unmarshal(body, &result); err == nil {
// 成功解析 JSON检查是否有更新
if result.NodeID > 0 && result.NodeIP != "" {
nodeInfo.Lock()
needUpdate := false
if nodeInfo.nodeID != result.NodeID || nodeInfo.nodeIP != result.NodeIP ||
nodeInfo.country != result.Country || nodeInfo.province != result.Province ||
nodeInfo.city != result.City || nodeInfo.isp != result.ISP {
needUpdate = true
}
if needUpdate {
// 更新内存
nodeInfo.nodeID = result.NodeID
nodeInfo.nodeIP = result.NodeIP
nodeInfo.country = result.Country
nodeInfo.province = result.Province
nodeInfo.city = result.City
nodeInfo.isp = result.ISP
// 更新配置文件
if nodeInfo.cfg != nil {
nodeInfo.cfg.Node.ID = result.NodeID
nodeInfo.cfg.Node.IP = result.NodeIP
nodeInfo.cfg.Node.Country = result.Country
nodeInfo.cfg.Node.Province = result.Province
nodeInfo.cfg.Node.City = result.City
nodeInfo.cfg.Node.ISP = result.ISP
if err := nodeInfo.cfg.Save(); err != nil {
r.logger.Warn("保存节点信息到配置文件失败", zap.Error(err))
}
}
nodeInfo.Unlock()
r.logger.Info("节点信息已更新",
zap.Uint("node_id", result.NodeID),
zap.String("node_ip", result.NodeIP),
zap.String("location", fmt.Sprintf("%s/%s/%s", result.Country, result.Province, result.City)))
} else {
nodeInfo.Unlock()
}
}
} else {
// 不是 JSON 格式,可能是旧格式的 "done",忽略
r.logger.Debug("心跳响应为旧格式,跳过解析")
}
}
r.logger.Debug("心跳发送成功")
} else {
// 读取响应体以便记录错误详情
body, err := io.ReadAll(resp.Body)
bodyStr := ""
if err == nil && len(body) > 0 {
// 限制响应体长度,避免日志过长
if len(body) > 500 {
bodyStr = string(body[:500]) + "..."
} else {
bodyStr = string(body)
}
}
r.logger.Warn("心跳发送失败",
zap.Int("status", resp.StatusCode),
zap.String("url", url),
zap.String("response_body", bodyStr))
}
}