diff --git a/cmd/agent/main.go b/cmd/agent/main.go index ef24821..c1d9436 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -13,6 +13,7 @@ import ( "linkmaster-node/internal/heartbeat" "linkmaster-node/internal/recovery" "linkmaster-node/internal/server" + "linkmaster-node/internal/timesync" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -54,6 +55,17 @@ func main() { } } + // 启动时间同步服务(每30分钟同步一次) + var timeSync *timesync.TimeSync + timeSync, err = timesync.NewTimeSync(logger) + if err != nil { + logger.Warn("创建时间同步器失败", zap.Error(err)) + timeSync = nil + } else { + go timeSync.Start(context.Background(), 30*time.Minute) + logger.Info("时间同步服务已启动") + } + // 启动心跳上报 heartbeatReporter := heartbeat.NewReporter(cfg) go heartbeatReporter.Start(context.Background()) @@ -79,6 +91,9 @@ func main() { httpServer.Shutdown(ctx) heartbeatReporter.Stop() + if timeSync != nil { + timeSync.Stop() + } logger.Info("服务已关闭") } diff --git a/internal/heartbeat/reporter.go b/internal/heartbeat/reporter.go index 6f8c6f0..5bacb53 100644 --- a/internal/heartbeat/reporter.go +++ b/internal/heartbeat/reporter.go @@ -67,10 +67,11 @@ func GetNodeLocation() (country, province, city, isp string) { } type Reporter struct { - cfg *config.Config - client *http.Client - logger *zap.Logger - stopCh chan struct{} + cfg *config.Config + client *http.Client + logger *zap.Logger + stopCh chan struct{} + beijingTZ *time.Location } func NewReporter(cfg *config.Config) *Reporter { @@ -79,32 +80,52 @@ func NewReporter(cfg *config.Config) *Reporter { // 初始化节点信息(从配置文件读取) InitNodeInfo(cfg) + // 加载北京时间时区 + beijingTZ, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + // 如果加载失败,使用UTC+8手动创建 + beijingTZ = time.FixedZone("CST", 8*60*60) + logger.Warn("加载时区失败,使用UTC+8", zap.Error(err)) + } + return &Reporter{ cfg: cfg, client: &http.Client{ Timeout: 10 * time.Second, }, - logger: logger, - stopCh: make(chan struct{}), + logger: logger, + stopCh: make(chan struct{}), + beijingTZ: beijingTZ, } } func (r *Reporter) Start(ctx context.Context) { - ticker := time.NewTicker(time.Duration(r.cfg.Heartbeat.Interval) * time.Second) - defer ticker.Stop() - // 立即发送一次心跳 r.sendHeartbeat() for { + // 获取当前北京时间 + now := time.Now().In(r.beijingTZ) + // 计算到下一分钟第1秒的时间(基于北京时间) + nextMinute := now.Truncate(time.Minute).Add(time.Minute) + nextHeartbeatTime := nextMinute.Add(1 * time.Second) + durationUntilNext := nextHeartbeatTime.Sub(now) + + // 等待到下一分钟的第1秒 + timer := time.NewTimer(durationUntilNext) + select { case <-ctx.Done(): + timer.Stop() return case <-r.stopCh: + timer.Stop() return - case <-ticker.C: + case <-timer.C: + // 在每分钟的第1秒发送心跳(北京时间) r.sendHeartbeat() } + timer.Stop() } } diff --git a/internal/timesync/sync.go b/internal/timesync/sync.go new file mode 100644 index 0000000..7dff5a0 --- /dev/null +++ b/internal/timesync/sync.go @@ -0,0 +1,190 @@ +package timesync + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "go.uber.org/zap" +) + +// TimeSync 时间同步器 +type TimeSync struct { + logger *zap.Logger + stopCh chan struct{} + beijingTZ *time.Location + lastSyncTime time.Time + lastSyncError error + mu sync.RWMutex +} + +// ChinaTimeAPIResponse 中国时间API响应结构(type=1返回时间戳) +type ChinaTimeAPIResponse struct { + Code int `json:"code"` + Msg string `json:"msg"` // 时间戳字符串(秒) +} + +// NewTimeSync 创建时间同步器 +func NewTimeSync(logger *zap.Logger) (*TimeSync, error) { + // 加载北京时间时区 + beijingTZ, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + return nil, fmt.Errorf("加载时区失败: %w", err) + } + + return &TimeSync{ + logger: logger, + stopCh: make(chan struct{}), + beijingTZ: beijingTZ, + }, nil +} + +// syncTime 同步时间(从HTTP API获取北京时间) +func (ts *TimeSync) syncTime() error { + // 使用中国时间API(必须使用,大陆无法访问海外API) + chinaAPIs := []string{ + "http://101.35.2.25/api/time/getapi.php", + "http://124.222.204.22/api/time/getapi.php", + "http://81.68.149.132/api/time/getapi.php", + "https://cn.apihz.cn/api/time/getapi.php", + } + + var lastErr error + + // 尝试所有中国时间API + for _, baseURL := range chinaAPIs { + apiURL := baseURL + "?id=88888888&key=88888888&type=1" + + client := &http.Client{ + Timeout: 5 * time.Second, + } + + resp, err := client.Get(apiURL) + if err != nil { + lastErr = err + ts.logger.Debug("API请求失败", zap.String("api", baseURL), zap.Error(err)) + continue + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + lastErr = fmt.Errorf("API返回状态码: %d", resp.StatusCode) + ts.logger.Debug("API返回错误状态码", zap.String("api", baseURL), zap.Int("status", resp.StatusCode)) + continue + } + + var result ChinaTimeAPIResponse + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + resp.Body.Close() + lastErr = err + ts.logger.Debug("解析API响应失败", zap.String("api", baseURL), zap.Error(err)) + continue + } + resp.Body.Close() + + // 检查返回码 + if result.Code != 200 { + lastErr = fmt.Errorf("API返回错误: %s", result.Msg) + ts.logger.Debug("API返回错误码", zap.String("api", baseURL), zap.Int("code", result.Code), zap.String("msg", result.Msg)) + continue + } + + // 从msg字段解析时间戳字符串 + if result.Msg == "" { + lastErr = fmt.Errorf("API返回的时间戳为空") + continue + } + + // 解析时间戳字符串为int64 + var timestamp int64 + if _, err := fmt.Sscanf(result.Msg, "%d", ×tamp); err != nil { + lastErr = fmt.Errorf("解析时间戳失败: %w", err) + ts.logger.Debug("解析时间戳失败", zap.String("api", baseURL), zap.String("msg", result.Msg), zap.Error(err)) + continue + } + + // 使用时间戳转换为北京时间 + beijingTime := time.Unix(timestamp, 0).In(ts.beijingTZ) + + // 计算时间差 + localTime := time.Now() + timeDiff := beijingTime.Sub(localTime) + + ts.mu.Lock() + ts.lastSyncTime = beijingTime + ts.lastSyncError = nil + ts.mu.Unlock() + + ts.logger.Info("时间同步成功", + zap.String("api", baseURL), + zap.String("remote_time", beijingTime.Format("2006-01-02 15:04:05")), + zap.String("local_time", localTime.Format("2006-01-02 15:04:05")), + zap.Duration("time_diff", timeDiff), + zap.Int64("timestamp", timestamp)) + + return nil + } + + ts.mu.Lock() + ts.lastSyncError = lastErr + ts.mu.Unlock() + + return fmt.Errorf("所有中国时间API同步失败: %w", lastErr) +} + +// Start 启动定期时间同步 +func (ts *TimeSync) Start(ctx context.Context, interval time.Duration) { + // 立即同步一次 + if err := ts.syncTime(); err != nil { + ts.logger.Warn("初始时间同步失败", zap.Error(err)) + } + + // 定期同步 + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ts.stopCh: + return + case <-ticker.C: + if err := ts.syncTime(); err != nil { + ts.logger.Warn("时间同步失败", zap.Error(err)) + } + } + } +} + +// Stop 停止时间同步 +func (ts *TimeSync) Stop() { + close(ts.stopCh) +} + +// GetBeijingTime 获取当前北京时间 +func (ts *TimeSync) GetBeijingTime() time.Time { + return time.Now().In(ts.beijingTZ) +} + +// GetLocation 获取北京时区 +func (ts *TimeSync) GetLocation() *time.Location { + return ts.beijingTZ +} + +// GetLastSyncTime 获取最后一次同步的时间 +func (ts *TimeSync) GetLastSyncTime() time.Time { + ts.mu.RLock() + defer ts.mu.RUnlock() + return ts.lastSyncTime +} + +// GetLastSyncError 获取最后一次同步的错误 +func (ts *TimeSync) GetLastSyncError() error { + ts.mu.RLock() + defer ts.mu.RUnlock() + return ts.lastSyncError +}