Directly writing to disk was also tested and passed

This commit is contained in:
david_bai
2025-09-06 23:49:10 +08:00
parent e385389e1d
commit 3f18002cf0
2 changed files with 256 additions and 69 deletions
+230 -56
View File
@@ -23,6 +23,150 @@ import {
} from "@/types/webrtc";
import { postLogToBackend } from "@/app/config/api";
/**
* 🚀 严格按序缓冲写入管理器 - 优化大文件磁盘I/O性能
*/
class SequencedDiskWriter {
private writeQueue = new Map<number, ArrayBuffer>();
private nextWriteIndex = 0;
private readonly maxBufferSize = 100; // 最多缓冲100个chunk(约6.4MB
private readonly stream: FileSystemWritableFileStream;
private totalWritten = 0;
constructor(stream: FileSystemWritableFileStream, startIndex: number = 0) {
this.stream = stream;
this.nextWriteIndex = startIndex;
}
/**
* 写入一个chunk,自动管理顺序和缓冲
*/
async writeChunk(chunkIndex: number, chunk: ArrayBuffer): Promise<void> {
// 1. 如果是期待的下一个chunk,立即写入
if (chunkIndex === this.nextWriteIndex) {
await this.flushSequentialChunks(chunk);
return;
}
// 2. 如果是未来的chunk,缓冲起来
if (chunkIndex > this.nextWriteIndex) {
if (this.writeQueue.size < this.maxBufferSize) {
this.writeQueue.set(chunkIndex, chunk);
postLogToBackend(
`[DEBUG] 📦 BUFFERED chunk #${chunkIndex} (waiting for #${this.nextWriteIndex}), queue: ${this.writeQueue.size}/${this.maxBufferSize}`
);
} else {
// 缓冲区满,强制处理最早的chunk以释放空间
await this.forceFlushOldest();
this.writeQueue.set(chunkIndex, chunk);
postLogToBackend(
`[DEBUG] ⚠️ BUFFER_FULL, forced flush and buffered chunk #${chunkIndex}`
);
}
return;
}
// 3. 如果是过期的chunk,记录警告但忽略(已写入)
postLogToBackend(
`[DEBUG] ⚠️ DUPLICATE chunk #${chunkIndex} ignored (already written #${this.nextWriteIndex})`
);
}
/**
* 写入当前chunk并尝试连续写入后续的chunk
*/
private async flushSequentialChunks(firstChunk: ArrayBuffer): Promise<void> {
// 写入当前chunk
await this.stream.write(firstChunk);
this.totalWritten += firstChunk.byteLength;
postLogToBackend(
`[DEBUG] ✓ DISK_WRITE chunk #${this.nextWriteIndex} - ${firstChunk.byteLength} bytes, total: ${this.totalWritten}`
);
this.nextWriteIndex++;
// 尝试连续写入缓冲中的chunk
let flushCount = 0;
while (this.writeQueue.has(this.nextWriteIndex)) {
const chunk = this.writeQueue.get(this.nextWriteIndex)!;
await this.stream.write(chunk);
this.totalWritten += chunk.byteLength;
this.writeQueue.delete(this.nextWriteIndex);
flushCount++;
this.nextWriteIndex++;
}
if (flushCount > 0) {
postLogToBackend(
`[DEBUG] 🔥 SEQUENTIAL_FLUSH ${flushCount} chunks, now at #${this.nextWriteIndex}, queue: ${this.writeQueue.size}`
);
}
}
/**
* 强制刷新最早的chunk以释放缓冲区空间
*/
private async forceFlushOldest(): Promise<void> {
if (this.writeQueue.size === 0) return;
const oldestIndex = Math.min(...Array.from(this.writeQueue.keys()));
const chunk = this.writeQueue.get(oldestIndex)!;
// 警告:非序写入
postLogToBackend(
`[DEBUG] ⚠️ FORCE_FLUSH out-of-order chunk #${oldestIndex} (expected #${this.nextWriteIndex})`
);
// 使用seek在正确位置写入(降级处理)
const fileOffset = oldestIndex * 65536; // 假设每个chunk 64KB
await this.stream.seek(fileOffset);
await this.stream.write(chunk);
this.writeQueue.delete(oldestIndex);
// 恢复到当前位置
const currentOffset = this.nextWriteIndex * 65536;
await this.stream.seek(currentOffset);
}
/**
* 获取缓冲区状态
*/
getBufferStatus(): {
queueSize: number;
nextIndex: number;
totalWritten: number;
} {
return {
queueSize: this.writeQueue.size,
nextIndex: this.nextWriteIndex,
totalWritten: this.totalWritten,
};
}
/**
* 关闭并清理资源
*/
async close(): Promise<void> {
// 尝试刷新所有剩余的chunk
const remainingIndexes = Array.from(this.writeQueue.keys()).sort(
(a, b) => a - b
);
for (const chunkIndex of remainingIndexes) {
const chunk = this.writeQueue.get(chunkIndex)!;
const fileOffset = chunkIndex * 65536;
await this.stream.seek(fileOffset);
await this.stream.write(chunk);
postLogToBackend(
`[DEBUG] 💾 FINAL_FLUSH chunk #${chunkIndex} at cleanup`
);
}
this.writeQueue.clear();
}
}
/**
* 🚀 新版本:管理按序列化融合数据包的文件接收状态
*/
@@ -33,6 +177,7 @@ interface ActiveFileReception {
initialOffset: number; // For resuming downloads
fileHandle: FileSystemFileHandle | null; // Object related to writing to disk -- current file.
writeStream: FileSystemWritableFileStream | null; // Object related to writing to disk.
sequencedWriter: SequencedDiskWriter | null; // 🚀 新增:严格按序写入管理器
completionNotifier: {
resolve: () => void;
reject: (reason?: any) => void;
@@ -108,6 +253,17 @@ class FileReceiver {
}
if (this.activeFileReception) {
// 🚀 在错误时也要清理SequencedWriter
if (this.activeFileReception.sequencedWriter) {
this.activeFileReception.sequencedWriter.close().catch((err) => {
this.log(
"error",
"Error closing sequenced writer during error cleanup",
{ err }
);
});
}
this.activeFileReception.completionNotifier.reject(new Error(message));
this.activeFileReception = null;
}
@@ -196,6 +352,7 @@ class FileReceiver {
initialOffset: offset,
fileHandle: null,
writeStream: null,
sequencedWriter: null, // 🚀 新增:严格按序写入管理器
completionNotifier: { resolve, reject },
// 🚀 新版本:简化的按序接收管理
receivedChunksCount: 0,
@@ -355,18 +512,22 @@ class FileReceiver {
try {
// 1. 检查数据包最小长度
if (arrayBuffer.byteLength < 4) {
postLogToBackend(`[DEBUG] ❌ Invalid embedded packet - too small: ${arrayBuffer.byteLength}`);
postLogToBackend(
`[DEBUG] ❌ Invalid embedded packet - too small: ${arrayBuffer.byteLength}`
);
return null;
}
// 2. 读取元数据长度(4字节)
const lengthView = new Uint32Array(arrayBuffer, 0, 1);
const metaLength = lengthView[0];
// 3. 验证数据包的完整性
const expectedTotalLength = 4 + metaLength;
if (arrayBuffer.byteLength < expectedTotalLength) {
postLogToBackend(`[DEBUG] ❌ Incomplete embedded packet - expected: ${expectedTotalLength}, got: ${arrayBuffer.byteLength}`);
postLogToBackend(
`[DEBUG] ❌ Incomplete embedded packet - expected: ${expectedTotalLength}, got: ${arrayBuffer.byteLength}`
);
return null;
}
@@ -374,11 +535,11 @@ class FileReceiver {
const metaBytes = new Uint8Array(arrayBuffer, 4, metaLength);
const metaJson = new TextDecoder().decode(metaBytes);
const chunkMeta: EmbeddedChunkMeta = JSON.parse(metaJson);
// 5. 提取实际chunk数据部分
const chunkDataStart = 4 + metaLength;
const chunkData = arrayBuffer.slice(chunkDataStart);
// 6. 验证chunk数据大小
if (chunkData.byteLength !== chunkMeta.chunkSize) {
postLogToBackend(
@@ -397,7 +558,6 @@ class FileReceiver {
}
}
private async handleReceivedData(
data: string | ArrayBuffer | any,
peerId: string
@@ -507,11 +667,13 @@ class FileReceiver {
// endregion
// region File and Folder Processing
/**
* 🚀 新版本:处理融合数据包
*/
private async handleEmbeddedChunkPacket(arrayBuffer: ArrayBuffer): Promise<void> {
private async handleEmbeddedChunkPacket(
arrayBuffer: ArrayBuffer
): Promise<void> {
const parsed = this.parseEmbeddedChunkPacket(arrayBuffer);
if (!parsed) {
this.fireError("Failed to parse embedded chunk packet");
@@ -520,7 +682,7 @@ class FileReceiver {
const { chunkMeta, chunkData } = parsed;
const reception = this.activeFileReception!;
// 验证fileId匹配
if (chunkMeta.fileId !== reception.meta.fileId) {
postLogToBackend(
@@ -551,48 +713,30 @@ class FileReceiver {
reception.chunks[chunkIndex] = chunkData;
reception.chunkSequenceMap.set(chunkIndex, true);
reception.receivedChunksCount++;
postLogToBackend(
`[DEBUG] ✓ SEQUENCED chunk #${chunkIndex}/${chunkMeta.totalChunks} stored - size: ${chunkData.byteLength}, isLast: ${chunkMeta.isLastChunk}`
);
// 更新进度
this.updateProgress(chunkData.byteLength);
if (reception.writeStream) {
// 对于大文件直写模式,按序写入
await this.writeSequencedLargeFileChunk(chunkData, chunkIndex, chunkMeta.fileOffset);
if (reception.sequencedWriter) {
// 🚀 使用严格按序写入管理器
await reception.sequencedWriter.writeChunk(chunkIndex, chunkData);
} else {
postLogToBackend(`[DEBUG] ❌ Error - no sequencedWriter available`);
}
} else {
postLogToBackend(
`[DEBUG] ❌ Invalid chunk index - ${chunkIndex}, expected 0-${reception.chunks.length - 1}`
`[DEBUG] ❌ Invalid chunk index - ${chunkIndex}, expected 0-${
reception.chunks.length - 1
}`
);
}
await this.checkAndAutoFinalize();
}
/**
* 🚀 新增:按序写入大文件数据块
*/
private async writeSequencedLargeFileChunk(
chunk: ArrayBuffer,
chunkIndex: number,
fileOffset: number
): Promise<void> {
const stream = this.activeFileReception?.writeStream;
if (!stream) return;
try {
// 对于按序写入,可能需要seek到指定位置
// 这里简化处理,假设按序接收就直接写入
await stream.write(chunk);
this.activeFileReception!.chunks[chunkIndex] = null; // Mark as written
} catch (error) {
this.fireError("Error writing sequenced chunk to disk", { error, chunkIndex, fileOffset });
}
}
/**
* 🚀 新版本:统一的自动完成检查 - 支持融合数据包和旧格式
@@ -618,12 +762,16 @@ class FileReceiver {
}
}
const isSequencedComplete = sequencedCount === expectedChunks;
const sizeComplete = currentTotalSize >= expectedSize;
const isDataComplete = isSequencedComplete && sizeComplete;
// 更频繁的调试信息只在接近完成时显示
if (receivedChunks % 10 === 0 || receivedChunks >= expectedChunks - 5 || isDataComplete) {
if (
receivedChunks % 10 === 0 ||
receivedChunks >= expectedChunks - 5 ||
isDataComplete
) {
postLogToBackend(
`[DEBUG] 🔄 SEQUENCED progress - received: ${sequencedCount}/${expectedChunks}, total: ${currentTotalSize}/${expectedSize}, complete: ${isDataComplete}`
);
@@ -729,6 +877,17 @@ class FileReceiver {
this.activeFileReception.fileHandle = fileHandle;
this.activeFileReception.writeStream = writeStream;
// 🚀 创建严格按序写入管理器
const startChunkIndex = Math.floor(offset / 65536); // 计算起始块索引
this.activeFileReception.sequencedWriter = new SequencedDiskWriter(
writeStream,
startChunkIndex
);
postLogToBackend(
`[DEBUG] 📢 SEQUENCED_WRITER created - startIndex: ${startChunkIndex}, offset: ${offset}`
);
} catch (err) {
this.fireError("Failed to create file on disk", {
err,
@@ -758,29 +917,27 @@ class FileReceiver {
return currentDir;
}
private async writeLargeFileChunk(chunk: ArrayBuffer): Promise<void> {
const stream = this.activeFileReception?.writeStream;
if (!stream) {
// Fallback to memory if stream is not available for some reason
this.activeFileReception?.chunks.push(chunk);
return;
}
try {
await stream.write(chunk);
this.activeFileReception?.chunks.push(null); // Keep track of chunk count
} catch (error) {
this.fireError("Error writing chunk to disk", { error });
}
}
private async finalizeLargeFileReceive(): Promise<void> {
const reception = this.activeFileReception;
if (!reception?.writeStream || !reception.fileHandle) return;
try {
// 🚀 先关闭严格按序写入管理器(刷新所有缓冲)
if (reception.sequencedWriter) {
await reception.sequencedWriter.close();
const status = reception.sequencedWriter.getBufferStatus();
postLogToBackend(
`[DEBUG] 💾 SEQUENCED_WRITER closed - totalWritten: ${status.totalWritten}, finalQueue: ${status.queueSize}`
);
reception.sequencedWriter = null;
}
// 然后关闭文件流
await reception.writeStream.close();
postLogToBackend(`[DEBUG] ✅ LARGE_FILE finalized successfully`);
} catch (error) {
this.fireError("Error closing write stream", { error });
this.fireError("Error finalizing large file", { error });
}
}
// endregion
@@ -924,6 +1081,23 @@ class FileReceiver {
// endregion
public gracefulShutdown(): void {
if (this.activeFileReception?.sequencedWriter) {
this.log(
"log",
"Attempting to gracefully close sequenced writer on page unload."
);
// 🚀 先关闭严格按序写入管理器
this.activeFileReception.sequencedWriter.close().catch((err) => {
this.log(
"error",
"Error closing sequenced writer during graceful shutdown",
{
err,
}
);
});
}
if (this.activeFileReception?.writeStream) {
this.log(
"log",
+26 -13
View File
@@ -363,7 +363,7 @@ export default class BaseWebRTC {
// 增强的数据类型检测 - 支持Firefox的多种二进制数据格式
let dataType = "Unknown";
let dataSize = 0;
if (typeof event.data === "string") {
dataType = "String";
dataSize = event.data.length;
@@ -382,12 +382,13 @@ export default class BaseWebRTC {
} else {
// 详细的未知类型调试信息
dataType = `Unknown(${Object.prototype.toString.call(event.data)})`;
dataSize = event.data?.length || event.data?.size || event.data?.byteLength || 0;
dataSize =
event.data?.length || event.data?.size || event.data?.byteLength || 0;
}
postLogToBackend(
`[Firefox Debug] DataChannel onmessage - peer: ${peerId}, dataType: ${dataType}, size: ${dataSize}`
);
// postLogToBackend(
// `[Firefox Debug] DataChannel onmessage - peer: ${peerId}, dataType: ${dataType}, size: ${dataSize}`
// );
if (this.onDataReceived) {
this.onDataReceived(event.data, peerId);
@@ -483,14 +484,22 @@ export default class BaseWebRTC {
if (dataChannel?.readyState === "open") {
try {
// Firefox兼容性调试:记录发送详细信息
const dataType = typeof data === "string" ? "string" : data instanceof ArrayBuffer ? "ArrayBuffer" : typeof data;
const dataSize = typeof data === "string" ? data.length : data instanceof ArrayBuffer ? data.byteLength : 0;
postLogToBackend(`[Firefox Debug] sendToPeer - type: ${dataType}, size: ${dataSize}, bufferedAmount: ${dataChannel.bufferedAmount}`);
const dataType =
typeof data === "string"
? "string"
: data instanceof ArrayBuffer
? "ArrayBuffer"
: typeof data;
const dataSize =
typeof data === "string"
? data.length
: data instanceof ArrayBuffer
? data.byteLength
: 0;
// postLogToBackend(`[Firefox Debug] sendToPeer - type: ${dataType}, size: ${dataSize}, bufferedAmount: ${dataChannel.bufferedAmount}`);
dataChannel.send(data);
postLogToBackend(`[Firefox Debug] sendToPeer success - bufferedAmount after: ${dataChannel.bufferedAmount}`);
return true;
} catch (error) {
postLogToBackend(`[Firefox Debug] sendToPeer error: ${error}`);
@@ -499,7 +508,11 @@ export default class BaseWebRTC {
}
}
postLogToBackend(`[Firefox Debug] DataChannel not ready - peerId: ${peerId}, state: ${dataChannel?.readyState || 'undefined'}`);
postLogToBackend(
`[Firefox Debug] DataChannel not ready - peerId: ${peerId}, state: ${
dataChannel?.readyState || "undefined"
}`
);
this.log("warn", `Data channel not ready for peer ${peerId}. Retrying...`);
return this.retryDataSend(data, peerId);
}