diff --git a/src/app/util/bidir_subject.ts b/src/app/util/bidir_subject.ts new file mode 100644 index 0000000000000000000000000000000000000000..069e004e83e7040dff2b273fad42ba9aef3ddd0c --- /dev/null +++ b/src/app/util/bidir_subject.ts @@ -0,0 +1,131 @@ +import { Observer, Subject, firstValueFrom, lastValueFrom } from "rxjs"; + +/** + * bi-directional subject (see RxJS Subject) + * Allows two objects to exchange messages in both directions. Each object has to choose a posting channel + * (messages will receive from the other one). + * + * source1 ----post-----> | channel 0 | --subscribe--> source2 + * <--subscribe-- | channel 1 | <-----post---- + * + * EventEmitter is not used since it is reserved to properties in Angular component with @Output annotation + */ +export class BidirectionalSubject<T> { + + // communication channels + private _channel0: Subject<T>; + private _channel1: Subject<T>; + + // array of "who chose which posting channel" + private _channel0Posters: any[] = []; + private _channel1Posters: any[] = []; + + constructor() { + this._channel0 = new Subject(); + this._channel1 = new Subject(); + } + + /** + * get posting channel index + * @param source object that chose one of the channels + */ + private getPostingChannelIndex(source: any) { + if (this._channel0Posters.indexOf(source) !== -1) { + return 0; + } + if (this._channel1Posters.indexOf(source) !== -1) { + return 1; + } + return -1; + } + + /** + * choose a posting channel + * @param source object that chooses the channel + * @param chan channel number + */ + public selectPostingChannel(source: any, chan: number) { + switch (chan) { + case 0: + if (this.getPostingChannelIndex(source) !== -1) { + throw new Error("object already has a selected channel"); + } + this._channel0Posters.push(source); + break; + + case 1: + if (this.getPostingChannelIndex(source) !== -1) { + throw new Error("object already has a selected channel"); + } + this._channel1Posters.push(source); + break; + + default: + throw new Error(`invalid channel number ${chan}`); + } + } + + /** + * remove a source from its channel + */ + public unselectPostingChannel(source: any) { + this._channel0Posters = this._channel0Posters.filter(o => o != source); + this._channel1Posters = this._channel1Posters.filter(o => o != source); + } + + /** + * used by a source to post a message to communication channel + */ + public post(source: any, msg: T) { + switch (this.getPostingChannelIndex(source)) { + case 0: + this._channel0.next(msg); + break; + + case 1: + this._channel1.next(msg); + break; + + case -1: + throw new Error("must select a channel first"); + } + } + + /** + * create a Promise representing a received message (when posted by another source) + * @param source object that will use the Promise + */ + public getReceivePromise(source: any): Promise<T> { + switch (this.getPostingChannelIndex(source)) { + case 0: + return firstValueFrom(this._channel1); + + case 1: + return firstValueFrom(this._channel0); + + case -1: + throw new Error("must select a channel first"); + } + } + + /** + * Add a message handler (provided by source) to process received messages + * (alternative to getReceivePromise()) + * @param source object providing handler + * @param handler message processing function + */ + public addHandler(source: any, handler: Observer<T>) { + switch (this.getPostingChannelIndex(source)) { + case 0: + this._channel1.subscribe(handler); + break; + + case 1: + this._channel0.subscribe(handler); + break; + + case -1: + throw new Error("must select a channel first"); + } + } +}