fix(test runner): shutdown redundant workers (#12062)
This commit is contained in:
parent
6c89f160e8
commit
9814c592d2
|
|
@ -43,6 +43,7 @@ type TestData = {
|
||||||
export class Dispatcher {
|
export class Dispatcher {
|
||||||
private _workerSlots: { busy: boolean, worker?: Worker }[] = [];
|
private _workerSlots: { busy: boolean, worker?: Worker }[] = [];
|
||||||
private _queue: TestGroup[] = [];
|
private _queue: TestGroup[] = [];
|
||||||
|
private _queueHashCount = new Map<string, number>();
|
||||||
private _finished = new ManualPromise<void>();
|
private _finished = new ManualPromise<void>();
|
||||||
private _isStopped = false;
|
private _isStopped = false;
|
||||||
|
|
||||||
|
|
@ -57,6 +58,7 @@ export class Dispatcher {
|
||||||
this._reporter = reporter;
|
this._reporter = reporter;
|
||||||
this._queue = testGroups;
|
this._queue = testGroups;
|
||||||
for (const group of testGroups) {
|
for (const group of testGroups) {
|
||||||
|
this._queueHashCount.set(group.workerHash, 1 + (this._queueHashCount.get(group.workerHash) || 0));
|
||||||
for (const test of group.tests) {
|
for (const test of group.tests) {
|
||||||
this._testById.set(test._id, { test, resultByWorkerIndex: new Map() });
|
this._testById.set(test._id, { test, resultByWorkerIndex: new Map() });
|
||||||
for (let suite: Suite | undefined = test.parent; suite; suite = suite.parent) {
|
for (let suite: Suite | undefined = test.parent; suite; suite = suite.parent) {
|
||||||
|
|
@ -85,6 +87,7 @@ export class Dispatcher {
|
||||||
|
|
||||||
// 3. Claim both the job and the worker, run the job and release the worker.
|
// 3. Claim both the job and the worker, run the job and release the worker.
|
||||||
this._queue.shift();
|
this._queue.shift();
|
||||||
|
this._queueHashCount.set(job.workerHash, this._queueHashCount.get(job.workerHash)! - 1);
|
||||||
this._workerSlots[index].busy = true;
|
this._workerSlots[index].busy = true;
|
||||||
await this._startJobInWorker(index, job);
|
await this._startJobInWorker(index, job);
|
||||||
this._workerSlots[index].busy = false;
|
this._workerSlots[index].busy = false;
|
||||||
|
|
@ -141,6 +144,15 @@ export class Dispatcher {
|
||||||
this._finished.resolve();
|
this._finished.resolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private _isWorkerRedundant(worker: Worker) {
|
||||||
|
let workersWithSameHash = 0;
|
||||||
|
for (const slot of this._workerSlots) {
|
||||||
|
if (slot.worker && !slot.worker.didSendStop() && slot.worker.hash() === worker.hash())
|
||||||
|
workersWithSameHash++;
|
||||||
|
}
|
||||||
|
return workersWithSameHash > this._queueHashCount.get(worker.hash())!;
|
||||||
|
}
|
||||||
|
|
||||||
async run() {
|
async run() {
|
||||||
this._workerSlots = [];
|
this._workerSlots = [];
|
||||||
// 1. Allocate workers.
|
// 1. Allocate workers.
|
||||||
|
|
@ -279,6 +291,8 @@ export class Dispatcher {
|
||||||
// - we are here not because something failed
|
// - we are here not because something failed
|
||||||
// - no unrecoverable worker error
|
// - no unrecoverable worker error
|
||||||
if (!remaining.length && !failedTestIds.size && !params.fatalError) {
|
if (!remaining.length && !failedTestIds.size && !params.fatalError) {
|
||||||
|
if (this._isWorkerRedundant(worker))
|
||||||
|
worker.stop();
|
||||||
doneWithJob();
|
doneWithJob();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -382,6 +396,7 @@ export class Dispatcher {
|
||||||
|
|
||||||
if (remaining.length) {
|
if (remaining.length) {
|
||||||
this._queue.unshift({ ...testGroup, tests: remaining });
|
this._queue.unshift({ ...testGroup, tests: remaining });
|
||||||
|
this._queueHashCount.set(testGroup.workerHash, this._queueHashCount.get(testGroup.workerHash)! + 1);
|
||||||
// Perhaps we can immediately start the new job if there is a worker available?
|
// Perhaps we can immediately start the new job if there is a worker available?
|
||||||
this._scheduleJob();
|
this._scheduleJob();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -202,3 +202,41 @@ test('should not stall when workers are available', async ({ runInlineTest }) =>
|
||||||
'%%passes-2-done',
|
'%%passes-2-done',
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('should teardown workers that are redundant', async ({ runInlineTest }) => {
|
||||||
|
const result = await runInlineTest({
|
||||||
|
'helper.js': `
|
||||||
|
module.exports = pwt.test.extend({
|
||||||
|
w: [async ({}, use) => {
|
||||||
|
console.log('\\n%%worker setup');
|
||||||
|
await use('worker');
|
||||||
|
console.log('\\n%%worker teardown');
|
||||||
|
}, { scope: 'worker' }],
|
||||||
|
});
|
||||||
|
`,
|
||||||
|
'a.spec.js': `
|
||||||
|
const test = require('./helper');
|
||||||
|
test('test1', async ({ w }) => {
|
||||||
|
await new Promise(f => setTimeout(f, 1500));
|
||||||
|
console.log('\\n%%test-done');
|
||||||
|
});
|
||||||
|
`,
|
||||||
|
'b.spec.js': `
|
||||||
|
const test = require('./helper');
|
||||||
|
test('test2', async ({ w }) => {
|
||||||
|
await new Promise(f => setTimeout(f, 3000));
|
||||||
|
console.log('\\n%%test-done');
|
||||||
|
});
|
||||||
|
`,
|
||||||
|
}, { workers: 2 });
|
||||||
|
expect(result.exitCode).toBe(0);
|
||||||
|
expect(result.passed).toBe(2);
|
||||||
|
expect(stripAnsi(result.output).split('\n').filter(line => line.startsWith('%%'))).toEqual([
|
||||||
|
'%%worker setup',
|
||||||
|
'%%worker setup',
|
||||||
|
'%%test-done',
|
||||||
|
'%%worker teardown',
|
||||||
|
'%%test-done',
|
||||||
|
'%%worker teardown',
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue