chore(websocket): extract common socket part (#2506)

This commit is contained in:
Pavel Feldman 2020-06-10 16:33:27 -07:00 committed by GitHub
parent 1bb33650b0
commit 903de2582a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 328 additions and 326 deletions

View file

@ -16,42 +16,13 @@
import { ChildProcess } from 'child_process';
import { EventEmitter } from 'events';
export class WebSocketWrapper {
readonly wsEndpoint: string;
private _bindings: (Map<any, any> | Set<any>)[];
constructor(wsEndpoint: string, bindings: (Map<any, any>|Set<any>)[]) {
this.wsEndpoint = wsEndpoint;
this._bindings = bindings;
}
async checkLeaks() {
let counter = 0;
return new Promise((fulfill, reject) => {
const check = () => {
const filtered = this._bindings.filter(entry => entry.size);
if (!filtered.length) {
fulfill();
return;
}
if (++counter >= 50) {
reject(new Error('Web socket leak ' + filtered.map(entry => [...entry.keys()].join(':')).join('|')));
return;
}
setTimeout(check, 100);
};
check();
});
}
}
import { WebSocketServer } from './webSocketServer';
export class BrowserServer extends EventEmitter {
private _process: ChildProcess;
private _gracefullyClose: () => Promise<void>;
private _kill: () => Promise<void>;
_webSocketWrapper: WebSocketWrapper | null = null;
_webSocketServer: WebSocketServer | null = null;
constructor(process: ChildProcess, gracefullyClose: () => Promise<void>, kill: () => Promise<void>) {
super();
@ -65,7 +36,7 @@ export class BrowserServer extends EventEmitter {
}
wsEndpoint(): string {
return this._webSocketWrapper ? this._webSocketWrapper.wsEndpoint : '';
return this._webSocketServer ? this._webSocketServer.wsEndpoint : '';
}
async kill(): Promise<void> {
@ -77,8 +48,8 @@ export class BrowserServer extends EventEmitter {
}
async _checkLeaks(): Promise<void> {
if (this._webSocketWrapper)
await this._webSocketWrapper.checkLeaks();
if (this._webSocketServer)
await this._webSocketServer.checkLeaks();
}
async _closeOrKill(timeout: number): Promise<void> {

View file

@ -19,7 +19,7 @@ import * as os from 'os';
import * as path from 'path';
import * as util from 'util';
import { BrowserContext, PersistentContextOptions, verifyProxySettings, validateBrowserContextOptions } from '../browserContext';
import { BrowserServer, WebSocketWrapper } from './browserServer';
import { BrowserServer } from './browserServer';
import * as browserPaths from '../install/browserPaths';
import { Logger, InnerLogger } from '../logger';
import { ConnectionTransport, WebSocketTransport } from '../transport';
@ -31,6 +31,7 @@ import { PipeTransport } from './pipeTransport';
import { Progress, runAbortableTask } from '../progress';
import { ProxySettings } from '../types';
import { TimeoutSettings } from '../timeoutSettings';
import { WebSocketServer } from './webSocketServer';
export type FirefoxUserPrefsOptions = {
firefoxUserPrefs?: { [key: string]: string | number | boolean },
@ -150,7 +151,7 @@ export abstract class BrowserTypeBase implements BrowserType {
const logger = new InnerLogger(options.logger);
return runAbortableTask(async progress => {
const { browserServer, transport } = await this._launchServer(progress, options, false, logger);
browserServer._webSocketWrapper = this._wrapTransportWithWebSocket(transport, logger, port);
browserServer._webSocketServer = this._startWebSocketServer(transport, logger, port);
return browserServer;
}, logger, TimeoutSettings.timeout(options));
}
@ -248,7 +249,7 @@ export abstract class BrowserTypeBase implements BrowserType {
abstract _defaultArgs(options: LaunchOptionsBase, isPersistent: boolean, userDataDir: string): string[];
abstract _connectToTransport(transport: ConnectionTransport, options: BrowserOptions): Promise<BrowserBase>;
abstract _wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper;
abstract _startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer;
abstract _amendEnvironment(env: Env, userDataDir: string, executable: string, browserArguments: string[]): Env;
abstract _attemptToGracefullyCloseBrowser(transport: ConnectionTransport): void;
}

View file

@ -16,19 +16,19 @@
*/
import * as path from 'path';
import { helper, assert, getFromENV, logPolitely } from '../helper';
import { assert, getFromENV, logPolitely } from '../helper';
import { CRBrowser } from '../chromium/crBrowser';
import * as ws from 'ws';
import { Env } from './processLauncher';
import { kBrowserCloseMessageId } from '../chromium/crConnection';
import { LaunchOptionsBase, BrowserTypeBase, processBrowserArgOptions } from './browserType';
import { WebSocketWrapper } from './browserServer';
import { ConnectionTransport, ProtocolRequest } from '../transport';
import { InnerLogger, logError } from '../logger';
import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport';
import { InnerLogger } from '../logger';
import { BrowserDescriptor } from '../install/browserPaths';
import { CRDevTools } from '../debug/crDevTools';
import * as debugSupport from '../debug/debugSupport';
import { BrowserOptions } from '../browser';
import { WebSocketServer } from './webSocketServer';
export class Chromium extends BrowserTypeBase {
private _devtools: CRDevTools | undefined;
@ -73,8 +73,8 @@ export class Chromium extends BrowserTypeBase {
transport.send(message);
}
_wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper {
return wrapTransportWithWebSocket(transport, logger, port);
_startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer {
return startWebSocketServer(transport, logger, port);
}
_defaultArgs(options: LaunchOptionsBase, isPersistent: boolean, userDataDir: string): string[] {
@ -132,21 +132,17 @@ type SessionData = {
parent?: string,
};
function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper {
const server = new ws.Server({ port });
const guid = helper.guid();
function startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer {
const awaitingBrowserTarget = new Map<number, ws>();
const sessionToData = new Map<string, SessionData>();
const socketToBrowserSession = new Map<ws, { sessionId?: string, queue?: ProtocolRequest[] }>();
let lastSequenceNumber = 1;
function addSession(sessionId: string, socket: ws, parentSessionId?: string) {
sessionToData.set(sessionId, {
socket,
children: new Set(),
isBrowserSession: !parentSessionId,
parent: parentSessionId
parent: parentSessionId,
});
if (parentSessionId)
sessionToData.get(parentSessionId)!.children.add(sessionId);
@ -161,77 +157,76 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: Inne
sessionToData.delete(sessionId);
}
transport.onmessage = message => {
if (typeof message.id === 'number' && awaitingBrowserTarget.has(message.id)) {
const freshSocket = awaitingBrowserTarget.get(message.id)!;
awaitingBrowserTarget.delete(message.id);
const server = new WebSocketServer(transport, logger, port, {
onBrowserResponse(seqNum: number, source: ws, message: ProtocolResponse) {
if (awaitingBrowserTarget.has(seqNum)) {
const freshSocket = awaitingBrowserTarget.get(seqNum)!;
awaitingBrowserTarget.delete(seqNum);
const sessionId = message.result.sessionId;
if (freshSocket.readyState !== ws.CLOSED && freshSocket.readyState !== ws.CLOSING) {
const { queue } = socketToBrowserSession.get(freshSocket)!;
for (const item of queue!) {
item.sessionId = sessionId;
transport.send(item);
const sessionId = message.result.sessionId;
if (freshSocket.readyState !== ws.CLOSED && freshSocket.readyState !== ws.CLOSING) {
const { queue } = socketToBrowserSession.get(freshSocket)!;
for (const item of queue!) {
item.sessionId = sessionId;
server.sendMessageToBrowser(item, source);
}
socketToBrowserSession.set(freshSocket, { sessionId });
addSession(sessionId, freshSocket);
} else {
server.sendMessageToBrowserOneWay('Target.detachFromTarget', { sessionId });
socketToBrowserSession.delete(freshSocket);
}
socketToBrowserSession.set(freshSocket, { sessionId });
addSession(sessionId, freshSocket);
} else {
transport.send({
id: ++lastSequenceNumber,
method: 'Target.detachFromTarget',
params: { sessionId }
});
socketToBrowserSession.delete(freshSocket);
return;
}
return;
}
// At this point everything we care about has sessionId.
if (!message.sessionId)
return;
if (message.id === -1)
return;
const data = sessionToData.get(message.sessionId);
if (data && data.socket.readyState !== ws.CLOSING) {
if (message.method === 'Target.attachedToTarget')
addSession(message.params.sessionId, data.socket, message.sessionId);
if (message.method === 'Target.detachedFromTarget')
removeSession(message.params.sessionId);
// Strip session ids from the browser sessions.
if (data.isBrowserSession)
delete message.sessionId;
data.socket.send(JSON.stringify(message));
}
};
// At this point everything we care about has sessionId.
if (!message.sessionId)
return;
transport.onclose = () => {
for (const socket of socketToBrowserSession.keys()) {
socket.removeListener('close', (socket as any).__closeListener);
socket.close(undefined, 'Browser disconnected');
}
server.close();
transport.onmessage = undefined;
transport.onclose = undefined;
};
const data = sessionToData.get(message.sessionId);
if (data && data.socket.readyState !== ws.CLOSING) {
if (data.isBrowserSession)
delete message.sessionId;
data.socket.send(JSON.stringify(message));
}
},
server.on('connection', (socket: ws, req) => {
if (req.url !== '/' + guid) {
socket.close();
return;
}
socketToBrowserSession.set(socket, { queue: [] });
onBrowserNotification(message: ProtocolResponse) {
// At this point everything we care about has sessionId.
if (!message.sessionId)
return;
transport.send({
id: ++lastSequenceNumber,
method: 'Target.attachToBrowserTarget',
params: {}
});
awaitingBrowserTarget.set(lastSequenceNumber, socket);
const data = sessionToData.get(message.sessionId);
if (data && data.socket.readyState !== ws.CLOSING) {
if (message.method === 'Target.attachedToTarget')
addSession(message.params.sessionId, data.socket, message.sessionId);
if (message.method === 'Target.detachedFromTarget')
removeSession(message.params.sessionId);
// Strip session ids from the browser sessions.
if (data.isBrowserSession)
delete message.sessionId;
data.socket.send(JSON.stringify(message));
}
},
socket.on('message', (message: string) => {
const parsedMessage = JSON.parse(Buffer.from(message).toString()) as ProtocolRequest;
onClientAttached(socket: ws) {
socketToBrowserSession.set(socket, { queue: [] });
const seqNum = server.sendMessageToBrowser({
id: -1, // Proxy-initiated request.
method: 'Target.attachToBrowserTarget',
params: {}
}, socket);
awaitingBrowserTarget.set(seqNum, socket);
},
onClientRequest(socket: ws, message: ProtocolRequest) {
// If message has sessionId, pass through.
if (parsedMessage.sessionId) {
transport.send(parsedMessage);
if (message.sessionId) {
server.sendMessageToBrowser(message, socket);
return;
}
@ -239,33 +234,24 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: Inne
const session = socketToBrowserSession.get(socket)!;
if (session.sessionId) {
// We have it, use it.
parsedMessage.sessionId = session.sessionId;
transport.send(parsedMessage);
message.sessionId = session.sessionId;
server.sendMessageToBrowser(message, socket);
return;
}
// Pending session id, queue the message.
session.queue!.push(parsedMessage);
});
session.queue!.push(message);
},
socket.on('error', logError(logger));
socket.on('close', (socket as any).__closeListener = () => {
onClientDetached(socket: ws) {
const session = socketToBrowserSession.get(socket);
if (!session || !session.sessionId)
return;
removeSession(session.sessionId);
socketToBrowserSession.delete(socket);
transport.send({
id: ++lastSequenceNumber,
method: 'Target.detachFromTarget',
params: { sessionId: session.sessionId }
});
});
server.sendMessageToBrowserOneWay('Target.detachFromTarget', { sessionId: session.sessionId });
}
});
const address = server.address();
const wsEndpoint = typeof address === 'string' ? `${address}/${guid}` : `ws://127.0.0.1:${address.port}/${guid}`;
return new WebSocketWrapper(wsEndpoint, [awaitingBrowserTarget, sessionToData, socketToBrowserSession]);
return server;
}

View file

@ -21,14 +21,13 @@ import * as path from 'path';
import * as ws from 'ws';
import { FFBrowser } from '../firefox/ffBrowser';
import { kBrowserCloseMessageId } from '../firefox/ffConnection';
import { helper } from '../helper';
import { WebSocketWrapper } from './browserServer';
import { LaunchOptionsBase, BrowserTypeBase, processBrowserArgOptions, FirefoxUserPrefsOptions } from './browserType';
import { Env } from './processLauncher';
import { ConnectionTransport, SequenceNumberMixer } from '../transport';
import { InnerLogger, logError } from '../logger';
import { ConnectionTransport, ProtocolResponse, ProtocolRequest } from '../transport';
import { InnerLogger } from '../logger';
import { BrowserOptions } from '../browser';
import { BrowserDescriptor } from '../install/browserPaths';
import { WebSocketServer } from './webSocketServer';
export class Firefox extends BrowserTypeBase {
constructor(packagePath: string, browser: BrowserDescriptor) {
@ -53,8 +52,8 @@ export class Firefox extends BrowserTypeBase {
transport.send(message);
}
_wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper {
return wrapTransportWithWebSocket(transport, logger, port);
_startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer {
return startWebSocketServer(transport, logger, port);
}
_defaultArgs(options: LaunchOptionsBase & FirefoxUserPrefsOptions, isPersistent: boolean, userDataDir: string): string[] {
@ -108,39 +107,36 @@ export class Firefox extends BrowserTypeBase {
}
}
function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper {
const server = new ws.Server({ port });
const guid = helper.guid();
const idMixer = new SequenceNumberMixer<{id: number, socket: ws}>();
type SessionData = {
socket: ws,
};
function startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer {
const pendingBrowserContextCreations = new Set<number>();
const pendingBrowserContextDeletions = new Map<number, string>();
const browserContextIds = new Map<string, ws>();
const sessionToSocket = new Map<string, ws>();
const sockets = new Set<ws>();
const sessionToData = new Map<string, SessionData>();
transport.onmessage = message => {
if (typeof message.id === 'number') {
function removeSession(sessionId: string): SessionData | undefined {
const data = sessionToData.get(sessionId);
if (!data)
return;
sessionToData.delete(sessionId);
return data;
}
const server = new WebSocketServer(transport, logger, port, {
onBrowserResponse(seqNum: number, source: ws, message: ProtocolResponse) {
// Process command response.
const seqNum = message.id;
const value = idMixer.take(seqNum);
if (!value)
return;
const { id, socket } = value;
if (socket.readyState === ws.CLOSING) {
if (pendingBrowserContextCreations.has(id)) {
transport.send({
id: ++SequenceNumberMixer._lastSequenceNumber,
method: 'Browser.removeBrowserContext',
params: { browserContextId: message.result.browserContextId }
});
}
if (source.readyState === ws.CLOSING || source.readyState === ws.CLOSED) {
if (pendingBrowserContextCreations.has(seqNum))
server.sendMessageToBrowserOneWay('Browser.removeBrowserContext', { browserContextId: message.result.browserContextId });
return;
}
if (pendingBrowserContextCreations.has(seqNum)) {
// Browser.createBrowserContext response -> establish context attribution.
browserContextIds.set(message.result.browserContextId, socket);
browserContextIds.set(message.result.browserContextId, source);
pendingBrowserContextCreations.delete(seqNum);
}
@ -151,88 +147,59 @@ function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: Inne
pendingBrowserContextDeletions.delete(seqNum);
}
message.id = id;
socket.send(JSON.stringify(message));
source.send(JSON.stringify(message));
return;
}
},
// Process notification response.
const { method, params, sessionId } = message;
if (sessionId) {
const socket = sessionToSocket.get(sessionId);
if (!socket || socket.readyState === ws.CLOSING) {
// Drop unattributed messages on the floor.
onBrowserNotification(message: ProtocolResponse) {
// Process notification response.
const { method, params, sessionId } = message;
if (sessionId) {
const data = sessionToData.get(sessionId);
if (!data || data.socket.readyState === ws.CLOSING) {
// Drop unattributed messages on the floor.
return;
}
data.socket.send(JSON.stringify(message));
return;
}
socket.send(JSON.stringify(message));
return;
}
if (method === 'Browser.attachedToTarget') {
const socket = browserContextIds.get(params.targetInfo.browserContextId);
if (!socket || socket.readyState === ws.CLOSING) {
// Drop unattributed messages on the floor.
return;
}
sessionToSocket.set(params.sessionId, socket);
socket.send(JSON.stringify(message));
return;
}
if (method === 'Browser.detachedFromTarget') {
const socket = sessionToSocket.get(params.sessionId);
sessionToSocket.delete(params.sessionId);
if (socket && socket.readyState !== ws.CLOSING)
if (method === 'Browser.attachedToTarget') {
const socket = browserContextIds.get(params.targetInfo.browserContextId);
if (!socket || socket.readyState === ws.CLOSING) {
// Drop unattributed messages on the floor.
return;
}
sessionToData.set(params.sessionId, { socket });
socket.send(JSON.stringify(message));
return;
}
};
return;
}
if (method === 'Browser.detachedFromTarget') {
const data = removeSession(params.sessionId);
if (data && data.socket.readyState !== ws.CLOSING)
data.socket.send(JSON.stringify(message));
return;
}
},
transport.onclose = () => {
for (const socket of sockets) {
socket.removeListener('close', (socket as any).__closeListener);
socket.close(undefined, 'Browser disconnected');
}
server.close();
transport.onmessage = undefined;
transport.onclose = undefined;
};
onClientAttached() {},
server.on('connection', (socket: ws, req) => {
if (req.url !== '/' + guid) {
socket.close();
return;
}
sockets.add(socket);
socket.on('message', (message: string) => {
const parsedMessage = JSON.parse(Buffer.from(message).toString());
const { id, method, params } = parsedMessage;
const seqNum = idMixer.generate({ id, socket });
transport.send({ ...parsedMessage, id: seqNum });
onClientRequest(socket: ws, message: ProtocolRequest) {
const { method, params } = message;
const seqNum = server.sendMessageToBrowser(message, socket);
if (method === 'Browser.createBrowserContext')
pendingBrowserContextCreations.add(seqNum);
if (method === 'Browser.removeBrowserContext')
pendingBrowserContextDeletions.set(seqNum, params.browserContextId);
});
},
socket.on('error', logError(logger));
socket.on('close', (socket as any).__closeListener = () => {
onClientDetached(socket: ws) {
for (const [browserContextId, s] of browserContextIds) {
if (s === socket) {
transport.send({
id: ++SequenceNumberMixer._lastSequenceNumber,
method: 'Browser.removeBrowserContext',
params: { browserContextId }
});
server.sendMessageToBrowserOneWay('Browser.removeBrowserContext', { browserContextId });
browserContextIds.delete(browserContextId);
}
}
sockets.delete(socket);
});
}
});
const address = server.address();
const wsEndpoint = typeof address === 'string' ? `${address}/${guid}` : `ws://127.0.0.1:${address.port}/${guid}`;
return new WebSocketWrapper(wsEndpoint,
[pendingBrowserContextCreations, pendingBrowserContextDeletions, browserContextIds, sessionToSocket, sockets]);
return server;
}

View file

@ -0,0 +1,139 @@
/**
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { IncomingMessage } from 'http';
import * as ws from 'ws';
import { helper } from '../helper';
import { InnerLogger, logError } from '../logger';
import { ConnectionTransport, ProtocolRequest, ProtocolResponse } from '../transport';
export interface WebSocketServerDelegate {
onClientAttached(socket: ws): void;
onClientRequest(socket: ws, message: ProtocolRequest): void;
onClientDetached(socket: ws): void;
onBrowserNotification(message: ProtocolResponse): void;
onBrowserResponse(seqNum: number, source: ws, message: ProtocolResponse): void;
}
export class WebSocketServer {
private _transport: ConnectionTransport;
private _logger: InnerLogger;
private _server: ws.Server;
private _guid: string;
readonly wsEndpoint: string;
private _bindings: (Map<any, any> | Set<any>)[] = [];
private _lastSeqNum = 0;
private _delegate: WebSocketServerDelegate;
private _sockets = new Set<ws>();
private _pendingRequests = new Map<number, { message: ProtocolRequest, source: ws | null }>();
constructor(transport: ConnectionTransport, logger: InnerLogger, port: number, delegate: WebSocketServerDelegate) {
this._guid = helper.guid();
this._transport = transport;
this._logger = logger;
this._server = new ws.Server({ port });
this._delegate = delegate;
transport.onmessage = message => this._browserMessage(message);
transport.onclose = () => this._browserClosed();
this._server.on('connection', (socket: ws, req) => this._clientAttached(socket, req));
const address = this._server.address();
this.wsEndpoint = typeof address === 'string' ? `${address}/${this._guid}` : `ws://127.0.0.1:${address.port}/${this._guid}`;
}
addBindings(bindings: (Map<any, any>|Set<any>)[]) {
this._bindings.push(...bindings);
}
sendMessageToBrowser(message: ProtocolRequest, source: ws): number {
const seqNum = ++this._lastSeqNum;
this._pendingRequests.set(seqNum, { message, source });
this._transport.send({ ...message, id: seqNum });
return seqNum;
}
sendMessageToBrowserOneWay(method: string, params: any) {
this._transport.send({ id: ++this._lastSeqNum, method, params });
}
close() {
this._server.close();
}
private _browserMessage(message: ProtocolResponse) {
const seqNum = message.id;
if (typeof seqNum === 'number') {
const request = this._pendingRequests.get(seqNum);
if (!request)
return;
this._pendingRequests.delete(seqNum);
message.id = request.message.id;
if (request.source)
this._delegate.onBrowserResponse(seqNum, request.source, message);
} else {
this._delegate.onBrowserNotification(message);
}
}
private _browserClosed() {
this._transport.onmessage = undefined;
this._transport.onclose = undefined;
for (const socket of this._sockets) {
socket.removeAllListeners('close');
socket.close(undefined, 'Browser disconnected');
}
this._server.close();
}
private _clientAttached(socket: ws, req: IncomingMessage) {
if (req.url !== '/' + this._guid) {
socket.close();
return;
}
this._sockets.add(socket);
this._delegate.onClientAttached(socket);
socket.on('message', (message: string) => {
const parsedMessage = JSON.parse(Buffer.from(message).toString()) as ProtocolRequest;
this._delegate.onClientRequest(socket, parsedMessage);
});
socket.on('error', logError(this._logger));
socket.on('close', () => {
this._delegate.onClientDetached(socket);
this._sockets.delete(socket);
});
}
async checkLeaks() {
let counter = 0;
return new Promise((fulfill, reject) => {
const check = () => {
const filtered = this._bindings.filter(entry => entry.size);
if (!filtered.length) {
fulfill();
return;
}
if (++counter >= 50) {
reject(new Error('Web socket leak ' + filtered.map(entry => [...entry.keys()].join(':')).join('|')));
return;
}
setTimeout(check, 100);
};
check();
});
}
}

View file

@ -18,15 +18,15 @@
import { WKBrowser } from '../webkit/wkBrowser';
import { Env } from './processLauncher';
import * as path from 'path';
import { helper } from '../helper';
import { kBrowserCloseMessageId } from '../webkit/wkConnection';
import { LaunchOptionsBase, BrowserTypeBase, processBrowserArgOptions } from './browserType';
import { ConnectionTransport, SequenceNumberMixer } from '../transport';
import { ConnectionTransport, ProtocolResponse, ProtocolRequest } from '../transport';
import * as ws from 'ws';
import { WebSocketWrapper } from './browserServer';
import { InnerLogger, logError } from '../logger';
import { InnerLogger } from '../logger';
import { BrowserOptions } from '../browser';
import { BrowserDescriptor } from '../install/browserPaths';
import { WebSocketServer } from './webSocketServer';
import { assert } from '../helper';
export class WebKit extends BrowserTypeBase {
constructor(packagePath: string, browser: BrowserDescriptor) {
@ -45,8 +45,8 @@ export class WebKit extends BrowserTypeBase {
transport.send({method: 'Playwright.close', params: {}, id: kBrowserCloseMessageId});
}
_wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper {
return wrapTransportWithWebSocket(transport, logger, port);
_startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer {
return startWebSocketServer(transport, logger, port);
}
_defaultArgs(options: LaunchOptionsBase, isPersistent: boolean, userDataDir: string): string[] {
@ -88,114 +88,69 @@ export class WebKit extends BrowserTypeBase {
}
}
function wrapTransportWithWebSocket(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketWrapper {
const server = new ws.Server({ port });
const guid = helper.guid();
const idMixer = new SequenceNumberMixer<{id: number, socket: ws}>();
function startWebSocketServer(transport: ConnectionTransport, logger: InnerLogger, port: number): WebSocketServer {
const pendingBrowserContextCreations = new Set<number>();
const pendingBrowserContextDeletions = new Map<number, string>();
const browserContextIds = new Map<string, ws>();
const sockets = new Set<ws>();
transport.onmessage = message => {
if (typeof message.id === 'number') {
if (message.id === -9999)
return;
// Process command response.
const value = idMixer.take(message.id);
if (!value)
return;
const { id, socket } = value;
if (socket.readyState === ws.CLOSED || socket.readyState === ws.CLOSING) {
if (pendingBrowserContextCreations.has(id)) {
transport.send({
id: ++SequenceNumberMixer._lastSequenceNumber,
method: 'Playwright.deleteContext',
params: { browserContextId: message.result.browserContextId }
});
}
const server = new WebSocketServer(transport, logger, port, {
onBrowserResponse(seqNum: number, source: ws, message: ProtocolResponse) {
if (source.readyState === ws.CLOSED || source.readyState === ws.CLOSING) {
if (pendingBrowserContextCreations.has(seqNum))
server.sendMessageToBrowserOneWay('Playwright.deleteContext', { browserContextId: message.result.browserContextId });
return;
}
if (pendingBrowserContextCreations.has(message.id)) {
if (pendingBrowserContextCreations.has(seqNum)) {
// Browser.createContext response -> establish context attribution.
browserContextIds.set(message.result.browserContextId, socket);
pendingBrowserContextCreations.delete(message.id);
browserContextIds.set(message.result.browserContextId, source);
pendingBrowserContextCreations.delete(seqNum);
}
const deletedContextId = pendingBrowserContextDeletions.get(message.id);
const deletedContextId = pendingBrowserContextDeletions.get(seqNum);
if (deletedContextId) {
// Browser.deleteContext response -> remove context attribution.
browserContextIds.delete(deletedContextId);
pendingBrowserContextDeletions.delete(message.id);
pendingBrowserContextDeletions.delete(seqNum);
}
message.id = id;
source.send(JSON.stringify(message));
return;
},
onBrowserNotification(message: ProtocolResponse) {
// Process notification response.
const { params, browserContextId } = message;
const contextId = browserContextId || params.browserContextId;
assert(contextId);
const socket = browserContextIds.get(contextId);
if (!socket || socket.readyState === ws.CLOSING) {
// Drop unattributed messages on the floor.
return;
}
socket.send(JSON.stringify(message));
return;
}
},
// Every notification either has a browserContextId top-level field or
// has a browserContextId parameter.
const { params, browserContextId } = message;
const contextId = browserContextId || params.browserContextId;
const socket = browserContextIds.get(contextId);
if (!socket || socket.readyState === ws.CLOSING) {
// Drop unattributed messages on the floor.
return;
}
socket.send(JSON.stringify(message));
};
onClientAttached(socket: ws) {
},
transport.onclose = () => {
for (const socket of sockets) {
socket.removeListener('close', (socket as any).__closeListener);
socket.close(undefined, 'Browser disconnected');
}
server.close();
transport.onmessage = undefined;
transport.onclose = undefined;
};
server.on('connection', (socket: ws, req) => {
if (req.url !== '/' + guid) {
socket.close();
return;
}
sockets.add(socket);
socket.on('message', (message: string) => {
const parsedMessage = JSON.parse(Buffer.from(message).toString());
const { id, method, params } = parsedMessage;
const seqNum = idMixer.generate({ id, socket });
transport.send({ ...parsedMessage, id: seqNum });
onClientRequest(socket: ws, message: ProtocolRequest) {
const { method, params } = message;
const seqNum = server.sendMessageToBrowser(message, socket);
if (method === 'Playwright.createContext')
pendingBrowserContextCreations.add(seqNum);
if (method === 'Playwright.deleteContext')
pendingBrowserContextDeletions.set(seqNum, params.browserContextId);
});
},
socket.on('error', logError(logger));
socket.on('close', (socket as any).__closeListener = () => {
onClientDetached(socket: ws) {
for (const [browserContextId, s] of browserContextIds) {
if (s === socket) {
transport.send({
id: ++SequenceNumberMixer._lastSequenceNumber,
method: 'Playwright.deleteContext',
params: { browserContextId }
});
server.sendMessageToBrowserOneWay('Playwright.deleteContext', { browserContextId });
browserContextIds.delete(browserContextId);
}
}
sockets.delete(socket);
});
}
});
const address = server.address();
const wsEndpoint = typeof address === 'string' ? `${address}/${guid}` : `ws://127.0.0.1:${address.port}/${guid}`;
return new WebSocketWrapper(wsEndpoint,
[pendingBrowserContextCreations, pendingBrowserContextDeletions, browserContextIds, sockets]);
return server;
}

View file

@ -195,23 +195,6 @@ export class WebSocketTransport implements ConnectionTransport {
}
}
export class SequenceNumberMixer<V> {
static _lastSequenceNumber = 1;
private _values = new Map<number, V>();
generate(value: V): number {
const sequenceNumber = ++SequenceNumberMixer._lastSequenceNumber;
this._values.set(sequenceNumber, value);
return sequenceNumber;
}
take(sequenceNumber: number): V | undefined {
const value = this._values.get(sequenceNumber);
this._values.delete(sequenceNumber);
return value;
}
}
export class InterceptingTransport implements ConnectionTransport {
private readonly _delegate: ConnectionTransport;
private _interceptor: (message: ProtocolRequest) => ProtocolRequest;