first commit
This commit is contained in:
315
internal/handler/continuous.go
Normal file
315
internal/handler/continuous.go
Normal file
@@ -0,0 +1,315 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"linkmaster-node/internal/config"
|
||||
"linkmaster-node/internal/continuous"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var continuousTasks = make(map[string]*ContinuousTask)
|
||||
var taskMutex sync.RWMutex
|
||||
var backendURL string
|
||||
var logger *zap.Logger
|
||||
|
||||
func InitContinuousHandler(cfg *config.Config) {
|
||||
backendURL = cfg.Backend.URL
|
||||
logger, _ = zap.NewProduction()
|
||||
}
|
||||
|
||||
type ContinuousTask struct {
|
||||
TaskID string
|
||||
Type string
|
||||
Target string
|
||||
Interval time.Duration
|
||||
MaxDuration time.Duration
|
||||
StartTime time.Time
|
||||
LastRequest time.Time
|
||||
StopCh chan struct{}
|
||||
IsRunning bool
|
||||
pingTask *continuous.PingTask
|
||||
tcpingTask *continuous.TCPingTask
|
||||
}
|
||||
|
||||
func HandleContinuousStart(c *gin.Context) {
|
||||
var req struct {
|
||||
Type string `json:"type" binding:"required"`
|
||||
Target string `json:"target" binding:"required"`
|
||||
Interval int `json:"interval"` // 秒
|
||||
MaxDuration int `json:"max_duration"` // 分钟
|
||||
}
|
||||
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// 生成任务ID
|
||||
taskID := generateTaskID()
|
||||
|
||||
// 设置默认值
|
||||
interval := 10 * time.Second
|
||||
if req.Interval > 0 {
|
||||
interval = time.Duration(req.Interval) * time.Second
|
||||
}
|
||||
|
||||
maxDuration := 60 * time.Minute
|
||||
if req.MaxDuration > 0 {
|
||||
maxDuration = time.Duration(req.MaxDuration) * time.Minute
|
||||
}
|
||||
|
||||
// 创建任务
|
||||
task := &ContinuousTask{
|
||||
TaskID: taskID,
|
||||
Type: req.Type,
|
||||
Target: req.Target,
|
||||
Interval: interval,
|
||||
MaxDuration: maxDuration,
|
||||
StartTime: time.Now(),
|
||||
LastRequest: time.Now(),
|
||||
StopCh: make(chan struct{}),
|
||||
IsRunning: true,
|
||||
}
|
||||
|
||||
// 根据类型创建对应的任务
|
||||
if req.Type == "ping" {
|
||||
pingTask := continuous.NewPingTask(taskID, req.Target, interval, maxDuration)
|
||||
task.pingTask = pingTask
|
||||
} else if req.Type == "tcping" {
|
||||
tcpingTask, err := continuous.NewTCPingTask(taskID, req.Target, interval, maxDuration)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
task.tcpingTask = tcpingTask
|
||||
} else {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "不支持的持续测试类型"})
|
||||
return
|
||||
}
|
||||
|
||||
taskMutex.Lock()
|
||||
continuousTasks[taskID] = task
|
||||
taskMutex.Unlock()
|
||||
|
||||
// 启动持续测试goroutine
|
||||
ctx := context.Background()
|
||||
if task.pingTask != nil {
|
||||
go task.pingTask.Start(ctx, func(result map[string]interface{}) {
|
||||
pushResultToBackend(taskID, result)
|
||||
})
|
||||
} else if task.tcpingTask != nil {
|
||||
go task.tcpingTask.Start(ctx, func(result map[string]interface{}) {
|
||||
pushResultToBackend(taskID, result)
|
||||
})
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"task_id": taskID,
|
||||
})
|
||||
}
|
||||
|
||||
func HandleContinuousStop(c *gin.Context) {
|
||||
var req struct {
|
||||
TaskID string `json:"task_id" binding:"required"`
|
||||
}
|
||||
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
taskMutex.Lock()
|
||||
task, exists := continuousTasks[req.TaskID]
|
||||
if exists {
|
||||
task.IsRunning = false
|
||||
if task.pingTask != nil {
|
||||
task.pingTask.Stop()
|
||||
}
|
||||
if task.tcpingTask != nil {
|
||||
task.tcpingTask.Stop()
|
||||
}
|
||||
close(task.StopCh)
|
||||
delete(continuousTasks, req.TaskID)
|
||||
}
|
||||
taskMutex.Unlock()
|
||||
|
||||
if !exists {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "任务不存在"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "任务已停止"})
|
||||
}
|
||||
|
||||
func HandleContinuousStatus(c *gin.Context) {
|
||||
taskID := c.Query("task_id")
|
||||
if taskID == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "task_id参数缺失"})
|
||||
return
|
||||
}
|
||||
|
||||
taskMutex.RLock()
|
||||
task, exists := continuousTasks[taskID]
|
||||
if exists {
|
||||
// 更新LastRequest时间
|
||||
task.LastRequest = time.Now()
|
||||
if task.pingTask != nil {
|
||||
task.pingTask.UpdateLastRequest()
|
||||
}
|
||||
if task.tcpingTask != nil {
|
||||
task.tcpingTask.UpdateLastRequest()
|
||||
}
|
||||
}
|
||||
taskMutex.RUnlock()
|
||||
|
||||
if !exists {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "任务不存在"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"task_id": task.TaskID,
|
||||
"is_running": task.IsRunning,
|
||||
"start_time": task.StartTime,
|
||||
"last_request": task.LastRequest,
|
||||
})
|
||||
}
|
||||
|
||||
func pushResultToBackend(taskID string, result map[string]interface{}) {
|
||||
// 推送结果到后端
|
||||
url := fmt.Sprintf("%s/api/public/node/continuous/result", backendURL)
|
||||
|
||||
// 获取本机IP
|
||||
nodeIP := getLocalIP()
|
||||
|
||||
data := map[string]interface{}{
|
||||
"task_id": taskID,
|
||||
"node_ip": nodeIP,
|
||||
"result": result,
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
logger.Error("序列化结果失败", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
logger.Error("创建请求失败", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
logger.Warn("推送结果失败", zap.Error(err))
|
||||
// 如果推送失败,停止任务
|
||||
taskMutex.Lock()
|
||||
if task, exists := continuousTasks[taskID]; exists {
|
||||
task.IsRunning = false
|
||||
if task.pingTask != nil {
|
||||
task.pingTask.Stop()
|
||||
}
|
||||
if task.tcpingTask != nil {
|
||||
task.tcpingTask.Stop()
|
||||
}
|
||||
delete(continuousTasks, taskID)
|
||||
}
|
||||
taskMutex.Unlock()
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
logger.Warn("推送结果失败", zap.Int("status", resp.StatusCode))
|
||||
// 如果推送失败,停止任务
|
||||
taskMutex.Lock()
|
||||
if task, exists := continuousTasks[taskID]; exists {
|
||||
task.IsRunning = false
|
||||
if task.pingTask != nil {
|
||||
task.pingTask.Stop()
|
||||
}
|
||||
if task.tcpingTask != nil {
|
||||
task.tcpingTask.Stop()
|
||||
}
|
||||
delete(continuousTasks, taskID)
|
||||
}
|
||||
taskMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func getLocalIP() string {
|
||||
// 简化实现:返回第一个非回环IP
|
||||
// 实际应该获取外网IP
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return "127.0.0.1"
|
||||
}
|
||||
|
||||
for _, addr := range addrs {
|
||||
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
|
||||
if ipNet.IP.To4() != nil {
|
||||
return ipNet.IP.String()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return "127.0.0.1"
|
||||
}
|
||||
|
||||
func generateTaskID() string {
|
||||
return fmt.Sprintf("task_%d", time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// 定期清理超时任务
|
||||
func StartTaskCleanup() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
now := time.Now()
|
||||
taskMutex.Lock()
|
||||
for taskID, task := range continuousTasks {
|
||||
// 检查最大运行时长
|
||||
if now.Sub(task.StartTime) > task.MaxDuration {
|
||||
logger.Info("任务达到最大运行时长,自动停止", zap.String("task_id", taskID))
|
||||
task.IsRunning = false
|
||||
if task.pingTask != nil {
|
||||
task.pingTask.Stop()
|
||||
}
|
||||
if task.tcpingTask != nil {
|
||||
task.tcpingTask.Stop()
|
||||
}
|
||||
delete(continuousTasks, taskID)
|
||||
continue
|
||||
}
|
||||
// 检查无客户端连接(30分钟无请求)
|
||||
if now.Sub(task.LastRequest) > 30*time.Minute {
|
||||
logger.Info("任务无客户端连接,自动停止", zap.String("task_id", taskID))
|
||||
task.IsRunning = false
|
||||
if task.pingTask != nil {
|
||||
task.pingTask.Stop()
|
||||
}
|
||||
if task.tcpingTask != nil {
|
||||
task.tcpingTask.Stop()
|
||||
}
|
||||
delete(continuousTasks, taskID)
|
||||
}
|
||||
}
|
||||
taskMutex.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user