diff --git a/src/chromium/Connection.ts b/src/chromium/Connection.ts index 59a2780236..26dd5a8afc 100644 --- a/src/chromium/Connection.ts +++ b/src/chromium/Connection.ts @@ -17,7 +17,7 @@ import * as debug from 'debug'; import { EventEmitter } from 'events'; -import { ConnectionTransport } from '../transport'; +import { ConnectionTransport, SlowMoTransport } from '../transport'; import { assert } from '../helper'; import { Protocol } from './protocol'; @@ -30,7 +30,6 @@ export const ConnectionEvents = { export class Connection extends EventEmitter { private _url: string; private _lastId = 0; - private _delay: number; private _transport: ConnectionTransport; private _sessions = new Map(); readonly rootSession: CDPSession; @@ -39,9 +38,8 @@ export class Connection extends EventEmitter { constructor(url: string, transport: ConnectionTransport, delay: number | undefined = 0) { super(); this._url = url; - this._delay = delay; - this._transport = transport; + this._transport = SlowMoTransport.wrap(transport, delay); this._transport.onmessage = this._onMessage.bind(this); this._transport.onclose = this._onClose.bind(this); this.rootSession = new CDPSession(this, 'browser', ''); @@ -72,8 +70,6 @@ export class Connection extends EventEmitter { } async _onMessage(message: string) { - if (this._delay) - await new Promise(f => setTimeout(f, this._delay)); debugProtocol('◀ RECV ' + message); const object = JSON.parse(message); if (object.method === 'Target.attachedToTarget') { @@ -101,7 +97,7 @@ export class Connection extends EventEmitter { for (const session of this._sessions.values()) session._onClosed(); this._sessions.clear(); - this.emit(ConnectionEvents.Disconnected); + Promise.resolve().then(() => this.emit(ConnectionEvents.Disconnected)); } dispose() { @@ -164,7 +160,7 @@ export class CDPSession extends EventEmitter { callback.resolve(object.result); } else { assert(!object.id); - this.emit(object.method, object.params); + Promise.resolve().then(() => this.emit(object.method, object.params)); } } @@ -179,7 +175,7 @@ export class CDPSession extends EventEmitter { callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): Target closed.`)); this._callbacks.clear(); this._connection = null; - this.emit(CDPSessionEvents.Disconnected); + Promise.resolve().then(() => this.emit(CDPSessionEvents.Disconnected)); } } diff --git a/src/firefox/Connection.ts b/src/firefox/Connection.ts index 447c6a5243..46d1e43def 100644 --- a/src/firefox/Connection.ts +++ b/src/firefox/Connection.ts @@ -18,7 +18,7 @@ import {assert} from '../helper'; import {EventEmitter} from 'events'; import * as debug from 'debug'; -import { ConnectionTransport } from '../transport'; +import { ConnectionTransport, SlowMoTransport } from '../transport'; import { Protocol } from './protocol'; const debugProtocol = debug('playwright:protocol'); @@ -30,18 +30,17 @@ export class Connection extends EventEmitter { private _url: string; private _lastId: number; private _callbacks: Map; - private _delay: number; private _transport: ConnectionTransport; private _sessions: Map; _closed: boolean; + constructor(url: string, transport: ConnectionTransport, delay: number | undefined = 0) { super(); this._url = url; this._lastId = 0; this._callbacks = new Map(); - this._delay = delay; - this._transport = transport; + this._transport = SlowMoTransport.wrap(transport, delay); this._transport.onmessage = this._onMessage.bind(this); this._transport.onclose = this._onClose.bind(this); this._sessions = new Map(); @@ -76,8 +75,6 @@ export class Connection extends EventEmitter { } async _onMessage(message: string) { - if (this._delay) - await new Promise(f => setTimeout(f, this._delay)); debugProtocol('◀ RECV ' + message); const object = JSON.parse(message); if (object.method === 'Target.attachedToTarget') { @@ -106,7 +103,7 @@ export class Connection extends EventEmitter { callback.resolve(object.result); } } else { - this.emit(object.method, object.params); + Promise.resolve().then(() => this.emit(object.method, object.params)); } } @@ -122,7 +119,7 @@ export class Connection extends EventEmitter { for (const session of this._sessions.values()) session._onClosed(); this._sessions.clear(); - this.emit(ConnectionEvents.Disconnected); + Promise.resolve().then(() => this.emit(ConnectionEvents.Disconnected)); } dispose() { @@ -181,7 +178,7 @@ export class JugglerSession extends EventEmitter { callback.resolve(object.result); } else { assert(!object.id); - this.emit(object.method, object.params); + Promise.resolve().then(() => this.emit(object.method, object.params)); } } @@ -196,7 +193,7 @@ export class JugglerSession extends EventEmitter { callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): Target closed.`)); this._callbacks.clear(); this._connection = null; - this.emit(JugglerSessionEvents.Disconnected); + Promise.resolve().then(() => this.emit(JugglerSessionEvents.Disconnected)); } } diff --git a/src/firefox/Launcher.ts b/src/firefox/Launcher.ts index 7088154aba..71bd7b49ce 100644 --- a/src/firefox/Launcher.ts +++ b/src/firefox/Launcher.ts @@ -17,14 +17,14 @@ import * as os from 'os'; import * as path from 'path'; -import {Connection} from './Connection'; -import {Browser} from './Browser'; -import {BrowserFetcher, BrowserFetcherOptions} from '../browserFetcher'; +import { Connection } from './Connection'; +import { Browser } from './Browser'; +import { BrowserFetcher, BrowserFetcherOptions } from '../browserFetcher'; import * as fs from 'fs'; import * as util from 'util'; -import {debugError, assert} from '../helper'; -import {TimeoutError} from '../errors'; -import {WebSocketTransport} from './WebSocketTransport'; +import { debugError, assert } from '../helper'; +import { TimeoutError } from '../errors'; +import { WebSocketTransport } from '../transport'; import { launchProcess, waitForLine } from '../processLauncher'; const mkdtempAsync = util.promisify(fs.mkdtemp); diff --git a/src/firefox/WebSocketTransport.ts b/src/firefox/WebSocketTransport.ts deleted file mode 100644 index 8846b7bf81..0000000000 --- a/src/firefox/WebSocketTransport.ts +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Copyright 2018 Google Inc. All rights reserved. - * Modifications copyright (c) Microsoft Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import { ConnectionTransport } from '../transport'; -import * as WebSocket from 'ws'; - -export class WebSocketTransport implements ConnectionTransport { - _ws: WebSocket; - _dispatchQueue: DispatchQueue; - onclose?: () => void; - onmessage?: (message: string) => void; - static create(url: string): Promise { - return new Promise((resolve, reject) => { - const ws = new WebSocket(url, [], { perMessageDeflate: false }); - ws.addEventListener('open', () => resolve(new WebSocketTransport(ws))); - ws.addEventListener('error', reject); - }); - } - - constructor(ws: WebSocket) { - this._ws = ws; - this._dispatchQueue = new DispatchQueue(this); - this._ws.addEventListener('message', event => { - this._dispatchQueue.enqueue(event.data); - }); - this._ws.addEventListener('close', event => { - if (this.onclose) - this.onclose.call(null); - }); - // Silently ignore all errors - we don't know what to do with them. - this._ws.addEventListener('error', () => {}); - } - - send(message: string) { - this._ws.send(message); - } - - close() { - this._ws.close(); - } -} - -// We want to dispatch all "message" events in separate tasks -// to make sure all message-related promises are resolved first -// before dispatching next message. -// -// We cannot just use setTimeout() in Node.js here like we would -// do in Browser - see https://github.com/nodejs/node/issues/23773 -// Thus implement a dispatch queue that enforces new tasks manually. -class DispatchQueue { - _transport: ConnectionTransport; - _timeoutId: NodeJS.Timer = null; - _queue: string[] = []; - constructor(transport : ConnectionTransport) { - this._transport = transport; - this._dispatch = this._dispatch.bind(this); - } - - enqueue(message: string) { - this._queue.push(message); - if (!this._timeoutId) - this._timeoutId = setTimeout(this._dispatch, 0); - } - - _dispatch() { - const message = this._queue.shift(); - if (this._queue.length) - this._timeoutId = setTimeout(this._dispatch, 0); - else - this._timeoutId = null; - - if (this._transport.onmessage) - this._transport.onmessage.call(null, message); - } -} - diff --git a/src/transport.ts b/src/transport.ts index a3374bf310..884464a82f 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -117,3 +117,72 @@ export class PipeTransport implements ConnectionTransport { helper.removeEventListeners(this._eventListeners); } } + +export class SlowMoTransport { + private readonly _delay: number; + private readonly _delegate: ConnectionTransport; + private _incomingMessageQueue: string[] = []; + private _dispatchTimerId?: NodeJS.Timer; + private _closed = false; + + onmessage?: (message: string) => void; + onclose?: () => void; + + static wrap(transport: ConnectionTransport, delay?: number): ConnectionTransport { + return delay ? new SlowMoTransport(transport, delay) : transport; + } + + constructor(transport: ConnectionTransport, delay: number) { + this._delay = delay; + this._delegate = transport; + this._delegate.onmessage = this._enqueueMessage.bind(this); + this._delegate.onclose = this._onClose.bind(this); + } + + private _enqueueMessage(message: string) { + this._incomingMessageQueue.push(message); + this._scheduleQueueDispatch(); + } + + private _scheduleQueueDispatch() { + if (this._dispatchTimerId) + return; + if (!this._incomingMessageQueue.length) + return; + this._dispatchTimerId = setTimeout(() => { + this._dispatchTimerId = undefined; + this._dispatchOneMessageFromQueue(); + }, this._delay); + } + + private _dispatchOneMessageFromQueue() { + if (this._closed) + return; + const message = this._incomingMessageQueue.shift(); + try { + if (this.onmessage) + this.onmessage(message); + } finally { + this._scheduleQueueDispatch(); + } + } + + private _onClose() { + if (this._closed) + return; + if (this.onclose) + this.onclose(); + this._closed = true; + this._delegate.onmessage = null; + this._delegate.onclose = null; + } + + send(s: string) { + this._delegate.send(s); + } + + close() { + this._closed = true; + this._delegate.close(); + } +} diff --git a/src/webkit/Browser.ts b/src/webkit/Browser.ts index 7d33f74aa3..c4af04008a 100644 --- a/src/webkit/Browser.ts +++ b/src/webkit/Browser.ts @@ -58,8 +58,8 @@ export class Browser extends EventEmitter implements BrowserInterface { this._eventListeners = [ helper.addEventListener(this._connection, ConnectionEvents.TargetCreated, this._onTargetCreated.bind(this)), - helper.addEventListener(this._connection, 'Target.targetDestroyed', this._onTargetDestroyed.bind(this)), - helper.addEventListener(this._connection, 'Target.didCommitProvisionalTarget', this._onProvisionalTargetCommitted.bind(this)), + helper.addEventListener(this._connection, ConnectionEvents.TargetDestroyed, this._onTargetDestroyed.bind(this)), + helper.addEventListener(this._connection, ConnectionEvents.DidCommitProvisionalTarget, this._onProvisionalTargetCommitted.bind(this)), ]; // Intercept provisional targets during cross-process navigation. @@ -142,7 +142,7 @@ export class Browser extends EventEmitter implements BrowserInterface { return contextPages.reduce((acc, x) => acc.concat(x), []); } - async _onTargetCreated(session: TargetSession, targetInfo: Protocol.Target.TargetInfo) { + _onTargetCreated(session: TargetSession, targetInfo: Protocol.Target.TargetInfo) { let context = null; if (targetInfo.browserContextId) { // FIXME: we don't know about the default context id, so assume that all targets from @@ -170,9 +170,10 @@ export class Browser extends EventEmitter implements BrowserInterface { const openerPage = opener._page; if (!openerPage || !openerPage.listenerCount(Events.Page.Popup)) return; - const page = await target.page(); - openerPage.emit(Events.Page.Popup, page); + target.page().then(page => openerPage.emit(Events.Page.Popup, page)); } + if (targetInfo.isPaused) + this._connection.send('Target.resume', { targetId: targetInfo.targetId }).catch(debugError); } _onTargetDestroyed({targetId}) { diff --git a/src/webkit/Connection.ts b/src/webkit/Connection.ts index e946aa5400..2211602462 100644 --- a/src/webkit/Connection.ts +++ b/src/webkit/Connection.ts @@ -15,37 +15,33 @@ * limitations under the License. */ -import {assert, debugError} from '../helper'; +import { assert } from '../helper'; import * as debug from 'debug'; import { EventEmitter } from 'events'; -import { ConnectionTransport } from '../transport'; +import { ConnectionTransport, SlowMoTransport } from '../transport'; import { Protocol } from './protocol'; const debugProtocol = debug('playwright:protocol'); const debugWrappedMessage = require('debug')('wrapped'); export const ConnectionEvents = { - TargetCreated: Symbol('ConnectionEvents.TargetCreated') + TargetCreated: Symbol('ConnectionEvents.TargetCreated'), + TargetDestroyed: Symbol('Connection.TargetDestroyed'), + DidCommitProvisionalTarget: Symbol('Connection.DidCommitProvisionalTarget') }; export class Connection extends EventEmitter { _lastId = 0; private readonly _callbacks = new Map void, reject: (e: Error) => void, error: Error, method: string}>(); - private readonly _delay: number; private readonly _transport: ConnectionTransport; private readonly _sessions = new Map(); - private _incomingMessageQueue: string[] = []; - private _dispatchTimerId?: NodeJS.Timer; - private _sameDispatchTask: boolean = false; _closed = false; constructor(transport: ConnectionTransport, delay: number | undefined = 0) { super(); - this._delay = delay; - - this._transport = transport; - this._transport.onmessage = this._onMessage.bind(this); + this._transport = SlowMoTransport.wrap(transport, delay); + this._transport.onmessage = this._dispatchMessage.bind(this); this._transport.onclose = this._onClose.bind(this); } @@ -71,52 +67,6 @@ export class Connection extends EventEmitter { return id; } - private _onMessage(message: string) { - if (this._sameDispatchTask || this._incomingMessageQueue.length || this._delay) { - this._enqueueMessage(message); - } else { - this._sameDispatchTask = true; - // This is for the case when several messages come in a batch and read - // in a loop by transport ending up in the same task. - Promise.resolve().then(() => this._sameDispatchTask = false); - this._dispatchMessage(message); - } - } - - private _enqueueMessage(message: string) { - this._incomingMessageQueue.push(message); - this._scheduleQueueDispatch(); - } - - private _enqueueProvisionalMessages(messages: string[]) { - // Insert provisional messages at the point of "Target.didCommitProvisionalTarget" message. - this._incomingMessageQueue = messages.concat(this._incomingMessageQueue); - this._scheduleQueueDispatch(); - } - - private _scheduleQueueDispatch() { - if (this._dispatchTimerId) - return; - if (!this._incomingMessageQueue.length) - return; - const delay = this._delay || 0; - this._dispatchTimerId = setTimeout(() => { - this._dispatchTimerId = undefined; - this._dispatchOneMessageFromQueue(); - }, delay); - } - - private _dispatchOneMessageFromQueue() { - if (this._closed) - return; - const message = this._incomingMessageQueue.shift(); - try { - this._dispatchMessage(message); - } finally { - this._scheduleQueueDispatch(); - } - } - private _dispatchMessage(message: string) { debugProtocol('◀ RECV ' + message); const object = JSON.parse(message); @@ -134,7 +84,7 @@ export class Connection extends EventEmitter { assert(this._closed, 'Received response for unknown callback: ' + object.id); } } else { - this.emit(object.method, object.params); + Promise.resolve().then(() => this.emit(object.method, object.params)); } } @@ -143,26 +93,26 @@ export class Connection extends EventEmitter { const targetInfo = object.params.targetInfo as Protocol.Target.TargetInfo; const session = new TargetSession(this, targetInfo); this._sessions.set(session._sessionId, session); - this.emit(ConnectionEvents.TargetCreated, session, object.params.targetInfo); - if (targetInfo.isPaused) - this.send('Target.resume', { targetId: targetInfo.targetId }).catch(debugError); + Promise.resolve().then(() => this.emit(ConnectionEvents.TargetCreated, session, object.params.targetInfo)); } else if (object.method === 'Target.targetDestroyed') { const session = this._sessions.get(object.params.targetId); if (session) { session._onClosed(); this._sessions.delete(object.params.targetId); } + Promise.resolve().then(() => this.emit(ConnectionEvents.TargetDestroyed, { targetId: object.params.targetId })); } else if (object.method === 'Target.dispatchMessageFromTarget') { const {targetId, message} = object.params as Protocol.Target.dispatchMessageFromTargetPayload; const session = this._sessions.get(targetId); if (!session) throw new Error('Unknown target: ' + targetId); if (session.isProvisional()) - session._addProvisionalMessage(wrappedMessage); + session._addProvisionalMessage(message); else session._dispatchMessageFromTarget(message); } else if (object.method === 'Target.didCommitProvisionalTarget') { const {oldTargetId, newTargetId} = object.params as Protocol.Target.didCommitProvisionalTargetPayload; + Promise.resolve().then(() => this.emit(ConnectionEvents.DidCommitProvisionalTarget, { oldTargetId, newTargetId })); const newSession = this._sessions.get(newTargetId); if (!newSession) throw new Error('Unknown new target: ' + newTargetId); @@ -170,7 +120,8 @@ export class Connection extends EventEmitter { if (!oldSession) throw new Error('Unknown old target: ' + oldTargetId); oldSession._swappedOut = true; - this._enqueueProvisionalMessages(newSession._takeProvisionalMessagesAndCommit()); + for (const message of newSession._takeProvisionalMessagesAndCommit()) + newSession._dispatchMessageFromTarget(message); } } @@ -278,7 +229,7 @@ export class TargetSession extends EventEmitter { callback.resolve(object.result); } else { assert(!object.id); - this.emit(object.method, object.params); + Promise.resolve().then(() => this.emit(object.method, object.params)); } } @@ -292,7 +243,7 @@ export class TargetSession extends EventEmitter { } this._callbacks.clear(); this._connection = null; - this.emit(TargetSessionEvents.Disconnected); + Promise.resolve().then(() => this.emit(TargetSessionEvents.Disconnected)); } }