TYPESCRIPT

Middleware Pipeline Pattern

Composable middleware pipeline for request/response processing

TypeScriptMiddlewarePipelinePatterns

Code

type Middleware<T> = (
  context: T,
  next: () => Promise<void> | void
) => Promise<void> | void;

class Pipeline<T> {
  private middlewares: Middleware<T>[] = [];
  private errorHandlers: Array<{
    predicate: (error: any) => boolean;
    handler: (context: T, error: any) => Promise<void> | void;
  }> = [];
  
  // Add middleware to the pipeline
  use(middleware: Middleware<T>): this {
    this.middlewares.push(middleware);
    return this;
  }
  
  // Add error handler
  __TOKEN_82__(
    predicate: (error: any) => boolean,
    handler: (context: T, error: any) => Promise<void> | void
  ): this {
    this.errorHandlers.push({ predicate, handler });
    return this;
  }
  
  // Execute the pipeline
  async execute(context: T): Promise<void> {
    let index = 0;
    
    const next = __TOKEN_90__ (): Promise<void> => {
      __TOKEN_91__ (index < this.middlewares.length) {
        const middleware = this.middlewares[index++];
        try {
          await middleware(context, next);
        } __TOKEN_97__ (error) {
          await this.handleError(context, error);
        }
      }
    };
    
    try {
      await next();
    } __TOKEN_102__ (error) {
      await this.handleError(context, error);
    }
  }
  
  private async handleError(context: T, error: any): Promise<void> {
    __TOKEN_107__ (const { predicate, handler } of this.errorHandlers) {
      __TOKEN_111__ (predicate(error)) {
        await handler(context, error);
        return;
      }
    }
    throw error; // Re-throw if no handler matches
  }
  
  // Compose multiple pipelines
  static compose<T>(pipelines: Pipeline<T>[]): Pipeline<T> {
    const combined = new Pipeline<T>();
    
    __TOKEN_118__ (const pipeline of pipelines) {
      combined.middlewares.push(...pipeline.middlewares);
      combined.errorHandlers.push(...pipeline.errorHandlers);
    }
    
    return combined;
  }
  
  // Create a pipeline from middleware array
  static from<T>(middlewares: Middleware<T>[]): Pipeline<T> {
    const pipeline = new Pipeline<T>();
    pipeline.middlewares = [...middlewares];
    return pipeline;
  }
}

// Common middleware types
interface RequestContext {
  request: {
    method: string;
    url: string;
    headers: Record<string, string>;
    body?: any;
    params?: Record<string, string>;
    query?: Record<string, string>;
  };
  response: {
    statusCode: number;
    headers: Record<string, string>;
    body?: any;
    send: (body: any) => void;
    json: (data: any) => void;
  };
  state: Record<string, any>;
}

