Files
linkmaster-node/internal/continuous/ping.go
2025-11-23 03:43:53 +08:00

424 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package continuous
import (
"bufio"
"context"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"go.uber.org/zap"
)
type PingTask struct {
TaskID string
Target string
Interval time.Duration
MaxDuration time.Duration
StartTime time.Time
LastRequest time.Time
StopCh chan struct{}
IsRunning bool
mu sync.RWMutex
logger *zap.Logger
targetIP string // 存储目标IP从ping输出中提取
currentCmd *exec.Cmd // 当前正在执行的命令,用于停止时取消
cmdMu sync.Mutex // 保护 currentCmd 的锁
}
func NewPingTask(taskID, target string, interval, maxDuration time.Duration) *PingTask {
logger, _ := zap.NewProduction()
return &PingTask{
TaskID: taskID,
Target: target,
Interval: interval,
MaxDuration: maxDuration,
StartTime: time.Now(),
LastRequest: time.Now(),
StopCh: make(chan struct{}),
IsRunning: true,
logger: logger,
}
}
func (t *PingTask) Start(ctx context.Context, resultCallback func(result map[string]interface{})) {
for {
select {
case <-ctx.Done():
return
case <-t.StopCh:
return
default:
// 检查是否超过最大运行时长
t.mu.RLock()
if time.Since(t.StartTime) > t.MaxDuration {
t.mu.RUnlock()
t.Stop()
return
}
t.mu.RUnlock()
// 执行多个ping包测试每个包完成后立即返回结果
// 使用 -c 10 -i 0.5 发送10个包间隔0.5秒,实时解析每个包的延迟
t.executePingWithRealtimeCallback(resultCallback)
// 等待间隔时间后继续下一次测试缩短间隔比如1秒
select {
case <-ctx.Done():
return
case <-t.StopCh:
return
case <-time.After(t.Interval):
// 继续下一次循环
}
}
}
}
func (t *PingTask) Stop() {
t.mu.Lock()
if !t.IsRunning {
t.mu.Unlock()
return
}
t.IsRunning = false
t.mu.Unlock()
// 取消正在执行的命令
t.cmdMu.Lock()
if t.currentCmd != nil && t.currentCmd.Process != nil {
t.logger.Debug("取消正在执行的ping命令", zap.String("task_id", t.TaskID))
t.currentCmd.Process.Kill()
t.currentCmd = nil
}
t.cmdMu.Unlock()
// 关闭停止通道
select {
case <-t.StopCh:
// 已经关闭
default:
close(t.StopCh)
}
t.logger.Info("Ping任务已停止", zap.String("task_id", t.TaskID))
}
func (t *PingTask) UpdateLastRequest() {
t.mu.Lock()
defer t.mu.Unlock()
t.LastRequest = time.Now()
}
func (t *PingTask) executePingWithRealtimeCallback(resultCallback func(result map[string]interface{})) {
// 检查任务是否已停止
t.mu.RLock()
isRunning := t.IsRunning
t.mu.RUnlock()
if !isRunning {
return
}
// 发送10个ping包间隔0.5秒,实时解析每个包的延迟
// 使用 -c 10 -i 0.5 发送10个包
cmd := exec.Command("ping", "-c", "10", "-i", "0.5", t.Target)
// 保存命令引用,以便停止时取消
t.cmdMu.Lock()
t.currentCmd = cmd
t.cmdMu.Unlock()
// 确保在函数退出时清理命令引用
defer func() {
t.cmdMu.Lock()
t.currentCmd = nil
t.cmdMu.Unlock()
}()
// 获取标准输出管道,实时读取
stdout, err := cmd.StdoutPipe()
if err != nil {
if resultCallback != nil {
resultCallback(map[string]interface{}{
"timestamp": time.Now().Unix(),
"latency": -1,
"success": false,
"packet_loss": true,
"error": err.Error(),
})
}
return
}
// 启动命令
if err := cmd.Start(); err != nil {
if resultCallback != nil {
resultCallback(map[string]interface{}{
"timestamp": time.Now().Unix(),
"latency": -1,
"success": false,
"packet_loss": true,
"error": err.Error(),
})
}
return
}
// 检查任务是否已停止(在启动命令后)
t.mu.RLock()
isRunning = t.IsRunning
t.mu.RUnlock()
if !isRunning {
// 任务已停止,取消命令
cmd.Process.Kill()
return
}
// 使用bufio.Scanner实时读取每一行
scanner := bufio.NewScanner(stdout)
processedPackets := make(map[int]bool) // 用于去重避免重复处理同一个包通过icmp_seq
// 在goroutine中读取输出避免阻塞
go func() {
for scanner.Scan() {
// 检查任务是否已停止
t.mu.RLock()
isRunning := t.IsRunning
t.mu.RUnlock()
if !isRunning {
break
}
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
// 从PING行提取目标IPPING example.com (8.8.8.8) 56(84) bytes of data.
if strings.HasPrefix(line, "PING") {
t.mu.RLock()
currentTargetIP := t.targetIP
t.mu.RUnlock()
if currentTargetIP == "" {
// 尝试从括号中提取IPPING example.com (8.8.8.8)
startIdx := strings.Index(line, "(")
endIdx := strings.Index(line, ")")
if startIdx != -1 && endIdx != -1 && endIdx > startIdx {
t.mu.Lock()
t.targetIP = line[startIdx+1 : endIdx]
t.mu.Unlock()
} else {
// 如果没有括号,尝试从"from"后提取64 bytes from 8.8.8.8:
fromIdx := strings.Index(line, "from")
if fromIdx != -1 {
parts := strings.Fields(line[fromIdx+4:])
if len(parts) > 0 {
ipPart := strings.TrimSuffix(parts[0], ":")
t.mu.Lock()
t.targetIP = ipPart
t.mu.Unlock()
}
}
}
}
}
// 解析单个ping包的响应时间64 bytes from 8.8.8.8: icmp_seq=0 ttl=64 time=10.123 ms
if strings.Contains(line, "time=") && strings.Contains(line, "icmp_seq") {
// 如果还没有提取到目标IP从这一行提取
t.mu.RLock()
currentTargetIP := t.targetIP
t.mu.RUnlock()
if currentTargetIP == "" {
// 格式64 bytes from 8.8.8.8: icmp_seq=0
fromIdx := strings.Index(line, "from")
if fromIdx != -1 {
afterFrom := line[fromIdx+4:]
colonIdx := strings.Index(afterFrom, ":")
if colonIdx != -1 {
t.mu.Lock()
t.targetIP = strings.TrimSpace(afterFrom[:colonIdx])
t.mu.Unlock()
}
}
}
// 提取icmp_seq用于去重
seqIndex := strings.Index(line, "icmp_seq=")
seq := -1
if seqIndex != -1 {
seqPart := line[seqIndex+9:]
spaceIndex := strings.Index(seqPart, " ")
if spaceIndex == -1 {
spaceIndex = len(seqPart)
}
if s, err := strconv.Atoi(seqPart[:spaceIndex]); err == nil {
seq = s
}
}
// 检查是否已处理过这个包
if seq >= 0 && processedPackets[seq] {
continue
}
if seq >= 0 {
processedPackets[seq] = true
}
latency := parseSinglePacketLatency(line)
if latency >= 0 && resultCallback != nil {
t.mu.RLock()
currentTargetIP := t.targetIP
t.mu.RUnlock()
result := map[string]interface{}{
"timestamp": time.Now().Unix(),
"latency": latency,
"success": true,
"packet_loss": false,
}
if currentTargetIP != "" {
result["ip"] = currentTargetIP
}
resultCallback(result)
}
} else if strings.Contains(line, "Request timeout") || strings.Contains(line, "no answer") {
// 处理超时的包
if resultCallback != nil {
t.mu.RLock()
currentTargetIP := t.targetIP
t.mu.RUnlock()
result := map[string]interface{}{
"timestamp": time.Now().Unix(),
"latency": -1,
"success": false,
"packet_loss": true,
}
if currentTargetIP != "" {
result["ip"] = currentTargetIP
}
resultCallback(result)
}
}
}
}()
// 等待命令完成
cmd.Wait()
}
// parseSinglePacketLatency 解析单个ping包的延迟时间
func parseSinglePacketLatency(line string) float64 {
// 格式64 bytes from 8.8.8.8: icmp_seq=0 ttl=64 time=10.123 ms
timeIndex := strings.Index(line, "time=")
if timeIndex == -1 {
return -1
}
timePart := line[timeIndex+5:]
spaceIndex := strings.Index(timePart, " ")
if spaceIndex == -1 {
return -1
}
timeStr := timePart[:spaceIndex]
if latency, err := strconv.ParseFloat(timeStr, 64); err == nil {
return latency
}
return -1
}
func (t *PingTask) executePing() map[string]interface{} {
// 发送单个ping包-c 1每个包完成后立即返回结果
cmd := exec.Command("ping", "-c", "1", t.Target)
output, err := cmd.CombinedOutput()
if err != nil {
return map[string]interface{}{
"timestamp": time.Now().Unix(),
"latency": -1,
"success": false,
"packet_loss": true,
"error": err.Error(),
}
}
// 解析ping输出单个包的结果
result := parsePingOutput(string(output))
result["timestamp"] = time.Now().Unix()
return result
}
func parsePingOutput(output string) map[string]interface{} {
result := map[string]interface{}{
"latency": float64(0),
"success": true,
"packet_loss": false,
}
lines := strings.Split(output, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
// 解析单个ping包的响应时间64 bytes from 8.8.8.8: icmp_seq=0 ttl=64 time=10.123 ms
if strings.Contains(line, "time=") && strings.Contains(line, "icmp_seq") {
// 提取time=后面的数值
timeIndex := strings.Index(line, "time=")
if timeIndex != -1 {
timePart := line[timeIndex+5:]
spaceIndex := strings.Index(timePart, " ")
if spaceIndex != -1 {
timeStr := timePart[:spaceIndex]
if latency, err := strconv.ParseFloat(timeStr, 64); err == nil {
result["latency"] = latency
result["success"] = true
result["packet_loss"] = false
return result
}
}
}
}
// 解析丢包率1 packets transmitted, 1 received, 0% packet loss
if strings.Contains(line, "packets transmitted") {
parts := strings.Fields(line)
for i, part := range parts {
if part == "packet" && i+2 < len(parts) {
// 查找百分比
lossStr := strings.Trim(parts[i+1], "%")
if loss, err := strconv.ParseFloat(lossStr, 64); err == nil {
result["packet_loss"] = loss > 0
if loss > 0 {
result["success"] = false
result["latency"] = -1
}
}
}
}
}
// 解析延迟rtt min/avg/max/mdev = 10.123/10.123/10.123/0.000 ms单个包时min=avg=max
if strings.Contains(line, "min/avg/max") || strings.Contains(line, "rtt") {
parts := strings.Fields(line)
for _, part := range parts {
if strings.Contains(part, "/") && !strings.Contains(part, "=") {
times := strings.Split(part, "/")
if len(times) >= 2 {
if avg, err := strconv.ParseFloat(times[1], 64); err == nil {
result["latency"] = avg
break
}
}
}
}
}
}
return result
}