TYPESCRIPT

Observable Pattern

Simple Observable implementation with operators

TypeScriptReactiveObservable

Code

type Observer<T> = {
  next: (value: T) => void;
  error?: (error: any) => void;
  complete?: () => void;
};

type Unsubscribe = () => void;

type Operator<T, R> = (source: Observable<T>) => Observable<R>;

class Observable<T> {
  private subscribers: Observer<T>[] = [];
  private isCompleted = false;
  
  constructor(private executor: (observer: Observer<T>) => Unsubscribe | void) {}
  
  // Subscribe to observable
  subscribe(observer: Observer<T> | ((value: T) => void)): Unsubscribe {
    const normalizedObserver: Observer<T> = 
      typeof observer === 'function' 
        ? { next: observer }
        : observer;
    
    this.subscribers.push(normalizedObserver);
    
    // Execute the observable
    const unsubscribe = this.executor({
      next: (value) => {
        __TOKEN_32__ (this.isCompleted) return;
        normalizedObserver.next(value);
      },
      error: (error) => {
        __TOKEN_35__ (this.isCompleted) return;
        normalizedObserver.error?.(error);
        this.complete();
      },
      complete: () => {
        __TOKEN_39__ (this.isCompleted) return;
        normalizedObserver.complete?.();
        this.complete();
      }
    });
    
    __TOKEN_43__ () => {
      const index = this.subscribers.indexOf(normalizedObserver);
      __TOKEN_46__ (index > -1) {
        this.subscribers.splice(index, 1);
      }
      __TOKEN_48__ (typeof unsubscribe === 'function') {
        unsubscribe();
      }
    };
  }
  
  // Pipe operators
  pipe<R>(operator: Operator<T, R>): Observable<R> {
    return operator(this);
  }
  
  // Helper method to emit values
  private emit(value: T): void {
    __TOKEN_53__ (this.isCompleted) return;
    this.subscribers.forEach(subscriber => subscriber.next(value));
  }
  
  private error(err: any): void {
    __TOKEN_58__ (this.isCompleted) return;
    this.subscribers.forEach(subscriber => subscriber.error?.(err));
    this.complete();
  }
  
  private complete(): void {
    __TOKEN_64__ (this.isCompleted) return;
    this.isCompleted = true;
    this.subscribers.forEach(subscriber => subscriber.complete?.());
    this.subscribers = [];
  }
  
  // Static creation methods
  static of<T>(...values: T[]): Observable<T> {
    return new Observable(observer => {
      values.forEach(value => observer.next(value));
      observer.complete?.();
    });
  }
  
  static from<T>(iterable: Iterable<T> | ArrayLike<T>): Observable<T> {
    return new Observable(observer => {
      try {
        __TOKEN_79__ (const value of iterable as any) {
          observer.next(value);
        }
        observer.complete?.();
      } __TOKEN_82__ (error) {
        observer.error?.(error);
      }
    });
  }
  
  static interval(period: number): Observable<number> {
    return new Observable(observer => {
      let count = 0;
      const intervalId = setInterval(() => {
        observer.next(count++);
      }, period);
      
      __TOKEN_88__ () => clearInterval(intervalId);
    });
  }
  
  static fromEvent<T extends Event>(element: EventTarget, eventName: string): Observable<T> {
    return new Observable(observer => {
      const handler = (event: Event) => observer.next(event as T);
      element.addEventListener(eventName, handler);
      
      __TOKEN_94__ () => element.removeEventListener(eventName, handler);
    });
  }
}

// Operators
function map<T, R>(transform: (value: T) => R): Operator<T, R> {
  __TOKEN_96__ (source: Observable<T>) => 
    new Observable<R>(observer => {
      return source.subscribe({
        next: value => observer.next(transform(value)),
        error: err => observer.error?.(err),
        complete: () => observer.complete?.()
      });
    });
}

function filter<T>(predicate: (value: T) => boolean): Operator<T, T> {
  __TOKEN_100__ (source: Observable<T>) => 
    new Observable<T>(observer => {
      return source.subscribe({
        next: value => {
          __TOKEN_103__ (predicate(value)) {
            observer.next(value);
          }
        },
        error: err => observer.error?.(err),
        complete: () => observer.complete?.()
      });
    });
}

function debounceTime<T>(delay: number): Operator<T, T> {
  __TOKEN_105__ (source: Observable<T>) => 
    new Observable<T>(observer => {
      let timeoutId: NodeJS.Timeout;
      
      const subscription = source.subscribe({
        next: value => {
          clearTimeout(timeoutId);
          timeoutId = setTimeout(() => observer.next(value), delay);
        },
        error: err => observer.error?.(err),
        complete: () => {
          clearTimeout(timeoutId);
          observer.complete?.();
        }
      });
      
      __TOKEN_109__ () => {
        clearTimeout(timeoutId);
        subscription();
      };
    });
}

function take<T>(count: number): Operator<T, T> {
  __TOKEN_111__ (source: Observable<T>) => 
    new Observable<T>(observer => {
      let taken = 0;
      
      const subscription = source.subscribe({
        next: value => {
          __TOKEN_115__ (taken++ < count) {
            observer.next(value);
            __TOKEN_116__ (taken === count) {
              observer.complete?.();
              subscription();
            }
          }
        },
        error: err => observer.error?.(err),
        complete: () => observer.complete?.()
      });
      
      return subscription;
    });
}

// Usage examples
// Create observables
const numbers$ = Observable.__TOKEN_119__(1, 2, 3, 4, 5);
const interval$ = Observable.interval(1000);

// Subscribe
const unsubscribe = numbers$.subscribe({
  next: value => console.log('Number:', value),
  complete: () => console.log('Complete!')
});

// Using operators
const processed$ = numbers$.pipe(
  filter(x => x % 2 === 0),
  map(x => x * 10),
  take(2)
);

processed$.subscribe(value => console.log('Processed:', value));

// From DOM events
const button = document.querySelector('button');
__TOKEN_124__ (button) {
  const clicks$ = Observable.fromEvent<MouseEvent>(button, 'click');
  
  clicks$
    .pipe(
      debounceTime(300),
      map(event => ({ x: event.clientX, y: event.clientY }))
    )
    .subscribe(coords => {
      console.log('Clicked at:', coords);
    });
}

// Custom observable
const custom$ = new Observable<number>(observer => {
  let count = 0;
  const interval = setInterval(() => {
    observer.next(count++);
    __TOKEN_130__ (count === 5) {
      observer.complete?.();
    }
  }, 500);
  
  __TOKEN_131__ () => clearInterval(interval);
});