// Utility middleware factories
const middlewareFactories = {
  // Logging middleware
  logger(): Middleware<RequestContext> {
    return __TOKEN_130__ (context, next) => {
      const start = Date.now();
      console.log(`→ ${context.request.method} ${context.request.url}`);
      
      await next();
      
      const duration = Date.now() - start;
      console.log(`← ${context.response.statusCode} ${duration}ms`);
    };
  },
  
  // JSON body parsing
  jsonBodyParser(): Middleware<RequestContext> {
    return __TOKEN_135__ (context, next) => {
      const contentType = context.request.headers['content-type'] || '';
      
      __TOKEN_137__ (contentType.includes('application/json')) {
        try {
          // In real implementation, you'd parse the body from the request
          // For example: context.request.body = await parseJsonBody(context.request);
        } __TOKEN_139__ (error) {
          context.response.statusCode = 400;
          context.response.json({ error: 'Invalid JSON' });
          return;
        }
      }
      
      await next();
    };
  },
  
  // CORS middleware
  cors(options: {
    origin?: string | string[] | ((origin: string) => string | null);
    methods?: string[];
    headers?: string[];
    credentials?: boolean;
  } = {}): Middleware<RequestContext> {
    return __TOKEN_143__ (context, next) => {
      const origin = context.request.headers.origin;
      
      __TOKEN_145__ (origin) {
        let allowedOrigin: string | null = null;
        
        __TOKEN_147__ (typeof options.origin === 'function') {
          allowedOrigin = options.origin(origin);
        } else __TOKEN_150__ (Array.isArray(options.origin)) {
          __TOKEN_151__ (options.origin.includes(origin) || options.origin.includes('*')) {
            allowedOrigin = origin;
          }
        } else __TOKEN_153__ (options.origin === '*' || options.origin === origin) {
          allowedOrigin = origin;
        }
        
        __TOKEN_154__ (allowedOrigin) {
          context.response.headers['Access-Control-Allow-Origin'] = allowedOrigin;
          
          __TOKEN_155__ (options.credentials) {
            context.response.headers['Access-Control-Allow-Credentials'] = 'true';
          }
          
          __TOKEN_156__ (context.request.method === 'OPTIONS') {
            // Preflight request
            __TOKEN_157__ (options.methods) {
              context.response.headers['Access-Control-Allow-Methods'] = options.methods.join(', ');
            }
            __TOKEN_158__ (options.headers) {
              context.response.headers['Access-Control-Allow-Headers'] = options.headers.join(', ');
            }
            context.response.statusCode = 204;
            return;
          }
        }
      }
      
      await next();
    };
  },
  
  // Authentication middleware
  authenticate(
    verifier: (token: string) => Promise<any>,
    options: { header?: string; cookie?: string } = {}
  ): Middleware<RequestContext> {
    return __TOKEN_162__ (context, next) => {
      let token: string | undefined;
      
      // Try to get token from header
      __TOKEN_164__ (options.header) {
        const authHeader = context.request.headers[options.header.toLowerCase()];
        __TOKEN_166__ (authHeader?.startsWith('Bearer ')) {
          token = authHeader.slice(7);
        }
      }
      
      // Try to get token from cookie
      __TOKEN_167__ (!token && options.cookie) {
        const cookies = context.request.headers.cookie || '';
        const cookieMatch = cookies.match(new RegExp(`${options.cookie}=([^;]+)`));
        __TOKEN_171__ (cookieMatch) {
          token = cookieMatch[1];
        }
      }
      
      __TOKEN_172__ (!token) {
        context.response.statusCode = 401;
        context.response.json({ error: 'Authentication required' });
        return;
      }
      
      try {
        const user = await verifier(token);
        context.state.user = user;
        await next();
      } __TOKEN_178__ (error) {
        context.response.statusCode = 401;
        context.response.json({ error: 'Invalid token' });
      }
    };
  },
  
  // Rate limiting middleware
  rateLimit(options: {
    windowMs: number;
    max: number;
    keyGenerator?: (context: RequestContext) => string;
  }): Middleware<RequestContext> {
    const store = new Map<string, { count: number; resetTime: number }>();
    
    return __TOKEN_182__ (context, next) => {
      const key = options.keyGenerator?.(context) || context.request.headers['x-forwarded-for'] || 'anonymous';
      const now = Date.now();
      
      let entry = store.get(key);
      
      __TOKEN_186__ (!entry || now > entry.resetTime) {
        entry = { count: 0, resetTime: now + options.windowMs };
        store.set(key, entry);
      }
      
      __TOKEN_187__ (entry.count >= options.max) {
        context.response.statusCode = 429;
        context.response.headers['Retry-After'] = Math.ceil((entry.resetTime - now) / 1000).toString();
        context.response.json({ error: 'Too many requests' });
        return;
      }
      
      entry.count++;
      context.response.headers['X-RateLimit-Limit'] = options.max.toString();
      context.response.headers['X-RateLimit-Remaining'] = (options.max - entry.count).toString();
      context.response.headers['X-RateLimit-Reset'] = Math.ceil(entry.resetTime / 1000).toString();
      
      await next();
    };
  }
};

