Observable Pattern
Simple Observable implementation with operators
Code
type Observer<T> = {
next: (value: T) => void;
error?: (error: any) => void;
complete?: () => void;
};
type Unsubscribe = () => void;
type Operator<T, R> = (source: Observable<T>) => Observable<R>;
class Observable<T> {
private subscribers: Observer<T>[] = [];
private isCompleted = false;
constructor(private executor: (observer: Observer<T>) => Unsubscribe | void) {}
// Subscribe to observable
subscribe(observer: Observer<T> | ((value: T) => void)): Unsubscribe {
const normalizedObserver: Observer<T> =
typeof observer === 'function'
? { next: observer }
: observer;
this.subscribers.push(normalizedObserver);
// Execute the observable
const unsubscribe = this.executor({
next: (value) => {
__TOKEN_32__ (this.isCompleted) return;
normalizedObserver.next(value);
},
error: (error) => {
__TOKEN_35__ (this.isCompleted) return;
normalizedObserver.error?.(error);
this.complete();
},
complete: () => {
__TOKEN_39__ (this.isCompleted) return;
normalizedObserver.complete?.();
this.complete();
}
});
__TOKEN_43__ () => {
const index = this.subscribers.indexOf(normalizedObserver);
__TOKEN_46__ (index > -1) {
this.subscribers.splice(index, 1);
}
__TOKEN_48__ (typeof unsubscribe === 'function') {
unsubscribe();
}
};
}
// Pipe operators
pipe<R>(operator: Operator<T, R>): Observable<R> {
return operator(this);
}
// Helper method to emit values
private emit(value: T): void {
__TOKEN_53__ (this.isCompleted) return;
this.subscribers.forEach(subscriber => subscriber.next(value));
}
private error(err: any): void {
__TOKEN_58__ (this.isCompleted) return;
this.subscribers.forEach(subscriber => subscriber.error?.(err));
this.complete();
}
private complete(): void {
__TOKEN_64__ (this.isCompleted) return;
this.isCompleted = true;
this.subscribers.forEach(subscriber => subscriber.complete?.());
this.subscribers = [];
}
// Static creation methods
static of<T>(...values: T[]): Observable<T> {
return new Observable(observer => {
values.forEach(value => observer.next(value));
observer.complete?.();
});
}
static from<T>(iterable: Iterable<T> | ArrayLike<T>): Observable<T> {
return new Observable(observer => {
try {
__TOKEN_79__ (const value of iterable as any) {
observer.next(value);
}
observer.complete?.();
} __TOKEN_82__ (error) {
observer.error?.(error);
}
});
}
static interval(period: number): Observable<number> {
return new Observable(observer => {
let count = 0;
const intervalId = setInterval(() => {
observer.next(count++);
}, period);
__TOKEN_88__ () => clearInterval(intervalId);
});
}
static fromEvent<T extends Event>(element: EventTarget, eventName: string): Observable<T> {
return new Observable(observer => {
const handler = (event: Event) => observer.next(event as T);
element.addEventListener(eventName, handler);
__TOKEN_94__ () => element.removeEventListener(eventName, handler);
});
}
}
// Operators
function map<T, R>(transform: (value: T) => R): Operator<T, R> {
__TOKEN_96__ (source: Observable<T>) =>
new Observable<R>(observer => {
return source.subscribe({
next: value => observer.next(transform(value)),
error: err => observer.error?.(err),
complete: () => observer.complete?.()
});
});
}
function filter<T>(predicate: (value: T) => boolean): Operator<T, T> {
__TOKEN_100__ (source: Observable<T>) =>
new Observable<T>(observer => {
return source.subscribe({
next: value => {
__TOKEN_103__ (predicate(value)) {
observer.next(value);
}
},
error: err => observer.error?.(err),
complete: () => observer.complete?.()
});
});
}
function debounceTime<T>(delay: number): Operator<T, T> {
__TOKEN_105__ (source: Observable<T>) =>
new Observable<T>(observer => {
let timeoutId: NodeJS.Timeout;
const subscription = source.subscribe({
next: value => {
clearTimeout(timeoutId);
timeoutId = setTimeout(() => observer.next(value), delay);
},
error: err => observer.error?.(err),
complete: () => {
clearTimeout(timeoutId);
observer.complete?.();
}
});
__TOKEN_109__ () => {
clearTimeout(timeoutId);
subscription();
};
});
}
function take<T>(count: number): Operator<T, T> {
__TOKEN_111__ (source: Observable<T>) =>
new Observable<T>(observer => {
let taken = 0;
const subscription = source.subscribe({
next: value => {
__TOKEN_115__ (taken++ < count) {
observer.next(value);
__TOKEN_116__ (taken === count) {
observer.complete?.();
subscription();
}
}
},
error: err => observer.error?.(err),
complete: () => observer.complete?.()
});
return subscription;
});
}
// Usage examples
// Create observables
const numbers$ = Observable.__TOKEN_119__(1, 2, 3, 4, 5);
const interval$ = Observable.interval(1000);
// Subscribe
const unsubscribe = numbers$.subscribe({
next: value => console.log('Number:', value),
complete: () => console.log('Complete!')
});
// Using operators
const processed$ = numbers$.pipe(
filter(x => x % 2 === 0),
map(x => x * 10),
take(2)
);
processed$.subscribe(value => console.log('Processed:', value));
// From DOM events
const button = document.querySelector('button');
__TOKEN_124__ (button) {
const clicks$ = Observable.fromEvent<MouseEvent>(button, 'click');
clicks$
.pipe(
debounceTime(300),
map(event => ({ x: event.clientX, y: event.clientY }))
)
.subscribe(coords => {
console.log('Clicked at:', coords);
});
}
// Custom observable
const custom$ = new Observable<number>(observer => {
let count = 0;
const interval = setInterval(() => {
observer.next(count++);
__TOKEN_130__ (count === 5) {
observer.complete?.();
}
}, 500);
__TOKEN_131__ () => clearInterval(interval);
});