Files
yoyo c9c4da01b6 feat: 添加时间同步配置功能至安装脚本
- 在 install.sh 中新增 sync_time 函数,配置系统时间同步,设置时区为 Asia/Shanghai,并安装 chrony。
- 配置 NTP 服务器为阿里云和腾讯云,确保时间同步的准确性。
- 更新主函数以调用时间同步配置,优化安装流程。
2025-12-24 03:31:35 +08:00

321 lines
8.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package heartbeat
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"sync"
"time"
"linkmaster-node/internal/config"
"go.uber.org/zap"
)
// 节点信息存储(通过心跳更新,优先从配置文件读取)
var nodeInfo struct {
sync.RWMutex
nodeID uint
nodeIP string
country string
province string
city string
isp string
cfg *config.Config
initialized bool
}
// InitNodeInfo 初始化节点信息(从配置文件读取)
func InitNodeInfo(cfg *config.Config) {
nodeInfo.Lock()
defer nodeInfo.Unlock()
nodeInfo.cfg = cfg
nodeInfo.nodeID = cfg.Node.ID
nodeInfo.nodeIP = cfg.Node.IP
nodeInfo.country = cfg.Node.Country
nodeInfo.province = cfg.Node.Province
nodeInfo.city = cfg.Node.City
nodeInfo.isp = cfg.Node.ISP
nodeInfo.initialized = true
}
// GetNodeID 获取节点ID
func GetNodeID() uint {
nodeInfo.RLock()
defer nodeInfo.RUnlock()
return nodeInfo.nodeID
}
// GetNodeIP 获取节点IP
func GetNodeIP() string {
nodeInfo.RLock()
defer nodeInfo.RUnlock()
return nodeInfo.nodeIP
}
// GetNodeLocation 获取节点位置信息
func GetNodeLocation() (country, province, city, isp string) {
nodeInfo.RLock()
defer nodeInfo.RUnlock()
return nodeInfo.country, nodeInfo.province, nodeInfo.city, nodeInfo.isp
}
type Reporter struct {
cfg *config.Config
client *http.Client
logger *zap.Logger
stopCh chan struct{}
}
func NewReporter(cfg *config.Config) *Reporter {
logger, _ := zap.NewProduction()
// 初始化节点信息(从配置文件读取)
InitNodeInfo(cfg)
return &Reporter{
cfg: cfg,
client: &http.Client{
Timeout: 10 * time.Second,
},
logger: logger,
stopCh: make(chan struct{}),
}
}
func (r *Reporter) Start(ctx context.Context) {
// 立即发送一次心跳
r.sendHeartbeat()
for {
// 计算到下一分钟第1秒的时间
now := time.Now()
nextMinute := now.Truncate(time.Minute).Add(time.Minute)
nextHeartbeatTime := nextMinute.Add(1 * time.Second)
durationUntilNext := nextHeartbeatTime.Sub(now)
// 等待到下一分钟的第1秒
timer := time.NewTimer(durationUntilNext)
select {
case <-ctx.Done():
timer.Stop()
return
case <-r.stopCh:
timer.Stop()
return
case <-timer.C:
// 在每分钟的第1秒发送心跳
r.sendHeartbeat()
}
timer.Stop()
}
}
func (r *Reporter) Stop() {
close(r.stopCh)
}
// buildHeartbeatBody 构建心跳请求体
func buildHeartbeatBody() string {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
values := url.Values{}
values.Set("type", "pingServer")
values.Set("version", "2")
values.Set("host_name", hostname)
return values.Encode()
}
// RegisterNode 注册节点(安装时或首次启动时调用)
func RegisterNode(cfg *config.Config) error {
url := fmt.Sprintf("%s/api/node/heartbeat", cfg.Backend.URL)
req, err := http.NewRequest("POST", url, bytes.NewBufferString(buildHeartbeatBody()))
if err != nil {
return fmt.Errorf("创建心跳请求失败: %w", err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("发送心跳失败 (URL: %s): %w", url, err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("读取响应失败: %w", err)
}
// 尝试解析 JSON 响应
var result struct {
Status string `json:"status"`
NodeID uint `json:"node_id"`
NodeIP string `json:"node_ip"`
Country string `json:"country"`
Province string `json:"province"`
City string `json:"city"`
ISP string `json:"isp"`
}
if err := json.Unmarshal(body, &result); err == nil {
// 成功解析 JSON更新配置文件和内存
if result.NodeID > 0 && result.NodeIP != "" {
cfg.Node.ID = result.NodeID
cfg.Node.IP = result.NodeIP
cfg.Node.Country = result.Country
cfg.Node.Province = result.Province
cfg.Node.City = result.City
cfg.Node.ISP = result.ISP
// 保存到配置文件
if err := cfg.Save(); err != nil {
return fmt.Errorf("保存配置文件失败: %w", err)
}
// 更新内存中的节点信息
nodeInfo.Lock()
nodeInfo.nodeID = result.NodeID
nodeInfo.nodeIP = result.NodeIP
nodeInfo.country = result.Country
nodeInfo.province = result.Province
nodeInfo.city = result.City
nodeInfo.isp = result.ISP
nodeInfo.cfg = cfg
nodeInfo.initialized = true
nodeInfo.Unlock()
return nil
}
}
return fmt.Errorf("心跳响应格式无效或节点信息不完整 (响应体: %s)", string(body))
}
// 读取响应体以便记录错误详情
body, err := io.ReadAll(resp.Body)
bodyStr := ""
if err == nil && len(body) > 0 {
// 限制响应体长度,避免错误信息过长
if len(body) > 500 {
bodyStr = string(body[:500]) + "..."
} else {
bodyStr = string(body)
}
}
return fmt.Errorf("心跳请求失败,状态码: %d, URL: %s, 响应体: %s", resp.StatusCode, url, bodyStr)
}
func (r *Reporter) sendHeartbeat() {
// 发送心跳使用Form格式兼容旧接口
url := fmt.Sprintf("%s/api/node/heartbeat", r.cfg.Backend.URL)
req, err := http.NewRequest("POST", url, bytes.NewBufferString(buildHeartbeatBody()))
if err != nil {
r.logger.Error("创建心跳请求失败", zap.Error(err))
return
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := r.client.Do(req)
if err != nil {
r.logger.Warn("发送心跳失败",
zap.String("url", url),
zap.Error(err))
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
// 尝试解析响应,获取 node_id 和 node_ip
body, err := io.ReadAll(resp.Body)
if err == nil && len(body) > 0 {
// 尝试解析 JSON 响应
var result struct {
Status string `json:"status"`
NodeID uint `json:"node_id"`
NodeIP string `json:"node_ip"`
Country string `json:"country"`
Province string `json:"province"`
City string `json:"city"`
ISP string `json:"isp"`
}
if err := json.Unmarshal(body, &result); err == nil {
// 成功解析 JSON检查是否有更新
if result.NodeID > 0 && result.NodeIP != "" {
nodeInfo.Lock()
needUpdate := false
if nodeInfo.nodeID != result.NodeID || nodeInfo.nodeIP != result.NodeIP ||
nodeInfo.country != result.Country || nodeInfo.province != result.Province ||
nodeInfo.city != result.City || nodeInfo.isp != result.ISP {
needUpdate = true
}
if needUpdate {
// 更新内存
nodeInfo.nodeID = result.NodeID
nodeInfo.nodeIP = result.NodeIP
nodeInfo.country = result.Country
nodeInfo.province = result.Province
nodeInfo.city = result.City
nodeInfo.isp = result.ISP
// 更新配置文件
if nodeInfo.cfg != nil {
nodeInfo.cfg.Node.ID = result.NodeID
nodeInfo.cfg.Node.IP = result.NodeIP
nodeInfo.cfg.Node.Country = result.Country
nodeInfo.cfg.Node.Province = result.Province
nodeInfo.cfg.Node.City = result.City
nodeInfo.cfg.Node.ISP = result.ISP
if err := nodeInfo.cfg.Save(); err != nil {
r.logger.Warn("保存节点信息到配置文件失败", zap.Error(err))
}
}
nodeInfo.Unlock()
r.logger.Info("节点信息已更新",
zap.Uint("node_id", result.NodeID),
zap.String("node_ip", result.NodeIP),
zap.String("location", fmt.Sprintf("%s/%s/%s", result.Country, result.Province, result.City)))
} else {
nodeInfo.Unlock()
}
}
} else {
// 不是 JSON 格式,可能是旧格式的 "done",忽略
r.logger.Debug("心跳响应为旧格式,跳过解析")
}
}
r.logger.Debug("心跳发送成功")
} else {
// 读取响应体以便记录错误详情
body, err := io.ReadAll(resp.Body)
bodyStr := ""
if err == nil && len(body) > 0 {
// 限制响应体长度,避免日志过长
if len(body) > 500 {
bodyStr = string(body[:500]) + "..."
} else {
bodyStr = string(body)
}
}
r.logger.Warn("心跳发送失败",
zap.Int("status", resp.StatusCode),
zap.String("url", url),
zap.String("response_body", bodyStr))
}
}