// Generic middleware utilities
const middlewareUtils = {
  // Conditional middleware
  __TOKEN_191__(
    condition: (context: any) => boolean,
    trueMiddleware: Middleware<any>,
    falseMiddleware?: Middleware<any>
  ): Middleware<any> {
    return __TOKEN_193__ (context, next) => {
      __TOKEN_194__ (condition(context)) {
        await trueMiddleware(context, next);
      } else __TOKEN_197__ (falseMiddleware) {
        await falseMiddleware(context, next);
      } else {
        await next();
      }
    };
  },
  
  // Timeout middleware
  timeout(ms: number): Middleware<any> {
    return __TOKEN_202__ (context, next) => {
      const timeoutPromise = new __TOKEN_302__((_, reject) => {
        setTimeout(() => reject(new Error('Request timeout')), ms);
      });
      
      await Promise.race([next(), timeoutPromise]);
    };
  },
  
  // Compose multiple middleware into one
  compose<T>(middlewares: Middleware<T>[]): Middleware<T> {
    return __TOKEN_208__ (context, next) => {
      let index = 0;
      const composedNext = __TOKEN_211__ () => {
        __TOKEN_212__ (index < middlewares.length) {
          const middleware = middlewares[index++];
          await middleware(context, composedNext);
        } else {
          await next();
        }
      };
      await composedNext();
    };
  }
};

// Usage examples
const apiPipeline = new Pipeline<RequestContext>()
  .use(middlewareFactories.logger())
  .use(middlewareFactories.cors({
    origin: ['http:__TOKEN_23__
    credentials: true
  }))
  .use(middlewareFactories.jsonBodyParser())
  .use(middlewareFactories.rateLimit({
    windowMs: 15 * 60 * 1000, __TOKEN_24__
    max: 100
  }))
  .use(middlewareFactories.authenticate(
    async (token) => {
      __TOKEN_25__
      return { id: 1, name: 'John' };
    },
    { header: 'authorization' }
  ))
  .catch(
    (error) => error.status === 404,
    (context, error) => {
      context.response.statusCode = 404;
      context.response.json({ error: 'Not found' });
    }
  )
  .catch(
    (error) => error.message === 'Request timeout',
    (context, error) => {
      context.response.statusCode = 504;
      context.response.json({ error: 'Gateway timeout' });
    }
  );

__TOKEN_26__
async function processRequest(request: any) {
  const context: RequestContext = {
    request,
    response: {
      statusCode: 200,
      headers: {},
      send: (body) => console.log('Response:', body),
      json: (data) => console.log('JSON Response:', data)
    },
    state: {}
  };
  
  await apiPipeline.execute(context);
  return context.response;
}

__TOKEN_27__
const publicApiPipeline = new Pipeline<RequestContext>()
  .use(middlewareFactories.logger())
  .use(middlewareFactories.cors())
  .use(middlewareFactories.jsonBodyParser());

const adminApiPipeline = new Pipeline<RequestContext>()
  .use(middlewareFactories.authenticate(
    async (token) => {
      __TOKEN_28__
      return { id: 1, role: 'admin' };
    }
  ))
  .use((context, next) => {
    if (context.state.user?.role !== 'admin') {
      context.response.statusCode = 403;
      context.response.json({ error: 'Admin access required' });
      return;
    }
    return next();
  });

__TOKEN_29__
const combinedPipeline = Pipeline.compose([publicApiPipeline, adminApiPipeline]);

__TOKEN_30__
function validate(schema: any): Middleware<RequestContext> {
  return async (context, next) => {
    try {
      __TOKEN_31__
      __TOKEN_32__
      await next();
    } catch (validationError) {
      context.response.statusCode = 400;
      context.response.json({ error: 'Validation failed', details: validationError });
    }
  };
}