serialize async operations

This commit is contained in:
Simon Knott 2024-12-12 10:48:47 +01:00
parent a526cb9c81
commit c6e8d209fb
No known key found for this signature in database
GPG key ID: 8CEDC00028084AEC

View file

@ -58,6 +58,7 @@ function parseHeader(buffer: Buffer) {
type TarHeader = NonNullable<ReturnType<typeof parseHeader>>;
export class TarExtractor extends Writable {
private queue = Promise.resolve();
private buffer = Buffer.alloc(0);
private currentHeader: TarHeader | null = null;
private remainingBytes = 0;
@ -67,6 +68,66 @@ export class TarExtractor extends Writable {
super();
}
override async _write(chunk: Buffer, _encoding: string, callback: (err?: Error) => void) {
this.queue = this.queue.then(() => this._writeImpl(chunk)).then(callback).catch(callback);
}
private async _writeImpl(chunk: Buffer): Promise<undefined> {
this.buffer = Buffer.concat([this.buffer, chunk]);
while (this.buffer.length >= 512) {
if (!this.currentHeader) {
// Check for end of archive (two consecutive zero blocks)
if (this.buffer.subarray(0, 512).every(byte => byte === 0)) {
this.buffer = this.buffer.subarray(512);
continue;
}
const header = parseHeader(this.buffer);
if (!header)
break;
this.currentHeader = header;
this.remainingBytes = header.size;
this.buffer = this.buffer.subarray(512);
if (header.size > 0)
this.currentFileStream = await this.processHeader(header);
else
await this.processHeader(header); // For symlinks and directories
continue;
}
const blockSize = Math.min(this.remainingBytes, this.buffer.length);
if (blockSize === 0) {
this.currentHeader = null;
this.currentFileStream = null;
continue;
}
const dataChunk = this.buffer.subarray(0, blockSize);
this.buffer = this.buffer.subarray(blockSize);
this.remainingBytes -= blockSize;
if (this.currentFileStream)
await new Promise<void>((resolve, reject) => this.currentFileStream!.write(dataChunk, err => err ? reject(err) : resolve()));
// Handle padding
if (this.remainingBytes === 0) {
const padding = 512 - (this.currentHeader.size % 512);
if (padding < 512)
this.buffer = this.buffer.subarray(padding);
this.currentHeader = null;
if (this.currentFileStream) {
await new Promise(resolve => this.currentFileStream!.end(resolve));
this.currentFileStream = null;
}
}
}
}
async mkdir(dir: string) {
try {
await fs.promises.mkdir(dir, { recursive: true });
@ -107,65 +168,4 @@ export class TarExtractor extends Writable {
await fs.promises.symlink(targetPath, symlinkPath);
}
override async _write(chunk: Buffer, _encoding: string, callback: (err?: Error) => void) {
try {
this.buffer = Buffer.concat([this.buffer, chunk]);
while (this.buffer.length >= 512) {
if (!this.currentHeader) {
// Check for end of archive (two consecutive zero blocks)
if (this.buffer.subarray(0, 512).every(byte => byte === 0)) {
this.buffer = this.buffer.subarray(512);
continue;
}
const header = parseHeader(this.buffer);
if (!header)
break;
this.currentHeader = header;
this.remainingBytes = header.size;
this.buffer = this.buffer.subarray(512);
if (header.size > 0)
this.currentFileStream = await this.processHeader(header);
else
await this.processHeader(header); // For symlinks and directories
continue;
}
const blockSize = Math.min(this.remainingBytes, this.buffer.length);
if (blockSize === 0) {
this.currentHeader = null;
this.currentFileStream = null;
continue;
}
const dataChunk = this.buffer.subarray(0, blockSize);
this.buffer = this.buffer.subarray(blockSize);
this.remainingBytes -= blockSize;
if (this.currentFileStream)
await new Promise<void>((resolve, reject) => this.currentFileStream!.write(dataChunk, err => err ? reject(err) : resolve()));
// Handle padding
if (this.remainingBytes === 0) {
const padding = 512 - (this.currentHeader.size % 512);
if (padding < 512)
this.buffer = this.buffer.subarray(padding);
this.currentHeader = null;
if (this.currentFileStream) {
await new Promise(resolve => this.currentFileStream!.end(resolve));
this.currentFileStream = null;
}
}
}
callback();
} catch (err) {
callback(err);
}
}
}