feat(websocket): use proxy web socket on chromium (#1573)

This commit is contained in:
Pavel Feldman 2020-03-27 15:18:34 -07:00 committed by GitHub
parent 4e89939ece
commit 48516ed7ed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 189 additions and 70 deletions

View file

@ -17,7 +17,7 @@
import { assert } from '../helper';
import * as platform from '../platform';
import { ConnectionTransport, ProtocolMessage } from '../transport';
import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport';
import { Protocol } from './protocol';
export const ConnectionEvents = {
@ -55,9 +55,9 @@ export class CRConnection extends platform.EventEmitter {
return this._sessions.get(sessionId) || null;
}
_rawSend(sessionId: string, message: ProtocolMessage): number {
_rawSend(sessionId: string, method: string, params: any): number {
const id = ++this._lastId;
message.id = id;
const message: ProtocolRequest = { id, method, params };
if (sessionId)
message.sessionId = sessionId;
if (this._debugProtocol.enabled)
@ -66,9 +66,9 @@ export class CRConnection extends platform.EventEmitter {
return id;
}
async _onMessage(message: ProtocolMessage) {
async _onMessage(message: ProtocolResponse) {
if (this._debugProtocol.enabled)
this._debugProtocol('◀ RECV ' + rewriteInjectedScriptEvaluationLog(message));
this._debugProtocol('◀ RECV ' + JSON.stringify(message));
if (message.id === kBrowserCloseMessageId)
return;
if (message.method === 'Target.attachedToTarget') {
@ -150,13 +150,13 @@ export class CRSession extends platform.EventEmitter {
): Promise<Protocol.CommandReturnValues[T]> {
if (!this._connection)
throw new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`);
const id = this._connection._rawSend(this._sessionId, { method, params });
const id = this._connection._rawSend(this._sessionId, method, params);
return new Promise((resolve, reject) => {
this._callbacks.set(id, {resolve, reject, error: new Error(), method});
});
}
_onMessage(object: ProtocolMessage) {
_onMessage(object: ProtocolResponse) {
if (object.id && this._callbacks.has(object.id)) {
const callback = this._callbacks.get(object.id)!;
this._callbacks.delete(object.id);
@ -166,7 +166,7 @@ export class CRSession extends platform.EventEmitter {
callback.resolve(object.result);
} else {
assert(!object.id);
Promise.resolve().then(() => this.emit(object.method, object.params));
Promise.resolve().then(() => this.emit(object.method!, object.params));
}
}
@ -200,7 +200,7 @@ function rewriteError(error: Error, message: string): Error {
return error;
}
function rewriteInjectedScriptEvaluationLog(message: ProtocolMessage): string {
function rewriteInjectedScriptEvaluationLog(message: ProtocolRequest): string {
// Injected script is very long and clutters protocol logs.
// To increase development velocity, we skip replace it with short description in the log.
if (message.method === 'Runtime.evaluate' && message.params && message.params.expression && message.params.expression.includes('src/injected/injected.ts'))

View file

@ -17,7 +17,7 @@
import {assert} from '../helper';
import * as platform from '../platform';
import { ConnectionTransport, ProtocolMessage } from '../transport';
import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport';
import { Protocol } from './protocol';
export const ConnectionEvents = {
@ -76,15 +76,15 @@ export class FFConnection extends platform.EventEmitter {
return ++this._lastId;
}
_rawSend(message: ProtocolMessage) {
_rawSend(message: ProtocolRequest) {
if (this._debugProtocol.enabled)
this._debugProtocol('SEND ► ' + rewriteInjectedScriptEvaluationLog(message));
this._transport.send(message);
}
async _onMessage(message: ProtocolMessage) {
async _onMessage(message: ProtocolResponse) {
if (this._debugProtocol.enabled)
this._debugProtocol('◀ RECV ' + message);
this._debugProtocol('◀ RECV ' + JSON.stringify(message));
if (message.id === kBrowserCloseMessageId)
return;
if (message.sessionId) {
@ -102,7 +102,7 @@ export class FFConnection extends platform.EventEmitter {
callback.resolve(message.result);
}
} else {
Promise.resolve().then(() => this.emit(message.method, message.params));
Promise.resolve().then(() => this.emit(message.method!, message.params));
}
}
@ -176,7 +176,7 @@ export class FFSession extends platform.EventEmitter {
});
}
dispatchMessage(object: ProtocolMessage) {
dispatchMessage(object: ProtocolResponse) {
if (object.id && this._callbacks.has(object.id)) {
const callback = this._callbacks.get(object.id)!;
this._callbacks.delete(object.id);
@ -186,7 +186,7 @@ export class FFSession extends platform.EventEmitter {
callback.resolve(object.result);
} else {
assert(!object.id);
Promise.resolve().then(() => this.emit(object.method, object.params));
Promise.resolve().then(() => this.emit(object.method!, object.params));
}
}
@ -212,7 +212,7 @@ function rewriteError(error: Error, message: string): Error {
return error;
}
function rewriteInjectedScriptEvaluationLog(message: ProtocolMessage): string {
function rewriteInjectedScriptEvaluationLog(message: ProtocolRequest): string {
// Injected script is very long and clutters protocol logs.
// To increase development velocity, we skip replace it with short description in the log.
if (message.method === 'Runtime.evaluate' && message.params && message.params.expression && message.params.expression.includes('src/injected/injected.ts'))

View file

@ -29,7 +29,7 @@ import * as NodeWebSocket from 'ws';
import * as crypto from 'crypto';
import { assert, helper } from './helper';
import { ConnectionTransport, ProtocolMessage } from './transport';
import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from './transport';
export const isNode = typeof process === 'object' && !!process && typeof process.versions === 'object' && !!process.versions && !!process.versions.node;
@ -323,7 +323,7 @@ export async function connectToWebsocket<T>(url: string, onopen: (transport: Con
class WebSocketTransport implements ConnectionTransport {
_ws: WebSocket;
onmessage?: (message: ProtocolMessage) => void;
onmessage?: (message: ProtocolResponse) => void;
onclose?: () => void;
constructor(url: string) {
@ -352,7 +352,7 @@ class WebSocketTransport implements ConnectionTransport {
this._ws.addEventListener('error', () => {});
}
send(message: ProtocolMessage) {
send(message: ProtocolRequest) {
this._ws.send(JSON.stringify(message));
}

View file

@ -21,15 +21,15 @@ import * as path from 'path';
import { helper } from '../helper';
import { CRBrowser } from '../chromium/crBrowser';
import * as platform from '../platform';
import { TimeoutError } from '../errors';
import { launchProcess, waitForLine } from '../server/processLauncher';
import * as ws from 'ws';
import { launchProcess } from '../server/processLauncher';
import { kBrowserCloseMessageId } from '../chromium/crConnection';
import { PipeTransport } from './pipeTransport';
import { LaunchOptions, BrowserArgOptions, BrowserType } from './browserType';
import { ConnectOptions, LaunchType } from '../browser';
import { BrowserServer } from './browserServer';
import { Events } from '../events';
import { ConnectionTransport } from '../transport';
import { ConnectionTransport, ProtocolRequest } from '../transport';
import { BrowserContext } from '../browserContext';
export class Chromium implements BrowserType<CRBrowser> {
@ -76,7 +76,6 @@ export class Chromium implements BrowserType<CRBrowser> {
handleSIGINT = true,
handleSIGTERM = true,
handleSIGHUP = true,
timeout = 30000
} = options;
let temporaryUserDataDir: string | null = null;
@ -87,9 +86,9 @@ export class Chromium implements BrowserType<CRBrowser> {
const chromeArguments = [];
if (!ignoreDefaultArgs)
chromeArguments.push(...this._defaultArgs(options, launchType, userDataDir!, port || 0));
chromeArguments.push(...this._defaultArgs(options, launchType, userDataDir!));
else if (Array.isArray(ignoreDefaultArgs))
chromeArguments.push(...this._defaultArgs(options, launchType, userDataDir!, port || 0).filter(arg => ignoreDefaultArgs.indexOf(arg) === -1));
chromeArguments.push(...this._defaultArgs(options, launchType, userDataDir!).filter(arg => ignoreDefaultArgs.indexOf(arg) === -1));
else
chromeArguments.push(...args);
@ -105,7 +104,7 @@ export class Chromium implements BrowserType<CRBrowser> {
handleSIGTERM,
handleSIGHUP,
dumpio,
pipe: launchType !== 'server',
pipe: true,
tempDir: temporaryUserDataDir || undefined,
attemptToGracefullyClose: async () => {
if (!browserServer)
@ -113,9 +112,9 @@ export class Chromium implements BrowserType<CRBrowser> {
// We try to gracefully close to prevent crash reporting and core dumps.
// Note that it's fine to reuse the pipe transport, since
// our connection ignores kBrowserCloseMessageId.
const t = transport || await platform.connectToWebsocket(browserWSEndpoint!, async transport => transport);
const message = { method: 'Browser.close', id: kBrowserCloseMessageId };
await t.send(message);
const t = transport!;
const message: ProtocolRequest = { method: 'Browser.close', id: kBrowserCloseMessageId, params: {} };
t.send(message);
},
onkill: (exitCode, signal) => {
if (browserServer)
@ -123,20 +122,10 @@ export class Chromium implements BrowserType<CRBrowser> {
},
});
let transport: PipeTransport | undefined;
let browserWSEndpoint: string | undefined;
if (launchType === 'server') {
const timeoutError = new TimeoutError(`Timed out after ${timeout} ms while trying to connect to Chromium!`);
const match = await waitForLine(launchedProcess, launchedProcess.stderr, /^DevTools listening on (ws:\/\/.*)$/, timeout, timeoutError);
browserWSEndpoint = match[1];
browserServer = new BrowserServer(launchedProcess, gracefullyClose, browserWSEndpoint);
return { browserServer };
} else {
// For local launch scenario close will terminate the browser process.
transport = new PipeTransport(launchedProcess.stdio[3] as NodeJS.WritableStream, launchedProcess.stdio[4] as NodeJS.ReadableStream, () => browserServer!.close());
browserServer = new BrowserServer(launchedProcess, gracefullyClose, null);
return { browserServer, transport };
}
let transport: PipeTransport | undefined = undefined;
transport = new PipeTransport(launchedProcess.stdio[3] as NodeJS.WritableStream, launchedProcess.stdio[4] as NodeJS.ReadableStream, () => browserServer!.close());
browserServer = new BrowserServer(launchedProcess, gracefullyClose, launchType === 'server' ? wrapTransportWithWebSocket(transport, port || 0) : null);
return { browserServer, transport };
}
async connect(options: ConnectOptions): Promise<CRBrowser> {
@ -145,7 +134,7 @@ export class Chromium implements BrowserType<CRBrowser> {
});
}
private _defaultArgs(options: BrowserArgOptions = {}, launchType: LaunchType, userDataDir: string, port: number): string[] {
private _defaultArgs(options: BrowserArgOptions = {}, launchType: LaunchType, userDataDir: string): string[] {
const {
devtools = false,
headless = !devtools,
@ -161,7 +150,7 @@ export class Chromium implements BrowserType<CRBrowser> {
const chromeArguments = [...DEFAULT_ARGS];
chromeArguments.push(`--user-data-dir=${userDataDir}`);
chromeArguments.push(launchType === 'server' ? `--remote-debugging-port=${port || 0}` : '--remote-debugging-pipe');
chromeArguments.push('--remote-debugging-pipe');
if (devtools)
chromeArguments.push('--auto-open-devtools-for-tabs');
if (headless) {
@ -183,6 +172,130 @@ export class Chromium implements BrowserType<CRBrowser> {
}
}
function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number): string {
const server = new ws.Server({ port });
const guid = platform.guid();
const awaitingBrowserTarget = new Map<number, ws>();
const sessionToSocket = new Map<string, ws>();
const socketToBrowserSession = new Map<ws, { sessionId?: string, queue?: ProtocolRequest[] }>();
const browserSessions = new Set<string>();
let lastSequenceNumber = 1;
transport.onmessage = message => {
if (typeof message.id === 'number' && awaitingBrowserTarget.has(message.id)) {
const freshSocket = awaitingBrowserTarget.get(message.id)!;
awaitingBrowserTarget.delete(message.id);
const sessionId = message.result.sessionId;
if (freshSocket.readyState !== ws.CLOSED && freshSocket.readyState !== ws.CLOSING) {
sessionToSocket.set(sessionId, freshSocket);
const { queue } = socketToBrowserSession.get(freshSocket)!;
for (const item of queue!) {
item.sessionId = sessionId;
transport.send(item);
}
socketToBrowserSession.set(freshSocket, { sessionId });
browserSessions.add(sessionId);
} else {
transport.send({
id: ++lastSequenceNumber,
method: 'Target.detachFromTarget',
params: { sessionId }
});
socketToBrowserSession.delete(freshSocket);
}
return;
}
// At this point everything we care about has sessionId.
if (!message.sessionId)
return;
const socket = sessionToSocket.get(message.sessionId);
if (socket && socket.readyState !== ws.CLOSING) {
if (message.method === 'Target.attachedToTarget')
sessionToSocket.set(message.params.sessionId, socket);
if (message.method === 'Target.detachedFromTarget')
sessionToSocket.delete(message.params.sessionId);
// Strip session ids from the browser sessions.
if (browserSessions.has(message.sessionId))
delete message.sessionId;
socket.send(JSON.stringify(message));
}
};
transport.onclose = () => {
for (const socket of socketToBrowserSession.keys()) {
socket.removeListener('close', (socket as any).__closeListener);
socket.close(undefined, 'Browser disconnected');
}
server.close();
transport.onmessage = undefined;
transport.onclose = undefined;
};
server.on('connection', (socket: ws, req) => {
if (req.url !== '/' + guid) {
socket.close();
return;
}
socketToBrowserSession.set(socket, { queue: [] });
transport.send({
id: ++lastSequenceNumber,
method: 'Target.attachToBrowserTarget',
params: {}
});
awaitingBrowserTarget.set(lastSequenceNumber, socket);
socket.on('message', (message: string) => {
const parsedMessage = JSON.parse(Buffer.from(message).toString()) as ProtocolRequest;
if (parsedMessage.method.startsWith('Backend')) {
// Add backend domain handler here.
return;
}
// If message has sessionId, pass through.
if (parsedMessage.sessionId) {
transport.send(parsedMessage);
return;
}
// If message has no sessionId, look it up.
const session = socketToBrowserSession.get(socket)!;
if (session.sessionId) {
// We have it, use it.
parsedMessage.sessionId = session.sessionId;
transport.send(parsedMessage);
return;
}
// Pending session id, queue the message.
session.queue!.push(parsedMessage);
});
socket.on('close', (socket as any).__closeListener = () => {
const session = socketToBrowserSession.get(socket);
if (!session || !session.sessionId)
return;
sessionToSocket.delete(session.sessionId);
browserSessions.delete(session.sessionId);
socketToBrowserSession.delete(socket);
transport.send({
id: ++lastSequenceNumber,
method: 'Target.detachFromTarget',
params: { sessionId: session.sessionId }
});
});
});
const address = server.address();
if (typeof address === 'string')
return address + '/' + guid;
return 'ws://127.0.0.1:' + address.port + '/' + guid;
}
const mkdtempAsync = platform.promisify(fs.mkdtemp);
const CHROMIUM_PROFILE_PATH = path.join(os.tmpdir(), 'playwright_dev_profile-');

View file

@ -16,7 +16,7 @@
*/
import { debugError, helper, RegisteredListener } from '../helper';
import { ConnectionTransport, ProtocolMessage } from '../transport';
import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport';
import { makeWaitForNextTask } from '../platform';
export class PipeTransport implements ConnectionTransport {
@ -26,7 +26,7 @@ export class PipeTransport implements ConnectionTransport {
private _waitForNextTask = makeWaitForNextTask();
private readonly _closeCallback: () => void;
onmessage?: (message: ProtocolMessage) => void;
onmessage?: (message: ProtocolResponse) => void;
onclose?: () => void;
constructor(pipeWrite: NodeJS.WritableStream, pipeRead: NodeJS.ReadableStream, closeCallback: () => void) {
@ -46,7 +46,7 @@ export class PipeTransport implements ConnectionTransport {
this.onclose = undefined;
}
send(message: ProtocolMessage) {
send(message: ProtocolRequest) {
this._pipeWrite!.write(JSON.stringify(message));
this._pipeWrite!.write('\0');
}

View file

@ -96,8 +96,6 @@ export class WebKit implements BrowserType<WKBrowser> {
if (!webkitExecutable)
throw new Error(`No executable path is specified.`);
let transport: ConnectionTransport | undefined = undefined;
let browserServer: BrowserServer | undefined = undefined;
const { launchedProcess, gracefullyClose } = await launchProcess({
executablePath: webkitExecutable,
args: webkitArguments,
@ -123,10 +121,10 @@ export class WebKit implements BrowserType<WKBrowser> {
});
// For local launch scenario close will terminate the browser process.
let transport: ConnectionTransport | undefined = undefined;
let browserServer: BrowserServer | undefined = undefined;
transport = new PipeTransport(launchedProcess.stdio[3] as NodeJS.WritableStream, launchedProcess.stdio[4] as NodeJS.ReadableStream, () => browserServer!.close());
browserServer = new BrowserServer(launchedProcess, gracefullyClose, launchType === 'server' ? await wrapTransportWithWebSocket(transport, port || 0) : null);
if (launchType === 'server')
return { browserServer };
browserServer = new BrowserServer(launchedProcess, gracefullyClose, launchType === 'server' ? wrapTransportWithWebSocket(transport, port || 0) : null);
return { browserServer, transport };
}
@ -182,7 +180,7 @@ class SequenceNumberMixer<V> {
}
}
function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number) {
function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number): string {
const server = new ws.Server({ port });
const guid = platform.guid();
const idMixer = new SequenceNumberMixer<{id: number, socket: ws}>();
@ -202,7 +200,7 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, port: number
return;
const { id, socket } = value;
if (!socket || socket.readyState === ws.CLOSING) {
if (socket.readyState === ws.CLOSING) {
if (pendingBrowserContextCreations.has(id)) {
transport.send({
id: ++SequenceNumberMixer._lastSequenceNumber,

View file

@ -15,20 +15,28 @@
* limitations under the License.
*/
export type ProtocolMessage = {
id?: number;
export type ProtocolRequest = {
id: number;
method: string;
params?: any;
params: any;
sessionId?: string;
pageProxyId?: string;
};
export type ProtocolResponse = {
id?: number;
method?: string;
sessionId?: string;
error?: { message: string; data: any; };
params?: any;
result?: any;
pageProxyId?: string;
};
export interface ConnectionTransport {
send(s: ProtocolMessage): void;
send(s: ProtocolRequest): void;
close(): void; // Note: calling close is expected to issue onclose at some point.
onmessage?: (message: ProtocolMessage) => void,
onmessage?: (message: ProtocolResponse) => void,
onclose?: () => void,
}
@ -36,7 +44,7 @@ export class SlowMoTransport {
private readonly _delay: number;
private readonly _delegate: ConnectionTransport;
onmessage?: (message: ProtocolMessage) => void;
onmessage?: (message: ProtocolResponse) => void;
onclose?: () => void;
static wrap(transport: ConnectionTransport, delay?: number): ConnectionTransport {
@ -50,7 +58,7 @@ export class SlowMoTransport {
this._delegate.onclose = this._onClose.bind(this);
}
private _onmessage(message: ProtocolMessage) {
private _onmessage(message: ProtocolResponse) {
if (this.onmessage)
this.onmessage(message);
}
@ -62,7 +70,7 @@ export class SlowMoTransport {
this._delegate.onclose = undefined;
}
send(s: ProtocolMessage) {
send(s: ProtocolRequest) {
setTimeout(() => {
if (this._delegate.onmessage)
this._delegate.send(s);
@ -78,14 +86,14 @@ export class DeferWriteTransport implements ConnectionTransport {
private _delegate: ConnectionTransport;
private _readPromise: Promise<void>;
onmessage?: (message: ProtocolMessage) => void;
onmessage?: (message: ProtocolResponse) => void;
onclose?: () => void;
constructor(transport: ConnectionTransport) {
this._delegate = transport;
let callback: () => void;
this._readPromise = new Promise(f => callback = f);
this._delegate.onmessage = (s: ProtocolMessage) => {
this._delegate.onmessage = (s: ProtocolResponse) => {
callback();
if (this.onmessage)
this.onmessage(s);
@ -96,7 +104,7 @@ export class DeferWriteTransport implements ConnectionTransport {
};
}
async send(s: ProtocolMessage) {
async send(s: ProtocolRequest) {
await this._readPromise;
this._delegate.send(s);
}

View file

@ -17,7 +17,7 @@
import { assert } from '../helper';
import * as platform from '../platform';
import { ConnectionTransport, ProtocolMessage } from '../transport';
import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport';
import { Protocol } from './protocol';
// WKPlaywright uses this special id to issue Browser.close command which we
@ -53,13 +53,13 @@ export class WKConnection {
return ++this._lastId;
}
rawSend(message: ProtocolMessage) {
rawSend(message: ProtocolRequest) {
if (this._debugProtocol.enabled)
this._debugProtocol('SEND ► ' + rewriteInjectedScriptEvaluationLog(message));
this._transport.send(message);
}
private _dispatchMessage(message: ProtocolMessage) {
private _dispatchMessage(message: ProtocolResponse) {
if (this._debugProtocol.enabled)
this._debugProtocol('◀ RECV ' + message);
if (message.id === kBrowserCloseMessageId)