From 00e678074e54cbc3a3e0c5116e95000b30575559 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Grand?= <francois.grand@inrae.fr> Date: Thu, 23 Feb 2023 16:12:04 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20add=20bidirectionnal=20subject=20(?= =?UTF-8?q?=C3=A0=20la=20RxJS)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit refs #604 --- src/app/util/bidir_subject.ts | 131 ++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 src/app/util/bidir_subject.ts diff --git a/src/app/util/bidir_subject.ts b/src/app/util/bidir_subject.ts new file mode 100644 index 000000000..069e004e8 --- /dev/null +++ b/src/app/util/bidir_subject.ts @@ -0,0 +1,131 @@ +import { Observer, Subject, firstValueFrom, lastValueFrom } from "rxjs"; + +/** + * bi-directional subject (see RxJS Subject) + * Allows two objects to exchange messages in both directions. Each object has to choose a posting channel + * (messages will receive from the other one). + * + * source1 ----post-----> | channel 0 | --subscribe--> source2 + * <--subscribe-- | channel 1 | <-----post---- + * + * EventEmitter is not used since it is reserved to properties in Angular component with @Output annotation + */ +export class BidirectionalSubject<T> { + + // communication channels + private _channel0: Subject<T>; + private _channel1: Subject<T>; + + // array of "who chose which posting channel" + private _channel0Posters: any[] = []; + private _channel1Posters: any[] = []; + + constructor() { + this._channel0 = new Subject(); + this._channel1 = new Subject(); + } + + /** + * get posting channel index + * @param source object that chose one of the channels + */ + private getPostingChannelIndex(source: any) { + if (this._channel0Posters.indexOf(source) !== -1) { + return 0; + } + if (this._channel1Posters.indexOf(source) !== -1) { + return 1; + } + return -1; + } + + /** + * choose a posting channel + * @param source object that chooses the channel + * @param chan channel number + */ + public selectPostingChannel(source: any, chan: number) { + switch (chan) { + case 0: + if (this.getPostingChannelIndex(source) !== -1) { + throw new Error("object already has a selected channel"); + } + this._channel0Posters.push(source); + break; + + case 1: + if (this.getPostingChannelIndex(source) !== -1) { + throw new Error("object already has a selected channel"); + } + this._channel1Posters.push(source); + break; + + default: + throw new Error(`invalid channel number ${chan}`); + } + } + + /** + * remove a source from its channel + */ + public unselectPostingChannel(source: any) { + this._channel0Posters = this._channel0Posters.filter(o => o != source); + this._channel1Posters = this._channel1Posters.filter(o => o != source); + } + + /** + * used by a source to post a message to communication channel + */ + public post(source: any, msg: T) { + switch (this.getPostingChannelIndex(source)) { + case 0: + this._channel0.next(msg); + break; + + case 1: + this._channel1.next(msg); + break; + + case -1: + throw new Error("must select a channel first"); + } + } + + /** + * create a Promise representing a received message (when posted by another source) + * @param source object that will use the Promise + */ + public getReceivePromise(source: any): Promise<T> { + switch (this.getPostingChannelIndex(source)) { + case 0: + return firstValueFrom(this._channel1); + + case 1: + return firstValueFrom(this._channel0); + + case -1: + throw new Error("must select a channel first"); + } + } + + /** + * Add a message handler (provided by source) to process received messages + * (alternative to getReceivePromise()) + * @param source object providing handler + * @param handler message processing function + */ + public addHandler(source: any, handler: Observer<T>) { + switch (this.getPostingChannelIndex(source)) { + case 0: + this._channel1.subscribe(handler); + break; + + case 1: + this._channel0.subscribe(handler); + break; + + case -1: + throw new Error("must select a channel first"); + } + } +} -- GitLab