chore(test runner): decouple runJob from the Dispacther (#26756)

In preparation to extracting a class for it.
This commit is contained in:
Dmitry Gozman 2023-08-28 15:41:14 -07:00 committed by GitHub
parent 38f1e62642
commit 4bbd16d316
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -136,7 +136,21 @@ export class Dispatcher {
}
// 3. Run the job.
await this._runJob(worker, job);
const result = await this._runJob(worker, job);
this._updateCounterForWorkerHash(job.workerHash, -1);
// 4. When worker encounters error, we stop it and create a new one.
// We also do not keep the worker alive if it cannot serve any more jobs.
if (result.didFail)
void worker.stop(true /* didFail */);
else if (this._isWorkerRedundant(worker))
void worker.stop();
// 5. Possibly schedule a new job with leftover tests and/or retries.
if (!this._isStopped && result.newJob) {
this._queue.unshift(result.newJob);
this._updateCounterForWorkerHash(job.workerHash, +1);
}
}
private _checkFinished() {
@ -168,11 +182,15 @@ export class Dispatcher {
return workersWithSameHash > this._queuedOrRunningHashCount.get(worker.hash())!;
}
private _updateCounterForWorkerHash(hash: string, delta: number) {
this._queuedOrRunningHashCount.set(hash, delta + (this._queuedOrRunningHashCount.get(hash) || 0));
}
async run(testGroups: TestGroup[], extraEnvByProjectId: EnvByProjectId) {
this._extraEnvByProjectId = extraEnvByProjectId;
this._queue = testGroups;
for (const group of testGroups) {
this._queuedOrRunningHashCount.set(group.workerHash, 1 + (this._queuedOrRunningHashCount.get(group.workerHash) || 0));
this._updateCounterForWorkerHash(group.workerHash, +1);
for (const test of group.tests)
this._testById.set(test.id, { test, resultByWorkerIndex: new Map() });
}
@ -185,12 +203,12 @@ export class Dispatcher {
for (let i = 0; i < this._workerSlots.length; i++)
void this._scheduleJob();
this._checkFinished();
// 3. More jobs are scheduled when the worker becomes free, or a new job is added.
// 3. More jobs are scheduled when the worker becomes free.
// 4. Wait for all jobs to finish.
await this._finished;
}
async _runJob(worker: WorkerHost, testGroup: TestGroup) {
async _runJob(worker: WorkerHost, testGroup: TestGroup): Promise<{ newJob?: TestGroup, didFail: boolean }> {
const runPayload: RunPayload = {
file: testGroup.requireFile,
entries: testGroup.tests.map(test => {
@ -199,8 +217,7 @@ export class Dispatcher {
};
worker.runTestGroup(runPayload);
let doneCallback = () => {};
const result = new Promise<void>(f => doneCallback = f);
const result = new ManualPromise<{ newJob?: TestGroup, didFail: boolean }>();
const doneWithJob = () => {
worker.removeListener('testBegin', onTestBegin);
worker.removeListener('testEnd', onTestEnd);
@ -209,7 +226,6 @@ export class Dispatcher {
worker.removeListener('attach', onAttach);
worker.removeListener('done', onDone);
worker.removeListener('exit', onExit);
doneCallback();
};
const remainingByTestId = new Map(testGroup.tests.map(e => [e.id, e]));
@ -320,7 +336,6 @@ export class Dispatcher {
worker.on('attach', onAttach);
const onDone = (params: DonePayload & { unexpectedExitError?: TestError }) => {
this._queuedOrRunningHashCount.set(worker.hash(), this._queuedOrRunningHashCount.get(worker.hash())! - 1);
let remaining = [...remainingByTestId.values()];
// We won't file remaining if:
@ -328,15 +343,11 @@ export class Dispatcher {
// - we are here not because something failed
// - no unrecoverable worker error
if (!remaining.length && !failedTestIds.size && !params.fatalErrors.length && !params.skipTestsDueToSetupFailure.length && !params.fatalUnknownTestIds && !params.unexpectedExitError) {
if (this._isWorkerRedundant(worker))
void worker.stop();
doneWithJob();
result.resolve({ didFail: false });
return;
}
// When worker encounters error, we will stop it and create a new one.
void worker.stop(true /* didFail */);
const massSkipTestsFromRemaining = (testIds: Set<string>, errors: TestError[], onlyStartedTests?: boolean) => {
remaining = remaining.filter(test => {
if (!testIds.has(test.id))
@ -367,7 +378,7 @@ export class Dispatcher {
// We had fatal errors after all tests have passed - most likely in some teardown.
// Let's just fail the test run.
this._hasWorkerErrors = true;
for (const error of params.fatalErrors)
for (const error of errors)
this._reporter.onError(error);
}
};
@ -435,19 +446,14 @@ export class Dispatcher {
for (const testId of retryCandidates) {
const pair = this._testById.get(testId)!;
if (!this._isStopped && pair.test.results.length < pair.test.retries + 1)
if (pair.test.results.length < pair.test.retries + 1)
remaining.push(pair.test);
}
if (remaining.length) {
this._queue.unshift({ ...testGroup, tests: remaining });
this._queuedOrRunningHashCount.set(testGroup.workerHash, this._queuedOrRunningHashCount.get(testGroup.workerHash)! + 1);
// Perhaps we can immediately start the new job if there is a worker available?
void this._scheduleJob();
}
// This job is over, we just scheduled another one.
// This job is over, we will schedule another one.
doneWithJob();
const newJob = remaining.length ? { ...testGroup, tests: remaining } : undefined;
result.resolve({ didFail: true, newJob });
};
worker.on('done', onDone);