From 9814c592d27370e85636edc75184cbcbc4a707ab Mon Sep 17 00:00:00 2001 From: Dmitry Gozman Date: Mon, 14 Feb 2022 10:57:15 -0800 Subject: [PATCH] fix(test runner): shutdown redundant workers (#12062) --- packages/playwright-test/src/dispatcher.ts | 15 +++++++++ tests/playwright-test/runner.spec.ts | 38 ++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/packages/playwright-test/src/dispatcher.ts b/packages/playwright-test/src/dispatcher.ts index 10cef1dbea..bb8737d43d 100644 --- a/packages/playwright-test/src/dispatcher.ts +++ b/packages/playwright-test/src/dispatcher.ts @@ -43,6 +43,7 @@ type TestData = { export class Dispatcher { private _workerSlots: { busy: boolean, worker?: Worker }[] = []; private _queue: TestGroup[] = []; + private _queueHashCount = new Map(); private _finished = new ManualPromise(); private _isStopped = false; @@ -57,6 +58,7 @@ export class Dispatcher { this._reporter = reporter; this._queue = testGroups; for (const group of testGroups) { + this._queueHashCount.set(group.workerHash, 1 + (this._queueHashCount.get(group.workerHash) || 0)); for (const test of group.tests) { this._testById.set(test._id, { test, resultByWorkerIndex: new Map() }); for (let suite: Suite | undefined = test.parent; suite; suite = suite.parent) { @@ -85,6 +87,7 @@ export class Dispatcher { // 3. Claim both the job and the worker, run the job and release the worker. this._queue.shift(); + this._queueHashCount.set(job.workerHash, this._queueHashCount.get(job.workerHash)! - 1); this._workerSlots[index].busy = true; await this._startJobInWorker(index, job); this._workerSlots[index].busy = false; @@ -141,6 +144,15 @@ export class Dispatcher { this._finished.resolve(); } + private _isWorkerRedundant(worker: Worker) { + let workersWithSameHash = 0; + for (const slot of this._workerSlots) { + if (slot.worker && !slot.worker.didSendStop() && slot.worker.hash() === worker.hash()) + workersWithSameHash++; + } + return workersWithSameHash > this._queueHashCount.get(worker.hash())!; + } + async run() { this._workerSlots = []; // 1. Allocate workers. @@ -279,6 +291,8 @@ export class Dispatcher { // - we are here not because something failed // - no unrecoverable worker error if (!remaining.length && !failedTestIds.size && !params.fatalError) { + if (this._isWorkerRedundant(worker)) + worker.stop(); doneWithJob(); return; } @@ -382,6 +396,7 @@ export class Dispatcher { if (remaining.length) { this._queue.unshift({ ...testGroup, tests: remaining }); + this._queueHashCount.set(testGroup.workerHash, this._queueHashCount.get(testGroup.workerHash)! + 1); // Perhaps we can immediately start the new job if there is a worker available? this._scheduleJob(); } diff --git a/tests/playwright-test/runner.spec.ts b/tests/playwright-test/runner.spec.ts index 143cd84fbe..fd8c789259 100644 --- a/tests/playwright-test/runner.spec.ts +++ b/tests/playwright-test/runner.spec.ts @@ -202,3 +202,41 @@ test('should not stall when workers are available', async ({ runInlineTest }) => '%%passes-2-done', ]); }); + +test('should teardown workers that are redundant', async ({ runInlineTest }) => { + const result = await runInlineTest({ + 'helper.js': ` + module.exports = pwt.test.extend({ + w: [async ({}, use) => { + console.log('\\n%%worker setup'); + await use('worker'); + console.log('\\n%%worker teardown'); + }, { scope: 'worker' }], + }); + `, + 'a.spec.js': ` + const test = require('./helper'); + test('test1', async ({ w }) => { + await new Promise(f => setTimeout(f, 1500)); + console.log('\\n%%test-done'); + }); + `, + 'b.spec.js': ` + const test = require('./helper'); + test('test2', async ({ w }) => { + await new Promise(f => setTimeout(f, 3000)); + console.log('\\n%%test-done'); + }); + `, + }, { workers: 2 }); + expect(result.exitCode).toBe(0); + expect(result.passed).toBe(2); + expect(stripAnsi(result.output).split('\n').filter(line => line.startsWith('%%'))).toEqual([ + '%%worker setup', + '%%worker setup', + '%%test-done', + '%%worker teardown', + '%%test-done', + '%%worker teardown', + ]); +});