chore(code):Use the speedCalculator to estimate network quality

This commit is contained in:
david_bai
2025-08-31 22:09:23 +08:00
parent c0317211e7
commit 0562e8a3a8
+175 -22
View File
@@ -26,6 +26,17 @@ class FileSender {
private pendingFolerMeta: Record<string, FolderMeta>;
private speedCalculator: SpeedCalculator;
// 自适应性能监控
private networkPerformance: Map<
string,
{
avgClearingRate: number; // 平均网络清理速度 KB/s
optimalThreshold: number; // 动态优化的阈值
avgWaitTime: number; // 平均等待时间
sampleCount: number; // 采样次数
}
> = new Map();
// 混合优化配置 - FileReader大块 + 网络小包策略(修复sendData failed
private static readonly OPTIMIZED_CONFIG = {
CHUNK_SIZE: 4194304, // 4MB - 极致大块,最大化减少FileReader调用次数
@@ -321,6 +332,9 @@ class FileSender {
const speed = this.speedCalculator.getSendSpeed(peerId);
const progress = totalSize > 0 ? currentBytes / totalSize : 0;
// 持续更新网络性能(从传输速度学习)
this.updateNetworkFromSpeed(peerId);
peerState.progressCallback?.(progressFileId, progress, speed);
}
@@ -383,8 +397,7 @@ class FileSender {
}
// 智能发送控制 - 根据缓冲区状态决定发送策略
const threshold = FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD;
await this.smartBufferControl(dataChannel, threshold);
await this.smartBufferControl(dataChannel, peerId);
// 发送数据
if (!this.webrtcConnection.sendData(data, peerId)) {
@@ -392,26 +405,154 @@ class FileSender {
}
}
// 智能发送控制策略 - 根据缓冲区状态决定发送策略
// 初始化网络性能监控(传输开始时调用)
private initializeNetworkPerformance(peerId: string): void {
if (!this.networkPerformance.has(peerId)) {
// 使用保守的初始值
this.networkPerformance.set(peerId, {
avgClearingRate: 5000, // 5MB/s初始估计
optimalThreshold: FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD,
avgWaitTime: 50, // 50ms初始估计
sampleCount: 0,
});
postLogToBackend(
`[NetworkInit] Initialized network performance for peer ${peerId}`
);
}
}
// 从SpeedCalculator获取当前传输速度并更新网络性能
private updateNetworkFromSpeed(peerId: string): void {
const currentSpeed = this.speedCalculator.getSendSpeed(peerId); // KB/s
if (currentSpeed > 0) {
const perf = this.networkPerformance.get(peerId);
if (perf) {
perf.avgClearingRate = currentSpeed;
perf.sampleCount++;
// 每10次速度更新时调整阈值
if (perf.sampleCount % 10 === 0) {
this.adjustOptimalThreshold(perf);
postLogToBackend(
`[SpeedAdapt] Updated from speed: ${Math.round(
currentSpeed
)}KB/s, avgRate: ${Math.round(perf.avgClearingRate)}KB/s`
);
}
}
}
}
// 调整最优阈值的共用逻辑
private adjustOptimalThreshold(perf: {
avgClearingRate: number;
optimalThreshold: number;
avgWaitTime: number;
sampleCount: number;
}): void {
if (perf.avgClearingRate > 8000) {
// >8MB/s网络很好
perf.optimalThreshold = Math.max(
FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD,
6291456
); // 6MB
} else if (perf.avgClearingRate > 4000) {
// >4MB/s网络一般
perf.optimalThreshold = FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD; // 3MB
} else {
// 网络较差
perf.optimalThreshold = Math.min(
FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD,
1572864
); // 1.5MB
}
}
// 自适应网络性能学习(从背压等待中学习)
private updateNetworkPerformance(
peerId: string,
clearingRate: number,
waitTime: number
): void {
if (!this.networkPerformance.has(peerId)) {
this.initializeNetworkPerformance(peerId);
}
const perf = this.networkPerformance.get(peerId)!;
perf.sampleCount++;
// 指数移动平均,新数据权重更高
const alpha = 0.3;
perf.avgClearingRate =
perf.avgClearingRate * (1 - alpha) + clearingRate * alpha;
perf.avgWaitTime = perf.avgWaitTime * (1 - alpha) + waitTime * alpha;
// 调整最优阈值
this.adjustOptimalThreshold(perf);
postLogToBackend(
`[BackpressureAdapt] clearingRate=${Math.round(
clearingRate
)}KB/s, avgRate=${Math.round(
perf.avgClearingRate
)}KB/s, threshold=${Math.round(perf.optimalThreshold / 1024)}KB`
);
}
// 获取自适应阈值
private getAdaptiveThreshold(peerId: string): number {
const perf = this.networkPerformance.get(peerId);
return perf
? perf.optimalThreshold
: FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD;
}
// 自适应智能发送控制策略
private async intelligentSendControl(
dataChannel: RTCDataChannel,
threshold: number
peerId: string
): Promise<"AGGRESSIVE" | "NORMAL" | "CAUTIOUS" | "WAIT"> {
const bufferedAmount = dataChannel.bufferedAmount;
const utilizationRate = bufferedAmount / threshold;
postLogToBackend(`[utilizationRate] ${utilizationRate}`);
// 多级缓冲区控制策略
if (utilizationRate < 0.3) {
// 缓冲区使用率 < 30% - 积极发送
const adaptiveThreshold = this.getAdaptiveThreshold(peerId);
const utilizationRate = bufferedAmount / adaptiveThreshold;
// 动态调整策略阈值:基于网络性能
const perf = this.networkPerformance.get(peerId);
const networkQuality = perf
? perf.avgClearingRate > 6000
? "good"
: "poor"
: "unknown";
let aggressiveThreshold = 0.3;
let normalThreshold = 0.6;
let cautiousThreshold = 0.9;
if (networkQuality === "good") {
// 网络好:更激进的策略
aggressiveThreshold = 0.4; // 40%以下积极发送
normalThreshold = 0.7; // 70%以下正常发送
} else if (networkQuality === "poor") {
// 网络差:更保守的策略
aggressiveThreshold = 0.2; // 20%以下才积极发送
normalThreshold = 0.5; // 50%以下正常发送
cautiousThreshold = 0.8; // 80%以上就要等待
}
postLogToBackend(
`[AdaptiveStrategy] utilizationRate=${utilizationRate.toFixed(
3
)}, threshold=${Math.round(
adaptiveThreshold / 1024
)}KB, quality=${networkQuality}`
);
if (utilizationRate < aggressiveThreshold) {
return "AGGRESSIVE";
} else if (utilizationRate < 0.6) {
// 缓冲区使用率 30-60% - 正常发送
} else if (utilizationRate < normalThreshold) {
return "NORMAL";
} else if (utilizationRate < 0.9) {
// 缓冲区使用率 60-90% - 谨慎发送
} else if (utilizationRate < cautiousThreshold) {
return "CAUTIOUS";
} else {
// 缓冲区使用率 > 90% - 必须等待
return "WAIT";
}
}
@@ -419,9 +560,9 @@ class FileSender {
// 智能等待策略 - 根据缓冲区状态调整发送控制
private async smartBufferControl(
dataChannel: RTCDataChannel,
threshold: number
peerId: string
): Promise<void> {
const strategy = await this.intelligentSendControl(dataChannel, threshold);
const strategy = await this.intelligentSendControl(dataChannel, peerId);
if (strategy === "AGGRESSIVE") {
// 积极模式:无需等待,立即发送
@@ -441,14 +582,15 @@ class FileSender {
const POLLING_INTERVAL = 5;
const MAX_WAIT_TIME = 3000;
const startTime = Date.now();
const threshold_75 = threshold * 0.75;
const adaptiveThreshold = this.getAdaptiveThreshold(peerId);
const threshold_75 = adaptiveThreshold * 0.75;
const initialBuffered = dataChannel.bufferedAmount;
let pollCount = 0;
postLogToBackend(
`[BackPressure] Start waiting - buffered: ${Math.round(
initialBuffered / 1024
)}KB, threshold: ${Math.round(threshold / 1024)}KB`
)}KB, threshold: ${Math.round(adaptiveThreshold / 1024)}KB`
);
while (dataChannel.bufferedAmount > threshold_75) {
@@ -457,7 +599,7 @@ class FileSender {
if (Date.now() - startTime > MAX_WAIT_TIME) {
this.log("warn", "Buffer wait timeout", {
bufferedAmount: dataChannel.bufferedAmount,
threshold,
threshold: adaptiveThreshold,
waitTime: Date.now() - startTime,
});
@@ -481,6 +623,11 @@ class FileSender {
const clearingRate =
waitTime > 0 ? clearedBytes / 1024 / (waitTime / 1000) : 0;
// 更新网络性能学习
if (clearingRate > 0) {
this.updateNetworkPerformance(peerId, clearingRate, waitTime);
}
postLogToBackend(
`[BackPressure] End waiting - time: ${waitTime}ms, polls: ${pollCount}, cleared: ${Math.round(
clearedBytes / 1024
@@ -577,12 +724,18 @@ class FileSender {
let totalSendTime = 0;
let batchCount = 0;
// 初始化网络性能监控
this.initializeNetworkPerformance(peerId);
const initialThreshold = this.getAdaptiveThreshold(peerId);
postLogToBackend(
`[Transfer] Start sending ${file.name} (${Math.round(
file.size / 1024
)}KB) from offset ${Math.round(offset / 1024)}KB, threshold: ${Math.round(
FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD / 1024
)}KB`
)}KB) from offset ${Math.round(
offset / 1024
)}KB, initial threshold: ${Math.round(
initialThreshold / 1024
)}KB (adaptive)`
);
try {