chore: move fs operation in trace recorder to a separate class (#24383)

This commit is contained in:
Dmitry Gozman 2023-07-26 17:31:00 -07:00 committed by GitHub
parent ea6d127f28
commit c8f2fc392a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 180 additions and 109 deletions

View file

@ -25,7 +25,7 @@ import { ManualPromise } from '../../../utils/manualPromise';
import type { RegisteredListener } from '../../../utils/eventsHelper';
import { eventsHelper } from '../../../utils/eventsHelper';
import { assert, createGuid, monotonicTime } from '../../../utils';
import { mkdirIfNeeded, removeFolders } from '../../../utils/fileUtils';
import { removeFolders } from '../../../utils/fileUtils';
import { Artifact } from '../../artifact';
import { BrowserContext } from '../../browserContext';
import type { ElementHandle } from '../../dom';
@ -54,8 +54,8 @@ export type TracerOptions = {
type RecordingState = {
options: TracerOptions,
traceName: string,
networkFile: { file: string, buffer: trace.TraceEvent[] },
traceFile: { file: string, buffer: trace.TraceEvent[] },
networkFile: string,
traceFile: string,
tracesDir: string,
resourcesDir: string,
chunkOrdinal: number,
@ -68,7 +68,7 @@ type RecordingState = {
const kScreencastOptions = { width: 800, height: 600, quality: 90 };
export class Tracing extends SdkObject implements InstrumentationListener, SnapshotterDelegate, HarTracerDelegate {
private _writeChain = Promise.resolve();
private _fs = new SerializedFS();
private _snapshotter?: Snapshotter;
private _harTracer: HarTracer;
private _screencastListeners: RegisteredListener[] = [];
@ -137,8 +137,8 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
options,
traceName,
tracesDir,
traceFile: { file: path.join(tracesDir, traceName + '.trace'), buffer: [] },
networkFile: { file: path.join(tracesDir, traceName + '.network'), buffer: [] },
traceFile: path.join(tracesDir, traceName + '.trace'),
networkFile: path.join(tracesDir, traceName + '.network'),
resourcesDir: path.join(tracesDir, 'resources'),
chunkOrdinal: 0,
traceSha1s: new Set(),
@ -146,8 +146,8 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
recording: false,
callIds: new Set(),
};
const { resourcesDir, networkFile } = this._state;
this._writeChain = fs.promises.mkdir(resourcesDir, { recursive: true }).then(() => fs.promises.writeFile(networkFile.file, ''));
this._fs.mkdir(this._state.resourcesDir);
this._fs.writeFile(this._state.networkFile, '');
if (options.snapshots)
this._harTracer.start();
}
@ -169,12 +169,9 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
else
this._allocateNewTraceFile(this._state);
const { traceFile } = this._state;
this._appendTraceOperation(async () => {
await mkdirIfNeeded(traceFile.file);
const event: trace.TraceEvent = { ...this._contextCreatedEvent, title: options.title, wallTime: Date.now() };
await appendEventAndFlushIfNeeded(traceFile, event);
});
this._fs.mkdir(path.dirname(this._state.traceFile));
const event: trace.TraceEvent = { ...this._contextCreatedEvent, title: options.title, wallTime: Date.now() };
this._fs.appendFile(this._state.traceFile, JSON.stringify(event) + '\n');
this._context.instrumentation.addListener(this, this._context);
this._eventListeners.push(
@ -208,29 +205,18 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
private _allocateNewTraceFile(state: RecordingState) {
const suffix = state.chunkOrdinal ? `-chunk${state.chunkOrdinal}` : ``;
state.chunkOrdinal++;
state.traceFile = {
file: path.join(state.tracesDir, `${state.traceName}${suffix}.trace`),
buffer: [],
};
state.traceFile = path.join(state.tracesDir, `${state.traceName}${suffix}.trace`);
}
private _changeTraceName(state: RecordingState, name: string) {
const { traceFile: oldTraceFile, networkFile: oldNetworkFile } = state;
state.traceName = name;
state.chunkOrdinal = 0; // Reset ordinal for the new name.
this._allocateNewTraceFile(state);
state.networkFile = {
file: path.join(state.tracesDir, name + '.network'),
buffer: [],
};
const networkFile = state.networkFile;
this._appendTraceOperation(async () => {
await flushTraceFile(oldTraceFile);
await flushTraceFile(oldNetworkFile);
// Network file survives across chunks, so make a copy with the new name.
await fs.promises.copyFile(oldNetworkFile.file, networkFile.file);
});
// Network file survives across chunks, so make a copy with the new name.
const newNetworkFile = path.join(state.tracesDir, name + '.network');
this._fs.copyFile(state.networkFile, newNetworkFile);
state.networkFile = newNetworkFile;
}
async stop() {
@ -241,7 +227,7 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
if (this._state.recording)
throw new Error(`Must stop trace file before stopping tracing`);
this._harTracer.stop();
await this._writeChain;
await this._fs.syncAndGetError();
this._state = undefined;
}
@ -260,7 +246,7 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
async dispose() {
this._snapshotter?.dispose();
this._harTracer.stop();
await this._writeChain;
await this._fs.syncAndGetError();
}
async stopChunk(params: TracingTracingStopChunkParams): Promise<{ artifact?: Artifact, entries?: NameValue[] }> {
@ -290,11 +276,9 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
// We can use <traceName>-<guid>.network, but "-pwnetcopy-0" suffix is more readable
// and makes it easier to debug future issues.
const newNetworkFile = path.join(this._state.tracesDir, this._state.traceName + `-pwnetcopy-${this._state.chunkOrdinal}.network`);
const oldNetworkFile = this._state.networkFile;
const traceFile = this._state.traceFile;
const entries: NameValue[] = [];
entries.push({ name: 'trace.trace', value: traceFile.file });
entries.push({ name: 'trace.trace', value: this._state.traceFile });
entries.push({ name: 'trace.network', value: newNetworkFile });
for (const sha1 of new Set([...this._state.traceSha1s, ...this._state.networkSha1s]))
entries.push({ name: path.join('resources', sha1), value: path.join(this._state.resourcesDir, sha1) });
@ -302,46 +286,41 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
// Only reset trace sha1s, network resources are preserved between chunks.
this._state.traceSha1s = new Set();
// Chain the export operation against write operations,
// so that neither trace files nor resources change during the export.
let result: { artifact?: Artifact, entries?: NameValue[] } = {};
this._appendTraceOperation(async () => {
if (params.mode === 'discard')
return;
await flushTraceFile(traceFile);
await flushTraceFile(oldNetworkFile);
await fs.promises.copyFile(oldNetworkFile.file, newNetworkFile);
if (params.mode === 'entries')
result = { entries };
else
result = { artifact: await this._exportZip(entries, traceFile.file + '.zip').catch(() => undefined) };
});
try {
await this._writeChain;
return result;
} finally {
if (params.mode === 'discard') {
this._isStopping = false;
if (this._state)
this._state.recording = false;
this._state.recording = false;
return {};
}
}
private _exportZip(entries: NameValue[], zipFileName: string): Promise<Artifact | undefined> {
const zipFile = new yazl.ZipFile();
const result = new ManualPromise<Artifact | undefined>();
(zipFile as any as EventEmitter).on('error', error => result.reject(error));
for (const entry of entries)
zipFile.addFile(entry.value, entry.name);
zipFile.end();
zipFile.outputStream.pipe(fs.createWriteStream(zipFileName)).on('close', () => {
const artifact = new Artifact(this._context, zipFileName);
artifact.reportFinished();
result.resolve(artifact);
}).on('error', error => result.reject(error));
return result;
this._fs.copyFile(this._state.networkFile, newNetworkFile);
const zipFileName = this._state.traceFile + '.zip';
if (params.mode === 'archive')
this._fs.zip(entries, zipFileName);
// Make sure all file operations complete.
const error = await this._fs.syncAndGetError();
this._isStopping = false;
if (this._state)
this._state.recording = false;
// IMPORTANT: no awaits after this point, to make sure recording state is correct.
if (error) {
// This check is here because closing the browser removes the tracesDir and tracing
// cannot access removed files. Clients are ready for the missing artifact.
if (this._context instanceof BrowserContext && !this._context._browser.isConnected())
return {};
throw error;
}
if (params.mode === 'entries')
return { entries };
const artifact = new Artifact(this._context, zipFileName);
artifact.reportFinished();
return { artifact };
}
async _captureSnapshot(snapshotName: string, sdkObject: SdkObject, metadata: CallMetadata, element?: ElementHandle): Promise<void> {
@ -410,10 +389,7 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
onEntryFinished(entry: har.Entry) {
const event: trace.ResourceSnapshotTraceEvent = { type: 'resource-snapshot', snapshot: entry };
const visited = visitTraceEvent(event, this._state!.networkSha1s);
const { networkFile } = this._state!;
this._appendTraceOperation(async () => {
await appendEventAndFlushIfNeeded(networkFile, visited);
});
this._fs.appendFile(this._state!.networkFile, JSON.stringify(visited) + '\n', true /* flush */);
}
onContentBlob(sha1: string, buffer: Buffer) {
@ -476,10 +452,9 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
private _appendTraceEvent(event: trace.TraceEvent) {
const visited = visitTraceEvent(event, this._state!.traceSha1s);
const { traceFile } = this._state!;
this._appendTraceOperation(async () => {
await appendEventAndFlushIfNeeded(traceFile, visited);
});
// Do not flush events, they are too noisy.
const flush = event.type !== 'event' && event.type !== 'object';
this._fs.appendFile(this._state!.traceFile, JSON.stringify(visited) + '\n', flush);
}
private _appendResource(sha1: string, buffer: Buffer) {
@ -487,23 +462,7 @@ export class Tracing extends SdkObject implements InstrumentationListener, Snaps
return;
this._allResources.add(sha1);
const resourcePath = path.join(this._state!.resourcesDir, sha1);
this._appendTraceOperation(async () => {
// Note: 'wx' flag only writes when the file does not exist.
// See https://nodejs.org/api/fs.html#file-system-flags.
// This way tracing never have to write the same resource twice.
await fs.promises.writeFile(resourcePath, buffer, { flag: 'wx' }).catch(() => {});
});
}
private _appendTraceOperation(cb: () => Promise<unknown>): void {
// This method serializes all writes to the trace.
this._writeChain = this._writeChain.then(async () => {
// This check is here because closing the browser removes the tracesDir and tracing
// dies trying to archive.
if (this._context instanceof BrowserContext && !this._context._browser.isConnected())
return;
await cb();
});
this._fs.writeFile(resourcePath, buffer, true /* skipIfExists */);
}
}
@ -570,18 +529,91 @@ function createAfterActionTraceEvent(metadata: CallMetadata): trace.AfterActionT
};
}
async function appendEventAndFlushIfNeeded(file: { file: string, buffer: trace.TraceEvent[] }, event: trace.TraceEvent) {
file.buffer.push(event);
class SerializedFS {
private _writeChain = Promise.resolve();
private _buffers = new Map<string, string[]>(); // Should never be accessed from within appendOperation.
private _error: Error | undefined;
// Do not flush events, they are too noisy.
if (event.type === 'event' || event.type === 'object')
return;
mkdir(dir: string) {
this._appendOperation(() => fs.promises.mkdir(dir, { recursive: true }));
}
await flushTraceFile(file);
}
async function flushTraceFile(file: { file: string, buffer: trace.TraceEvent[] }) {
const data = file.buffer.map(e => Buffer.from(JSON.stringify(e) + '\n'));
await fs.promises.appendFile(file.file, Buffer.concat(data));
file.buffer = [];
writeFile(file: string, content: string | Buffer, skipIfExists?: boolean) {
this._buffers.delete(file); // No need to flush the buffer since we'll overwrite anyway.
// Note: 'wx' flag only writes when the file does not exist.
// See https://nodejs.org/api/fs.html#file-system-flags.
// This way tracing never have to write the same resource twice.
this._appendOperation(async () => {
if (skipIfExists)
await fs.promises.writeFile(file, content, { flag: 'wx' }).catch(() => {});
else
await fs.promises.writeFile(file, content);
});
}
appendFile(file: string, text: string, flush?: boolean) {
if (!this._buffers.has(file))
this._buffers.set(file, []);
this._buffers.get(file)!.push(text);
if (flush)
this._flushFile(file);
}
private _flushFile(file: string) {
const buffer = this._buffers.get(file);
if (buffer === undefined)
return;
const text = buffer.join('');
this._buffers.delete(file);
this._appendOperation(() => fs.promises.appendFile(file, text));
}
copyFile(from: string, to: string) {
this._flushFile(from);
this._buffers.delete(to); // No need to flush the buffer since we'll overwrite anyway.
this._appendOperation(() => fs.promises.copyFile(from, to));
}
async syncAndGetError() {
for (const file of this._buffers.keys())
this._flushFile(file);
await this._writeChain;
return this._error;
}
zip(entries: NameValue[], zipFileName: string) {
for (const file of this._buffers.keys())
this._flushFile(file);
// Chain the export operation against write operations,
// so that files do not change during the export.
this._appendOperation(async () => {
const zipFile = new yazl.ZipFile();
const result = new ManualPromise<void>();
(zipFile as any as EventEmitter).on('error', error => result.reject(error));
for (const entry of entries)
zipFile.addFile(entry.value, entry.name);
zipFile.end();
zipFile.outputStream
.pipe(fs.createWriteStream(zipFileName))
.on('close', () => result.resolve())
.on('error', error => result.reject(error));
await result;
});
}
private _appendOperation(cb: () => Promise<unknown>): void {
// This method serializes all writes to the trace.
this._writeChain = this._writeChain.then(async () => {
// Ignore all operations after the first error.
if (this._error)
return;
try {
await cb();
} catch (e) {
this._error = e;
}
});
}
}

View file

@ -320,6 +320,45 @@ test('should record network failures', async ({ context, page, server }, testInf
expect(requestEvent).toBeTruthy();
});
test('should not crash when browser closes mid-trace', async ({ browserType, server }, testInfo) => {
const browser = await browserType.launch();
const page = await browser.newPage();
await page.context().tracing.start({ snapshots: true, screenshots: true });
await page.goto(server.EMPTY_PAGE);
await browser.close();
await new Promise(f => setTimeout(f, 1000)); // Give it some time to throw errors
});
test('should survive browser.close with auto-created traces dir', async ({ browserType }, testInfo) => {
const oldTracesDir = (browserType as any)._defaultLaunchOptions.tracesDir;
(browserType as any)._defaultLaunchOptions.tracesDir = undefined;
const browser = await browserType.launch();
const page = await browser.newPage();
await page.context().tracing.start();
const done = { value: false };
async function go() {
while (!done.value) {
// Produce a lot of operations to make sure tracing operations are enqueued.
for (let i = 0; i < 100; i++)
page.evaluate('1 + 1').catch(() => {});
await page.waitForTimeout(10).catch(() => {});
}
}
void go();
await new Promise(f => setTimeout(f, 500));
// Close the browser and give it some time to fail.
await Promise.all([
browser.close(),
new Promise(f => setTimeout(f, 500)),
]);
done.value = true;
(browserType as any)._defaultLaunchOptions.tracesDir = oldTracesDir;
});
test('should not stall on dialogs', async ({ page, context, server }) => {
await context.tracing.start({ screenshots: true, snapshots: true });
await page.goto(server.EMPTY_PAGE);