package continuous import ( "context" "fmt" "net" "strconv" "strings" "sync" "time" "go.uber.org/zap" ) type TCPingTask struct { TaskID string Target string Host string Port int Interval time.Duration MaxDuration time.Duration StartTime time.Time LastRequest time.Time StopCh chan struct{} IsRunning bool mu sync.RWMutex logger *zap.Logger } func NewTCPingTask(taskID, target string, interval, maxDuration time.Duration) (*TCPingTask, error) { // 解析host:port,如果没有端口则默认80 var host string var portStr string var port int // 检查是否是IPv6格式(如 [::1]:8080) if strings.HasPrefix(target, "[") { // IPv6格式 - 使用 Index 而不是 LastIndex 来找到第一个闭合括号 closeBracket := strings.Index(target, "]") if closeBracket == -1 { return nil, fmt.Errorf("无效的target格式,IPv6地址格式应为 [host]:port") } host = target[1:closeBracket] if closeBracket+1 < len(target) && target[closeBracket+1] == ':' { portStr = target[closeBracket+2:] // 如果端口部分为空,使用默认端口80(修复 Bug 1) if portStr == "" { portStr = "80" } } else { portStr = "80" // 默认端口 } } else { // 普通格式 host:port 或 host lastColonIndex := strings.LastIndex(target, ":") if lastColonIndex == -1 { // 没有冒号,使用默认端口80 host = target portStr = "80" } else { host = target[:lastColonIndex] portStr = target[lastColonIndex+1:] // 如果端口部分为空,使用默认端口80 if portStr == "" { portStr = "80" } } } var err error port, err = strconv.Atoi(portStr) if err != nil { return nil, fmt.Errorf("无效的端口: %v", err) } logger, _ := zap.NewProduction() return &TCPingTask{ TaskID: taskID, Target: target, Host: host, Port: port, Interval: interval, MaxDuration: maxDuration, StartTime: time.Now(), LastRequest: time.Now(), StopCh: make(chan struct{}), IsRunning: true, logger: logger, }, nil } func (t *TCPingTask) Start(ctx context.Context, resultCallback func(result map[string]interface{})) { for { select { case <-ctx.Done(): return case <-t.StopCh: return default: // 检查是否超过最大运行时长 t.mu.RLock() if time.Since(t.StartTime) > t.MaxDuration { t.mu.RUnlock() t.Stop() return } t.mu.RUnlock() // 检查任务是否已停止 t.mu.RLock() isRunning := t.IsRunning t.mu.RUnlock() if !isRunning { return } // 执行tcping测试(每次测试完成后立即返回结果) result := t.executeTCPing() // 再次检查任务是否已停止(执行完成后) t.mu.RLock() isRunning = t.IsRunning t.mu.RUnlock() if !isRunning { return } if resultCallback != nil { resultCallback(result) } // 等待间隔时间后继续下一次测试 select { case <-ctx.Done(): return case <-t.StopCh: return case <-time.After(t.Interval): // 继续下一次循环 } } } } func (t *TCPingTask) Stop() { t.mu.Lock() if !t.IsRunning { t.mu.Unlock() return } t.IsRunning = false t.mu.Unlock() // 关闭停止通道 select { case <-t.StopCh: // 已经关闭 default: close(t.StopCh) } t.logger.Info("TCPing任务已停止", zap.String("task_id", t.TaskID)) } func (t *TCPingTask) UpdateLastRequest() { t.mu.Lock() defer t.mu.Unlock() t.LastRequest = time.Now() } func (t *TCPingTask) executeTCPing() map[string]interface{} { start := time.Now() conn, err := net.DialTimeout("tcp", net.JoinHostPort(t.Host, strconv.Itoa(t.Port)), 5*time.Second) latency := time.Since(start).Milliseconds() // 提取目标IP var targetIP string if conn != nil { if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { targetIP = addr.IP.String() } defer conn.Close() } // 如果连接失败,从host解析 if targetIP == "" { ips, err := net.LookupIP(t.Host) if err == nil && len(ips) > 0 { // 优先使用IPv4 for _, ip := range ips { if ip.To4() != nil { targetIP = ip.String() break } } if targetIP == "" && len(ips) > 0 { targetIP = ips[0].String() } } } if err != nil { return map[string]interface{}{ "timestamp": time.Now().Unix(), "latency": -1, "success": false, "packet_loss": true, "ip": targetIP, "error": err.Error(), } } return map[string]interface{}{ "timestamp": time.Now().Unix(), "latency": float64(latency), "success": true, "packet_loss": false, "ip": targetIP, } }