current zone and run in zone

This commit is contained in:
Yury Semikhatsky 2024-11-05 16:56:12 -08:00
parent 8d9b9078b5
commit 62c23ce180
2 changed files with 52 additions and 42 deletions

View file

@ -17,7 +17,8 @@
import type { EventEmitter } from 'events'; import type { EventEmitter } from 'events';
import { rewriteErrorMessage } from '../utils/stackTrace'; import { rewriteErrorMessage } from '../utils/stackTrace';
import { TimeoutError } from './errors'; import { TimeoutError } from './errors';
import { createGuid } from '../utils'; import { createGuid, zones } from '../utils';
import type { Zone } from '../utils';
import type * as channels from '@protocol/channels'; import type * as channels from '@protocol/channels';
import type { ChannelOwner } from './channelOwner'; import type { ChannelOwner } from './channelOwner';
@ -29,10 +30,13 @@ export class Waiter {
private _channelOwner: ChannelOwner<channels.EventTargetChannel>; private _channelOwner: ChannelOwner<channels.EventTargetChannel>;
private _waitId: string; private _waitId: string;
private _error: string | undefined; private _error: string | undefined;
private _savedZone: Zone | undefined;
constructor(channelOwner: ChannelOwner<channels.EventTargetChannel>, event: string) { constructor(channelOwner: ChannelOwner<channels.EventTargetChannel>, event: string) {
this._waitId = createGuid(); this._waitId = createGuid();
this._channelOwner = channelOwner; this._channelOwner = channelOwner;
this._savedZone = zones.currentZone();
this._channelOwner._channel.waitForEventInfo({ info: { waitId: this._waitId, phase: 'before', event } }).catch(() => {}); this._channelOwner._channel.waitForEventInfo({ info: { waitId: this._waitId, phase: 'before', event } }).catch(() => {});
this._dispose = [ this._dispose = [
() => this._channelOwner._wrapApiCall(async () => { () => this._channelOwner._wrapApiCall(async () => {
@ -46,12 +50,12 @@ export class Waiter {
} }
async waitForEvent<T = void>(emitter: EventEmitter, event: string, predicate?: (arg: T) => boolean | Promise<boolean>): Promise<T> { async waitForEvent<T = void>(emitter: EventEmitter, event: string, predicate?: (arg: T) => boolean | Promise<boolean>): Promise<T> {
const { promise, dispose } = waitForEvent(emitter, event, predicate); const { promise, dispose } = waitForEvent(emitter, event, this._savedZone, predicate);
return await this.waitForPromise(promise, dispose); return await this.waitForPromise(promise, dispose);
} }
rejectOnEvent<T = void>(emitter: EventEmitter, event: string, error: Error | (() => Error), predicate?: (arg: T) => boolean | Promise<boolean>) { rejectOnEvent<T = void>(emitter: EventEmitter, event: string, error: Error | (() => Error), predicate?: (arg: T) => boolean | Promise<boolean>) {
const { promise, dispose } = waitForEvent(emitter, event, predicate); const { promise, dispose } = waitForEvent(emitter, event, this._savedZone, predicate);
this._rejectOn(promise.then(() => { throw (typeof error === 'function' ? error() : error); }), dispose); this._rejectOn(promise.then(() => { throw (typeof error === 'function' ? error() : error); }), dispose);
} }
@ -103,10 +107,12 @@ export class Waiter {
} }
} }
function waitForEvent<T = void>(emitter: EventEmitter, event: string, predicate?: (arg: T) => boolean | Promise<boolean>): { promise: Promise<T>, dispose: () => void } { function waitForEvent<T = void>(emitter: EventEmitter, event: string, savedZone: Zone | undefined, predicate?: (arg: T) => boolean | Promise<boolean>): { promise: Promise<T>, dispose: () => void } {
let listener: (eventArg: any) => void; let listener: (eventArg: any) => void;
const promise = new Promise<T>((resolve, reject) => { const promise = new Promise<T>((resolve, reject) => {
listener = async (eventArg: any) => { listener = async (eventArg: any) => {
// Reset apiZone and expectZone, but restore step data.
await zones.runInZone(savedZone?.copyWithoutTypes(['apiZone', 'expectZone']), async () => {
try { try {
if (predicate && !(await predicate(eventArg))) if (predicate && !(await predicate(eventArg)))
return; return;
@ -116,6 +122,7 @@ function waitForEvent<T = void>(emitter: EventEmitter, event: string, predicate?
emitter.removeListener(event, listener); emitter.removeListener(event, listener);
reject(e); reject(e);
} }
});
}; };
emitter.addListener(event, listener); emitter.addListener(event, listener);
}); });

View file

@ -19,49 +19,52 @@ import { AsyncLocalStorage } from 'async_hooks';
export type ZoneType = 'apiZone' | 'expectZone' | 'stepZone'; export type ZoneType = 'apiZone' | 'expectZone' | 'stepZone';
class ZoneManager { class ZoneManager {
private readonly _asyncLocalStorage = new AsyncLocalStorage<Zone<unknown>|undefined>(); private readonly _asyncLocalStorage = new AsyncLocalStorage<Zone|undefined>();
run<T, R>(type: ZoneType, data: T, func: () => R): R { run<T, R>(type: ZoneType, data: T, func: () => R): R {
const previous = this._asyncLocalStorage.getStore(); const current = this._asyncLocalStorage.getStore();
const zone = new Zone(previous, type, data); const zone = Zone.createWithData(current, type, data);
return this.runInZone(zone, func);
}
runInZone<R>(zone: Zone | undefined, func: () => R): R {
return this._asyncLocalStorage.run(zone, func); return this._asyncLocalStorage.run(zone, func);
} }
zoneData<T>(type: ZoneType): T | undefined { zoneData<T>(type: ZoneType): T | undefined {
for (let zone = this._asyncLocalStorage.getStore(); zone; zone = zone.previous) { const zone = this._asyncLocalStorage.getStore();
if (zone.type === type) return zone?.get(type);
return zone.data as T;
} }
return undefined;
currentZone(): Zone | undefined {
return this._asyncLocalStorage.getStore();
} }
exitZones<R>(func: () => R): R { exitZones<R>(func: () => R): R {
return this._asyncLocalStorage.run(undefined, func); return this._asyncLocalStorage.run(undefined, func);
} }
printZones() {
const zones = [];
for (let zone = this._asyncLocalStorage.getStore(); zone; zone = zone.previous) {
let str = zone.type;
if (zone.type === 'apiZone')
str += `(${(zone.data as any).apiName})`;
zones.push(str);
}
// eslint-disable-next-line no-console
console.log('zones: ', zones.join(' -> '));
}
} }
class Zone<T> { export class Zone {
readonly type: ZoneType; private readonly store: Map<ZoneType, unknown>;
readonly data: T;
readonly previous: Zone<unknown> | undefined;
constructor(previous: Zone<unknown> | undefined, type: ZoneType, data: T) { static createWithData(currentZone: Zone | undefined, type: ZoneType, data: unknown) {
this.type = type; const store = new Map(currentZone?.store.entries() ?? []);
this.data = data; store.set(type, data);
this.previous = previous; return new Zone(store);
}
private constructor(store: Map<ZoneType, unknown>) {
this.store = store;
}
copyWithoutTypes(types: ZoneType[]): Zone {
const store = new Map(this.store.entries().filter(([type]) => !types.includes(type)));
return new Zone(store);
}
get<T>(type: ZoneType): T | undefined {
return this.store.get(type) as T | undefined;
} }
} }