Skip to content
Snippets Groups Projects
Commit 00e67807 authored by François Grand's avatar François Grand
Browse files

feat: add bidirectionnal subject (à la RxJS)

refs #604
parent 904d24be
No related branches found
No related tags found
2 merge requests!225Release v4.17.0,!206Resolve "PWA: l'application ne se met pas à jour"
import { Observer, Subject, firstValueFrom, lastValueFrom } from "rxjs";
/**
* bi-directional subject (see RxJS Subject)
* Allows two objects to exchange messages in both directions. Each object has to choose a posting channel
* (messages will receive from the other one).
*
* source1 ----post-----> | channel 0 | --subscribe--> source2
* <--subscribe-- | channel 1 | <-----post----
*
* EventEmitter is not used since it is reserved to properties in Angular component with @Output annotation
*/
export class BidirectionalSubject<T> {
// communication channels
private _channel0: Subject<T>;
private _channel1: Subject<T>;
// array of "who chose which posting channel"
private _channel0Posters: any[] = [];
private _channel1Posters: any[] = [];
constructor() {
this._channel0 = new Subject();
this._channel1 = new Subject();
}
/**
* get posting channel index
* @param source object that chose one of the channels
*/
private getPostingChannelIndex(source: any) {
if (this._channel0Posters.indexOf(source) !== -1) {
return 0;
}
if (this._channel1Posters.indexOf(source) !== -1) {
return 1;
}
return -1;
}
/**
* choose a posting channel
* @param source object that chooses the channel
* @param chan channel number
*/
public selectPostingChannel(source: any, chan: number) {
switch (chan) {
case 0:
if (this.getPostingChannelIndex(source) !== -1) {
throw new Error("object already has a selected channel");
}
this._channel0Posters.push(source);
break;
case 1:
if (this.getPostingChannelIndex(source) !== -1) {
throw new Error("object already has a selected channel");
}
this._channel1Posters.push(source);
break;
default:
throw new Error(`invalid channel number ${chan}`);
}
}
/**
* remove a source from its channel
*/
public unselectPostingChannel(source: any) {
this._channel0Posters = this._channel0Posters.filter(o => o != source);
this._channel1Posters = this._channel1Posters.filter(o => o != source);
}
/**
* used by a source to post a message to communication channel
*/
public post(source: any, msg: T) {
switch (this.getPostingChannelIndex(source)) {
case 0:
this._channel0.next(msg);
break;
case 1:
this._channel1.next(msg);
break;
case -1:
throw new Error("must select a channel first");
}
}
/**
* create a Promise representing a received message (when posted by another source)
* @param source object that will use the Promise
*/
public getReceivePromise(source: any): Promise<T> {
switch (this.getPostingChannelIndex(source)) {
case 0:
return firstValueFrom(this._channel1);
case 1:
return firstValueFrom(this._channel0);
case -1:
throw new Error("must select a channel first");
}
}
/**
* Add a message handler (provided by source) to process received messages
* (alternative to getReceivePromise())
* @param source object providing handler
* @param handler message processing function
*/
public addHandler(source: any, handler: Observer<T>) {
switch (this.getPostingChannelIndex(source)) {
case 0:
this._channel1.subscribe(handler);
break;
case 1:
this._channel0.subscribe(handler);
break;
case -1:
throw new Error("must select a channel first");
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment