Compare commits

..

9 Commits

Author SHA1 Message Date
b5fc83065c feat: 更新文档和配置逻辑,增强心跳机制和持续测试功能
- 在 INSTALL.md 和 README.md 中添加配置优先级说明,确保环境变量优先级最高。
- 增强心跳机制,新增字段以传递节点信息。
- 持续测试功能优化,支持批量推送和自动清理。
- 更新版本号至 v1.1.4,完善文档以反映新功能和改进。
2025-12-24 01:21:45 +08:00
ef31a054c0 chore: 更新版本号至 v1.1.3
增加 version host_name 2个新字段传递
2025-12-23 23:09:55 +08:00
ff35510ef0 修复 2025-12-17 21:12:49 +08:00
21592ae8a0 fix: 修复 IPv6 地址解析中的端口处理逻辑
- 将 LastIndex 替换为 Index,以正确找到第一个闭合括号。
- 添加逻辑以在端口部分为空时使用默认端口 80,解决了潜在的连接问题。
2025-12-17 20:09:51 +08:00
f01547df35 refactor: 优化 HTTP 请求处理逻辑
- 改进了对重定向的处理,确保在 CheckRedirect 返回 ErrUseLastResponse 时能够正确处理响应。
- 移除了不必要的空行以提升代码可读性。
- 增强了错误处理逻辑,确保在没有响应的情况下返回适当的错误信息。
2025-12-17 20:08:31 +08:00
4a2532a83b 强壮安装脚本 2025-12-17 19:56:16 +08:00
b962265168 refactor: 优化 TCPing 任务的目标解析逻辑
- 改进了 TCPing 任务中对 host:port 格式的解析,支持 IPv6 地址格式并默认使用端口 80。
- 移除了不必要的空行以提升代码可读性。
- 更新了安装脚本,移除了不再使用的镜像源。
2025-12-17 19:16:39 +08:00
38acca6484 fix: 改进心跳报告中的错误处理和日志记录
- 增强了 RegisterNode 和 sendHeartbeat 函数中的错误消息,包含 URL 和响应体详情以便更好地调试。
- 移除了不必要的空行以使代码结构更清晰。
2025-12-07 18:37:17 +08:00
8d36ef495d 修复 2025-12-07 18:09:49 +08:00
15 changed files with 635 additions and 138 deletions

1
.gitignore vendored
View File

@@ -4,3 +4,4 @@ agent
node.log
node.pid
config.yaml
.DS_Store

View File

