From fccc14cdfaa1389af4d56f1cf4fdb62a2d248558 Mon Sep 17 00:00:00 2001 From: Yury Semikhatsky Date: Wed, 6 Apr 2022 18:03:27 -0700 Subject: [PATCH] feat: github agent, browser worker (#13336) --- packages/playwright-core/src/grid/DEPS.list | 1 + .../src/grid/githubGridFactory.ts | 67 +++++++++++++++++ .../playwright-core/src/grid/gridAgent.ts | 10 ++- .../src/grid/gridBrowserWorker.ts | 35 +++++++++ .../playwright-core/src/grid/gridClient.ts | 2 +- .../playwright-core/src/grid/gridServer.ts | 72 +++++++++++++------ .../playwright-core/src/grid/gridWorker.ts | 2 +- 7 files changed, 163 insertions(+), 26 deletions(-) create mode 100644 packages/playwright-core/src/grid/githubGridFactory.ts create mode 100644 packages/playwright-core/src/grid/gridBrowserWorker.ts diff --git a/packages/playwright-core/src/grid/DEPS.list b/packages/playwright-core/src/grid/DEPS.list index 4874d0e91a..2ad866e7b1 100644 --- a/packages/playwright-core/src/grid/DEPS.list +++ b/packages/playwright-core/src/grid/DEPS.list @@ -1,5 +1,6 @@ [*] ../client/ ../dispatchers/ +../remote/ ../server/ ../utils/ diff --git a/packages/playwright-core/src/grid/githubGridFactory.ts b/packages/playwright-core/src/grid/githubGridFactory.ts new file mode 100644 index 0000000000..3e6b8b3904 --- /dev/null +++ b/packages/playwright-core/src/grid/githubGridFactory.ts @@ -0,0 +1,67 @@ +/** + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the 'License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type { GridAgentLaunchOptions, GridFactory } from './gridServer'; +import https from 'https'; +import debug from 'debug'; + +const repoName = process.env.GITHUB_AGENT_REPO; +if (!repoName) + throw new Error('GITHUB_AGENT_REPO is not specified.'); + +const repoAccessToken = process.env.GITHUB_AGENT_REPO_ACCESS_TOKEN; +if (!repoAccessToken) + throw new Error('GITHUB_AGENT_REPO_ACCESS_TOKEN is not specified.'); + +const log = debug(`pw:grid:server`); + +const githubFactory: GridFactory = { + name: 'Agents hosted on Github', + capacity: 10, + launchTimeout: 30000, + retireTimeout: 600000, + launch: async (options: GridAgentLaunchOptions) => { + await createWorkflow(options); + }, +}; + +async function createWorkflow(inputs: GridAgentLaunchOptions): Promise { + return new Promise(fulfill => { + log(`triggering workflow ${JSON.stringify(inputs)}`); + const req = https.request(`https://api.github.com/repos/${repoName}/actions/workflows/agent.yml/dispatches`, { + method: 'POST', + headers: { + 'User-Agent': 'request', + 'Accept': 'application/vnd.github.v3+json', + 'Authorization': `token ${repoAccessToken}`, + } + }, response => { + log(`workflow ${inputs.agentId} response: ${response.statusCode} ${response.statusMessage}`); + const success = !!response.statusCode && 200 <= response.statusCode && response.statusCode < 300; + fulfill(success); + }); + req.on('error', e => { + log(`failed to create workflow ${inputs.agentId}`); + fulfill(false); + }); + req.end(JSON.stringify({ + 'ref': 'refs/heads/main', + inputs + })); + }); +} + +export default githubFactory; diff --git a/packages/playwright-core/src/grid/gridAgent.ts b/packages/playwright-core/src/grid/gridAgent.ts index 7bce0f3e45..62774ce5f6 100644 --- a/packages/playwright-core/src/grid/gridAgent.ts +++ b/packages/playwright-core/src/grid/gridAgent.ts @@ -26,9 +26,13 @@ export function launchGridAgent(agentId: string, gridURL: string) { params.set('pwVersion', getPlaywrightVersion(true /* majorMinorOnly */)); params.set('agentId', agentId); const ws = new WebSocket(gridURL.replace('http://', 'ws://') + `/registerAgent?` + params.toString()); - ws.on('message', (workerId: string) => { - log('Worker requested ' + workerId); - fork(require.resolve('./gridWorker.js'), [gridURL, agentId, workerId], { detached: true }); + ws.on('message', (message: string) => { + log('worker requested ' + message); + const { workerId, browserAlias } = JSON.parse(message); + if (browserAlias) + fork(require.resolve('./gridBrowserWorker.js'), [gridURL, agentId, workerId, browserAlias], { detached: true }); + else + fork(require.resolve('./gridWorker.js'), [gridURL, agentId, workerId], { detached: true }); }); ws.on('close', () => process.exit(0)); } diff --git a/packages/playwright-core/src/grid/gridBrowserWorker.ts b/packages/playwright-core/src/grid/gridBrowserWorker.ts new file mode 100644 index 0000000000..93bbbbf676 --- /dev/null +++ b/packages/playwright-core/src/grid/gridBrowserWorker.ts @@ -0,0 +1,35 @@ +/** + * Copyright (c) Microsoft Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import debug from 'debug'; +import WebSocket from 'ws'; +import { PlaywrightConnection } from '../remote/playwrightConnection'; +import { gracefullyCloseAll } from '../utils/processLauncher'; + +function launchGridBrowserWorker(gridURL: string, agentId: string, workerId: string, browserAlias: string) { + const log = debug(`pw:grid:worker:${workerId}`); + log('created'); + const ws = new WebSocket(gridURL.replace('http://', 'ws://') + `/registerWorker?agentId=${agentId}&workerId=${workerId}`); + new PlaywrightConnection(ws, true, browserAlias, undefined, log, async () => { + log('exiting process'); + setTimeout(() => process.exit(0), 30000); + // Meanwhile, try to gracefully close all browsers. + await gracefullyCloseAll(); + process.exit(0); + }); +} + +launchGridBrowserWorker(process.argv[2], process.argv[3], process.argv[4], process.argv[5]); diff --git a/packages/playwright-core/src/grid/gridClient.ts b/packages/playwright-core/src/grid/gridClient.ts index ad29c2336a..a5c535ca01 100644 --- a/packages/playwright-core/src/grid/gridClient.ts +++ b/packages/playwright-core/src/grid/gridClient.ts @@ -28,7 +28,7 @@ export class GridClient { params.set('pwVersion', getPlaywrightVersion(true /* majorMinorOnly */)); const ws = new WebSocket(`${gridURL}/claimWorker?` + params.toString()); const errorText = await Promise.race([ - new Promise(f => ws.once('message', () => f(undefined))), + new Promise(f => ws.once('open', () => f(undefined))), new Promise(f => ws.once('close', (code, reason) => f(reason))), ]); if (errorText) diff --git a/packages/playwright-core/src/grid/gridServer.ts b/packages/playwright-core/src/grid/gridServer.ts index 05644167ab..4b6ed294f7 100644 --- a/packages/playwright-core/src/grid/gridServer.ts +++ b/packages/playwright-core/src/grid/gridServer.ts @@ -15,13 +15,12 @@ */ import debug from 'debug'; -import assert from 'assert'; import { EventEmitter } from 'events'; import { URL } from 'url'; import type { Server as WebSocketServer } from 'ws'; import type WebSocket from 'ws'; import { HttpServer } from '../utils/httpServer'; -import { createGuid, getPlaywrightVersion } from '../utils/utils'; +import { assert, createGuid, getPlaywrightVersion } from '../utils/utils'; export type GridAgentLaunchOptions = { agentId: string, @@ -58,18 +57,35 @@ const WSErrors = { AGENT_MANUALLY_STOPPED: { code: 1000, reason: 'Grid agent was manually stopped' }, }; + +type GridWorkerParams = { + browserAlias?: string; + headless?: boolean; +}; + class GridWorker extends EventEmitter { readonly workerId = createGuid(); + readonly params: GridWorkerParams; private _workerSocket: WebSocket | undefined; private _clientSocket: WebSocket; private _log: debug.Debugger; + private _bufferedMessages: WebSocket.RawData[] = []; - constructor(clientSocket: WebSocket) { + constructor(clientSocket: WebSocket, params: GridWorkerParams) { super(); - this._log = debug(`pw:grid:worker${this.workerId}`); + this._log = debug(`pw:grid:worker:${this.workerId}`); this._clientSocket = clientSocket; + this.params = params; clientSocket.on('close', (code: number, reason: string) => this.closeWorker(WSErrors.NO_ERROR)); clientSocket.on('error', (error: Error) => this.closeWorker(WSErrors.CLIENT_SOCKET_ERROR)); + // clientSocket.pause() would be preferrable but according to the docs " Some events can still be + // emitted after it is called, until all buffered data is consumed." + this._clientSocket.on('message', data => { + if (this._workerSocket) + this._workerSocket.send(data); + else + this._bufferedMessages.push(data); + }); } workerConnected(workerSocket: WebSocket) { @@ -77,13 +93,14 @@ class GridWorker extends EventEmitter { this._workerSocket = workerSocket; workerSocket.on('close', (code: number, reason: string) => this.closeWorker(WSErrors.NO_ERROR)); workerSocket.on('error', (error: Error) => this.closeWorker(WSErrors.WORKER_SOCKET_ERROR)); - this._clientSocket.on('message', data => workerSocket!.send(data)); workerSocket.on('message', data => this._clientSocket!.send(data)); - this._clientSocket.send('run'); + for (const data of this._bufferedMessages) + workerSocket.send(data); + this._bufferedMessages = []; } closeWorker(errorCode: ErrorCode) { - this._log('close'); + this._log(`close ${errorCode.reason}`); this._workerSocket?.close(errorCode.code, errorCode.reason); this._clientSocket.close(errorCode.code, errorCode.reason); this.emit('close'); @@ -111,7 +128,7 @@ class GridAgent extends EventEmitter { constructor(capacity = Infinity, creationTimeout = 5 * 60000, retireTimeout = 30000) { super(); this._capacity = capacity; - this._log = debug(`pw:grid:agent${this.agentId}`); + this._log = debug(`pw:grid:agent:${this.agentId}`); this.setStatus('created'); this._retireTimeout = retireTimeout; this._agentCreationTimeoutId = setTimeout(() => { @@ -132,10 +149,8 @@ class GridAgent extends EventEmitter { clearTimeout(this._agentCreationTimeoutId); this.setStatus('connected'); this._ws = ws; - for (const worker of this._workersWaitingForAgentConnected) { - this._log(`send worker id: ${worker.workerId}`); - ws.send(worker.workerId); - } + for (const worker of this._workersWaitingForAgentConnected) + this._sendStartWorkerMessage(worker); this._workersWaitingForAgentConnected.clear(); } @@ -143,12 +158,12 @@ class GridAgent extends EventEmitter { return this._workers.size < this._capacity; } - async createWorker(clientSocket: WebSocket) { + async createWorker(clientSocket: WebSocket, params: GridWorkerParams) { if (this._retireTimeoutId) clearTimeout(this._retireTimeoutId); if (this._ws) this.setStatus('connected'); - const worker = new GridWorker(clientSocket); + const worker = new GridWorker(clientSocket, params); this._log(`create worker: ${worker.workerId}`); this._workers.set(worker.workerId, worker); worker.on('close', () => { @@ -162,12 +177,10 @@ class GridAgent extends EventEmitter { this._retireTimeoutId = setTimeout(() => this.closeAgent(WSErrors.AGENT_RETIRED), this._retireTimeout); } }); - if (this._ws) { - this._log(`send worker id: ${worker.workerId}`); - this._ws.send(worker.workerId); - } else { + if (this._ws) + this._sendStartWorkerMessage(worker); + else this._workersWaitingForAgentConnected.add(worker); - } } workerConnected(workerId: string, ws: WebSocket) { @@ -185,6 +198,16 @@ class GridAgent extends EventEmitter { this._ws?.close(errorCode.code, errorCode.reason); this.emit('close'); } + + private _sendStartWorkerMessage(worker: GridWorker) { + const message = JSON.stringify({ + ...worker.params, + 'workerId': worker.workerId, + }); + this._log(`start worker message: ${message}`); + assert(this._ws); + this._ws.send(message); + } } export class GridServer { @@ -198,6 +221,7 @@ export class GridServer { constructor(factory: GridFactory, authToken: string = '') { this._log = debug(`pw:grid:server`); + this._log(`using factory ${factory.name}`); this._authToken = authToken || ''; this._server = new HttpServer(); this._factory = factory; @@ -240,17 +264,23 @@ export class GridServer { this._wsServer.on('connection', async (ws, request) => { if (request.url?.startsWith(this._securePath('/claimWorker'))) { const params = new URL('http://localhost/' + request.url).searchParams; - if (params.get('pwVersion') !== this._pwVersion && !process.env.PWTEST_UNSAFE_GRID_VERSION) { + const version = params.get('pwVersion'); + if (version !== this._pwVersion && !process.env.PWTEST_UNSAFE_GRID_VERSION) { + this._log(`version mismatch: ${version} !== ${this._pwVersion}`); ws.close(WSErrors.CLIENT_PLAYWRIGHT_VERSION_MISMATCH.code, WSErrors.CLIENT_PLAYWRIGHT_VERSION_MISMATCH.reason); return; } const agent = [...this._agents.values()].find(w => w.canCreateWorker()) || this._createAgent()?.agent; if (!agent) { + this._log(`failed to get agent`); ws.close(WSErrors.AGENT_CREATION_FAILED.code, WSErrors.AGENT_CREATION_FAILED.reason); return; } - agent.createWorker(ws); + agent.createWorker(ws, { + browserAlias: request.headers['x-playwright-browser'] as string | undefined, + headless: request.headers['x-playwright-headless'] !== '0', + }); return; } diff --git a/packages/playwright-core/src/grid/gridWorker.ts b/packages/playwright-core/src/grid/gridWorker.ts index dddf042a08..5e1e6b9f6f 100644 --- a/packages/playwright-core/src/grid/gridWorker.ts +++ b/packages/playwright-core/src/grid/gridWorker.ts @@ -23,7 +23,7 @@ import { gracefullyCloseAll } from '../utils/processLauncher'; import { SocksProxy } from '../utils/socksProxy'; function launchGridWorker(gridURL: string, agentId: string, workerId: string) { - const log = debug(`pw:grid:worker${workerId}`); + const log = debug(`pw:grid:worker:${workerId}`); log('created'); const ws = new WebSocket(gridURL.replace('http://', 'ws://') + `/registerWorker?agentId=${agentId}&workerId=${workerId}`); const dispatcherConnection = new DispatcherConnection();