chore: unify transports, serialize messages into tasks

This commit is contained in:
Pavel Feldman 2019-12-12 22:51:06 -08:00
parent bae8cd3fae
commit 4fb707b558
5 changed files with 103 additions and 159 deletions

View file

@ -17,7 +17,7 @@
import * as debug from 'debug';
import { EventEmitter } from 'events';
import { ConnectionTransport } from '../transport';
import { ConnectionTransport, SerializingTransport } from '../transport';
import { assert } from '../helper';
import { Protocol } from './protocol';
@ -30,7 +30,6 @@ export const ConnectionEvents = {
export class Connection extends EventEmitter {
private _url: string;
private _lastId = 0;
private _delay: number;
private _transport: ConnectionTransport;
private _sessions = new Map<string, CDPSession>();
readonly rootSession: CDPSession;
@ -39,9 +38,8 @@ export class Connection extends EventEmitter {
constructor(url: string, transport: ConnectionTransport, delay: number | undefined = 0) {
super();
this._url = url;
this._delay = delay;
this._transport = transport;
this._transport = new SerializingTransport(transport, delay);
this._transport.onmessage = this._onMessage.bind(this);
this._transport.onclose = this._onClose.bind(this);
this.rootSession = new CDPSession(this, 'browser', '');
@ -72,8 +70,6 @@ export class Connection extends EventEmitter {
}
async _onMessage(message: string) {
if (this._delay)
await new Promise(f => setTimeout(f, this._delay));
debugProtocol('◀ RECV ' + message);
const object = JSON.parse(message);
if (object.method === 'Target.attachedToTarget') {

View file

@ -17,14 +17,14 @@
import * as os from 'os';
import * as path from 'path';
import {Connection} from './Connection';
import {Browser} from './Browser';
import {BrowserFetcher, BrowserFetcherOptions} from '../browserFetcher';
import { Connection } from './Connection';
import { Browser } from './Browser';
import { BrowserFetcher, BrowserFetcherOptions } from '../browserFetcher';
import * as fs from 'fs';
import * as util from 'util';
import {debugError, assert} from '../helper';
import {TimeoutError} from '../errors';
import {WebSocketTransport} from './WebSocketTransport';
import { debugError, assert } from '../helper';
import { TimeoutError } from '../errors';
import { SerializingTransport, WebSocketTransport } from '../transport';
import { launchProcess, waitForLine } from '../processLauncher';
const mkdtempAsync = util.promisify(fs.mkdtemp);
@ -129,7 +129,7 @@ export class Launcher {
const timeoutError = new TimeoutError(`Timed out after ${timeout} ms while trying to connect to Firefox!`);
const match = await waitForLine(launched.process, launched.process.stdout, /^Juggler listening on (ws:\/\/.*)$/, timeout, timeoutError);
const url = match[1];
const transport = await WebSocketTransport.create(url);
const transport = new SerializingTransport(await WebSocketTransport.create(url));
connection = new Connection(url, transport, slowMo);
const browser = await Browser.create(connection, defaultViewport, launched.process, launched.gracefullyClose);
if (ignoreHTTPSErrors)

View file

@ -1,89 +0,0 @@
/**
* Copyright 2018 Google Inc. All rights reserved.
* Modifications copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ConnectionTransport } from '../transport';
import * as WebSocket from 'ws';
export class WebSocketTransport implements ConnectionTransport {
_ws: WebSocket;
_dispatchQueue: DispatchQueue;
onclose?: () => void;
onmessage?: (message: string) => void;
static create(url: string): Promise<WebSocketTransport> {
return new Promise((resolve, reject) => {
const ws = new WebSocket(url, [], { perMessageDeflate: false });
ws.addEventListener('open', () => resolve(new WebSocketTransport(ws)));
ws.addEventListener('error', reject);
});
}
constructor(ws: WebSocket) {
this._ws = ws;
this._dispatchQueue = new DispatchQueue(this);
this._ws.addEventListener('message', event => {
this._dispatchQueue.enqueue(event.data);
});
this._ws.addEventListener('close', event => {
if (this.onclose)
this.onclose.call(null);
});
// Silently ignore all errors - we don't know what to do with them.
this._ws.addEventListener('error', () => {});
}
send(message: string) {
this._ws.send(message);
}
close() {
this._ws.close();
}
}
// We want to dispatch all "message" events in separate tasks
// to make sure all message-related promises are resolved first
// before dispatching next message.
//
// We cannot just use setTimeout() in Node.js here like we would
// do in Browser - see https://github.com/nodejs/node/issues/23773
// Thus implement a dispatch queue that enforces new tasks manually.
class DispatchQueue {
_transport: ConnectionTransport;
_timeoutId: NodeJS.Timer = null;
_queue: string[] = [];
constructor(transport : ConnectionTransport) {
this._transport = transport;
this._dispatch = this._dispatch.bind(this);
}
enqueue(message: string) {
this._queue.push(message);
if (!this._timeoutId)
this._timeoutId = setTimeout(this._dispatch, 0);
}
_dispatch() {
const message = this._queue.shift();
if (this._queue.length)
this._timeoutId = setTimeout(this._dispatch, 0);
else
this._timeoutId = null;
if (this._transport.onmessage)
this._transport.onmessage.call(null, message);
}
}

View file

@ -117,3 +117,92 @@ export class PipeTransport implements ConnectionTransport {
helper.removeEventListeners(this._eventListeners);
}
}
export class SerializingTransport {
private readonly _delay: number;
private readonly _delegate: ConnectionTransport;
private _incomingMessageQueue: string[] = [];
private _dispatchTimerId?: NodeJS.Timer;
private _sameDispatchTask: boolean = false;
private _closed = false;
onmessage?: (message: string) => void;
onclose?: () => void;
constructor(transport: ConnectionTransport, delay: number | undefined = 0) {
this._delay = delay;
this._delegate = transport;
this._delegate.onmessage = this._onMessage.bind(this);
this._delegate.onclose = this._onClose.bind(this);
}
private _onMessage(message: string) {
if (this._sameDispatchTask || this._incomingMessageQueue.length || this._delay) {
this._enqueueMessage(message);
} else {
this._sameDispatchTask = true;
// This is for the case when several messages come in a batch and read
// in a loop by transport ending up in the same task.
Promise.resolve().then(() => this._sameDispatchTask = false);
this._dispatchMessage(message);
}
}
private _enqueueMessage(message: string) {
this._incomingMessageQueue.push(message);
this._scheduleQueueDispatch();
}
injectMessagesInPlace(messages: string[]) {
// Insert provisional messages at the point of "Target.didCommitProvisionalTarget" message.
this._incomingMessageQueue = messages.concat(this._incomingMessageQueue);
this._scheduleQueueDispatch();
}
private _scheduleQueueDispatch() {
if (this._dispatchTimerId)
return;
if (!this._incomingMessageQueue.length)
return;
const delay = this._delay || 0;
this._dispatchTimerId = setTimeout(() => {
this._dispatchTimerId = undefined;
this._dispatchOneMessageFromQueue();
}, delay);
}
private _dispatchOneMessageFromQueue() {
if (this._closed)
return;
const message = this._incomingMessageQueue.shift();
try {
this._dispatchMessage(message);
} finally {
this._scheduleQueueDispatch();
}
}
private _dispatchMessage(message: string) {
if (this.onmessage)
this.onmessage(message);
}
private _onClose() {
if (this._closed)
return;
if (this.onclose)
this.onclose();
this._closed = true;
this._delegate.onmessage = null;
this._delegate.onclose = null;
}
send(s: string) {
this._delegate.send(s);
}
close() {
this._closed = true;
this._delegate.close();
}
}

View file

@ -18,7 +18,7 @@
import {assert, debugError} from '../helper';
import * as debug from 'debug';
import { EventEmitter } from 'events';
import { ConnectionTransport } from '../transport';
import { ConnectionTransport, SerializingTransport } from '../transport';
import { Protocol } from './protocol';
const debugProtocol = debug('playwright:protocol');
@ -31,21 +31,15 @@ export const ConnectionEvents = {
export class Connection extends EventEmitter {
_lastId = 0;
private readonly _callbacks = new Map<number, {resolve:(o: any) => void, reject: (e: Error) => void, error: Error, method: string}>();
private readonly _delay: number;
private readonly _transport: ConnectionTransport;
private readonly _transport: SerializingTransport;
private readonly _sessions = new Map<string, TargetSession>();
private _incomingMessageQueue: string[] = [];
private _dispatchTimerId?: NodeJS.Timer;
private _sameDispatchTask: boolean = false;
_closed = false;
constructor(transport: ConnectionTransport, delay: number | undefined = 0) {
super();
this._delay = delay;
this._transport = transport;
this._transport.onmessage = this._onMessage.bind(this);
this._transport = new SerializingTransport(transport);
this._transport.onmessage = this._dispatchMessage.bind(this);
this._transport.onclose = this._onClose.bind(this);
}
@ -71,52 +65,6 @@ export class Connection extends EventEmitter {
return id;
}
private _onMessage(message: string) {
if (this._sameDispatchTask || this._incomingMessageQueue.length || this._delay) {
this._enqueueMessage(message);
} else {
this._sameDispatchTask = true;
// This is for the case when several messages come in a batch and read
// in a loop by transport ending up in the same task.
Promise.resolve().then(() => this._sameDispatchTask = false);
this._dispatchMessage(message);
}
}
private _enqueueMessage(message: string) {
this._incomingMessageQueue.push(message);
this._scheduleQueueDispatch();
}
private _enqueueProvisionalMessages(messages: string[]) {
// Insert provisional messages at the point of "Target.didCommitProvisionalTarget" message.
this._incomingMessageQueue = messages.concat(this._incomingMessageQueue);
this._scheduleQueueDispatch();
}
private _scheduleQueueDispatch() {
if (this._dispatchTimerId)
return;
if (!this._incomingMessageQueue.length)
return;
const delay = this._delay || 0;
this._dispatchTimerId = setTimeout(() => {
this._dispatchTimerId = undefined;
this._dispatchOneMessageFromQueue();
}, delay);
}
private _dispatchOneMessageFromQueue() {
if (this._closed)
return;
const message = this._incomingMessageQueue.shift();
try {
this._dispatchMessage(message);
} finally {
this._scheduleQueueDispatch();
}
}
private _dispatchMessage(message: string) {
debugProtocol('◀ RECV ' + message);
const object = JSON.parse(message);
@ -170,7 +118,7 @@ export class Connection extends EventEmitter {
if (!oldSession)
throw new Error('Unknown old target: ' + oldTargetId);
oldSession._swappedOut = true;
this._enqueueProvisionalMessages(newSession._takeProvisionalMessagesAndCommit());
this._transport.injectMessagesInPlace(newSession._takeProvisionalMessagesAndCommit());
}
}