diff --git a/packages/playwright-test/src/runner/dispatcher.ts b/packages/playwright-test/src/runner/dispatcher.ts index 42b8a2fa55..76ad8a0819 100644 --- a/packages/playwright-test/src/runner/dispatcher.ts +++ b/packages/playwright-test/src/runner/dispatcher.ts @@ -136,7 +136,21 @@ export class Dispatcher { } // 3. Run the job. - await this._runJob(worker, job); + const result = await this._runJob(worker, job); + this._updateCounterForWorkerHash(job.workerHash, -1); + + // 4. When worker encounters error, we stop it and create a new one. + // We also do not keep the worker alive if it cannot serve any more jobs. + if (result.didFail) + void worker.stop(true /* didFail */); + else if (this._isWorkerRedundant(worker)) + void worker.stop(); + + // 5. Possibly schedule a new job with leftover tests and/or retries. + if (!this._isStopped && result.newJob) { + this._queue.unshift(result.newJob); + this._updateCounterForWorkerHash(job.workerHash, +1); + } } private _checkFinished() { @@ -168,11 +182,15 @@ export class Dispatcher { return workersWithSameHash > this._queuedOrRunningHashCount.get(worker.hash())!; } + private _updateCounterForWorkerHash(hash: string, delta: number) { + this._queuedOrRunningHashCount.set(hash, delta + (this._queuedOrRunningHashCount.get(hash) || 0)); + } + async run(testGroups: TestGroup[], extraEnvByProjectId: EnvByProjectId) { this._extraEnvByProjectId = extraEnvByProjectId; this._queue = testGroups; for (const group of testGroups) { - this._queuedOrRunningHashCount.set(group.workerHash, 1 + (this._queuedOrRunningHashCount.get(group.workerHash) || 0)); + this._updateCounterForWorkerHash(group.workerHash, +1); for (const test of group.tests) this._testById.set(test.id, { test, resultByWorkerIndex: new Map() }); } @@ -185,12 +203,12 @@ export class Dispatcher { for (let i = 0; i < this._workerSlots.length; i++) void this._scheduleJob(); this._checkFinished(); - // 3. More jobs are scheduled when the worker becomes free, or a new job is added. + // 3. More jobs are scheduled when the worker becomes free. // 4. Wait for all jobs to finish. await this._finished; } - async _runJob(worker: WorkerHost, testGroup: TestGroup) { + async _runJob(worker: WorkerHost, testGroup: TestGroup): Promise<{ newJob?: TestGroup, didFail: boolean }> { const runPayload: RunPayload = { file: testGroup.requireFile, entries: testGroup.tests.map(test => { @@ -199,8 +217,7 @@ export class Dispatcher { }; worker.runTestGroup(runPayload); - let doneCallback = () => {}; - const result = new Promise(f => doneCallback = f); + const result = new ManualPromise<{ newJob?: TestGroup, didFail: boolean }>(); const doneWithJob = () => { worker.removeListener('testBegin', onTestBegin); worker.removeListener('testEnd', onTestEnd); @@ -209,7 +226,6 @@ export class Dispatcher { worker.removeListener('attach', onAttach); worker.removeListener('done', onDone); worker.removeListener('exit', onExit); - doneCallback(); }; const remainingByTestId = new Map(testGroup.tests.map(e => [e.id, e])); @@ -320,7 +336,6 @@ export class Dispatcher { worker.on('attach', onAttach); const onDone = (params: DonePayload & { unexpectedExitError?: TestError }) => { - this._queuedOrRunningHashCount.set(worker.hash(), this._queuedOrRunningHashCount.get(worker.hash())! - 1); let remaining = [...remainingByTestId.values()]; // We won't file remaining if: @@ -328,15 +343,11 @@ export class Dispatcher { // - we are here not because something failed // - no unrecoverable worker error if (!remaining.length && !failedTestIds.size && !params.fatalErrors.length && !params.skipTestsDueToSetupFailure.length && !params.fatalUnknownTestIds && !params.unexpectedExitError) { - if (this._isWorkerRedundant(worker)) - void worker.stop(); doneWithJob(); + result.resolve({ didFail: false }); return; } - // When worker encounters error, we will stop it and create a new one. - void worker.stop(true /* didFail */); - const massSkipTestsFromRemaining = (testIds: Set, errors: TestError[], onlyStartedTests?: boolean) => { remaining = remaining.filter(test => { if (!testIds.has(test.id)) @@ -367,7 +378,7 @@ export class Dispatcher { // We had fatal errors after all tests have passed - most likely in some teardown. // Let's just fail the test run. this._hasWorkerErrors = true; - for (const error of params.fatalErrors) + for (const error of errors) this._reporter.onError(error); } }; @@ -435,19 +446,14 @@ export class Dispatcher { for (const testId of retryCandidates) { const pair = this._testById.get(testId)!; - if (!this._isStopped && pair.test.results.length < pair.test.retries + 1) + if (pair.test.results.length < pair.test.retries + 1) remaining.push(pair.test); } - if (remaining.length) { - this._queue.unshift({ ...testGroup, tests: remaining }); - this._queuedOrRunningHashCount.set(testGroup.workerHash, this._queuedOrRunningHashCount.get(testGroup.workerHash)! + 1); - // Perhaps we can immediately start the new job if there is a worker available? - void this._scheduleJob(); - } - - // This job is over, we just scheduled another one. + // This job is over, we will schedule another one. doneWithJob(); + const newJob = remaining.length ? { ...testGroup, tests: remaining } : undefined; + result.resolve({ didFail: true, newJob }); }; worker.on('done', onDone);