diff --git a/INSTALL.md b/INSTALL.md new file mode 100644 index 0000000..b2accf5 --- /dev/null +++ b/INSTALL.md @@ -0,0 +1,262 @@ +# LinkMaster 节点端安装指南 + +## 一句话安装 + +### 从 GitHub 安装(推荐) + +```bash +curl -fsSL https://raw.githubusercontent.com/yourbask/linkmaster-node/main/install.sh | bash -s -- http://your-backend-server:8080 +``` + +**替换说明:** +- `yourbask/linkmaster-node` - 独立的 node 项目 GitHub 仓库地址 +- `http://your-backend-server:8080` - 替换为实际的后端服务器地址 + +**重要提示:** +- ⚠️ 节点端需要直接连接后端服务器(端口 8080),不是前端地址 +- 前端通过 `/api` 路径代理到后端,但节点端不使用前端代理 +- 如果节点和后端在同一服务器:使用 `http://localhost:8080` +- 如果节点和后端在不同服务器:使用 `http://backend-ip:8080` 或 `http://backend-domain:8080` +- **本项目是独立的 GitHub 仓库**,与前后端项目分离 + +**示例:** +```bash +# 如果后端服务器在 192.168.1.100:8080 +curl -fsSL https://raw.githubusercontent.com/yourbask/linkmaster-node/main/install.sh | bash -s -- http://192.168.1.100:8080 +``` + +### 指定分支安装 + +```bash +GITHUB_BRANCH=develop curl -fsSL https://raw.githubusercontent.com/yourbask/linkmaster-node/main/install.sh | bash -s -- http://your-backend-server:8080 +``` + +## 安装步骤说明 + +安装脚本会自动完成以下步骤: + +1. **检测系统** - 自动识别 Linux 发行版和 CPU 架构 +2. **安装依赖** - 自动安装 Git、Go、ping、traceroute、dnsutils 等工具 +3. **克隆源码** - 从 GitHub 克隆 node 项目源码到 `/opt/linkmaster-node` +4. **编译安装** - 自动编译源码并安装二进制文件 +5. **创建服务** - 自动创建 systemd 服务文件(使用 run.sh 启动) +6. **启动服务** - 自动启动并设置开机自启 +7. **验证安装** - 检查服务状态和健康检查 + +**注意:** 每次服务启动时会自动拉取最新代码并重新编译,确保使用最新版本。 + +## 安装后管理 + +### 查看服务状态 + +```bash +sudo systemctl status linkmaster-node +``` + +### 查看日志 + +```bash +# 实时日志 +sudo journalctl -u linkmaster-node -f + +# 最近50行日志 +sudo journalctl -u linkmaster-node -n 50 +``` + +### 重启服务 + +```bash +sudo systemctl restart linkmaster-node +``` + +### 停止服务 + +```bash +sudo systemctl stop linkmaster-node +``` + +### 禁用开机自启 + +```bash +sudo systemctl disable linkmaster-node +``` + +## 验证安装 + +### 检查进程 + +```bash +ps aux | grep linkmaster-node +``` + +### 检查端口 + +```bash +netstat -tlnp | grep 2200 +# 或 +ss -tlnp | grep 2200 +``` + +### 健康检查 + +```bash +curl http://localhost:2200/api/health +``` + +应该返回:`{"status":"ok"}` + +## 手动安装(不使用脚本) + +如果无法使用一键安装脚本,可以手动安装: + +### 1. 克隆源码并编译 + +```bash +# 克隆仓库 +git clone https://github.com/yourbask/linkmaster-node.git /opt/linkmaster-node +cd /opt/linkmaster-node + +# 安装 Go 环境(如果未安装) +# Ubuntu/Debian +sudo apt-get install -y golang-go + +# CentOS/RHEL +sudo yum install -y golang + +# 编译 +go build -o agent ./cmd/agent + +# 安装到系统目录 +sudo cp agent /usr/local/bin/linkmaster-node +sudo chmod +x /usr/local/bin/linkmaster-node +``` + +### 2. 安装系统依赖 + +```bash +# Ubuntu/Debian +sudo apt-get update +sudo apt-get install -y ping traceroute dnsutils curl + +# CentOS/RHEL +sudo yum install -y iputils traceroute bind-utils curl +``` + +### 3. 创建 systemd 服务 + +```bash +# 确保 run.sh 有执行权限 +sudo chmod +x /opt/linkmaster-node/run.sh + +sudo tee /etc/systemd/system/linkmaster-node.service > /dev/null < +# 示例: curl -fsSL https://raw.githubusercontent.com/yourbask/linkmaster-node/main/install.sh | bash -s -- http://192.168.1.100:8080 +# ============================================ + +set -e + +# 颜色输出 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +# 配置 +# 尝试从脚本URL自动提取仓库信息(如果通过curl下载) +SCRIPT_URL="${SCRIPT_URL:-}" +if [ -z "$SCRIPT_URL" ] && [ -n "${BASH_SOURCE[0]}" ]; then + # 如果脚本是通过 curl 下载的,尝试从环境变量获取 + SCRIPT_URL="${SCRIPT_URL:-}" +fi + +# 默认配置(如果无法自动提取,使用这些默认值) +GITHUB_REPO="${GITHUB_REPO:-yourbask/linkmaster-node}" # 默认仓库(独立的 node 项目) +GITHUB_BRANCH="${GITHUB_BRANCH:-main}" # 默认分支 +SOURCE_DIR="/opt/linkmaster-node" # 源码目录 +BINARY_NAME="linkmaster-node" +INSTALL_DIR="/usr/local/bin" +SERVICE_NAME="linkmaster-node" + +# 获取后端地址参数 +BACKEND_URL="${1:-}" +if [ -z "$BACKEND_URL" ]; then + echo -e "${RED}错误: 请提供后端服务器地址${NC}" + echo -e "${YELLOW}使用方法:${NC}" + echo " curl -fsSL https://raw.githubusercontent.com/${GITHUB_REPO}/${GITHUB_BRANCH}/install.sh | bash -s -- http://your-backend-server:8080" + echo "" + echo -e "${YELLOW}注意:${NC}" + echo " - 节点端需要直接连接后端服务器,不是前端地址" + echo " - 后端默认端口: 8080" + echo " - 如果节点和后端在同一服务器: http://localhost:8080" + echo " - 如果节点和后端在不同服务器: http://backend-ip:8080 或 http://backend-domain:8080" + exit 1 +fi + +# 检测系统类型和架构 +detect_system() { + if [ -f /etc/os-release ]; then + . /etc/os-release + OS=$ID + OS_VERSION=$VERSION_ID + else + echo -e "${RED}无法检测系统类型${NC}" + exit 1 + fi + + ARCH=$(uname -m) + case $ARCH in + x86_64) + ARCH="amd64" + ;; + aarch64|arm64) + ARCH="arm64" + ;; + *) + echo -e "${RED}不支持的架构: $ARCH${NC}" + exit 1 + ;; + esac + + echo -e "${BLUE}检测到系统: $OS $OS_VERSION ($ARCH)${NC}" +} + +# 安装系统依赖 +install_dependencies() { + echo -e "${BLUE}安装系统依赖...${NC}" + + if [ "$OS" = "ubuntu" ] || [ "$OS" = "debian" ]; then + sudo apt-get update -qq + sudo apt-get install -y -qq curl wget ping traceroute dnsutils git > /dev/null 2>&1 + elif [ "$OS" = "centos" ] || [ "$OS" = "rhel" ] || [ "$OS" = "rocky" ] || [ "$OS" = "almalinux" ]; then + sudo yum install -y -q curl wget iputils traceroute bind-utils git > /dev/null 2>&1 + else + echo -e "${YELLOW}警告: 未知系统类型,跳过依赖安装${NC}" + fi + + echo -e "${GREEN}✓ 依赖安装完成${NC}" +} + +# 安装 Go 环境 +install_go() { + echo -e "${BLUE}安装 Go 环境...${NC}" + + if [ "$OS" = "ubuntu" ] || [ "$OS" = "debian" ]; then + sudo apt-get update -qq + sudo apt-get install -y -qq golang-go > /dev/null 2>&1 + elif [ "$OS" = "centos" ] || [ "$OS" = "rhel" ] || [ "$OS" = "rocky" ] || [ "$OS" = "almalinux" ]; then + sudo yum install -y -q golang > /dev/null 2>&1 + else + echo -e "${YELLOW}无法自动安装 Go,请手动安装: https://golang.org/dl/${NC}" + show_build_alternatives + exit 1 + fi + + if command -v go > /dev/null 2>&1; then + GO_VERSION=$(go version 2>/dev/null | head -1) + echo -e "${GREEN}✓ Go 安装完成: ${GO_VERSION}${NC}" + else + echo -e "${RED}Go 安装失败${NC}" + show_build_alternatives + exit 1 + fi +} + +# 显示替代方案 +show_build_alternatives() { + echo "" + echo -e "${YELLOW}═══════════════════════════════════════════════════════════${NC}" + echo -e "${YELLOW} 安装失败,请使用以下替代方案:${NC}" + echo -e "${YELLOW}═══════════════════════════════════════════════════════════${NC}" + echo "" + echo -e "${GREEN}手动编译安装:${NC}" + echo " git clone https://github.com/${GITHUB_REPO}.git ${SOURCE_DIR}" + echo " cd ${SOURCE_DIR}" + echo " go build -o agent ./cmd/agent" + echo " sudo cp agent /usr/local/bin/linkmaster-node" + echo " sudo chmod +x /usr/local/bin/linkmaster-node" + echo "" +} + +# 从源码编译安装 +build_from_source() { + echo -e "${BLUE}从源码编译安装节点端...${NC}" + + # 检查 Go 环境 + if ! command -v go > /dev/null 2>&1; then + echo -e "${BLUE}未检测到 Go 环境,开始安装...${NC}" + install_go + fi + + # 检查 Go 版本 + GO_VERSION=$(go version 2>/dev/null | head -1 || echo "") + if [ -z "$GO_VERSION" ]; then + echo -e "${RED}无法获取 Go 版本信息${NC}" + show_build_alternatives + exit 1 + fi + + echo -e "${BLUE}检测到 Go 版本: ${GO_VERSION}${NC}" + + # 如果源码目录已存在,先备份或删除 + if [ -d "$SOURCE_DIR" ]; then + echo -e "${YELLOW}源码目录已存在,将更新代码...${NC}" + sudo rm -rf "$SOURCE_DIR" + fi + + # 克隆仓库到源码目录 + echo -e "${BLUE}克隆仓库到 ${SOURCE_DIR}...${NC}" + if ! sudo git clone --branch "${GITHUB_BRANCH}" "https://github.com/${GITHUB_REPO}.git" "$SOURCE_DIR" 2>&1; then + echo -e "${RED}克隆仓库失败,请检查网络连接和仓库地址${NC}" + echo -e "${YELLOW}仓库地址: https://github.com/${GITHUB_REPO}.git${NC}" + show_build_alternatives + exit 1 + fi + + # 设置目录权限 + sudo chown -R $USER:$USER "$SOURCE_DIR" 2>/dev/null || true + + cd "$SOURCE_DIR" + + # 下载依赖 + echo -e "${BLUE}下载 Go 依赖...${NC}" + if ! go mod download 2>&1; then + echo -e "${RED}下载依赖失败${NC}" + show_build_alternatives + exit 1 + fi + + # 编译 + echo -e "${BLUE}编译二进制文件...${NC}" + BINARY_PATH="$SOURCE_DIR/agent" + if GOOS=linux GOARCH=${ARCH} CGO_ENABLED=0 go build -ldflags="-w -s" -o "$BINARY_PATH" ./cmd/agent 2>&1; then + if [ -f "$BINARY_PATH" ] && [ -s "$BINARY_PATH" ]; then + echo -e "${GREEN}✓ 编译成功${NC}" + else + echo -e "${RED}编译失败:未生成二进制文件${NC}" + show_build_alternatives + exit 1 + fi + else + echo -e "${RED}编译失败${NC}" + show_build_alternatives + exit 1 + fi + + # 复制到安装目录(可选,保留在源码目录供 run.sh 使用) + sudo mkdir -p "$INSTALL_DIR" + sudo cp "$BINARY_PATH" "$INSTALL_DIR/$BINARY_NAME" + sudo chmod +x "$INSTALL_DIR/$BINARY_NAME" + + echo -e "${GREEN}✓ 编译安装完成${NC}" + echo -e "${BLUE}源码目录: ${SOURCE_DIR}${NC}" + echo -e "${BLUE}二进制文件: ${INSTALL_DIR}/${BINARY_NAME}${NC}" +} + +# 创建 systemd 服务 +create_service() { + echo -e "${BLUE}创建 systemd 服务...${NC}" + + # 确保 run.sh 有执行权限 + sudo chmod +x "$SOURCE_DIR/run.sh" + + sudo tee /etc/systemd/system/${SERVICE_NAME}.service > /dev/null < /dev/null 2>&1 + sudo systemctl restart ${SERVICE_NAME} + + # 等待服务启动 + sleep 3 + + # 检查服务状态 + if sudo systemctl is-active --quiet ${SERVICE_NAME}; then + echo -e "${GREEN}✓ 服务启动成功${NC}" + else + echo -e "${RED}✗ 服务启动失败${NC}" + echo -e "${YELLOW}查看日志: sudo journalctl -u ${SERVICE_NAME} -n 50${NC}" + exit 1 + fi +} + +# 验证安装 +verify_installation() { + echo -e "${BLUE}验证安装...${NC}" + + # 检查进程 + if pgrep -f "$BINARY_NAME" > /dev/null; then + echo -e "${GREEN}✓ 进程运行中${NC}" + else + echo -e "${YELLOW}⚠ 进程未运行${NC}" + fi + + # 检查端口 + if command -v netstat > /dev/null 2>&1; then + if netstat -tlnp 2>/dev/null | grep -q ":2200"; then + echo -e "${GREEN}✓ 端口 2200 已监听${NC}" + fi + elif command -v ss > /dev/null 2>&1; then + if ss -tlnp 2>/dev/null | grep -q ":2200"; then + echo -e "${GREEN}✓ 端口 2200 已监听${NC}" + fi + fi + + # 健康检查 + sleep 2 + if curl -sf http://localhost:2200/api/health > /dev/null; then + echo -e "${GREEN}✓ 健康检查通过${NC}" + else + echo -e "${YELLOW}⚠ 健康检查未通过,请稍后重试${NC}" + fi +} + +# 主安装流程 +main() { + echo -e "${GREEN}========================================${NC}" + echo -e "${GREEN} LinkMaster 节点端安装程序${NC}" + echo -e "${GREEN}========================================${NC}" + echo "" + + detect_system + install_dependencies + build_from_source + create_service + start_service + verify_installation + + echo "" + echo -e "${GREEN}========================================${NC}" + echo -e "${GREEN} 安装完成!${NC}" + echo -e "${GREEN}========================================${NC}" + echo "" + echo -e "${BLUE}服务管理命令:${NC}" + echo " 查看状态: sudo systemctl status ${SERVICE_NAME}" + echo " 查看日志: sudo journalctl -u ${SERVICE_NAME} -f" + echo " 重启服务: sudo systemctl restart ${SERVICE_NAME}" + echo " 停止服务: sudo systemctl stop ${SERVICE_NAME}" + echo "" + echo -e "${BLUE}后端地址: ${BACKEND_URL}${NC}" + echo -e "${BLUE}节点端口: 2200${NC}" + echo "" + echo -e "${YELLOW}重要提示:${NC}" + echo " - 节点端直接连接后端服务器,不使用前端代理" + echo " - 确保后端地址可访问: curl ${BACKEND_URL}/api/public/nodes/online" +} + +# 执行安装 +main diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..c3d0b05 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,60 @@ +package config + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" +) + +type Config struct { + Server struct { + Port int `yaml:"port"` + } `yaml:"server"` + + Backend struct { + URL string `yaml:"url"` + } `yaml:"backend"` + + Heartbeat struct { + Interval int `yaml:"interval"` // 心跳间隔(秒) + } `yaml:"heartbeat"` + + Debug bool `yaml:"debug"` +} + +func Load() (*Config, error) { + cfg := &Config{} + + // 默认配置 + cfg.Server.Port = 2200 + cfg.Heartbeat.Interval = 60 + cfg.Debug = false + + // 从环境变量读取后端URL + backendURL := os.Getenv("BACKEND_URL") + if backendURL == "" { + backendURL = "http://localhost:8080" + } + cfg.Backend.URL = backendURL + + // 尝试从配置文件读取 + configPath := os.Getenv("CONFIG_PATH") + if configPath == "" { + configPath = "config.yaml" + } + + if _, err := os.Stat(configPath); err == nil { + data, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("读取配置文件失败: %w", err) + } + + if err := yaml.Unmarshal(data, cfg); err != nil { + return nil, fmt.Errorf("解析配置文件失败: %w", err) + } + } + + return cfg, nil +} + diff --git a/internal/continuous/ping.go b/internal/continuous/ping.go new file mode 100644 index 0000000..f3208b1 --- /dev/null +++ b/internal/continuous/ping.go @@ -0,0 +1,143 @@ +package continuous + +import ( + "context" + "os/exec" + "strconv" + "strings" + "sync" + "time" + + "go.uber.org/zap" +) + +type PingTask struct { + TaskID string + Target string + Interval time.Duration + MaxDuration time.Duration + StartTime time.Time + LastRequest time.Time + StopCh chan struct{} + IsRunning bool + mu sync.RWMutex + logger *zap.Logger +} + +func NewPingTask(taskID, target string, interval, maxDuration time.Duration) *PingTask { + logger, _ := zap.NewProduction() + return &PingTask{ + TaskID: taskID, + Target: target, + Interval: interval, + MaxDuration: maxDuration, + StartTime: time.Now(), + LastRequest: time.Now(), + StopCh: make(chan struct{}), + IsRunning: true, + logger: logger, + } +} + +func (t *PingTask) Start(ctx context.Context, resultCallback func(result map[string]interface{})) { + ticker := time.NewTicker(t.Interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.StopCh: + return + case <-ticker.C: + // 检查是否超过最大运行时长 + t.mu.RLock() + if time.Since(t.StartTime) > t.MaxDuration { + t.mu.RUnlock() + t.Stop() + return + } + t.mu.RUnlock() + + // 执行ping测试 + result := t.executePing() + if resultCallback != nil { + resultCallback(result) + } + } + } +} + +func (t *PingTask) Stop() { + t.mu.Lock() + defer t.mu.Unlock() + if t.IsRunning { + t.IsRunning = false + close(t.StopCh) + } +} + +func (t *PingTask) UpdateLastRequest() { + t.mu.Lock() + defer t.mu.Unlock() + t.LastRequest = time.Now() +} + +func (t *PingTask) executePing() map[string]interface{} { + cmd := exec.Command("ping", "-c", "4", t.Target) + output, err := cmd.CombinedOutput() + if err != nil { + return map[string]interface{}{ + "timestamp": time.Now().Unix(), + "latency": -1, + "success": false, + "packet_loss": true, + "error": err.Error(), + } + } + + // 解析ping输出 + result := parsePingOutput(string(output)) + result["timestamp"] = time.Now().Unix() + return result +} + +func parsePingOutput(output string) map[string]interface{} { + result := map[string]interface{}{ + "latency": 0.0, + "success": true, + "packet_loss": false, + } + + lines := strings.Split(output, "\n") + for _, line := range lines { + if strings.Contains(line, "packets transmitted") { + // 解析丢包率 + parts := strings.Fields(line) + for i, part := range parts { + if part == "packet" && i+2 < len(parts) { + if loss, err := strconv.ParseFloat(strings.Trim(parts[i+1], "%"), 64); err == nil { + result["packet_loss"] = loss > 0 + } + } + } + } + if strings.Contains(line, "min/avg/max") { + // 解析平均延迟 + parts := strings.Fields(line) + for _, part := range parts { + if strings.Contains(part, "/") { + times := strings.Split(part, "/") + if len(times) >= 2 { + if avg, err := strconv.ParseFloat(times[1], 64); err == nil { + result["latency"] = avg + } + } + } + } + } + } + + return result +} + diff --git a/internal/continuous/tcping.go b/internal/continuous/tcping.go new file mode 100644 index 0000000..31253ca --- /dev/null +++ b/internal/continuous/tcping.go @@ -0,0 +1,126 @@ +package continuous + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + "sync" + "time" + + "go.uber.org/zap" +) + +type TCPingTask struct { + TaskID string + Target string + Host string + Port int + Interval time.Duration + MaxDuration time.Duration + StartTime time.Time + LastRequest time.Time + StopCh chan struct{} + IsRunning bool + mu sync.RWMutex + logger *zap.Logger +} + +func NewTCPingTask(taskID, target string, interval, maxDuration time.Duration) (*TCPingTask, error) { + // 解析host:port + parts := strings.Split(target, ":") + if len(parts) != 2 { + return nil, fmt.Errorf("无效的target格式,需要 host:port") + } + + host := parts[0] + port, err := strconv.Atoi(parts[1]) + if err != nil { + return nil, fmt.Errorf("无效的端口: %v", err) + } + + logger, _ := zap.NewProduction() + return &TCPingTask{ + TaskID: taskID, + Target: target, + Host: host, + Port: port, + Interval: interval, + MaxDuration: maxDuration, + StartTime: time.Now(), + LastRequest: time.Now(), + StopCh: make(chan struct{}), + IsRunning: true, + logger: logger, + }, nil +} + +func (t *TCPingTask) Start(ctx context.Context, resultCallback func(result map[string]interface{})) { + ticker := time.NewTicker(t.Interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.StopCh: + return + case <-ticker.C: + // 检查是否超过最大运行时长 + t.mu.RLock() + if time.Since(t.StartTime) > t.MaxDuration { + t.mu.RUnlock() + t.Stop() + return + } + t.mu.RUnlock() + + // 执行tcping测试 + result := t.executeTCPing() + if resultCallback != nil { + resultCallback(result) + } + } + } +} + +func (t *TCPingTask) Stop() { + t.mu.Lock() + defer t.mu.Unlock() + if t.IsRunning { + t.IsRunning = false + close(t.StopCh) + } +} + +func (t *TCPingTask) UpdateLastRequest() { + t.mu.Lock() + defer t.mu.Unlock() + t.LastRequest = time.Now() +} + +func (t *TCPingTask) executeTCPing() map[string]interface{} { + start := time.Now() + conn, err := net.DialTimeout("tcp", net.JoinHostPort(t.Host, strconv.Itoa(t.Port)), 5*time.Second) + latency := time.Since(start).Milliseconds() + + if err != nil { + return map[string]interface{}{ + "timestamp": time.Now().Unix(), + "latency": -1, + "success": false, + "packet_loss": true, + "error": err.Error(), + } + } + defer conn.Close() + + return map[string]interface{}{ + "timestamp": time.Now().Unix(), + "latency": float64(latency), + "success": true, + "packet_loss": false, + } +} + diff --git a/internal/handler/continuous.go b/internal/handler/continuous.go new file mode 100644 index 0000000..1095fae --- /dev/null +++ b/internal/handler/continuous.go @@ -0,0 +1,315 @@ +package handler + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "sync" + "time" + + "linkmaster-node/internal/config" + "linkmaster-node/internal/continuous" + + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +var continuousTasks = make(map[string]*ContinuousTask) +var taskMutex sync.RWMutex +var backendURL string +var logger *zap.Logger + +func InitContinuousHandler(cfg *config.Config) { + backendURL = cfg.Backend.URL + logger, _ = zap.NewProduction() +} + +type ContinuousTask struct { + TaskID string + Type string + Target string + Interval time.Duration + MaxDuration time.Duration + StartTime time.Time + LastRequest time.Time + StopCh chan struct{} + IsRunning bool + pingTask *continuous.PingTask + tcpingTask *continuous.TCPingTask +} + +func HandleContinuousStart(c *gin.Context) { + var req struct { + Type string `json:"type" binding:"required"` + Target string `json:"target" binding:"required"` + Interval int `json:"interval"` // 秒 + MaxDuration int `json:"max_duration"` // 分钟 + } + + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // 生成任务ID + taskID := generateTaskID() + + // 设置默认值 + interval := 10 * time.Second + if req.Interval > 0 { + interval = time.Duration(req.Interval) * time.Second + } + + maxDuration := 60 * time.Minute + if req.MaxDuration > 0 { + maxDuration = time.Duration(req.MaxDuration) * time.Minute + } + + // 创建任务 + task := &ContinuousTask{ + TaskID: taskID, + Type: req.Type, + Target: req.Target, + Interval: interval, + MaxDuration: maxDuration, + StartTime: time.Now(), + LastRequest: time.Now(), + StopCh: make(chan struct{}), + IsRunning: true, + } + + // 根据类型创建对应的任务 + if req.Type == "ping" { + pingTask := continuous.NewPingTask(taskID, req.Target, interval, maxDuration) + task.pingTask = pingTask + } else if req.Type == "tcping" { + tcpingTask, err := continuous.NewTCPingTask(taskID, req.Target, interval, maxDuration) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + task.tcpingTask = tcpingTask + } else { + c.JSON(http.StatusBadRequest, gin.H{"error": "不支持的持续测试类型"}) + return + } + + taskMutex.Lock() + continuousTasks[taskID] = task + taskMutex.Unlock() + + // 启动持续测试goroutine + ctx := context.Background() + if task.pingTask != nil { + go task.pingTask.Start(ctx, func(result map[string]interface{}) { + pushResultToBackend(taskID, result) + }) + } else if task.tcpingTask != nil { + go task.tcpingTask.Start(ctx, func(result map[string]interface{}) { + pushResultToBackend(taskID, result) + }) + } + + c.JSON(http.StatusOK, gin.H{ + "task_id": taskID, + }) +} + +func HandleContinuousStop(c *gin.Context) { + var req struct { + TaskID string `json:"task_id" binding:"required"` + } + + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + taskMutex.Lock() + task, exists := continuousTasks[req.TaskID] + if exists { + task.IsRunning = false + if task.pingTask != nil { + task.pingTask.Stop() + } + if task.tcpingTask != nil { + task.tcpingTask.Stop() + } + close(task.StopCh) + delete(continuousTasks, req.TaskID) + } + taskMutex.Unlock() + + if !exists { + c.JSON(http.StatusNotFound, gin.H{"error": "任务不存在"}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "任务已停止"}) +} + +func HandleContinuousStatus(c *gin.Context) { + taskID := c.Query("task_id") + if taskID == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "task_id参数缺失"}) + return + } + + taskMutex.RLock() + task, exists := continuousTasks[taskID] + if exists { + // 更新LastRequest时间 + task.LastRequest = time.Now() + if task.pingTask != nil { + task.pingTask.UpdateLastRequest() + } + if task.tcpingTask != nil { + task.tcpingTask.UpdateLastRequest() + } + } + taskMutex.RUnlock() + + if !exists { + c.JSON(http.StatusNotFound, gin.H{"error": "任务不存在"}) + return + } + + c.JSON(http.StatusOK, gin.H{ + "task_id": task.TaskID, + "is_running": task.IsRunning, + "start_time": task.StartTime, + "last_request": task.LastRequest, + }) +} + +func pushResultToBackend(taskID string, result map[string]interface{}) { + // 推送结果到后端 + url := fmt.Sprintf("%s/api/public/node/continuous/result", backendURL) + + // 获取本机IP + nodeIP := getLocalIP() + + data := map[string]interface{}{ + "task_id": taskID, + "node_ip": nodeIP, + "result": result, + } + + jsonData, err := json.Marshal(data) + if err != nil { + logger.Error("序列化结果失败", zap.Error(err)) + return + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + logger.Error("创建请求失败", zap.Error(err)) + return + } + + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) + if err != nil { + logger.Warn("推送结果失败", zap.Error(err)) + // 如果推送失败,停止任务 + taskMutex.Lock() + if task, exists := continuousTasks[taskID]; exists { + task.IsRunning = false + if task.pingTask != nil { + task.pingTask.Stop() + } + if task.tcpingTask != nil { + task.tcpingTask.Stop() + } + delete(continuousTasks, taskID) + } + taskMutex.Unlock() + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + logger.Warn("推送结果失败", zap.Int("status", resp.StatusCode)) + // 如果推送失败,停止任务 + taskMutex.Lock() + if task, exists := continuousTasks[taskID]; exists { + task.IsRunning = false + if task.pingTask != nil { + task.pingTask.Stop() + } + if task.tcpingTask != nil { + task.tcpingTask.Stop() + } + delete(continuousTasks, taskID) + } + taskMutex.Unlock() + } +} + +func getLocalIP() string { + // 简化实现:返回第一个非回环IP + // 实际应该获取外网IP + addrs, err := net.InterfaceAddrs() + if err != nil { + return "127.0.0.1" + } + + for _, addr := range addrs { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + if ipNet.IP.To4() != nil { + return ipNet.IP.String() + } + } + } + + return "127.0.0.1" +} + +func generateTaskID() string { + return fmt.Sprintf("task_%d", time.Now().UnixNano()) +} + +// 定期清理超时任务 +func StartTaskCleanup() { + ticker := time.NewTicker(1 * time.Minute) + go func() { + for range ticker.C { + now := time.Now() + taskMutex.Lock() + for taskID, task := range continuousTasks { + // 检查最大运行时长 + if now.Sub(task.StartTime) > task.MaxDuration { + logger.Info("任务达到最大运行时长,自动停止", zap.String("task_id", taskID)) + task.IsRunning = false + if task.pingTask != nil { + task.pingTask.Stop() + } + if task.tcpingTask != nil { + task.tcpingTask.Stop() + } + delete(continuousTasks, taskID) + continue + } + // 检查无客户端连接(30分钟无请求) + if now.Sub(task.LastRequest) > 30*time.Minute { + logger.Info("任务无客户端连接,自动停止", zap.String("task_id", taskID)) + task.IsRunning = false + if task.pingTask != nil { + task.pingTask.Stop() + } + if task.tcpingTask != nil { + task.tcpingTask.Stop() + } + delete(continuousTasks, taskID) + } + } + taskMutex.Unlock() + } + }() +} + diff --git a/internal/handler/dns.go b/internal/handler/dns.go new file mode 100644 index 0000000..f542956 --- /dev/null +++ b/internal/handler/dns.go @@ -0,0 +1,45 @@ +package handler + +import ( + "net" + "time" + + "github.com/gin-gonic/gin" +) + +func handleDns(c *gin.Context, url string, params map[string]interface{}) { + // 执行DNS查询 + start := time.Now() + ips, err := net.LookupIP(url) + lookupTime := time.Since(start).Milliseconds() + + if err != nil { + c.JSON(200, gin.H{ + "type": "ceDns", + "url": url, + "error": err.Error(), + }) + return + } + + // 格式化IP列表 + ipList := make([]map[string]interface{}, 0) + for _, ip := range ips { + ipType := "A" + if ip.To4() == nil { + ipType = "AAAA" + } + ipList = append(ipList, map[string]interface{}{ + "type": ipType, + "ip": ip.String(), + }) + } + + c.JSON(200, gin.H{ + "type": "ceDns", + "url": url, + "ips": ipList, + "lookup_time": lookupTime, + }) +} + diff --git a/internal/handler/findping.go b/internal/handler/findping.go new file mode 100644 index 0000000..ac8d30d --- /dev/null +++ b/internal/handler/findping.go @@ -0,0 +1,84 @@ +package handler + +import ( + "net" + "os/exec" + "sync" + + "github.com/gin-gonic/gin" +) + +func handleFindPing(c *gin.Context, url string, params map[string]interface{}) { + // url应该是CIDR格式,如 8.8.8.0/24 + cidr := url + if cidrParam, ok := params["cidr"].(string); ok && cidrParam != "" { + cidr = cidrParam + } + + // 解析CIDR + _, ipNet, err := net.ParseCIDR(cidr) + if err != nil { + c.JSON(200, gin.H{ + "type": "ceFindPing", + "error": "无效的CIDR格式", + }) + return + } + + // 生成IP列表 + var ipList []string + for ip := ipNet.IP.Mask(ipNet.Mask); ipNet.Contains(ip); incIP(ip) { + ipList = append(ipList, ip.String()) + } + + // 移除网络地址和广播地址 + if len(ipList) > 2 { + ipList = ipList[1 : len(ipList)-1] + } + + // 并发ping测试 + var wg sync.WaitGroup + var mu sync.Mutex + aliveIPs := make([]string, 0) + + // 限制并发数 + semaphore := make(chan struct{}, 50) + + for _, ip := range ipList { + wg.Add(1) + semaphore <- struct{}{} + go func(ipAddr string) { + defer wg.Done() + defer func() { <-semaphore }() + + // 执行ping(只ping一次,快速检测) + cmd := exec.Command("ping", "-c", "1", "-W", "1", ipAddr) + err := cmd.Run() + if err == nil { + mu.Lock() + aliveIPs = append(aliveIPs, ipAddr) + mu.Unlock() + } + }(ip) + } + + wg.Wait() + + c.JSON(200, gin.H{ + "type": "ceFindPing", + "cidr": cidr, + "alive_ips": aliveIPs, + "alive_count": len(aliveIPs), + "total_ips": len(ipList), + }) +} + +func incIP(ip net.IP) { + for j := len(ip) - 1; j >= 0; j-- { + ip[j]++ + if ip[j] > 0 { + break + } + } +} + diff --git a/internal/handler/get.go b/internal/handler/get.go new file mode 100644 index 0000000..4ca27cb --- /dev/null +++ b/internal/handler/get.go @@ -0,0 +1,32 @@ +package handler + +import ( + "net/http" + "time" + + "github.com/gin-gonic/gin" +) + +func handleGet(c *gin.Context, url string, params map[string]interface{}) { + // TODO: 实现HTTP GET测试 + // 这里先返回一个简单的响应 + c.JSON(http.StatusOK, gin.H{ + "type": "ceGet", + "url": url, + "statuscode": 200, + "totaltime": time.Since(time.Now()).Milliseconds(), + "response": "OK", + }) +} + +func handlePost(c *gin.Context, url string, params map[string]interface{}) { + // TODO: 实现HTTP POST测试 + c.JSON(http.StatusOK, gin.H{ + "type": "cePost", + "url": url, + "statuscode": 200, + "totaltime": time.Since(time.Now()).Milliseconds(), + "response": "OK", + }) +} + diff --git a/internal/handler/ping.go b/internal/handler/ping.go new file mode 100644 index 0000000..562600e --- /dev/null +++ b/internal/handler/ping.go @@ -0,0 +1,85 @@ +package handler + +import ( + "net" + "os/exec" + "strconv" + "strings" + + "github.com/gin-gonic/gin" +) + +func handlePing(c *gin.Context, url string, params map[string]interface{}) { + // 执行ping命令 + cmd := exec.Command("ping", "-c", "4", url) + output, err := cmd.CombinedOutput() + if err != nil { + c.JSON(200, gin.H{ + "type": "cePing", + "url": url, + "error": err.Error(), + }) + return + } + + // 解析ping输出 + result := parsePingOutput(string(output), url) + c.JSON(200, result) +} + +func parsePingOutput(output, url string) map[string]interface{} { + result := map[string]interface{}{ + "type": "cePing", + "url": url, + "ip": "", + } + + // 解析IP地址 + lines := strings.Split(output, "\n") + for _, line := range lines { + if strings.Contains(line, "PING") { + // 提取IP地址 + parts := strings.Fields(line) + for _, part := range parts { + if net.ParseIP(part) != nil { + result["ip"] = part + break + } + } + } + if strings.Contains(line, "packets transmitted") { + // 解析丢包率 + parts := strings.Fields(line) + for i, part := range parts { + if part == "packet" && i+2 < len(parts) { + if loss, err := strconv.ParseFloat(strings.Trim(parts[i+1], "%"), 64); err == nil { + result["packets_losrat"] = loss + } + } + } + } + if strings.Contains(line, "min/avg/max") { + // 解析延迟统计 + parts := strings.Fields(line) + for _, part := range parts { + if strings.Contains(part, "/") { + times := strings.Split(part, "/") + if len(times) >= 3 { + if min, err := strconv.ParseFloat(times[0], 64); err == nil { + result["time_min"] = min + } + if avg, err := strconv.ParseFloat(times[1], 64); err == nil { + result["time_avg"] = avg + } + if max, err := strconv.ParseFloat(times[2], 64); err == nil { + result["time_max"] = max + } + } + } + } + } + } + + return result +} + diff --git a/internal/handler/socket.go b/internal/handler/socket.go new file mode 100644 index 0000000..f9dd909 --- /dev/null +++ b/internal/handler/socket.go @@ -0,0 +1,59 @@ +package handler + +import ( + "net" + "strconv" + "strings" + "time" + + "github.com/gin-gonic/gin" +) + +func handleSocket(c *gin.Context, url string, params map[string]interface{}) { + // 解析host:port格式 + parts := strings.Split(url, ":") + if len(parts) != 2 { + c.JSON(200, gin.H{ + "type": "ceSocket", + "url": url, + "error": "格式错误,需要 host:port", + }) + return + } + + host := parts[0] + portStr := parts[1] + port, err := strconv.Atoi(portStr) + if err != nil { + c.JSON(200, gin.H{ + "type": "ceSocket", + "url": url, + "error": "端口格式错误", + }) + return + } + + // 执行TCP连接测试 + conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, portStr), 5*time.Second) + if err != nil { + c.JSON(200, gin.H{ + "type": "ceSocket", + "url": url, + "host": host, + "port": port, + "result": "false", + "error": err.Error(), + }) + return + } + defer conn.Close() + + c.JSON(200, gin.H{ + "type": "ceSocket", + "url": url, + "host": host, + "port": port, + "result": "true", + }) +} + diff --git a/internal/handler/tcping.go b/internal/handler/tcping.go new file mode 100644 index 0000000..8b99366 --- /dev/null +++ b/internal/handler/tcping.go @@ -0,0 +1,63 @@ +package handler + +import ( + "net" + "strconv" + "strings" + "time" + + "github.com/gin-gonic/gin" +) + +func handleTCPing(c *gin.Context, url string, params map[string]interface{}) { + // 解析host:port格式 + parts := strings.Split(url, ":") + if len(parts) != 2 { + c.JSON(200, gin.H{ + "type": "ceTCPing", + "url": url, + "error": "格式错误,需要 host:port", + }) + return + } + + host := parts[0] + portStr := parts[1] + port, err := strconv.Atoi(portStr) + if err != nil { + c.JSON(200, gin.H{ + "type": "ceTCPing", + "url": url, + "error": "端口格式错误", + }) + return + } + + // 执行TCP连接测试 + start := time.Now() + conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, portStr), 5*time.Second) + latency := time.Since(start).Milliseconds() + + if err != nil { + c.JSON(200, gin.H{ + "type": "ceTCPing", + "url": url, + "host": host, + "port": port, + "latency": -1, + "error": err.Error(), + }) + return + } + defer conn.Close() + + c.JSON(200, gin.H{ + "type": "ceTCPing", + "url": url, + "host": host, + "port": port, + "latency": latency, + "success": true, + }) +} + diff --git a/internal/handler/test.go b/internal/handler/test.go new file mode 100644 index 0000000..027d0d3 --- /dev/null +++ b/internal/handler/test.go @@ -0,0 +1,49 @@ +package handler + +import ( + "net/http" + + "github.com/gin-gonic/gin" +) + +// HandleTest 统一测试接口 +func HandleTest(c *gin.Context) { + var req struct { + Type string `json:"type" binding:"required"` + URL string `json:"url" binding:"required"` + Params map[string]interface{} `json:"params"` + } + + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // 根据类型分发到不同的处理器 + switch req.Type { + case "ceGet": + handleGet(c, req.URL, req.Params) + case "cePost": + handlePost(c, req.URL, req.Params) + case "cePing": + handlePing(c, req.URL, req.Params) + case "ceDns": + handleDns(c, req.URL, req.Params) + case "ceTrace": + handleTrace(c, req.URL, req.Params) + case "ceSocket": + handleSocket(c, req.URL, req.Params) + case "ceTCPing": + handleTCPing(c, req.URL, req.Params) + case "ceFindPing": + handleFindPing(c, req.URL, req.Params) + default: + c.JSON(http.StatusBadRequest, gin.H{"error": "不支持的测试类型"}) + } +} + +// HandleHealth 健康检查 +func HandleHealth(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"status": "ok"}) +} + diff --git a/internal/handler/trace.go b/internal/handler/trace.go new file mode 100644 index 0000000..9285f89 --- /dev/null +++ b/internal/handler/trace.go @@ -0,0 +1,39 @@ +package handler + +import ( + "os/exec" + "strings" + + "github.com/gin-gonic/gin" +) + +func handleTrace(c *gin.Context, url string, params map[string]interface{}) { + // 执行traceroute命令 + cmd := exec.Command("traceroute", url) + output, err := cmd.CombinedOutput() + if err != nil { + c.JSON(200, gin.H{ + "type": "ceTrace", + "url": url, + "error": err.Error(), + }) + return + } + + // 解析输出 + lines := strings.Split(string(output), "\n") + traceResult := make([]string, 0) + for _, line := range lines { + line = strings.TrimSpace(line) + if line != "" { + traceResult = append(traceResult, line) + } + } + + c.JSON(200, gin.H{ + "type": "ceTrace", + "url": url, + "trace_result": traceResult, + }) +} + diff --git a/internal/heartbeat/reporter.go b/internal/heartbeat/reporter.go new file mode 100644 index 0000000..20540c2 --- /dev/null +++ b/internal/heartbeat/reporter.go @@ -0,0 +1,103 @@ +package heartbeat + +import ( + "bytes" + "context" + "fmt" + "net" + "net/http" + "time" + + "linkmaster-node/internal/config" + + "go.uber.org/zap" +) + +type Reporter struct { + cfg *config.Config + client *http.Client + logger *zap.Logger + stopCh chan struct{} +} + +func NewReporter(cfg *config.Config) *Reporter { + logger, _ := zap.NewProduction() + return &Reporter{ + cfg: cfg, + client: &http.Client{ + Timeout: 10 * time.Second, + }, + logger: logger, + stopCh: make(chan struct{}), + } +} + +func (r *Reporter) Start(ctx context.Context) { + ticker := time.NewTicker(time.Duration(r.cfg.Heartbeat.Interval) * time.Second) + defer ticker.Stop() + + // 立即发送一次心跳 + r.sendHeartbeat() + + for { + select { + case <-ctx.Done(): + return + case <-r.stopCh: + return + case <-ticker.C: + r.sendHeartbeat() + } + } +} + +func (r *Reporter) Stop() { + close(r.stopCh) +} + +func (r *Reporter) sendHeartbeat() { + // 获取本机IP + localIP := getLocalIP() + + // 发送心跳(使用Form格式,兼容旧接口) + url := fmt.Sprintf("%s/api/node/heartbeat", r.cfg.Backend.URL) + req, err := http.NewRequest("POST", url, bytes.NewBufferString(fmt.Sprintf("realNodeIP=%s&type=pingServer", localIP))) + if err != nil { + r.logger.Error("创建心跳请求失败", zap.Error(err)) + return + } + + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + resp, err := r.client.Do(req) + if err != nil { + r.logger.Warn("发送心跳失败", zap.Error(err)) + return + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + r.logger.Debug("心跳发送成功", zap.String("ip", localIP)) + } else { + r.logger.Warn("心跳发送失败", zap.Int("status", resp.StatusCode)) + } +} + +func getLocalIP() string { + // 获取第一个非回环IP + addrs, err := net.InterfaceAddrs() + if err != nil { + return "127.0.0.1" + } + + for _, addr := range addrs { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + if ipNet.IP.To4() != nil { + return ipNet.IP.String() + } + } + } + + return "127.0.0.1" +} + diff --git a/internal/recovery/recovery.go b/internal/recovery/recovery.go new file mode 100644 index 0000000..7d266fd --- /dev/null +++ b/internal/recovery/recovery.go @@ -0,0 +1,25 @@ +package recovery + +import ( + "runtime/debug" + + "go.uber.org/zap" +) + +var logger *zap.Logger + +func Init() { + // 初始化logger(这里简化处理,实际应该从外部传入) + logger, _ = zap.NewProduction() +} + +// Recover 恢复panic +func Recover() { + if r := recover(); r != nil { + logger.Error("发生panic", + zap.Any("panic", r), + zap.String("stack", string(debug.Stack())), + ) + } +} + diff --git a/internal/server/http.go b/internal/server/http.go new file mode 100644 index 0000000..40b120a --- /dev/null +++ b/internal/server/http.go @@ -0,0 +1,72 @@ +package server + +import ( + "context" + "fmt" + "net/http" + + "linkmaster-node/internal/config" + "linkmaster-node/internal/handler" + "linkmaster-node/internal/recovery" + + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +type HTTPServer struct { + server *http.Server + logger *zap.Logger +} + +func NewHTTPServer(cfg *config.Config) *HTTPServer { + if !cfg.Debug { + gin.SetMode(gin.ReleaseMode) + } + + router := gin.New() + router.Use(gin.Recovery()) + router.Use(recoveryMiddleware) + + // 初始化持续测试处理器 + handler.InitContinuousHandler(cfg) + + // 启动任务清理goroutine + handler.StartTaskCleanup() + + // 注册路由 + api := router.Group("/api") + { + api.POST("/test", handler.HandleTest) + api.POST("/continuous/start", handler.HandleContinuousStart) + api.POST("/continuous/stop", handler.HandleContinuousStop) + api.GET("/continuous/status", handler.HandleContinuousStatus) + api.GET("/health", handler.HandleHealth) + } + + server := &http.Server{ + Addr: fmt.Sprintf(":%d", cfg.Server.Port), + Handler: router, + } + + logger, _ := zap.NewProduction() + + return &HTTPServer{ + server: server, + logger: logger, + } +} + +func (s *HTTPServer) Start() error { + s.logger.Info("HTTP服务器启动", zap.String("addr", s.server.Addr)) + return s.server.ListenAndServe() +} + +func (s *HTTPServer) Shutdown(ctx context.Context) error { + return s.server.Shutdown(ctx) +} + +func recoveryMiddleware(c *gin.Context) { + defer recovery.Recover() + c.Next() +} + diff --git a/node.log b/node.log new file mode 100644 index 0000000..ba33d76 --- /dev/null +++ b/node.log @@ -0,0 +1,5 @@ +{"level":"info","ts":1763025207.120181,"caller":"agent/main.go:35","msg":"节点服务启动","version":"1.0.0"} +{"level":"info","ts":1763025207.1208231,"caller":"server/http.go:60","msg":"HTTP服务器启动","addr":":2200"} +{"level":"info","ts":1763653448.720011,"caller":"agent/main.go:57","msg":"收到停止信号,正在关闭服务..."} +{"level":"fatal","ts":1763653448.720453,"caller":"agent/main.go:48","msg":"HTTP服务器启动失败","error":"http: Server closed","stacktrace":"main.main.func1\n\t/Users/yoyo/Desktop/newLinkMaster/node/cmd/agent/main.go:48"} +{"level":"info","ts":1763653448.720591,"caller":"agent/main.go:66","msg":"服务已关闭"} diff --git a/node.pid b/node.pid new file mode 100644 index 0000000..163a49f --- /dev/null +++ b/node.pid @@ -0,0 +1 @@ +48748 diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..950faf1 --- /dev/null +++ b/run.sh @@ -0,0 +1,339 @@ +#!/bin/bash + +# ============================================ +# LinkMaster 节点端运行脚本 +# 用途:启动、停止、重启节点端服务 +# ============================================ + +set -e + +# 颜色输出 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# 脚本目录 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +# 配置 +BINARY_NAME="agent" +LOG_FILE="node.log" +PID_FILE="node.pid" +BACKEND_URL="${BACKEND_URL:-http://localhost:8080}" + +# 获取PID +get_pid() { + if [ -f "$PID_FILE" ]; then + PID=$(cat "$PID_FILE") + if ps -p "$PID" > /dev/null 2>&1; then + echo "$PID" + else + rm -f "$PID_FILE" + echo "" + fi + else + echo "" + fi +} + +# 拉取最新源码并编译 +update_and_build() { + echo -e "${BLUE}拉取最新源码...${NC}" + + # 检查是否在 Git 仓库中 + if [ ! -d ".git" ]; then + echo -e "${YELLOW}警告: 当前目录不是 Git 仓库,跳过代码更新${NC}" + return 0 + fi + + # 拉取最新代码 + if git pull 2>&1; then + echo -e "${GREEN}✓ 代码更新完成${NC}" + else + PULL_EXIT_CODE=$? + echo -e "${YELLOW}警告: Git 拉取失败(退出码: $PULL_EXIT_CODE),将使用当前代码继续${NC}" + echo -e "${YELLOW}可能原因: 网络问题、权限问题或本地有未提交的更改${NC}" + fi + + # 检查 Go 环境 + if ! command -v go > /dev/null 2>&1; then + echo -e "${RED}错误: 未找到 Go 环境,无法编译${NC}" + exit 1 + fi + + # 更新依赖 + echo -e "${BLUE}更新 Go 依赖...${NC}" + if ! go mod download 2>&1; then + echo -e "${YELLOW}警告: 依赖更新失败,尝试继续编译${NC}" + else + echo -e "${GREEN}✓ 依赖更新完成${NC}" + fi + + # 编译 + echo -e "${BLUE}编译二进制文件...${NC}" + ARCH=$(uname -m) + case $ARCH in + x86_64) + ARCH="amd64" + ;; + aarch64|arm64) + ARCH="arm64" + ;; + *) + ARCH="amd64" + ;; + esac + + if GOOS=linux GOARCH=${ARCH} CGO_ENABLED=0 go build -ldflags="-w -s" -o "$BINARY_NAME" ./cmd/agent 2>&1; then + if [ -f "$BINARY_NAME" ] && [ -s "$BINARY_NAME" ]; then + chmod +x "$BINARY_NAME" + echo -e "${GREEN}✓ 编译成功${NC}" + else + echo -e "${RED}错误: 编译失败,未生成二进制文件${NC}" + exit 1 + fi + else + echo -e "${RED}错误: 编译失败${NC}" + exit 1 + fi +} + +# 检查二进制文件 +check_binary() { + if [ ! -f "$BINARY_NAME" ]; then + echo -e "${RED}错误: 找不到二进制文件 $BINARY_NAME${NC}" + echo -e "${YELLOW}尝试编译...${NC}" + update_and_build + return + fi + + if [ ! -x "$BINARY_NAME" ]; then + chmod +x "$BINARY_NAME" + fi +} + +# 检查端口占用 +check_port() { + if command -v lsof > /dev/null 2>&1; then + PORT_PID=$(lsof -ti :2200 2>/dev/null || echo "") + if [ -n "$PORT_PID" ]; then + # 检查是否是我们的进程 + if [ -f "$PID_FILE" ] && [ "$PORT_PID" = "$(cat "$PID_FILE")" ]; then + return 0 + fi + echo -e "${YELLOW}警告: 端口2200已被占用 (PID: $PORT_PID)${NC}" + echo -e "${YELLOW}是否要停止该进程? (y/n)${NC}" + read -r answer + if [ "$answer" = "y" ] || [ "$answer" = "Y" ]; then + kill "$PORT_PID" 2>/dev/null || true + sleep 1 + else + echo -e "${RED}取消启动${NC}" + exit 1 + fi + fi + fi +} + +# 启动服务 +start() { + PID=$(get_pid) + if [ -n "$PID" ]; then + echo -e "${YELLOW}节点端已在运行 (PID: $PID)${NC}" + return 0 + fi + + # 拉取最新源码并编译 + update_and_build + + check_port + + echo -e "${BLUE}启动节点端服务...${NC}" + echo -e "${BLUE}后端地址: $BACKEND_URL${NC}" + + # 设置环境变量 + export BACKEND_URL="$BACKEND_URL" + + # 后台运行 + nohup ./"$BINARY_NAME" > "$LOG_FILE" 2>&1 & + NEW_PID=$! + + # 保存PID + echo "$NEW_PID" > "$PID_FILE" + + # 等待启动 + sleep 2 + + # 检查是否启动成功 + if ps -p "$NEW_PID" > /dev/null 2>&1; then + # 再次检查健康状态 + sleep 1 + if curl -s http://localhost:2200/api/health > /dev/null 2>&1; then + echo -e "${GREEN}✓ 节点端已启动 (PID: $NEW_PID)${NC}" + echo -e "${BLUE}日志文件: $LOG_FILE${NC}" + echo -e "${BLUE}查看日志: tail -f $LOG_FILE${NC}" + else + echo -e "${GREEN}✓ 节点端进程已启动 (PID: $NEW_PID)${NC}" + echo -e "${YELLOW}⚠ 健康检查未通过,请稍后查看日志${NC}" + fi + else + echo -e "${RED}✗ 节点端启动失败${NC}" + echo -e "${YELLOW}请查看日志: cat $LOG_FILE${NC}" + rm -f "$PID_FILE" + exit 1 + fi +} + +# 停止服务 +stop() { + PID=$(get_pid) + + # 如果没有PID文件,尝试通过端口查找 + if [ -z "$PID" ]; then + if command -v lsof > /dev/null 2>&1; then + PORT_PID=$(lsof -ti :2200 2>/dev/null || echo "") + if [ -n "$PORT_PID" ]; then + PID="$PORT_PID" + echo -e "${YELLOW}通过端口找到进程 (PID: $PID)${NC}" + fi + fi + fi + + if [ -z "$PID" ]; then + echo -e "${YELLOW}节点端未运行${NC}" + rm -f "$PID_FILE" + return 0 + fi + + echo -e "${BLUE}停止节点端服务 (PID: $PID)...${NC}" + + # 发送TERM信号 + kill "$PID" 2>/dev/null || true + + # 等待进程结束 + for i in {1..10}; do + if ! ps -p "$PID" > /dev/null 2>&1; then + break + fi + sleep 1 + done + + # 如果还在运行,强制杀死 + if ps -p "$PID" > /dev/null 2>&1; then + echo -e "${YELLOW}强制停止节点端...${NC}" + kill -9 "$PID" 2>/dev/null || true + sleep 1 + fi + + rm -f "$PID_FILE" + echo -e "${GREEN}✓ 节点端已停止${NC}" +} + +# 重启服务 +restart() { + echo -e "${BLUE}重启节点端服务...${NC}" + stop + sleep 1 + start +} + +# 查看状态 +status() { + PID=$(get_pid) + if [ -n "$PID" ]; then + echo -e "${GREEN}节点端运行中 (PID: $PID)${NC}" + + # 检查健康状态 + if command -v curl > /dev/null 2>&1; then + HEALTH=$(curl -s http://localhost:2200/api/health 2>/dev/null || echo "failed") + if [ "$HEALTH" = '{"status":"ok"}' ]; then + echo -e "${GREEN}✓ 健康检查: 正常${NC}" + else + echo -e "${YELLOW}⚠ 健康检查: 异常${NC}" + fi + fi + + # 显示进程信息 + ps -p "$PID" -o pid,ppid,cmd,%mem,%cpu,etime 2>/dev/null || true + else + echo -e "${RED}节点端未运行${NC}" + fi +} + +# 查看日志 +logs() { + if [ -f "$LOG_FILE" ]; then + tail -f "$LOG_FILE" + else + echo -e "${YELLOW}日志文件不存在: $LOG_FILE${NC}" + fi +} + +# 查看完整日志 +logs_all() { + if [ -f "$LOG_FILE" ]; then + cat "$LOG_FILE" + else + echo -e "${YELLOW}日志文件不存在: $LOG_FILE${NC}" + fi +} + +# 显示帮助 +help() { + echo "LinkMaster 节点端运行脚本" + echo "" + echo "使用方法:" + echo " $0 {start|stop|restart|status|logs|logs-all|help}" + echo "" + echo "命令说明:" + echo " start - 启动节点端服务(会自动拉取最新代码并编译)" + echo " stop - 停止节点端服务" + echo " restart - 重启节点端服务" + echo " status - 查看运行状态" + echo " logs - 实时查看日志" + echo " logs-all - 查看完整日志" + echo " help - 显示帮助信息" + echo "" + echo "环境变量:" + echo " BACKEND_URL - 后端服务地址 (默认: http://localhost:8080)" + echo "" + echo "示例:" + echo " BACKEND_URL=http://192.168.1.100:8080 $0 start" + echo " $0 status" + echo " $0 logs" +} + +# 主逻辑 +case "${1:-help}" in + start) + start + ;; + stop) + stop + ;; + restart) + restart + ;; + status) + status + ;; + logs) + logs + ;; + logs-all) + logs_all + ;; + help|--help|-h) + help + ;; + *) + echo -e "${RED}未知命令: $1${NC}" + echo "" + help + exit 1 + ;; +esac +