@@ -169,6 +169,43 @@ EOF
**注意:** 使用 `run.sh` 启动的好处是每次启动会自动拉取最新代码并重新编译。
### 3.1. 配置说明
**配置优先级(从高到低):**
1. 环境变量 `BACKEND_URL`(最高优先级)
2. 配置文件 `config.yaml` 中的 `backend.url`
3. 默认值
**重要说明:**
- 环境变量 `BACKEND_URL` 会**覆盖**配置文件中的设置
- 即使配置文件存在,设置环境变量后也会优先使用环境变量的值
- 这确保了编译后的二进制文件不会硬编码后端地址
- 配置文件不会被编译进二进制文件,是运行时读取的
**使用环境变量(推荐):**
```bash
# 在 systemd 服务文件中设置
Environment="BACKEND_URL=http://your-backend-server:8080"
# 或在命令行中设置
BACKEND_URL=http://your-backend-server:8080 ./run.sh start
```
**使用配置文件:**
创建 `/opt/linkmaster-node/config.yaml`
```yaml
server:
port: 2200
backend:
url: http://your-backend-server:8080
heartbeat:
interval: 60
log:
file: node.log
level: info
debug: false
```
### 4. 启动服务
```bash
@@ -177,7 +214,11 @@ sudo systemctl enable linkmaster-node
sudo systemctl start linkmaster-node
```
**注意** 确保 `BACKEND_URL` 环境变量指向后端服务器的实际地址和端口(默认 8080不是前端地址。
**重要说明**
- 确保 `BACKEND_URL` 环境变量指向后端服务器的实际地址和端口(默认 8080不是前端地址
- `BACKEND_URL` 环境变量会**覆盖**配置文件中的 `backend.url` 设置(优先级最高)
- 即使配置文件存在,设置环境变量后也会优先使用环境变量的值
- 这确保了编译后的二进制文件不会硬编码后端地址
## 防火墙配置
@@ -238,12 +279,19 @@ sudo lsof -i :2200
**解决:**
- 检查后端地址是否正确(应该是 `http://backend-server:8080`,不是前端地址)
- 检查环境变量 `BACKEND_URL` 是否设置正确(优先级最高)
- 检查配置文件 `config.yaml` 中的 `backend.url` 是否正确
- 检查网络连通性:`ping your-backend-server`
- 检查端口是否开放:`telnet your-backend-server 8080``nc -zv your-backend-server 8080`
- 检查防火墙规则(确保后端服务器的 8080 端口开放)
- 检查后端服务是否运行:`curl http://your-backend-server:8080/api/public/nodes/online`
- 如果使用前端代理,节点端仍需要直接连接后端,不能使用前端地址
**配置优先级说明:**
- 环境变量 `BACKEND_URL` 优先级最高,会覆盖配置文件中的设置
- 如果同时设置了环境变量和配置文件,优先使用环境变量的值
- 这确保了编译后的二进制文件不会硬编码后端地址
## 卸载
```bash

View File

@@ -85,12 +85,25 @@ BACKEND_URL=http://your-backend-server:8080 ./run.sh start
## 配置
### 配置优先级
配置按以下优先级加载(高优先级会覆盖低优先级):
1. **环境变量**(最高优先级)
2. **配置文件** `config.yaml`
3. **默认值**
### 环境变量
- `BACKEND_URL`: 后端服务地址(必需,默认: http://localhost:8080
- `BACKEND_URL`: 后端服务地址(**优先级最高**,会覆盖配置文件中的设置
- `CONFIG_PATH`: 配置文件路径(可选,默认: config.yaml
- `LOG_FILE`: 日志文件路径(可选,默认: node.log
**重要说明:**
- `BACKEND_URL` 环境变量会**覆盖**配置文件中的 `backend.url` 设置
- 即使配置文件存在,设置环境变量后也会优先使用环境变量的值
- 这确保了编译后的二进制文件不会硬编码后端地址
### 配置文件(可选)
创建 `config.yaml` 文件:
@@ -99,20 +112,28 @@ BACKEND_URL=http://your-backend-server:8080 ./run.sh start
server:
port: 2200
backend:
url: http://your-backend-server:8080
url: http://your-backend-server:8080 # 会被 BACKEND_URL 环境变量覆盖
heartbeat:
interval: 60
log:
file: node.log # 日志文件路径(默认: node.log空则输出到标准错误
level: info # 日志级别: debug, info, warn, error默认: info
debug: false
node:
id: 0 # 节点ID通过心跳自动获取
ip: "" # 节点IP通过心跳自动获取
country: "" # 国家(通过心跳自动获取)
province: "" # 省份(通过心跳自动获取)
city: "" # 城市(通过心跳自动获取)
isp: "" # ISP通过心跳自动获取
```
**日志配置说明:**
**配置说明:**
- `backend.url`: 后端服务地址,会被 `BACKEND_URL` 环境变量覆盖
- `log.file`: 日志文件路径。如果为空日志将输出到标准错误stderr
- `log.level`: 日志级别,支持 `debug``info``warn``error`
- 也可以通过环境变量 `LOG_FILE` 设置日志文件路径
- 日志文件会自动创建,如果目录不存在会自动创建
- `node.*`: 节点信息通过心跳自动获取并保存,无需手动配置
- 配置文件不会被编译进二进制文件,是运行时读取的
## 运行脚本
@@ -337,8 +358,8 @@ BACKEND_URL=http://192.168.1.100:8080 ./run.sh start
版本号统一从 `version.json` 文件读取:
```json
{
"version": "1.1.0",
"tag": "v1.1.0"
"version": "1.1.3",
"tag": "v1.1.3"
}
```
@@ -414,8 +435,8 @@ BACKEND_URL=http://192.168.1.100:8080 ./run.sh start
版本号和标签统一从 `version.json` 文件读取:
```json
{
"version": "1.1.0",
"tag": "v1.1.0"
"version": "1.1.3",
"tag": "v1.1.3"
}
```
@@ -603,9 +624,67 @@ grep -i "error" node.log
tail -n 100 node.log
```
## 心跳机制
节点会定期向后端发送心跳,上报节点状态和获取节点信息。
### 心跳请求字段
心跳请求包含以下字段:
- `type`: 固定值 `pingServer`
- `version`: 协议版本号,固定值 `2`
- `host_name`: 节点主机名(自动读取系统主机名)
### 心跳响应
心跳响应包含以下节点信息:
- `node_id`: 节点ID
- `node_ip`: 节点外网IP
- `country`: 国家
- `province`: 省份
- `city`: 城市
- `isp`: ISP
这些信息会自动保存到配置文件中,用于后续的数据推送。
## 持续测试功能
节点支持持续 Ping 和 TCPing 测试,测试结果会自动推送到后端服务器。
### 功能特性
- ✅ 实时推送测试结果到后端
- ✅ 批量推送优化减少HTTP请求频率
- ✅ 自动清理超时任务
- ✅ 资源自动清理(防止内存泄漏)
- ✅ 详细的调试日志debug模式
### 数据推送
- 测试结果会自动推送到后端 `/api/public/node/continuous/result` 接口
- 推送包含节点ID、IP、位置信息和测试结果
- 如果后端任务不存在,节点端会自动停止对应任务
## 更新日志
### v1.1.0 (最新)
### v1.1.4 (最新)
**新增功能:**
- ✨ 心跳请求新增 `version` 字段协议版本号默认值2
- ✨ 心跳请求新增 `host_name` 字段(自动读取系统主机名)
- ✨ 支持环境变量 `BACKEND_URL` 覆盖配置文件中的后端地址
- ✨ 持续测试功能增强,支持批量推送和自动清理
**改进:**
- 🔧 修复持续测试数据推送的锁管理问题
- 🔧 修复任务停止时未清理推送缓冲的内存泄漏问题
- 🔧 优化配置加载逻辑,环境变量优先级最高
- 🔧 增强日志记录,添加详细的调试信息
- 📝 完善文档,添加配置优先级和心跳机制说明
### v1.1.3
**新增功能:**
- ✨ 添加日志文件输出功能,支持配置日志文件路径和级别

View File

@@ -19,6 +19,8 @@ BUILD_DIR="bin"
RELEASE_DIR="release"
TEMP_DIR=$(mktemp -d)
# Gitea Token (硬编码)
GITEA_TOKEN="3becb08eee31b422481ce1b8986de1cd645b468e"

View File

@@ -8,6 +8,30 @@
set -e
# 错误处理函数
error_handler() {
local line_number=$1
local command=$2
echo ""
echo -e "${RED}========================================${NC}"
echo -e "${RED} 脚本执行出错!${NC}"
echo -e "${RED}========================================${NC}"
echo -e "${YELLOW}错误位置: 第 ${line_number}${NC}"
echo -e "${YELLOW}失败命令: ${command}${NC}"
echo ""
echo -e "${YELLOW}故障排查建议:${NC}"
echo " 1. 检查网络连接是否正常"
echo " 2. 检查后端地址是否正确: ${BACKEND_URL:-未设置}"
echo " 3. 检查是否有足够的磁盘空间和权限"
echo " 4. 查看上面的详细错误信息"
echo ""
echo -e "${YELLOW}查看服务日志: sudo journalctl -u ${SERVICE_NAME:-linkmaster-node} -n 50${NC}"
exit 1
}
# 设置错误陷阱
trap 'error_handler ${LINENO} "${BASH_COMMAND}"' ERR
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
@@ -96,7 +120,6 @@ detect_fastest_mirror() {
# Ubuntu/Debian 镜像源列表
UBUNTU_MIRRORS=(
"mirrors.tuna.tsinghua.edu.cn"
"mirrors.huaweicloud.com"
"mirrors.163.com"
"archive.ubuntu.com"
@@ -104,7 +127,6 @@ detect_fastest_mirror() {
# CentOS/RHEL 镜像源列表
CENTOS_MIRRORS=(
"mirrors.tuna.tsinghua.edu.cn"
"mirrors.huaweicloud.com"
)
@@ -1264,11 +1286,18 @@ EOF
# 调用心跳API获取节点信息
echo -e "${BLUE}发送心跳请求获取节点信息...${NC}"
RESPONSE=$(curl -s -X POST "${BACKEND_URL}/api/node/heartbeat" \
echo -e "${BLUE}后端地址: ${BACKEND_URL}${NC}"
# 添加超时设置,避免长时间卡住
# 使用 set +e 临时禁用错误退出,因为心跳失败不应该阻止安装
set +e
RESPONSE=$(curl -s --connect-timeout 10 --max-time 30 -X POST "${BACKEND_URL}/api/node/heartbeat" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "type=pingServer" 2>&1)
CURL_EXIT_CODE=$?
set -e # 重新启用错误退出
if [ $? -eq 0 ]; then
if [ $CURL_EXIT_CODE -eq 0 ]; then
# 尝试解析JSON响应
NODE_ID=$(echo "$RESPONSE" | grep -o '"node_id":[0-9]*' | grep -o '[0-9]*' | head -1)
NODE_IP=$(echo "$RESPONSE" | grep -o '"node_ip":"[^"]*"' | cut -d'"' -f4 | head -1)
@@ -1306,8 +1335,10 @@ EOF
echo -e "${YELLOW} 响应: ${RESPONSE}${NC}"
fi
else
echo -e "${YELLOW}⚠ 心跳请求失败,将在服务启动时重试${NC}"
echo -e "${YELLOW} 错误: ${RESPONSE}${NC}"
echo -e "${YELLOW}⚠ 心跳请求失败 (退出码: ${CURL_EXIT_CODE}),将在服务启动时重试${NC}"
echo -e "${YELLOW} 错误信息: ${RESPONSE}${NC}"
echo -e "${YELLOW} 提示: 请检查后端地址是否正确: ${BACKEND_URL}${NC}"
echo -e "${YELLOW} 测试连接: curl -v ${BACKEND_URL}/api/public/nodes/online${NC}"
fi
# 设置配置文件权限
@@ -1318,18 +1349,52 @@ EOF
start_service() {
echo -e "${BLUE}启动服务...${NC}"
sudo systemctl enable ${SERVICE_NAME} > /dev/null 2>&1
sudo systemctl restart ${SERVICE_NAME}
# 先检查服务文件是否存在
if [ ! -f "/etc/systemd/system/${SERVICE_NAME}.service" ]; then
echo -e "${RED}✗ 错误: 服务文件不存在${NC}"
echo -e "${YELLOW} 路径: /etc/systemd/system/${SERVICE_NAME}.service${NC}"
exit 1
fi
# 检查二进制文件是否存在
if [ ! -f "$SOURCE_DIR/agent" ] && [ ! -f "$INSTALL_DIR/$BINARY_NAME" ]; then
echo -e "${RED}✗ 错误: 二进制文件不存在${NC}"
echo -e "${YELLOW} 检查路径: $SOURCE_DIR/agent 或 $INSTALL_DIR/$BINARY_NAME${NC}"
exit 1
fi
# 启用服务(显示输出以便调试)
echo -e "${BLUE}启用服务...${NC}"
if ! sudo systemctl enable ${SERVICE_NAME} 2>&1; then
echo -e "${RED}✗ 启用服务失败${NC}"
exit 1
fi
# 重新加载 systemd
echo -e "${BLUE}重新加载 systemd...${NC}"
sudo systemctl daemon-reload
# 启动服务(显示输出以便调试)
echo -e "${BLUE}启动服务...${NC}"
if ! sudo systemctl restart ${SERVICE_NAME} 2>&1; then
echo -e "${RED}✗ 启动服务失败${NC}"
echo -e "${YELLOW}查看详细日志: sudo journalctl -u ${SERVICE_NAME} -n 100 --no-pager${NC}"
echo -e "${YELLOW}查看服务状态: sudo systemctl status ${SERVICE_NAME}${NC}"
exit 1
fi
# 等待服务启动
echo -e "${BLUE}等待服务启动...${NC}"
sleep 3
# 检查服务状态
if sudo systemctl is-active --quiet ${SERVICE_NAME}; then
if sudo systemctl is-active --quiet ${SERVICE_NAME} 2>/dev/null; then
echo -e "${GREEN}✓ 服务启动成功${NC}"
else
echo -e "${RED}✗ 服务启动失败${NC}"
echo -e "${YELLOW}查看日志: sudo journalctl -u ${SERVICE_NAME} -n 50${NC}"
echo -e "${YELLOW}服务状态:${NC}"
sudo systemctl status ${SERVICE_NAME} --no-pager -l || true
echo -e "${YELLOW}查看详细日志: sudo journalctl -u ${SERVICE_NAME} -n 100 --no-pager${NC}"
exit 1
fi
}
@@ -1387,20 +1452,29 @@ main() {
echo -e "${GREEN} LinkMaster 节点端安装程序${NC}"
echo -e "${GREEN}========================================${NC}"
echo ""
echo -e "${BLUE}后端地址: ${BACKEND_URL}${NC}"
echo ""
echo -e "${BLUE}[1/8] 检测系统类型...${NC}"
detect_system
# 检查是否已安装,如果已安装则先卸载
if check_installed; then
echo -e "${BLUE}[2/8] 卸载已存在的服务...${NC}"
uninstall_service
else
echo -e "${BLUE}[2/8] 检查已安装服务...${NC}"
echo -e "${GREEN}✓ 未检测到已安装的服务${NC}"
fi
# 检测并配置最快的镜像源(在安装依赖之前)
echo -e "${BLUE}[3/8] 检测并配置镜像源...${NC}"
detect_fastest_mirror
echo -e "${BLUE}[4/8] 安装系统依赖...${NC}"
install_dependencies
# 优先尝试从 Releases 下载二进制文件
echo -e "${BLUE}[5/8] 下载或编译二进制文件...${NC}"
if ! download_binary_from_releases; then
echo -e "${BLUE}从 Releases 下载失败,开始从源码编译...${NC}"
build_from_source
@@ -1408,10 +1482,19 @@ main() {
echo -e "${GREEN}✓ 使用预编译二进制文件,跳过编译步骤${NC}"
fi
echo -e "${BLUE}[6/8] 创建 systemd 服务...${NC}"
create_service
echo -e "${BLUE}[7/8] 配置防火墙规则...${NC}"
configure_firewall
echo -e "${BLUE}[8/8] 登记节点到后端服务器...${NC}"
register_node
echo -e "${BLUE}[9/9] 启动服务...${NC}"
start_service
echo -e "${BLUE}[10/10] 验证安装...${NC}"
verify_installation
echo ""

View File

@@ -72,6 +72,12 @@ func Load() (*Config, error) {
}
}
// 环境变量优先级最高,覆盖配置文件中的设置
// 支持 BACKEND_URL 环境变量覆盖后端地址
if backendURL := os.Getenv("BACKEND_URL"); backendURL != "" {
cfg.Backend.URL = backendURL
}
// 如果配置文件中没有设置日志文件,使用环境变量或默认值
if cfg.Log.File == "" {
logFile := os.Getenv("LOG_FILE")

View File

@@ -28,14 +28,47 @@ type TCPingTask struct {
}
func NewTCPingTask(taskID, target string, interval, maxDuration time.Duration) (*TCPingTask, error) {
// 解析host:port
parts := strings.Split(target, ":")
if len(parts) != 2 {
return nil, fmt.Errorf("无效的target格式需要 host:port")
// 解析host:port如果没有端口则默认80
var host string
var portStr string
var port int
// 检查是否是IPv6格式如 [::1]:8080
if strings.HasPrefix(target, "[") {
// IPv6格式 - 使用 Index 而不是 LastIndex 来找到第一个闭合括号
closeBracket := strings.Index(target, "]")
if closeBracket == -1 {
return nil, fmt.Errorf("无效的target格式IPv6地址格式应为 [host]:port")
}
host = target[1:closeBracket]
if closeBracket+1 < len(target) && target[closeBracket+1] == ':' {
portStr = target[closeBracket+2:]
// 如果端口部分为空使用默认端口80修复 Bug 1
if portStr == "" {
portStr = "80"
}
} else {
portStr = "80" // 默认端口
}
} else {
// 普通格式 host:port 或 host
lastColonIndex := strings.LastIndex(target, ":")
if lastColonIndex == -1 {
// 没有冒号使用默认端口80
host = target
portStr = "80"
} else {
host = target[:lastColonIndex]
portStr = target[lastColonIndex+1:]
// 如果端口部分为空使用默认端口80
if portStr == "" {
portStr = "80"
}
}
}
host := parts[0]
port, err := strconv.Atoi(parts[1])
var err error
port, err = strconv.Atoi(portStr)
if err != nil {
return nil, fmt.Errorf("无效的端口: %v", err)
}
@@ -80,10 +113,10 @@ func (t *TCPingTask) Start(ctx context.Context, resultCallback func(result map[s
if !isRunning {
return
}
// 执行tcping测试每次测试完成后立即返回结果
result := t.executeTCPing()
// 再次检查任务是否已停止(执行完成后)
t.mu.RLock()
isRunning = t.IsRunning
@@ -91,7 +124,7 @@ func (t *TCPingTask) Start(ctx context.Context, resultCallback func(result map[s
if !isRunning {
return
}
if resultCallback != nil {
resultCallback(result)
}
@@ -117,7 +150,7 @@ func (t *TCPingTask) Stop() {
}
t.IsRunning = false
t.mu.Unlock()
// 关闭停止通道
select {
case <-t.StopCh:
@@ -125,7 +158,7 @@ func (t *TCPingTask) Stop() {
default:
close(t.StopCh)
}
t.logger.Info("TCPing任务已停止", zap.String("task_id", t.TaskID))
}
@@ -185,4 +218,3 @@ func (t *TCPingTask) executeTCPing() map[string]interface{} {
"ip": targetIP,
}
}

View File

@@ -8,6 +8,7 @@ import (
"io"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
@@ -18,6 +19,7 @@ import (
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var continuousTasks = make(map[string]*ContinuousTask)
@@ -46,7 +48,52 @@ const (
func InitContinuousHandler(cfg *config.Config) {
backendURL = cfg.Backend.URL
logger, _ = zap.NewProduction()
// 根据配置创建logger
var level zapcore.Level
logLevel := cfg.Log.Level
if logLevel == "" {
if cfg.Debug {
logLevel = "debug"
} else {
logLevel = "info"
}
}
switch logLevel {
case "debug":
level = zapcore.DebugLevel
case "info":
level = zapcore.InfoLevel
case "warn":
level = zapcore.WarnLevel
case "error":
level = zapcore.ErrorLevel
default:
level = zapcore.InfoLevel
}
// 创建编码器配置
encoderConfig := zap.NewProductionEncoderConfig()
if cfg.Debug {
encoderConfig = zap.NewDevelopmentEncoderConfig()
}
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
// 创建核心 - 输出到标准错误日志文件由main.go统一管理这里输出到stderr便于调试
core := zapcore.NewCore(
zapcore.NewJSONEncoder(encoderConfig),
zapcore.AddSync(os.Stderr),
level,
)
// 创建logger
logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))
logger.Info("持续测试处理器已初始化",
zap.String("backend_url", backendURL),
zap.String("log_level", logLevel))
}
type ContinuousTask struct {
@@ -160,7 +207,15 @@ func HandleContinuousStop(c *gin.Context) {
if task.tcpingTask != nil {
task.tcpingTask.Stop()
}
close(task.StopCh)
// 关闭停止通道
select {
case <-task.StopCh:
// 已经关闭
default:
close(task.StopCh)
}
delete(continuousTasks, req.TaskID)
}
taskMutex.Unlock()
@@ -170,6 +225,17 @@ func HandleContinuousStop(c *gin.Context) {
return
}
// 清理推送缓冲
bufferMutex.Lock()
if buffer, exists := pushBuffers[req.TaskID]; exists {
if buffer.pushTimer != nil {
buffer.pushTimer.Stop()
}
delete(pushBuffers, req.TaskID)
logger.Debug("已清理任务推送缓冲", zap.String("task_id", req.TaskID))
}
bufferMutex.Unlock()
c.JSON(http.StatusOK, gin.H{"message": "任务已停止"})
}
@@ -237,7 +303,8 @@ func pushResultToBackend(taskID string, result map[string]interface{}) {
logger.Warn("节点ID未获取跳过推送结果",
zap.String("task_id", taskID),
zap.String("node_ip", nodeIP),
zap.String("hint", "等待心跳返回node_id后再推送"))
zap.String("hint", "等待心跳返回node_id后再推送"),
zap.Any("result", result))
return
}
@@ -246,10 +313,18 @@ func pushResultToBackend(taskID string, result map[string]interface{}) {
logger.Warn("节点IP未获取跳过推送结果",
zap.String("task_id", taskID),
zap.Uint("node_id", nodeID),
zap.String("hint", "等待心跳返回node_ip后再推送"))
zap.String("hint", "等待心跳返回node_ip后再推送"),
zap.Any("result", result))
return
}
// 记录调试信息
logger.Debug("准备推送结果到后端",
zap.String("task_id", taskID),
zap.Uint("node_id", nodeID),
zap.String("node_ip", nodeIP),
zap.Any("result", result))
// 添加到批量推送缓冲
addToPushBuffer(taskID, nodeID, nodeIP, result)
}
@@ -269,28 +344,43 @@ func addToPushBuffer(taskID string, nodeID uint, nodeIP string, result map[strin
bufferMutex.Unlock()
buffer.mu.Lock()
defer buffer.mu.Unlock()
// 添加结果到缓冲
buffer.results = append(buffer.results, result)
// 如果缓冲已满,立即推送
shouldFlush := len(buffer.results) >= batchPushMaxSize
buffer.mu.Unlock()
if shouldFlush {
flushPushBuffer(taskID, nodeID, nodeIP)
// 复制结果列表
results := make([]map[string]interface{}, len(buffer.results))
copy(results, buffer.results)
buffer.results = buffer.results[:0] // 清空缓冲
// 停止定时器
if buffer.pushTimer != nil {
buffer.pushTimer.Stop()
buffer.pushTimer = nil
}
buffer.lastPush = time.Now()
buffer.mu.Unlock()
// 批量推送结果
for _, r := range results {
pushSingleResult(taskID, nodeID, nodeIP, r)
}
return
}
buffer.mu.Lock()
// 如果距离上次推送超过间隔时间,启动定时器推送
if buffer.pushTimer == nil {
buffer.pushTimer = time.AfterFunc(batchPushInterval, func() {
flushPushBuffer(taskID, nodeID, nodeIP)
})
}
buffer.mu.Unlock()
}
// flushPushBuffer 刷新并推送缓冲中的结果
@@ -362,13 +452,21 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri
jsonData, err := json.Marshal(data)
if err != nil {
logger.Error("序列化结果失败", zap.Error(err), zap.String("task_id", taskID))
logger.Error("序列化结果失败",
zap.Error(err),
zap.String("task_id", taskID),
zap.Uint("node_id", nodeID),
zap.String("node_ip", nodeIP),
zap.Any("data", data))
return
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
logger.Error("创建请求失败", zap.Error(err), zap.String("task_id", taskID))
logger.Error("创建请求失败",
zap.Error(err),
zap.String("task_id", taskID),
zap.String("url", url))
return
}
@@ -380,7 +478,9 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri
logger.Warn("推送结果失败,继续运行",
zap.Error(err),
zap.String("task_id", taskID),
zap.String("url", url))
zap.String("url", url),
zap.Uint("node_id", nodeID),
zap.String("node_ip", nodeIP))
// 推送失败不停止任务,继续运行
return
}
@@ -394,7 +494,9 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri
if containsTaskNotFoundError(bodyStr) {
logger.Warn("后端任务不存在,停止节点端任务",
zap.String("task_id", taskID),
zap.String("response", bodyStr))
zap.String("response", bodyStr),
zap.Uint("node_id", nodeID),
zap.String("node_ip", nodeIP))
// 停止对应的持续测试任务
stopTaskByTaskID(taskID)
return
@@ -404,12 +506,18 @@ func pushSingleResult(taskID string, nodeID uint, nodeIP string, result map[stri
zap.Int("status", resp.StatusCode),
zap.String("task_id", taskID),
zap.String("url", url),
zap.String("response", bodyStr))
zap.String("response", bodyStr),
zap.Uint("node_id", nodeID),
zap.String("node_ip", nodeIP))
// 其他错误不停止任务,继续运行
return
}
logger.Debug("推送结果成功", zap.String("task_id", taskID))
logger.Debug("推送结果成功",
zap.String("task_id", taskID),
zap.Uint("node_id", nodeID),
zap.String("node_ip", nodeIP),
zap.Any("result", result))
}
// containsTaskNotFoundError 检查响应中是否包含任务不存在的错误
@@ -522,23 +630,20 @@ func StartTaskCleanup() {
for range ticker.C {
now := time.Now()
taskMutex.Lock()
var tasksToDelete []string
for taskID, task := range continuousTasks {
shouldDelete := false
// 检查最大运行时长
if now.Sub(task.StartTime) > task.MaxDuration {
logger.Info("任务达到最大运行时长,自动停止", zap.String("task_id", taskID))
task.IsRunning = false
if task.pingTask != nil {
task.pingTask.Stop()
}
if task.tcpingTask != nil {
task.tcpingTask.Stop()
}
delete(continuousTasks, taskID)
continue
}
// 检查无客户端连接30分钟无请求
if now.Sub(task.LastRequest) > 30*time.Minute {
shouldDelete = true
} else if now.Sub(task.LastRequest) > 30*time.Minute {
// 检查无客户端连接30分钟无请求
logger.Info("任务无客户端连接,自动停止", zap.String("task_id", taskID))
shouldDelete = true
}
if shouldDelete {
task.IsRunning = false
if task.pingTask != nil {
task.pingTask.Stop()
@@ -546,10 +651,41 @@ func StartTaskCleanup() {
if task.tcpingTask != nil {
task.tcpingTask.Stop()
}
delete(continuousTasks, taskID)
// 关闭停止通道
select {
case <-task.StopCh:
// 已经关闭
default:
close(task.StopCh)
}
tasksToDelete = append(tasksToDelete, taskID)
}
}
taskMutex.Unlock()
// 清理任务和推送缓冲
if len(tasksToDelete) > 0 {
taskMutex.Lock()
for _, taskID := range tasksToDelete {
delete(continuousTasks, taskID)
}
taskMutex.Unlock()
// 清理推送缓冲
bufferMutex.Lock()
for _, taskID := range tasksToDelete {
if buffer, exists := pushBuffers[taskID]; exists {
if buffer.pushTimer != nil {
buffer.pushTimer.Stop()
}
delete(pushBuffers, taskID)
logger.Debug("已清理任务推送缓冲", zap.String("task_id", taskID))
}
}
bufferMutex.Unlock()
}
}
}()
}

View File

@@ -44,12 +44,12 @@ func (t *timingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
port = "80"
}
}
// DNS查询时间
dnsStart := time.Now()
ips, err := net.LookupIP(host)
dnsTime := time.Since(dnsStart)
t.mu.Lock()
t.nameLookup = dnsTime
if len(ips) > 0 {
@@ -65,11 +65,11 @@ func (t *timingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
}
}
t.mu.Unlock()
if err != nil {
return nil, err
}
// TCP连接时间如果已知IP
var connectTime time.Duration
if t.primaryIP != "" {
@@ -80,13 +80,13 @@ func (t *timingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
conn.Close()
}
}
// 执行HTTP请求
httpStart := time.Now()
resp, err := t.transport.RoundTrip(req)
httpTime := time.Since(httpStart)
totalTime := time.Since(start)
t.mu.Lock()
if connectTime > 0 {
t.connect = connectTime
@@ -103,7 +103,7 @@ func (t *timingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
}
}
t.mu.Unlock()
return resp, err
}
@@ -139,17 +139,14 @@ func handleGet(c *gin.Context, urlStr string, params map[string]interface{}) {
// 创建自定义Transport用于时间跟踪
timingTransport := newTimingTransport()
// 创建HTTP客户端
client := &http.Client{
Transport: timingTransport,
Timeout: 15 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
// 跟随重定向,最多20次
if len(via) >= 20 {
return fmt.Errorf("重定向次数过多")
}
return nil
// 跟随重定向,返回第一个状态码和 header
return http.ErrUseLastResponse
},
}
@@ -181,8 +178,11 @@ func handleGet(c *gin.Context, urlStr string, params map[string]interface{}) {
// 执行请求
startTime := time.Now()
resp, err := client.Do(req)
if err != nil {
// 错误处理
// 处理重定向错误:当 CheckRedirect 返回 ErrUseLastResponse 时,
// client.Do 会返回响应和错误,但响应仍然有效(包含重定向状态码和 header
if err != nil && resp == nil {
// 真正的错误,没有响应
errMsg := err.Error()
if strings.Contains(errMsg, "no such host") {
result["ip"] = "域名无法解析"
@@ -204,7 +204,24 @@ func handleGet(c *gin.Context, urlStr string, params map[string]interface{}) {
c.JSON(200, result)
return
}
defer resp.Body.Close()
// 如果有响应(包括重定向响应),继续处理
if resp != nil {
defer resp.Body.Close()
} else {
// 没有响应也没有错误,不应该发生
result["error"] = "未知错误"
result["ip"] = "访问失败"
result["totaltime"] = "*"
result["downtime"] = "*"
result["downsize"] = "*"
result["downspeed"] = "*"
result["firstbytetime"] = "*"
result["conntime"] = "*"
result["size"] = "*"
c.JSON(200, result)
return
}
// 获取时间信息
timingTransport.mu.Lock()
@@ -237,19 +254,19 @@ func handleGet(c *gin.Context, urlStr string, params map[string]interface{}) {
bodyReader := io.LimitReader(resp.Body, 1024*1024) // 限制1MB
bodyStartTime := time.Now()
body, err := io.ReadAll(bodyReader)
bodyReadTime := time.Now().Sub(bodyStartTime)
bodyReadTime := time.Since(bodyStartTime)
if err != nil && err != io.EOF {
result["error"] = err.Error()
}
downloadSize := int64(len(body))
statusCode := resp.StatusCode
// 如果首字节时间为0使用连接时间
if firstByteTime == 0 {
firstByteTime = connectTime
}
// 总时间 = 实际请求时间
if totalTime == 0 {
totalTime = time.Since(startTime)
@@ -327,16 +344,14 @@ func handlePost(c *gin.Context, urlStr string, params map[string]interface{}) {
// 创建自定义Transport用于时间跟踪
timingTransport := newTimingTransport()
// 创建HTTP客户端
client := &http.Client{
Transport: timingTransport,
Timeout: 15 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
if len(via) >= 20 {
return fmt.Errorf("重定向次数过多")
}
return nil
// 不跟随重定向,返回第一个状态码和 header
return http.ErrUseLastResponse
},
}
@@ -363,7 +378,11 @@ func handlePost(c *gin.Context, urlStr string, params map[string]interface{}) {
// 执行请求
startTime := time.Now()
resp, err := client.Do(req)
if err != nil {
// 处理重定向错误:当 CheckRedirect 返回 ErrUseLastResponse 时,
// client.Do 会返回响应和错误,但响应仍然有效(包含重定向状态码和 header
if err != nil && resp == nil {
// 真正的错误,没有响应
errMsg := err.Error()
if strings.Contains(errMsg, "no such host") {
result["ip"] = "域名无法解析"
@@ -385,7 +404,24 @@ func handlePost(c *gin.Context, urlStr string, params map[string]interface{}) {
c.JSON(200, result)
return
}
defer resp.Body.Close()
// 如果有响应(包括重定向响应),继续处理
if resp != nil {
defer resp.Body.Close()
} else {
// 没有响应也没有错误,不应该发生
result["error"] = "未知错误"
result["ip"] = "访问失败"
result["totaltime"] = "*"
result["downtime"] = "*"
result["downsize"] = "*"
result["downspeed"] = "*"
result["firstbytetime"] = "*"
result["conntime"] = "*"
result["size"] = "*"
c.JSON(200, result)
return
}
// 获取时间信息
timingTransport.mu.Lock()
@@ -425,12 +461,12 @@ func handlePost(c *gin.Context, urlStr string, params map[string]interface{}) {
downloadSize := int64(len(body))
statusCode := resp.StatusCode
// 如果首字节时间为0使用连接时间
if firstByteTime == 0 {
firstByteTime = connectTime
}
// 总时间 = 实际请求时间
if totalTime == 0 {
totalTime = time.Since(startTime)

View File

@@ -16,27 +16,59 @@ func handleTCPing(c *gin.Context, url string, params map[string]interface{}) {
seq = seqVal
}
// 解析host:port格式
parts := strings.Split(url, ":")
if len(parts) != 2 {
c.JSON(200, gin.H{
"seq": seq,
"type": "ceTCPing",
"url": url,
"error": "格式错误,需要 host:port",
})
return
// 解析host:port格式如果没有端口则默认80
var host string
var portStr string
var port int
// 检查是否是IPv6格式如 [::1]:8080
if strings.HasPrefix(url, "[") {
// IPv6格式 - 使用 Index 而不是 LastIndex 来找到第一个闭合括号
closeBracket := strings.Index(url, "]")
if closeBracket == -1 {
c.JSON(200, gin.H{
"seq": seq,
"type": "ceTCPing",
"url": url,
"error": "格式错误IPv6地址格式应为 [host]:port",
})
return
}
host = url[1:closeBracket]
if closeBracket+1 < len(url) && url[closeBracket+1] == ':' {
portStr = url[closeBracket+2:]
// 如果端口部分为空使用默认端口80修复 Bug 1
if portStr == "" {
portStr = "80"
}
} else {
portStr = "80" // 默认端口
}
} else {
// 普通格式 host:port 或 host
lastColonIndex := strings.LastIndex(url, ":")
if lastColonIndex == -1 {
// 没有冒号使用默认端口80
host = url
portStr = "80"
} else {
host = url[:lastColonIndex]
portStr = url[lastColonIndex+1:]
// 如果端口部分为空使用默认端口80
if portStr == "" {
portStr = "80"
}
}
}
host := parts[0]
portStr := parts[1]
port, err := strconv.Atoi(portStr)
var err error
port, err = strconv.Atoi(portStr)
if err != nil {
c.JSON(200, gin.H{
"seq": seq,
"type": "ceTCPing",
"url": url,
"error": "端口格式错误",
"seq": seq,
"type": "ceTCPing",
"url": url,
"error": "端口格式错误",
})
return
}
@@ -131,17 +163,17 @@ func handleTCPing(c *gin.Context, url string, params map[string]interface{}) {
// 返回格式和PING一致
result := gin.H{
"seq": seq,
"type": "ceTCPing",
"url": url,
"ip": primaryIP,
"host": host,
"port": port,
"seq": seq,
"type": "ceTCPing",
"url": url,
"ip": primaryIP,
"host": host,
"port": port,
"packets_total": strconv.Itoa(packetsTotal),
"packets_recv": strconv.Itoa(packetsRecv),
"packets_losrat": packetsLosrat, // float64类型百分比值如10.5表示10.5%
}
// 时间字段:如果是-1全部失败返回字符串"-"否则返回float64
if timeMin < 0 {
result["time_min"] = "-"
@@ -160,4 +192,3 @@ func handleTCPing(c *gin.Context, url string, params map[string]interface{}) {
c.JSON(200, result)
}

View File

@@ -7,6 +7,8 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"os"
"sync"
"time"
@@ -18,13 +20,13 @@ import (
// 节点信息存储(通过心跳更新,优先从配置文件读取)
var nodeInfo struct {
sync.RWMutex
nodeID uint
nodeIP string
country string
province string
city string
isp string
cfg *config.Config
nodeID uint
nodeIP string
country string
province string
city string
isp string
cfg *config.Config
initialized bool
}
@@ -32,7 +34,7 @@ var nodeInfo struct {
func InitNodeInfo(cfg *config.Config) {
nodeInfo.Lock()
defer nodeInfo.Unlock()
nodeInfo.cfg = cfg
nodeInfo.nodeID = cfg.Node.ID
nodeInfo.nodeIP = cfg.Node.IP
@@ -73,10 +75,10 @@ type Reporter struct {
func NewReporter(cfg *config.Config) *Reporter {
logger, _ := zap.NewProduction()
// 初始化节点信息(从配置文件读取)
InitNodeInfo(cfg)
return &Reporter{
cfg: cfg,
client: &http.Client{
@@ -110,10 +112,25 @@ func (r *Reporter) Stop() {
close(r.stopCh)
}
// buildHeartbeatBody 构建心跳请求体
func buildHeartbeatBody() string {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
values := url.Values{}
values.Set("type", "pingServer")
values.Set("version", "2")
values.Set("host_name", hostname)
return values.Encode()
}
// RegisterNode 注册节点(安装时或首次启动时调用)
func RegisterNode(cfg *config.Config) error {
url := fmt.Sprintf("%s/api/node/heartbeat", cfg.Backend.URL)
req, err := http.NewRequest("POST", url, bytes.NewBufferString("type=pingServer"))
req, err := http.NewRequest("POST", url, bytes.NewBufferString(buildHeartbeatBody()))
if err != nil {
return fmt.Errorf("创建心跳请求失败: %w", err)
}
@@ -123,7 +140,7 @@ func RegisterNode(cfg *config.Config) error {
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("发送心跳失败: %w", err)
return fmt.Errorf("发送心跳失败 (URL: %s): %w", url, err)
}
defer resp.Body.Close()
@@ -173,16 +190,27 @@ func RegisterNode(cfg *config.Config) error {
return nil
}
}
return fmt.Errorf("心跳响应格式无效或节点信息不完整")
return fmt.Errorf("心跳响应格式无效或节点信息不完整 (响应体: %s)", string(body))
}
return fmt.Errorf("心跳请求失败,状态码: %d", resp.StatusCode)
// 读取响应体以便记录错误详情
body, err := io.ReadAll(resp.Body)
bodyStr := ""
if err == nil && len(body) > 0 {
// 限制响应体长度,避免错误信息过长
if len(body) > 500 {
bodyStr = string(body[:500]) + "..."
} else {
bodyStr = string(body)
}
}
return fmt.Errorf("心跳请求失败,状态码: %d, URL: %s, 响应体: %s", resp.StatusCode, url, bodyStr)
}
func (r *Reporter) sendHeartbeat() {
// 发送心跳使用Form格式兼容旧接口
url := fmt.Sprintf("%s/api/node/heartbeat", r.cfg.Backend.URL)
req, err := http.NewRequest("POST", url, bytes.NewBufferString("type=pingServer"))
req, err := http.NewRequest("POST", url, bytes.NewBufferString(buildHeartbeatBody()))
if err != nil {
r.logger.Error("创建心跳请求失败", zap.Error(err))
return
@@ -192,7 +220,9 @@ func (r *Reporter) sendHeartbeat() {
resp, err := r.client.Do(req)
if err != nil {
r.logger.Warn("发送心跳失败", zap.Error(err))
r.logger.Warn("发送心跳失败",
zap.String("url", url),
zap.Error(err))
return
}
defer resp.Body.Close()
@@ -260,7 +290,21 @@ func (r *Reporter) sendHeartbeat() {
}
r.logger.Debug("心跳发送成功")
} else {
r.logger.Warn("心跳发送失败", zap.Int("status", resp.StatusCode))
// 读取响应体以便记录错误详情
body, err := io.ReadAll(resp.Body)
bodyStr := ""
if err == nil && len(body) > 0 {
// 限制响应体长度,避免日志过长
if len(body) > 500 {
bodyStr = string(body[:500]) + "..."
} else {
bodyStr = string(body)
}
}
r.logger.Warn("心跳发送失败",
zap.Int("status", resp.StatusCode),
zap.String("url", url),
zap.String("response_body", bodyStr))
}
}

View File

@@ -1 +0,0 @@
48748

View File

@@ -61,7 +61,7 @@ v1.2.0 (2019-09-26)
and `errors.Is`.
v1.1.0 (2017-06-30)
v1.1.2 (2017-06-30)
===================
- Added an `Errors(error) []error` function to extract the underlying list of

View File

@@ -489,7 +489,7 @@ Enhancements:
[#402]: https://github.com/uber-go/zap/pull/402
## v1.1.0 (31 Mar 2017)
## v1.1.2 (31 Mar 2017)
This release fixes two bugs and adds some enhancements to zap's testing helpers.
It is fully backward-compatible.

View File

@@ -1,4 +1,4 @@
{
"version": "1.1.0",
"tag": "v1.1.0"
"version": "1.1.4",
"tag": "v1.1.4"
}