chore(test runner): reuse TestGroup instead of DispatcherEntry (#7924)

This commit is contained in:
Dmitry Gozman 2021-07-29 21:41:06 -07:00 committed by GitHub
parent 4163cec93b
commit 2e387b3a3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -22,14 +22,6 @@ import type { TestResult, Reporter } from '../../types/testReporter';
import { TestCase } from './test';
import { Loader } from './loader';
// TODO: use TestGroup instead of DispatcherEntry
type DispatcherEntry = {
runPayload: RunPayload;
hash: string;
repeatEachIndex: number;
projectIndex: number;
};
export type TestGroup = {
workerHash: string;
requireFile: string;
@ -44,7 +36,7 @@ export class Dispatcher {
private _workerClaimers: (() => void)[] = [];
private _testById = new Map<string, { test: TestCase, result: TestResult }>();
private _queue: DispatcherEntry[] = [];
private _queue: TestGroup[] = [];
private _stopCallback = () => {};
readonly _loader: Loader;
private _reporter: Reporter;
@ -55,27 +47,12 @@ export class Dispatcher {
constructor(loader: Loader, testGroups: TestGroup[], reporter: Reporter) {
this._loader = loader;
this._reporter = reporter;
this._queue = [];
this._queue = testGroups;
for (const group of testGroups) {
const entry: DispatcherEntry = {
runPayload: {
file: group.requireFile,
entries: []
},
hash: group.workerHash,
repeatEachIndex: group.repeatEachIndex,
projectIndex: group.projectIndex,
};
for (const test of group.tests) {
const result = test._appendTestResult();
this._testById.set(test._id, { test, result });
entry.runPayload.entries.push({
retry: result.retry,
testId: test._id,
});
}
this._queue.push(entry);
}
}
@ -90,22 +67,23 @@ export class Dispatcher {
while (this._queue.length) {
if (this._isStopped)
break;
const entry = this._queue.shift()!;
const requiredHash = entry.hash;
let worker = await this._obtainWorker(entry);
const testGroup = this._queue.shift()!;
const requiredHash = testGroup.workerHash;
let worker = await this._obtainWorker(testGroup);
while (!this._isStopped && worker.hash && worker.hash !== requiredHash) {
worker.stop();
worker = await this._obtainWorker(entry);
worker = await this._obtainWorker(testGroup);
}
if (this._isStopped)
break;
jobs.push(this._runJob(worker, entry));
jobs.push(this._runJob(worker, testGroup));
}
await Promise.all(jobs);
}
async _runJob(worker: Worker, entry: DispatcherEntry) {
worker.run(entry.runPayload);
async _runJob(worker: Worker, testGroup: TestGroup) {
worker.run(testGroup);
let doneCallback = () => {};
const result = new Promise<void>(f => doneCallback = f);
const doneWithJob = () => {
@ -116,7 +94,7 @@ export class Dispatcher {
doneCallback();
};
const remainingByTestId = new Map(entry.runPayload.entries.map(e => [ e.testId, e ]));
const remainingByTestId = new Map(testGroup.tests.map(e => [ e._id, e ]));
let lastStartedTestId: string | undefined;
const onTestBegin = (params: TestBeginPayload) => {
@ -152,17 +130,17 @@ export class Dispatcher {
// and all others as skipped.
if (params.fatalError) {
let first = true;
for (const { testId } of remaining) {
const { test, result } = this._testById.get(testId)!;
for (const test of remaining) {
const { result } = this._testById.get(test._id)!;
if (this._hasReachedMaxFailures())
break;
// There might be a single test that has started but has not finished yet.
if (testId !== lastStartedTestId)
if (test._id !== lastStartedTestId)
this._reporter.onTestBegin?.(test);
result.error = params.fatalError;
result.status = first ? 'failed' : 'skipped';
this._reportTestEnd(test, result);
failedTestIds.add(testId);
failedTestIds.add(test._id);
first = false;
}
// Since we pretend that all remaining tests failed, there is nothing else to run,
@ -177,15 +155,12 @@ export class Dispatcher {
const pair = this._testById.get(testId)!;
if (!this._isStopped && pair.test.expectedStatus === 'passed' && pair.test.results.length < pair.test.retries + 1) {
pair.result = pair.test._appendTestResult();
remaining.unshift({
retry: pair.result.retry,
testId: pair.test._id,
});
remaining.unshift(pair.test);
}
}
if (remaining.length)
this._queue.unshift({ ...entry, runPayload: { ...entry.runPayload, entries: remaining } });
this._queue.unshift({ ...testGroup, tests: remaining });
// This job is over, we just scheduled another one.
doneWithJob();
@ -203,14 +178,14 @@ export class Dispatcher {
return result;
}
async _obtainWorker(entry: DispatcherEntry) {
async _obtainWorker(testGroup: TestGroup) {
const claimWorker = (): Promise<Worker> | null => {
// Use available worker.
if (this._freeWorkers.length)
return Promise.resolve(this._freeWorkers.pop()!);
// Create a new worker.
if (this._workers.size < this._loader.fullConfig().workers)
return this._createWorker(entry);
return this._createWorker(testGroup);
return null;
};
@ -232,7 +207,7 @@ export class Dispatcher {
callback();
}
_createWorker(entry: DispatcherEntry) {
_createWorker(testGroup: TestGroup) {
const worker = new Worker(this);
worker.on('testBegin', (params: TestBeginPayload) => {
if (this._hasReachedMaxFailures())
@ -285,7 +260,7 @@ export class Dispatcher {
this._stopCallback();
});
this._workers.add(worker);
return worker.init(entry).then(() => worker);
return worker.init(testGroup).then(() => worker);
}
async stop() {
@ -350,19 +325,25 @@ class Worker extends EventEmitter {
});
}
async init(entry: DispatcherEntry) {
this.hash = entry.hash;
async init(testGroup: TestGroup) {
this.hash = testGroup.workerHash;
const params: WorkerInitParams = {
workerIndex: this.index,
repeatEachIndex: entry.repeatEachIndex,
projectIndex: entry.projectIndex,
repeatEachIndex: testGroup.repeatEachIndex,
projectIndex: testGroup.projectIndex,
loader: this.runner._loader.serialize(),
};
this.process.send({ method: 'init', params });
await new Promise(f => this.process.once('message', f)); // Ready ack
}
run(runPayload: RunPayload) {
run(testGroup: TestGroup) {
const runPayload: RunPayload = {
file: testGroup.requireFile,
entries: testGroup.tests.map(test => {
return { testId: test._id, retry: test.results.length - 1 };
}),
};
this.process.send({ method: 'run', params: runPayload });
}