From 4fb707b558a30df3818f89f4a80594a44e914edf Mon Sep 17 00:00:00 2001 From: Pavel Feldman Date: Thu, 12 Dec 2019 22:51:06 -0800 Subject: [PATCH] chore: unify transports, serialize messages into tasks --- src/chromium/Connection.ts | 8 +-- src/firefox/Launcher.ts | 14 ++--- src/firefox/WebSocketTransport.ts | 89 ------------------------------- src/transport.ts | 89 +++++++++++++++++++++++++++++++ src/webkit/Connection.ts | 62 ++------------------- 5 files changed, 103 insertions(+), 159 deletions(-) delete mode 100644 src/firefox/WebSocketTransport.ts diff --git a/src/chromium/Connection.ts b/src/chromium/Connection.ts index 59a2780236..ef003889c0 100644 --- a/src/chromium/Connection.ts +++ b/src/chromium/Connection.ts @@ -17,7 +17,7 @@ import * as debug from 'debug'; import { EventEmitter } from 'events'; -import { ConnectionTransport } from '../transport'; +import { ConnectionTransport, SerializingTransport } from '../transport'; import { assert } from '../helper'; import { Protocol } from './protocol'; @@ -30,7 +30,6 @@ export const ConnectionEvents = { export class Connection extends EventEmitter { private _url: string; private _lastId = 0; - private _delay: number; private _transport: ConnectionTransport; private _sessions = new Map(); readonly rootSession: CDPSession; @@ -39,9 +38,8 @@ export class Connection extends EventEmitter { constructor(url: string, transport: ConnectionTransport, delay: number | undefined = 0) { super(); this._url = url; - this._delay = delay; - this._transport = transport; + this._transport = new SerializingTransport(transport, delay); this._transport.onmessage = this._onMessage.bind(this); this._transport.onclose = this._onClose.bind(this); this.rootSession = new CDPSession(this, 'browser', ''); @@ -72,8 +70,6 @@ export class Connection extends EventEmitter { } async _onMessage(message: string) { - if (this._delay) - await new Promise(f => setTimeout(f, this._delay)); debugProtocol('◀ RECV ' + message); const object = JSON.parse(message); if (object.method === 'Target.attachedToTarget') { diff --git a/src/firefox/Launcher.ts b/src/firefox/Launcher.ts index 7088154aba..5c9738c5e1 100644 --- a/src/firefox/Launcher.ts +++ b/src/firefox/Launcher.ts @@ -17,14 +17,14 @@ import * as os from 'os'; import * as path from 'path'; -import {Connection} from './Connection'; -import {Browser} from './Browser'; -import {BrowserFetcher, BrowserFetcherOptions} from '../browserFetcher'; +import { Connection } from './Connection'; +import { Browser } from './Browser'; +import { BrowserFetcher, BrowserFetcherOptions } from '../browserFetcher'; import * as fs from 'fs'; import * as util from 'util'; -import {debugError, assert} from '../helper'; -import {TimeoutError} from '../errors'; -import {WebSocketTransport} from './WebSocketTransport'; +import { debugError, assert } from '../helper'; +import { TimeoutError } from '../errors'; +import { SerializingTransport, WebSocketTransport } from '../transport'; import { launchProcess, waitForLine } from '../processLauncher'; const mkdtempAsync = util.promisify(fs.mkdtemp); @@ -129,7 +129,7 @@ export class Launcher { const timeoutError = new TimeoutError(`Timed out after ${timeout} ms while trying to connect to Firefox!`); const match = await waitForLine(launched.process, launched.process.stdout, /^Juggler listening on (ws:\/\/.*)$/, timeout, timeoutError); const url = match[1]; - const transport = await WebSocketTransport.create(url); + const transport = new SerializingTransport(await WebSocketTransport.create(url)); connection = new Connection(url, transport, slowMo); const browser = await Browser.create(connection, defaultViewport, launched.process, launched.gracefullyClose); if (ignoreHTTPSErrors) diff --git a/src/firefox/WebSocketTransport.ts b/src/firefox/WebSocketTransport.ts deleted file mode 100644 index 8846b7bf81..0000000000 --- a/src/firefox/WebSocketTransport.ts +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Copyright 2018 Google Inc. All rights reserved. - * Modifications copyright (c) Microsoft Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import { ConnectionTransport } from '../transport'; -import * as WebSocket from 'ws'; - -export class WebSocketTransport implements ConnectionTransport { - _ws: WebSocket; - _dispatchQueue: DispatchQueue; - onclose?: () => void; - onmessage?: (message: string) => void; - static create(url: string): Promise { - return new Promise((resolve, reject) => { - const ws = new WebSocket(url, [], { perMessageDeflate: false }); - ws.addEventListener('open', () => resolve(new WebSocketTransport(ws))); - ws.addEventListener('error', reject); - }); - } - - constructor(ws: WebSocket) { - this._ws = ws; - this._dispatchQueue = new DispatchQueue(this); - this._ws.addEventListener('message', event => { - this._dispatchQueue.enqueue(event.data); - }); - this._ws.addEventListener('close', event => { - if (this.onclose) - this.onclose.call(null); - }); - // Silently ignore all errors - we don't know what to do with them. - this._ws.addEventListener('error', () => {}); - } - - send(message: string) { - this._ws.send(message); - } - - close() { - this._ws.close(); - } -} - -// We want to dispatch all "message" events in separate tasks -// to make sure all message-related promises are resolved first -// before dispatching next message. -// -// We cannot just use setTimeout() in Node.js here like we would -// do in Browser - see https://github.com/nodejs/node/issues/23773 -// Thus implement a dispatch queue that enforces new tasks manually. -class DispatchQueue { - _transport: ConnectionTransport; - _timeoutId: NodeJS.Timer = null; - _queue: string[] = []; - constructor(transport : ConnectionTransport) { - this._transport = transport; - this._dispatch = this._dispatch.bind(this); - } - - enqueue(message: string) { - this._queue.push(message); - if (!this._timeoutId) - this._timeoutId = setTimeout(this._dispatch, 0); - } - - _dispatch() { - const message = this._queue.shift(); - if (this._queue.length) - this._timeoutId = setTimeout(this._dispatch, 0); - else - this._timeoutId = null; - - if (this._transport.onmessage) - this._transport.onmessage.call(null, message); - } -} - diff --git a/src/transport.ts b/src/transport.ts index a3374bf310..0d9aefb89e 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -117,3 +117,92 @@ export class PipeTransport implements ConnectionTransport { helper.removeEventListeners(this._eventListeners); } } + +export class SerializingTransport { + private readonly _delay: number; + private readonly _delegate: ConnectionTransport; + private _incomingMessageQueue: string[] = []; + private _dispatchTimerId?: NodeJS.Timer; + private _sameDispatchTask: boolean = false; + private _closed = false; + + onmessage?: (message: string) => void; + onclose?: () => void; + + constructor(transport: ConnectionTransport, delay: number | undefined = 0) { + this._delay = delay; + this._delegate = transport; + this._delegate.onmessage = this._onMessage.bind(this); + this._delegate.onclose = this._onClose.bind(this); + } + + private _onMessage(message: string) { + if (this._sameDispatchTask || this._incomingMessageQueue.length || this._delay) { + this._enqueueMessage(message); + } else { + this._sameDispatchTask = true; + // This is for the case when several messages come in a batch and read + // in a loop by transport ending up in the same task. + Promise.resolve().then(() => this._sameDispatchTask = false); + this._dispatchMessage(message); + } + } + + private _enqueueMessage(message: string) { + this._incomingMessageQueue.push(message); + this._scheduleQueueDispatch(); + } + + injectMessagesInPlace(messages: string[]) { + // Insert provisional messages at the point of "Target.didCommitProvisionalTarget" message. + this._incomingMessageQueue = messages.concat(this._incomingMessageQueue); + this._scheduleQueueDispatch(); + } + + private _scheduleQueueDispatch() { + if (this._dispatchTimerId) + return; + if (!this._incomingMessageQueue.length) + return; + const delay = this._delay || 0; + this._dispatchTimerId = setTimeout(() => { + this._dispatchTimerId = undefined; + this._dispatchOneMessageFromQueue(); + }, delay); + } + + private _dispatchOneMessageFromQueue() { + if (this._closed) + return; + const message = this._incomingMessageQueue.shift(); + try { + this._dispatchMessage(message); + } finally { + this._scheduleQueueDispatch(); + } + } + + private _dispatchMessage(message: string) { + if (this.onmessage) + this.onmessage(message); + } + + private _onClose() { + if (this._closed) + return; + if (this.onclose) + this.onclose(); + this._closed = true; + this._delegate.onmessage = null; + this._delegate.onclose = null; + } + + send(s: string) { + this._delegate.send(s); + } + + close() { + this._closed = true; + this._delegate.close(); + } +} diff --git a/src/webkit/Connection.ts b/src/webkit/Connection.ts index e946aa5400..46301824ef 100644 --- a/src/webkit/Connection.ts +++ b/src/webkit/Connection.ts @@ -18,7 +18,7 @@ import {assert, debugError} from '../helper'; import * as debug from 'debug'; import { EventEmitter } from 'events'; -import { ConnectionTransport } from '../transport'; +import { ConnectionTransport, SerializingTransport } from '../transport'; import { Protocol } from './protocol'; const debugProtocol = debug('playwright:protocol'); @@ -31,21 +31,15 @@ export const ConnectionEvents = { export class Connection extends EventEmitter { _lastId = 0; private readonly _callbacks = new Map void, reject: (e: Error) => void, error: Error, method: string}>(); - private readonly _delay: number; - private readonly _transport: ConnectionTransport; + private readonly _transport: SerializingTransport; private readonly _sessions = new Map(); - private _incomingMessageQueue: string[] = []; - private _dispatchTimerId?: NodeJS.Timer; - private _sameDispatchTask: boolean = false; _closed = false; constructor(transport: ConnectionTransport, delay: number | undefined = 0) { super(); - this._delay = delay; - - this._transport = transport; - this._transport.onmessage = this._onMessage.bind(this); + this._transport = new SerializingTransport(transport); + this._transport.onmessage = this._dispatchMessage.bind(this); this._transport.onclose = this._onClose.bind(this); } @@ -71,52 +65,6 @@ export class Connection extends EventEmitter { return id; } - private _onMessage(message: string) { - if (this._sameDispatchTask || this._incomingMessageQueue.length || this._delay) { - this._enqueueMessage(message); - } else { - this._sameDispatchTask = true; - // This is for the case when several messages come in a batch and read - // in a loop by transport ending up in the same task. - Promise.resolve().then(() => this._sameDispatchTask = false); - this._dispatchMessage(message); - } - } - - private _enqueueMessage(message: string) { - this._incomingMessageQueue.push(message); - this._scheduleQueueDispatch(); - } - - private _enqueueProvisionalMessages(messages: string[]) { - // Insert provisional messages at the point of "Target.didCommitProvisionalTarget" message. - this._incomingMessageQueue = messages.concat(this._incomingMessageQueue); - this._scheduleQueueDispatch(); - } - - private _scheduleQueueDispatch() { - if (this._dispatchTimerId) - return; - if (!this._incomingMessageQueue.length) - return; - const delay = this._delay || 0; - this._dispatchTimerId = setTimeout(() => { - this._dispatchTimerId = undefined; - this._dispatchOneMessageFromQueue(); - }, delay); - } - - private _dispatchOneMessageFromQueue() { - if (this._closed) - return; - const message = this._incomingMessageQueue.shift(); - try { - this._dispatchMessage(message); - } finally { - this._scheduleQueueDispatch(); - } - } - private _dispatchMessage(message: string) { debugProtocol('◀ RECV ' + message); const object = JSON.parse(message); @@ -170,7 +118,7 @@ export class Connection extends EventEmitter { if (!oldSession) throw new Error('Unknown old target: ' + oldTargetId); oldSession._swappedOut = true; - this._enqueueProvisionalMessages(newSession._takeProvisionalMessagesAndCommit()); + this._transport.injectMessagesInPlace(newSession._takeProvisionalMessagesAndCommit()); } }