Get a better understanding of the RxJS Observable by implementing one that's similar from the ground up.
class SafeObserver {
constructor(destination) {
this.destination = destination;
} next(value) {
const destination = this.destination;
if (destination.next && !this.isUnsubscribed) {
destination.next && destination.next(value);
}
} error(err) {
const destination = this.destination;
if (!this.isUnsubscribed) {
if (destination.error) {
destination.error(error);
}
this.unsubscribe();
}
} complete() {
const destination = this.destination;
if (!this.isUnsubscribed) {
if (destination.complete) {
destination.complete();
}
this.unsubscribe();
}
} unsubscribe() {
this.isUnsubscribed = true;
if (this._unsubscribe) {
this._unsubscribe();
}
}
} class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
} subscribe(observer) {
const safeObserver = new SafeObserver(observer);
safeObserver._unsubscribe = this._subscribe(safeObserver);
return () => safeObserver.unsubscribe();
}
} const myObservable = new Observable((observer) => {
let i = 0;
const id = setInterval(() => {
if (i < 10) {
observer.next(i++);
} else {
observer.complete();
}
}, 100); return () => {
console.log('unsubbed');
clearInterval(id);
};
}); const observer = {
next(value) { console.log('next -> ' + value); },
error(err) { },
complete() { console.log('complete'); }
}; const foo = myObservable.subscribe(observer); foo.unsubscribe();