fix(test-runner): avoid premature stop of worker in fullyParallel (#13584)

Prior to this change, we were pre-maturely stopping a worker (since it
was deemed redundant under a race condition), and then we immediately
created a new worker with the same hash to finish off the test run. The
worker creation is expensive, so this slowed down the overall test run
time.

See the following for logs of the old code illustrating the extra stops and starts: https://gist.github.com/rwoll/1c592ed9e8f9169274fa972674de6703
This commit is contained in:
Ross Wollman 2022-04-17 16:22:57 -07:00 committed by GitHub
parent 5f843c347d
commit 71fbd2454e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -43,7 +43,7 @@ type TestData = {
export class Dispatcher { export class Dispatcher {
private _workerSlots: { busy: boolean, worker?: Worker }[] = []; private _workerSlots: { busy: boolean, worker?: Worker }[] = [];
private _queue: TestGroup[] = []; private _queue: TestGroup[] = [];
private _queueHashCount = new Map<string, number>(); private _queuedOrRunningHashCount = new Map<string, number>();
private _finished = new ManualPromise<void>(); private _finished = new ManualPromise<void>();
private _isStopped = false; private _isStopped = false;
@ -58,7 +58,7 @@ export class Dispatcher {
this._reporter = reporter; this._reporter = reporter;
this._queue = testGroups; this._queue = testGroups;
for (const group of testGroups) { for (const group of testGroups) {
this._queueHashCount.set(group.workerHash, 1 + (this._queueHashCount.get(group.workerHash) || 0)); this._queuedOrRunningHashCount.set(group.workerHash, 1 + (this._queuedOrRunningHashCount.get(group.workerHash) || 0));
for (const test of group.tests) for (const test of group.tests)
this._testById.set(test._id, { test, resultByWorkerIndex: new Map() }); this._testById.set(test._id, { test, resultByWorkerIndex: new Map() });
} }
@ -80,7 +80,6 @@ export class Dispatcher {
// 3. Claim both the job and the worker, run the job and release the worker. // 3. Claim both the job and the worker, run the job and release the worker.
this._queue.shift(); this._queue.shift();
this._queueHashCount.set(job.workerHash, this._queueHashCount.get(job.workerHash)! - 1);
this._workerSlots[index].busy = true; this._workerSlots[index].busy = true;
await this._startJobInWorker(index, job); await this._startJobInWorker(index, job);
this._workerSlots[index].busy = false; this._workerSlots[index].busy = false;
@ -143,7 +142,7 @@ export class Dispatcher {
if (slot.worker && !slot.worker.didSendStop() && slot.worker.hash() === worker.hash()) if (slot.worker && !slot.worker.didSendStop() && slot.worker.hash() === worker.hash())
workersWithSameHash++; workersWithSameHash++;
} }
return workersWithSameHash > this._queueHashCount.get(worker.hash())!; return workersWithSameHash > this._queuedOrRunningHashCount.get(worker.hash())!;
} }
async run() { async run() {
@ -273,6 +272,7 @@ export class Dispatcher {
worker.on('stepEnd', onStepEnd); worker.on('stepEnd', onStepEnd);
const onDone = (params: DonePayload) => { const onDone = (params: DonePayload) => {
this._queuedOrRunningHashCount.set(worker.hash(), this._queuedOrRunningHashCount.get(worker.hash())! - 1);
let remaining = [...remainingByTestId.values()]; let remaining = [...remainingByTestId.values()];
// We won't file remaining if: // We won't file remaining if:
@ -379,7 +379,7 @@ export class Dispatcher {
if (remaining.length) { if (remaining.length) {
this._queue.unshift({ ...testGroup, tests: remaining }); this._queue.unshift({ ...testGroup, tests: remaining });
this._queueHashCount.set(testGroup.workerHash, this._queueHashCount.get(testGroup.workerHash)! + 1); this._queuedOrRunningHashCount.set(testGroup.workerHash, this._queuedOrRunningHashCount.get(testGroup.workerHash)! + 1);
// Perhaps we can immediately start the new job if there is a worker available? // Perhaps we can immediately start the new job if there is a worker available?
this._scheduleJob(); this._scheduleJob();
} }