From 48516ed7ed2a84bc92c442818020d81ccc8df349 Mon Sep 17 00:00:00 2001 From: Pavel Feldman Date: Fri, 27 Mar 2020 15:18:34 -0700 Subject: [PATCH] feat(websocket): use proxy web socket on chromium (#1573) --- src/chromium/crConnection.ts | 18 ++-- src/firefox/ffConnection.ts | 16 ++-- src/platform.ts | 6 +- src/server/chromium.ts | 165 +++++++++++++++++++++++++++++------ src/server/pipeTransport.ts | 6 +- src/server/webkit.ts | 12 ++- src/transport.ts | 30 ++++--- src/webkit/wkConnection.ts | 6 +- 8 files changed, 189 insertions(+), 70 deletions(-) diff --git a/src/chromium/crConnection.ts b/src/chromium/crConnection.ts index 5a7e89024b..6a4f017edd 100644 --- a/src/chromium/crConnection.ts +++ b/src/chromium/crConnection.ts @@ -17,7 +17,7 @@ import { assert } from '../helper'; import * as platform from '../platform'; -import { ConnectionTransport, ProtocolMessage } from '../transport'; +import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport'; import { Protocol } from './protocol'; export const ConnectionEvents = { @@ -55,9 +55,9 @@ export class CRConnection extends platform.EventEmitter { return this._sessions.get(sessionId) || null; } - _rawSend(sessionId: string, message: ProtocolMessage): number { + _rawSend(sessionId: string, method: string, params: any): number { const id = ++this._lastId; - message.id = id; + const message: ProtocolRequest = { id, method, params }; if (sessionId) message.sessionId = sessionId; if (this._debugProtocol.enabled) @@ -66,9 +66,9 @@ export class CRConnection extends platform.EventEmitter { return id; } - async _onMessage(message: ProtocolMessage) { + async _onMessage(message: ProtocolResponse) { if (this._debugProtocol.enabled) - this._debugProtocol('◀ RECV ' + rewriteInjectedScriptEvaluationLog(message)); + this._debugProtocol('◀ RECV ' + JSON.stringify(message)); if (message.id === kBrowserCloseMessageId) return; if (message.method === 'Target.attachedToTarget') { @@ -150,13 +150,13 @@ export class CRSession extends platform.EventEmitter { ): Promise { if (!this._connection) throw new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`); - const id = this._connection._rawSend(this._sessionId, { method, params }); + const id = this._connection._rawSend(this._sessionId, method, params); return new Promise((resolve, reject) => { this._callbacks.set(id, {resolve, reject, error: new Error(), method}); }); } - _onMessage(object: ProtocolMessage) { + _onMessage(object: ProtocolResponse) { if (object.id && this._callbacks.has(object.id)) { const callback = this._callbacks.get(object.id)!; this._callbacks.delete(object.id); @@ -166,7 +166,7 @@ export class CRSession extends platform.EventEmitter { callback.resolve(object.result); } else { assert(!object.id); - Promise.resolve().then(() => this.emit(object.method, object.params)); + Promise.resolve().then(() => this.emit(object.method!, object.params)); } } @@ -200,7 +200,7 @@ function rewriteError(error: Error, message: string): Error { return error; } -function rewriteInjectedScriptEvaluationLog(message: ProtocolMessage): string { +function rewriteInjectedScriptEvaluationLog(message: ProtocolRequest): string { // Injected script is very long and clutters protocol logs. // To increase development velocity, we skip replace it with short description in the log. if (message.method === 'Runtime.evaluate' && message.params && message.params.expression && message.params.expression.includes('src/injected/injected.ts')) diff --git a/src/firefox/ffConnection.ts b/src/firefox/ffConnection.ts index b571fa1d15..5cbf4df4a5 100644 --- a/src/firefox/ffConnection.ts +++ b/src/firefox/ffConnection.ts @@ -17,7 +17,7 @@ import {assert} from '../helper'; import * as platform from '../platform'; -import { ConnectionTransport, ProtocolMessage } from '../transport'; +import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport'; import { Protocol } from './protocol'; export const ConnectionEvents = { @@ -76,15 +76,15 @@ export class FFConnection extends platform.EventEmitter { return ++this._lastId; } - _rawSend(message: ProtocolMessage) { + _rawSend(message: ProtocolRequest) { if (this._debugProtocol.enabled) this._debugProtocol('SEND ► ' + rewriteInjectedScriptEvaluationLog(message)); this._transport.send(message); } - async _onMessage(message: ProtocolMessage) { + async _onMessage(message: ProtocolResponse) { if (this._debugProtocol.enabled) - this._debugProtocol('◀ RECV ' + message); + this._debugProtocol('◀ RECV ' + JSON.stringify(message)); if (message.id === kBrowserCloseMessageId) return; if (message.sessionId) { @@ -102,7 +102,7 @@ export class FFConnection extends platform.EventEmitter { callback.resolve(message.result); } } else { - Promise.resolve().then(() => this.emit(message.method, message.params)); + Promise.resolve().then(() => this.emit(message.method!, message.params)); } } @@ -176,7 +176,7 @@ export class FFSession extends platform.EventEmitter { }); } - dispatchMessage(object: ProtocolMessage) { + dispatchMessage(object: ProtocolResponse) { if (object.id && this._callbacks.has(object.id)) { const callback = this._callbacks.get(object.id)!; this._callbacks.delete(object.id); @@ -186,7 +186,7 @@ export class FFSession extends platform.EventEmitter { callback.resolve(object.result); } else { assert(!object.id); - Promise.resolve().then(() => this.emit(object.method, object.params)); + Promise.resolve().then(() => this.emit(object.method!, object.params)); } } @@ -212,7 +212,7 @@ function rewriteError(error: Error, message: string): Error { return error; } -function rewriteInjectedScriptEvaluationLog(message: ProtocolMessage): string { +function rewriteInjectedScriptEvaluationLog(message: ProtocolRequest): string { // Injected script is very long and clutters protocol logs. // To increase development velocity, we skip replace it with short description in the log. if (message.method === 'Runtime.evaluate' && message.params && message.params.expression && message.params.expression.includes('src/injected/injected.ts')) diff --git a/src/platform.ts b/src/platform.ts index 129f16e879..b3c2451214 100644 --- a/src/platform.ts +++ b/src/platform.ts @@ -29,7 +29,7 @@ import * as NodeWebSocket from 'ws'; import * as crypto from 'crypto'; import { assert, helper } from './helper'; -import { ConnectionTransport, ProtocolMessage } from './transport'; +import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from './transport'; export const isNode = typeof process === 'object' && !!process && typeof process.versions === 'object' && !!process.versions && !!process.versions.node; @@ -323,7 +323,7 @@ export async function connectToWebsocket(url: string, onopen: (transport: Con class WebSocketTransport implements ConnectionTransport { _ws: WebSocket; - onmessage?: (message: ProtocolMessage) => void; + onmessage?: (message: ProtocolResponse) => void; onclose?: () => void; constructor(url: string) { @@ -352,7 +352,7 @@ class WebSocketTransport implements ConnectionTransport { this._ws.addEventListener('error', () => {}); } - send(message: ProtocolMessage) { + send(message: ProtocolRequest) { this._ws.send(JSON.stringify(message)); } diff --git a/src/server/chromium.ts b/src/server/chromium.ts index f14ca75e88..07d4a2d791 100644 --- a/src/server/chromium.ts +++ b/src/server/chromium.ts @@ -21,15 +21,15 @@ import * as path from 'path'; import { helper } from '../helper'; import { CRBrowser } from '../chromium/crBrowser'; import * as platform from '../platform'; -import { TimeoutError } from '../errors'; -import { launchProcess, waitForLine } from '../server/processLauncher'; +import * as ws from 'ws'; +import { launchProcess } from '../server/processLauncher'; import { kBrowserCloseMessageId } from '../chromium/crConnection'; import { PipeTransport } from './pipeTransport'; import { LaunchOptions, BrowserArgOptions, BrowserType } from './browserType'; import { ConnectOptions, LaunchType } from '../browser'; import { BrowserServer } from './browserServer'; import { Events } from '../events'; -import { ConnectionTransport } from '../transport'; +import { ConnectionTransport, ProtocolRequest } from '../transport'; import { BrowserContext } from '../browserContext'; export class Chromium implements BrowserType { @@ -76,7 +76,6 @@ export class Chromium implements BrowserType { handleSIGINT = true, handleSIGTERM = true, handleSIGHUP = true, - timeout = 30000 } = options; let temporaryUserDataDir: string | null = null; @@ -87,9 +86,9 @@ export class Chromium implements BrowserType { const chromeArguments = []; if (!ignoreDefaultArgs) - chromeArguments.push(...this._defaultArgs(options, launchType, userDataDir!, port || 0)); + chromeArguments.push(...this._defaultArgs(options, launchType, userDataDir!)); else if (Array.isArray(ignoreDefaultArgs)) - chromeArguments.push(...this._defaultArgs(options, launchType, userDataDir!, port || 0).filter(arg => ignoreDefaultArgs.indexOf(arg) === -1)); + chromeArguments.push(...this._defaultArgs(options, launchType, userDataDir!).filter(arg => ignoreDefaultArgs.indexOf(arg) === -1)); else chromeArguments.push(...args); @@ -105,7 +104,7 @@ export class Chromium implements BrowserType { handleSIGTERM, handleSIGHUP, dumpio, - pipe: launchType !== 'server', + pipe: true, tempDir: temporaryUserDataDir || undefined, attemptToGracefullyClose: async () => { if (!browserServer) @@ -113,9 +112,9 @@ export class Chromium implements BrowserType { // We try to gracefully close to prevent crash reporting and core dumps. // Note that it's fine to reuse the pipe transport, since // our connection ignores kBrowserCloseMessageId. - const t = transport || await platform.connectToWebsocket(browserWSEndpoint!, async transport => transport); - const message = { method: 'Browser.close', id: kBrowserCloseMessageId }; - await t.send(message); + const t = transport!; + const message: ProtocolRequest = { method: 'Browser.close', id: kBrowserCloseMessageId, params: {} }; + t.send(message); }, onkill: (exitCode, signal) => { if (browserServer) @@ -123,20 +122,10 @@ export class Chromium implements BrowserType { }, }); - let transport: PipeTransport | undefined; - let browserWSEndpoint: string | undefined; - if (launchType === 'server') { - const timeoutError = new TimeoutError(`Timed out after ${timeout} ms while trying to connect to Chromium!`); - const match = await waitForLine(launchedProcess, launchedProcess.stderr, /^DevTools listening on (ws:\/\/.*)$/, timeout, timeoutError); - browserWSEndpoint = match[1]; - browserServer = new BrowserServer(launchedProcess, gracefullyClose, browserWSEndpoint); - return { browserServer }; - } else { - // For local launch scenario close will terminate the browser process. - transport = new PipeTransport(launchedProcess.stdio[3] as NodeJS.WritableStream, launchedProcess.stdio[4] as NodeJS.ReadableStream, () => browserServer!.close()); - browserServer = new BrowserServer(launchedProcess, gracefullyClose, null); - return { browserServer, transport }; - } + let transport: PipeTransport | undefined = undefined; + transport = new PipeTransport(launchedProcess.stdio[3] as NodeJS.WritableStream, launchedProcess.stdio[4] as NodeJS.ReadableStream, () => browserServer!.close()); + browserServer = new BrowserServer(launchedProcess, gracefullyClose, launchType === 'server' ? wrapTransportWithWebSocket(transport, port || 0) : null); + return { browserServer, transport }; } async connect(options: ConnectOptions): Promise { @@ -145,7 +134,7 @@ export class Chromium implements BrowserType { }); } - private _defaultArgs(options: BrowserArgOptions = {}, launchType: LaunchType, userDataDir: string, port: number): string[] { + private _defaultArgs(options: BrowserArgOptions = {}, launchType: LaunchType, userDataDir: string): string[] { const { devtools = false, headless = !devtools, @@ -161,7 +150,7 @@ export class Chromium implements BrowserType { const chromeArguments = [...DEFAULT_ARGS]; chromeArguments.push(`--user-data-dir=${userDataDir}`); - chromeArguments.push(launchType === 'server' ? `--remote-debugging-port=${port || 0}` : '--remote-debugging-pipe'); + chromeArguments.push('--remote-debugging-pipe'); if (devtools) chromeArguments.push('--auto-open-devtools-for-tabs'); if (headless) { @@ -183,6 +172,130 @@ export class Chromium implements BrowserType { } } +function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number): string { + const server = new ws.Server({ port }); + const guid = platform.guid(); + + const awaitingBrowserTarget = new Map(); + const sessionToSocket = new Map(); + const socketToBrowserSession = new Map(); + const browserSessions = new Set(); + let lastSequenceNumber = 1; + + transport.onmessage = message => { + if (typeof message.id === 'number' && awaitingBrowserTarget.has(message.id)) { + const freshSocket = awaitingBrowserTarget.get(message.id)!; + awaitingBrowserTarget.delete(message.id); + + const sessionId = message.result.sessionId; + if (freshSocket.readyState !== ws.CLOSED && freshSocket.readyState !== ws.CLOSING) { + sessionToSocket.set(sessionId, freshSocket); + const { queue } = socketToBrowserSession.get(freshSocket)!; + for (const item of queue!) { + item.sessionId = sessionId; + transport.send(item); + } + socketToBrowserSession.set(freshSocket, { sessionId }); + browserSessions.add(sessionId); + } else { + transport.send({ + id: ++lastSequenceNumber, + method: 'Target.detachFromTarget', + params: { sessionId } + }); + socketToBrowserSession.delete(freshSocket); + } + return; + } + + // At this point everything we care about has sessionId. + if (!message.sessionId) + return; + + const socket = sessionToSocket.get(message.sessionId); + if (socket && socket.readyState !== ws.CLOSING) { + if (message.method === 'Target.attachedToTarget') + sessionToSocket.set(message.params.sessionId, socket); + if (message.method === 'Target.detachedFromTarget') + sessionToSocket.delete(message.params.sessionId); + // Strip session ids from the browser sessions. + if (browserSessions.has(message.sessionId)) + delete message.sessionId; + socket.send(JSON.stringify(message)); + } + }; + + transport.onclose = () => { + for (const socket of socketToBrowserSession.keys()) { + socket.removeListener('close', (socket as any).__closeListener); + socket.close(undefined, 'Browser disconnected'); + } + server.close(); + transport.onmessage = undefined; + transport.onclose = undefined; + }; + + server.on('connection', (socket: ws, req) => { + if (req.url !== '/' + guid) { + socket.close(); + return; + } + socketToBrowserSession.set(socket, { queue: [] }); + + transport.send({ + id: ++lastSequenceNumber, + method: 'Target.attachToBrowserTarget', + params: {} + }); + awaitingBrowserTarget.set(lastSequenceNumber, socket); + + socket.on('message', (message: string) => { + const parsedMessage = JSON.parse(Buffer.from(message).toString()) as ProtocolRequest; + if (parsedMessage.method.startsWith('Backend')) { + // Add backend domain handler here. + return; + } + + // If message has sessionId, pass through. + if (parsedMessage.sessionId) { + transport.send(parsedMessage); + return; + } + + // If message has no sessionId, look it up. + const session = socketToBrowserSession.get(socket)!; + if (session.sessionId) { + // We have it, use it. + parsedMessage.sessionId = session.sessionId; + transport.send(parsedMessage); + return; + } + // Pending session id, queue the message. + session.queue!.push(parsedMessage); + }); + + socket.on('close', (socket as any).__closeListener = () => { + const session = socketToBrowserSession.get(socket); + if (!session || !session.sessionId) + return; + sessionToSocket.delete(session.sessionId); + browserSessions.delete(session.sessionId); + socketToBrowserSession.delete(socket); + transport.send({ + id: ++lastSequenceNumber, + method: 'Target.detachFromTarget', + params: { sessionId: session.sessionId } + }); + }); + }); + + const address = server.address(); + if (typeof address === 'string') + return address + '/' + guid; + return 'ws://127.0.0.1:' + address.port + '/' + guid; +} + + const mkdtempAsync = platform.promisify(fs.mkdtemp); const CHROMIUM_PROFILE_PATH = path.join(os.tmpdir(), 'playwright_dev_profile-'); diff --git a/src/server/pipeTransport.ts b/src/server/pipeTransport.ts index 01c94bfa38..28f68949c1 100644 --- a/src/server/pipeTransport.ts +++ b/src/server/pipeTransport.ts @@ -16,7 +16,7 @@ */ import { debugError, helper, RegisteredListener } from '../helper'; -import { ConnectionTransport, ProtocolMessage } from '../transport'; +import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport'; import { makeWaitForNextTask } from '../platform'; export class PipeTransport implements ConnectionTransport { @@ -26,7 +26,7 @@ export class PipeTransport implements ConnectionTransport { private _waitForNextTask = makeWaitForNextTask(); private readonly _closeCallback: () => void; - onmessage?: (message: ProtocolMessage) => void; + onmessage?: (message: ProtocolResponse) => void; onclose?: () => void; constructor(pipeWrite: NodeJS.WritableStream, pipeRead: NodeJS.ReadableStream, closeCallback: () => void) { @@ -46,7 +46,7 @@ export class PipeTransport implements ConnectionTransport { this.onclose = undefined; } - send(message: ProtocolMessage) { + send(message: ProtocolRequest) { this._pipeWrite!.write(JSON.stringify(message)); this._pipeWrite!.write('\0'); } diff --git a/src/server/webkit.ts b/src/server/webkit.ts index 700e774155..ac0ba1e4b3 100644 --- a/src/server/webkit.ts +++ b/src/server/webkit.ts @@ -96,8 +96,6 @@ export class WebKit implements BrowserType { if (!webkitExecutable) throw new Error(`No executable path is specified.`); - let transport: ConnectionTransport | undefined = undefined; - let browserServer: BrowserServer | undefined = undefined; const { launchedProcess, gracefullyClose } = await launchProcess({ executablePath: webkitExecutable, args: webkitArguments, @@ -123,10 +121,10 @@ export class WebKit implements BrowserType { }); // For local launch scenario close will terminate the browser process. + let transport: ConnectionTransport | undefined = undefined; + let browserServer: BrowserServer | undefined = undefined; transport = new PipeTransport(launchedProcess.stdio[3] as NodeJS.WritableStream, launchedProcess.stdio[4] as NodeJS.ReadableStream, () => browserServer!.close()); - browserServer = new BrowserServer(launchedProcess, gracefullyClose, launchType === 'server' ? await wrapTransportWithWebSocket(transport, port || 0) : null); - if (launchType === 'server') - return { browserServer }; + browserServer = new BrowserServer(launchedProcess, gracefullyClose, launchType === 'server' ? wrapTransportWithWebSocket(transport, port || 0) : null); return { browserServer, transport }; } @@ -182,7 +180,7 @@ class SequenceNumberMixer { } } -function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number) { +function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number): string { const server = new ws.Server({ port }); const guid = platform.guid(); const idMixer = new SequenceNumberMixer<{id: number, socket: ws}>(); @@ -202,7 +200,7 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number return; const { id, socket } = value; - if (!socket || socket.readyState === ws.CLOSING) { + if (socket.readyState === ws.CLOSING) { if (pendingBrowserContextCreations.has(id)) { transport.send({ id: ++SequenceNumberMixer._lastSequenceNumber, diff --git a/src/transport.ts b/src/transport.ts index eb4ee484da..315ca41837 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -15,20 +15,28 @@ * limitations under the License. */ -export type ProtocolMessage = { - id?: number; +export type ProtocolRequest = { + id: number; method: string; - params?: any; + params: any; + sessionId?: string; + pageProxyId?: string; +}; + +export type ProtocolResponse = { + id?: number; + method?: string; sessionId?: string; error?: { message: string; data: any; }; + params?: any; result?: any; pageProxyId?: string; }; export interface ConnectionTransport { - send(s: ProtocolMessage): void; + send(s: ProtocolRequest): void; close(): void; // Note: calling close is expected to issue onclose at some point. - onmessage?: (message: ProtocolMessage) => void, + onmessage?: (message: ProtocolResponse) => void, onclose?: () => void, } @@ -36,7 +44,7 @@ export class SlowMoTransport { private readonly _delay: number; private readonly _delegate: ConnectionTransport; - onmessage?: (message: ProtocolMessage) => void; + onmessage?: (message: ProtocolResponse) => void; onclose?: () => void; static wrap(transport: ConnectionTransport, delay?: number): ConnectionTransport { @@ -50,7 +58,7 @@ export class SlowMoTransport { this._delegate.onclose = this._onClose.bind(this); } - private _onmessage(message: ProtocolMessage) { + private _onmessage(message: ProtocolResponse) { if (this.onmessage) this.onmessage(message); } @@ -62,7 +70,7 @@ export class SlowMoTransport { this._delegate.onclose = undefined; } - send(s: ProtocolMessage) { + send(s: ProtocolRequest) { setTimeout(() => { if (this._delegate.onmessage) this._delegate.send(s); @@ -78,14 +86,14 @@ export class DeferWriteTransport implements ConnectionTransport { private _delegate: ConnectionTransport; private _readPromise: Promise; - onmessage?: (message: ProtocolMessage) => void; + onmessage?: (message: ProtocolResponse) => void; onclose?: () => void; constructor(transport: ConnectionTransport) { this._delegate = transport; let callback: () => void; this._readPromise = new Promise(f => callback = f); - this._delegate.onmessage = (s: ProtocolMessage) => { + this._delegate.onmessage = (s: ProtocolResponse) => { callback(); if (this.onmessage) this.onmessage(s); @@ -96,7 +104,7 @@ export class DeferWriteTransport implements ConnectionTransport { }; } - async send(s: ProtocolMessage) { + async send(s: ProtocolRequest) { await this._readPromise; this._delegate.send(s); } diff --git a/src/webkit/wkConnection.ts b/src/webkit/wkConnection.ts index 6326171ed1..a4c78cd37e 100644 --- a/src/webkit/wkConnection.ts +++ b/src/webkit/wkConnection.ts @@ -17,7 +17,7 @@ import { assert } from '../helper'; import * as platform from '../platform'; -import { ConnectionTransport, ProtocolMessage } from '../transport'; +import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport'; import { Protocol } from './protocol'; // WKPlaywright uses this special id to issue Browser.close command which we @@ -53,13 +53,13 @@ export class WKConnection { return ++this._lastId; } - rawSend(message: ProtocolMessage) { + rawSend(message: ProtocolRequest) { if (this._debugProtocol.enabled) this._debugProtocol('SEND ► ' + rewriteInjectedScriptEvaluationLog(message)); this._transport.send(message); } - private _dispatchMessage(message: ProtocolMessage) { + private _dispatchMessage(message: ProtocolResponse) { if (this._debugProtocol.enabled) this._debugProtocol('◀ RECV ' + message); if (message.id === kBrowserCloseMessageId)