fix(test runner): keep track of remaining tests on the runner side (#7486)

This fixes two issues:
- Sudden worker process exit is properly accounted for.
- We can stop() workers willy-nilly, e.g. after reaching maxFailures.

Details:
- DonePayload does not send `reamining` anymore, and worker does not track it.
- Instead, `Dispatcher._runJob` track remaining tests and acts accordingly.
- Upon worker exit, we emulate a fatal error for all remaining tests.

Drive-by:
- Do not report onTestBegin after reaching maxFailures to avoid confusion.
  Before, we did report onTestBegin, but not onTestEnd.
- List reporter aligned between "running" and "finished" state - it was
  one character misplaced.
- Added a SIGINT test.
This commit is contained in:
Dmitry Gozman 2021-07-07 12:04:43 -07:00 committed by GitHub
parent 98bcf26656
commit 2073193c36
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 168 additions and 26 deletions

View file

@ -149,34 +149,58 @@ export class Dispatcher {
worker.run(entry.runPayload);
let doneCallback = () => {};
const result = new Promise<void>(f => doneCallback = f);
worker.once('done', (params: DonePayload) => {
const doneWithJob = () => {
worker.removeListener('testBegin', onTestBegin);
worker.removeListener('testEnd', onTestEnd);
worker.removeListener('done', onDone);
worker.removeListener('exit', onExit);
doneCallback();
};
const remainingByTestId = new Map(entry.runPayload.entries.map(e => [ e.testId, e ]));
let lastStartedTestId: string | undefined;
const onTestBegin = (params: TestBeginPayload) => {
lastStartedTestId = params.testId;
};
worker.addListener('testBegin', onTestBegin);
const onTestEnd = (params: TestEndPayload) => {
remainingByTestId.delete(params.testId);
};
worker.addListener('testEnd', onTestEnd);
const onDone = (params: DonePayload) => {
let remaining = [...remainingByTestId.values()];
// We won't file remaining if:
// - there are no remaining
// - we are here not because something failed
// - no unrecoverable worker error
if (!params.remaining.length && !params.failedTestId && !params.fatalError) {
if (!remaining.length && !params.failedTestId && !params.fatalError) {
this._freeWorkers.push(worker);
this._notifyWorkerClaimer();
doneCallback();
doneWithJob();
return;
}
// When worker encounters error, we will stop it and create a new one.
worker.stop();
let remaining = params.remaining;
const failedTestIds = new Set<string>();
// In case of fatal error, report all remaining tests as failing with this error.
if (params.fatalError) {
for (const { testId } of remaining) {
const { test, result } = this._testById.get(testId)!;
this._reporter.onTestBegin(test);
// There might be a single test that has started but has not finished yet.
if (testId !== lastStartedTestId)
this._reportTestBegin(test);
result.error = params.fatalError;
this._reportTestEnd(test, result, 'failed');
failedTestIds.add(testId);
}
// Since we pretent that all remaining tests failed, there is nothing else to run,
// Since we pretend that all remaining tests failed, there is nothing else to run,
// except for possible retries.
remaining = [];
}
@ -199,8 +223,15 @@ export class Dispatcher {
this._queue.unshift({ ...entry, runPayload: { ...entry.runPayload, entries: remaining } });
// This job is over, we just scheduled another one.
doneCallback();
});
doneWithJob();
};
worker.on('done', onDone);
const onExit = () => {
onDone({ fatalError: { value: 'Worker process exited unexpectedly' } });
};
worker.on('exit', onExit);
return result;
}
@ -238,7 +269,7 @@ export class Dispatcher {
worker.on('testBegin', (params: TestBeginPayload) => {
const { test, result: testRun } = this._testById.get(params.testId)!;
testRun.workerIndex = params.workerIndex;
this._reporter.onTestBegin(test);
this._reportTestBegin(test);
});
worker.on('testEnd', (params: TestEndPayload) => {
const { test, result } = this._testById.get(params.testId)!;
@ -289,6 +320,14 @@ export class Dispatcher {
}
}
private _reportTestBegin(test: Test) {
if (this._isStopped)
return;
const maxFailures = this._loader.fullConfig().maxFailures;
if (!maxFailures || this._failureCount < maxFailures)
this._reporter.onTestBegin(test);
}
private _reportTestEnd(test: Test, result: TestResult, status: TestStatus) {
if (this._isStopped)
return;
@ -299,7 +338,7 @@ export class Dispatcher {
if (!maxFailures || this._failureCount <= maxFailures)
this._reporter.onTestEnd(test, result);
if (maxFailures && this._failureCount === maxFailures)
this._isStopped = true;
this.stop().catch(e => {});
}
hasWorkerErrors(): boolean {
@ -314,6 +353,7 @@ class Worker extends EventEmitter {
runner: Dispatcher;
hash = '';
index: number;
private didSendStop = false;
constructor(runner: Dispatcher) {
super();
@ -356,7 +396,9 @@ class Worker extends EventEmitter {
}
stop() {
this.process.send({ method: 'stop' });
if (!this.didSendStop)
this.process.send({ method: 'stop' });
this.didSendStop = true;
}
}

View file

@ -56,8 +56,7 @@ export type RunPayload = {
export type DonePayload = {
failedTestId?: string;
fatalError?: any;
remaining: TestEntry[];
fatalError?: TestError;
};
export type TestOutputPayload = {

View file

@ -226,7 +226,7 @@ function indent(lines: string, tab: string) {
return lines.replace(/^(?=.+$)/gm, tab);
}
function positionInFile(stack: string, file: string): { column: number; line: number; } {
function positionInFile(stack: string, file: string): { column: number; line: number; } | undefined {
// Stack will have /private/var/folders instead of /var/folders on Mac.
file = fs.realpathSync(file);
for (const line of stack.split('\n')) {
@ -236,7 +236,6 @@ function positionInFile(stack: string, file: string): { column: number; line: nu
if (path.resolve(process.cwd(), parsed.file) === file)
return {column: parsed.column || 0, line: parsed.line || 0};
}
return { column: 0, line: 0 };
}
function monotonicTime(): number {

View file

@ -45,7 +45,7 @@ class ListReporter extends BaseReporter {
process.stdout.write('\n');
this._lastRow++;
}
process.stdout.write(' ' + colors.gray(formatTestTitle(this.config, test) + ': ') + '\n');
process.stdout.write(' ' + colors.gray(formatTestTitle(this.config, test) + ': ') + '\n');
}
this._testRows.set(test, this._lastRow++);
}

View file

@ -24,7 +24,7 @@ import { TestBeginPayload, TestEndPayload, RunPayload, TestEntry, DonePayload, W
import { setCurrentTestInfo } from './globals';
import { Loader } from './loader';
import { Modifier, Spec, Suite, Test } from './test';
import { Annotations, TestInfo, WorkerInfo } from './types';
import { Annotations, TestError, TestInfo, WorkerInfo } from './types';
import { ProjectImpl } from './project';
import { FixtureRunner } from './fixtures';
@ -40,9 +40,8 @@ export class WorkerRunner extends EventEmitter {
private _fixtureRunner: FixtureRunner;
private _failedTestId: string | undefined;
private _fatalError: any | undefined;
private _fatalError: TestError | undefined;
private _entries = new Map<string, TestEntry>();
private _remaining = new Map<string, TestEntry>();
private _isStopped: any;
_currentTest: { testId: string, testInfo: TestInfo } | null = null;
@ -58,6 +57,8 @@ export class WorkerRunner extends EventEmitter {
}
async cleanup() {
// We have to load the project to get the right deadline below.
this._loadIfNeeded();
// TODO: separate timeout for teardown?
const result = await raceAgainstDeadline((async () => {
await this._fixtureRunner.teardownScope('test');
@ -111,7 +112,6 @@ export class WorkerRunner extends EventEmitter {
async run(runPayload: RunPayload) {
this._entries = new Map(runPayload.entries.map(e => [ e.testId, e ]));
this._remaining = new Map(runPayload.entries.map(e => [ e.testId, e ]));
this._loadIfNeeded();
const fileSuite = await this._loader.loadTestFile(runPayload.file);
@ -194,7 +194,6 @@ export class WorkerRunner extends EventEmitter {
const entry = this._entries.get(test._id);
if (!entry)
return;
this._remaining.delete(test._id);
const startTime = monotonicTime();
let deadlineRunner: DeadlineRunner<any> | undefined;
@ -448,7 +447,6 @@ export class WorkerRunner extends EventEmitter {
const donePayload: DonePayload = {
failedTestId: this._failedTestId,
fatalError: this._fatalError,
remaining: [...this._remaining.values()],
};
this.emit('done', donePayload);
}

View file

@ -79,3 +79,36 @@ test('max-failures should work with retries', async ({ runInlineTest }) => {
expect(result.failed).toBe(1);
expect(result.output.split('\n').filter(l => l.includes('Received:')).length).toBe(2);
});
test('max-failures should stop workers', async ({ runInlineTest }) => {
const result = await runInlineTest({
'a.spec.js': `
const { test } = pwt;
test('passed', async () => {
await new Promise(f => setTimeout(f, 500));
});
test('failed', async () => {
test.expect(1).toBe(2);
});
`,
'b.spec.js': `
const { test } = pwt;
test('passed short', async () => {
await new Promise(f => setTimeout(f, 1));
});
test('interrupted counts as skipped', async () => {
console.log('\\n%%interrupted');
await new Promise(f => setTimeout(f, 2000));
});
test('skipped', async () => {
console.log('\\n%%skipped');
});
`,
}, { 'max-failures': 1, 'workers': 2 });
expect(result.exitCode).toBe(1);
expect(result.passed).toBe(2);
expect(result.failed).toBe(1);
expect(result.skipped).toBe(2);
expect(result.output).toContain('%%interrupted');
expect(result.output).not.toContain('%%skipped');
});

View file

@ -112,7 +112,7 @@ async function runTSC(baseDir: string): Promise<TSCResult> {
};
}
async function runPlaywrightTest(baseDir: string, params: any, env: Env): Promise<RunResult> {
async function runPlaywrightTest(baseDir: string, params: any, env: Env, options: RunOptions): Promise<RunResult> {
const paramList = [];
let additionalArgs = '';
for (const key of Object.keys(params)) {
@ -151,6 +151,7 @@ async function runPlaywrightTest(baseDir: string, params: any, env: Env): Promis
cwd: baseDir
});
let output = '';
let didSendSigint = false;
testProcess.stderr.on('data', chunk => {
output += String(chunk);
if (process.env.PW_RUNNER_DEBUG)
@ -158,6 +159,10 @@ async function runPlaywrightTest(baseDir: string, params: any, env: Env): Promis
});
testProcess.stdout.on('data', chunk => {
output += String(chunk);
if (options.sendSIGINTAfter && !didSendSigint && countTimes(output, '%%SEND-SIGINT%%') >= options.sendSIGINTAfter) {
didSendSigint = true;
process.kill(testProcess.pid, 'SIGINT');
}
if (process.env.PW_RUNNER_DEBUG)
process.stdout.write(String(chunk));
});
@ -212,9 +217,12 @@ async function runPlaywrightTest(baseDir: string, params: any, env: Env): Promis
};
}
type RunOptions = {
sendSIGINTAfter?: number;
};
type Fixtures = {
writeFiles: (files: Files) => Promise<string>;
runInlineTest: (files: Files, params?: Params, env?: Env) => Promise<RunResult>;
runInlineTest: (files: Files, params?: Params, env?: Env, options?: RunOptions) => Promise<RunResult>;
runTSC: (files: Files) => Promise<TSCResult>;
};
@ -225,9 +233,9 @@ export const test = base.extend<Fixtures>({
runInlineTest: async ({}, use, testInfo: TestInfo) => {
let runResult: RunResult | undefined;
await use(async (files: Files, params: Params = {}, env: Env = {}) => {
await use(async (files: Files, params: Params = {}, env: Env = {}, options: RunOptions = {}) => {
const baseDir = await writeFiles(testInfo, files);
runResult = await runPlaywrightTest(baseDir, params, env);
runResult = await runPlaywrightTest(baseDir, params, env, options);
return runResult;
});
if (testInfo.status !== testInfo.expectedStatus && runResult && !process.env.PW_RUNNER_DEBUG)
@ -268,3 +276,15 @@ const asciiRegex = new RegExp('[\\u001B\\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*
export function stripAscii(str: string): string {
return str.replace(asciiRegex, '');
}
function countTimes(s: string, sub: string): number {
let result = 0;
for (let index = 0; index !== -1;) {
index = s.indexOf(sub, index);
if (index !== -1) {
result++;
index += sub.length;
}
}
return result;
}

View file

@ -59,3 +59,54 @@ test('it should not allow a focused test when forbid-only is used', async ({ run
expect(result.output).toContain('--forbid-only found a focused test.');
expect(result.output).toContain(`- tests${path.sep}focused-test.spec.js:6 > i-am-focused`);
});
test('it should not hang and report results when worker process suddenly exits', async ({ runInlineTest }) => {
const result = await runInlineTest({
'a.spec.js': `
const { test } = pwt;
test('passed1', () => {});
test('passed2', () => {});
test('failed1', () => { process.exit(0); });
test('failed2', () => {});
`
});
expect(result.exitCode).toBe(1);
expect(result.passed).toBe(2);
expect(result.failed).toBe(2);
expect(result.output).toContain('Worker process exited unexpectedly');
});
test('sigint should stop workers', async ({ runInlineTest }) => {
test.skip(process.platform === 'win32', 'No sending SIGINT on Windows');
const result = await runInlineTest({
'a.spec.js': `
const { test } = pwt;
test('interrupted1', async () => {
console.log('\\n%%SEND-SIGINT%%1');
await new Promise(f => setTimeout(f, 1000));
});
test('skipped1', async () => {
console.log('\\n%%skipped1');
});
`,
'b.spec.js': `
const { test } = pwt;
test('interrupted2', async () => {
console.log('\\n%%SEND-SIGINT%%2');
await new Promise(f => setTimeout(f, 1000));
});
test('skipped2', async () => {
console.log('\\n%%skipped2');
});
`,
}, { 'workers': 2 }, {}, { sendSIGINTAfter: 2 });
expect(result.exitCode).toBe(130);
expect(result.passed).toBe(0);
expect(result.failed).toBe(0);
expect(result.skipped).toBe(4);
expect(result.output).toContain('%%SEND-SIGINT%%1');
expect(result.output).toContain('%%SEND-SIGINT%%2');
expect(result.output).not.toContain('%%skipped1');
expect(result.output).not.toContain('%%skipped2');
});