Middleware Pipeline Pattern
Composable middleware pipeline for request/response processing
Code
type Middleware<T> = (
context: T,
next: () => Promise<void> | void
) => Promise<void> | void;
class Pipeline<T> {
private middlewares: Middleware<T>[] = [];
private errorHandlers: Array<{
predicate: (error: any) => boolean;
handler: (context: T, error: any) => Promise<void> | void;
}> = [];
// Add middleware to the pipeline
use(middleware: Middleware<T>): this {
this.middlewares.push(middleware);
return this;
}
// Add error handler
__TOKEN_82__(
predicate: (error: any) => boolean,
handler: (context: T, error: any) => Promise<void> | void
): this {
this.errorHandlers.push({ predicate, handler });
return this;
}
// Execute the pipeline
async execute(context: T): Promise<void> {
let index = 0;
const next = __TOKEN_90__ (): Promise<void> => {
__TOKEN_91__ (index < this.middlewares.length) {
const middleware = this.middlewares[index++];
try {
await middleware(context, next);
} __TOKEN_97__ (error) {
await this.handleError(context, error);
}
}
};
try {
await next();
} __TOKEN_102__ (error) {
await this.handleError(context, error);
}
}
private async handleError(context: T, error: any): Promise<void> {
__TOKEN_107__ (const { predicate, handler } of this.errorHandlers) {
__TOKEN_111__ (predicate(error)) {
await handler(context, error);
return;
}
}
throw error; // Re-throw if no handler matches
}
// Compose multiple pipelines
static compose<T>(pipelines: Pipeline<T>[]): Pipeline<T> {
const combined = new Pipeline<T>();
__TOKEN_118__ (const pipeline of pipelines) {
combined.middlewares.push(...pipeline.middlewares);
combined.errorHandlers.push(...pipeline.errorHandlers);
}
return combined;
}
// Create a pipeline from middleware array
static from<T>(middlewares: Middleware<T>[]): Pipeline<T> {
const pipeline = new Pipeline<T>();
pipeline.middlewares = [...middlewares];
return pipeline;
}
}
// Common middleware types
interface RequestContext {
request: {
method: string;
url: string;
headers: Record<string, string>;
body?: any;
params?: Record<string, string>;
query?: Record<string, string>;
};
response: {
statusCode: number;
headers: Record<string, string>;
body?: any;
send: (body: any) => void;
json: (data: any) => void;
};
state: Record<string, any>;
}
// Utility middleware factories
const middlewareFactories = {
// Logging middleware
logger(): Middleware<RequestContext> {
return __TOKEN_130__ (context, next) => {
const start = Date.now();
console.log(`→ ${context.request.method} ${context.request.url}`);
await next();
const duration = Date.now() - start;
console.log(`← ${context.response.statusCode} ${duration}ms`);
};
},
// JSON body parsing
jsonBodyParser(): Middleware<RequestContext> {
return __TOKEN_135__ (context, next) => {
const contentType = context.request.headers['content-type'] || '';
__TOKEN_137__ (contentType.includes('application/json')) {
try {
// In real implementation, you'd parse the body from the request
// For example: context.request.body = await parseJsonBody(context.request);
} __TOKEN_139__ (error) {
context.response.statusCode = 400;
context.response.json({ error: 'Invalid JSON' });
return;
}
}
await next();
};
},
// CORS middleware
cors(options: {
origin?: string | string[] | ((origin: string) => string | null);
methods?: string[];
headers?: string[];
credentials?: boolean;
} = {}): Middleware<RequestContext> {
return __TOKEN_143__ (context, next) => {
const origin = context.request.headers.origin;
__TOKEN_145__ (origin) {
let allowedOrigin: string | null = null;
__TOKEN_147__ (typeof options.origin === 'function') {
allowedOrigin = options.origin(origin);
} else __TOKEN_150__ (Array.isArray(options.origin)) {
__TOKEN_151__ (options.origin.includes(origin) || options.origin.includes('*')) {
allowedOrigin = origin;
}
} else __TOKEN_153__ (options.origin === '*' || options.origin === origin) {
allowedOrigin = origin;
}
__TOKEN_154__ (allowedOrigin) {
context.response.headers['Access-Control-Allow-Origin'] = allowedOrigin;
__TOKEN_155__ (options.credentials) {
context.response.headers['Access-Control-Allow-Credentials'] = 'true';
}
__TOKEN_156__ (context.request.method === 'OPTIONS') {
// Preflight request
__TOKEN_157__ (options.methods) {
context.response.headers['Access-Control-Allow-Methods'] = options.methods.join(', ');
}
__TOKEN_158__ (options.headers) {
context.response.headers['Access-Control-Allow-Headers'] = options.headers.join(', ');
}
context.response.statusCode = 204;
return;
}
}
}
await next();
};
},
// Authentication middleware
authenticate(
verifier: (token: string) => Promise<any>,
options: { header?: string; cookie?: string } = {}
): Middleware<RequestContext> {
return __TOKEN_162__ (context, next) => {
let token: string | undefined;
// Try to get token from header
__TOKEN_164__ (options.header) {
const authHeader = context.request.headers[options.header.toLowerCase()];
__TOKEN_166__ (authHeader?.startsWith('Bearer ')) {
token = authHeader.slice(7);
}
}
// Try to get token from cookie
__TOKEN_167__ (!token && options.cookie) {
const cookies = context.request.headers.cookie || '';
const cookieMatch = cookies.match(new RegExp(`${options.cookie}=([^;]+)`));
__TOKEN_171__ (cookieMatch) {
token = cookieMatch[1];
}
}
__TOKEN_172__ (!token) {
context.response.statusCode = 401;
context.response.json({ error: 'Authentication required' });
return;
}
try {
const user = await verifier(token);
context.state.user = user;
await next();
} __TOKEN_178__ (error) {
context.response.statusCode = 401;
context.response.json({ error: 'Invalid token' });
}
};
},
// Rate limiting middleware
rateLimit(options: {
windowMs: number;
max: number;
keyGenerator?: (context: RequestContext) => string;
}): Middleware<RequestContext> {
const store = new Map<string, { count: number; resetTime: number }>();
return __TOKEN_182__ (context, next) => {
const key = options.keyGenerator?.(context) || context.request.headers['x-forwarded-for'] || 'anonymous';
const now = Date.now();
let entry = store.get(key);
__TOKEN_186__ (!entry || now > entry.resetTime) {
entry = { count: 0, resetTime: now + options.windowMs };
store.set(key, entry);
}
__TOKEN_187__ (entry.count >= options.max) {
context.response.statusCode = 429;
context.response.headers['Retry-After'] = Math.ceil((entry.resetTime - now) / 1000).toString();
context.response.json({ error: 'Too many requests' });
return;
}
entry.count++;
context.response.headers['X-RateLimit-Limit'] = options.max.toString();
context.response.headers['X-RateLimit-Remaining'] = (options.max - entry.count).toString();
context.response.headers['X-RateLimit-Reset'] = Math.ceil(entry.resetTime / 1000).toString();
await next();
};
}
};
// Generic middleware utilities
const middlewareUtils = {
// Conditional middleware
__TOKEN_191__(
condition: (context: any) => boolean,
trueMiddleware: Middleware<any>,
falseMiddleware?: Middleware<any>
): Middleware<any> {
return __TOKEN_193__ (context, next) => {
__TOKEN_194__ (condition(context)) {
await trueMiddleware(context, next);
} else __TOKEN_197__ (falseMiddleware) {
await falseMiddleware(context, next);
} else {
await next();
}
};
},
// Timeout middleware
timeout(ms: number): Middleware<any> {
return __TOKEN_202__ (context, next) => {
const timeoutPromise = new __TOKEN_302__((_, reject) => {
setTimeout(() => reject(new Error('Request timeout')), ms);
});
await Promise.race([next(), timeoutPromise]);
};
},
// Compose multiple middleware into one
compose<T>(middlewares: Middleware<T>[]): Middleware<T> {
return __TOKEN_208__ (context, next) => {
let index = 0;
const composedNext = __TOKEN_211__ () => {
__TOKEN_212__ (index < middlewares.length) {
const middleware = middlewares[index++];
await middleware(context, composedNext);
} else {
await next();
}
};
await composedNext();
};
}
};
// Usage examples
const apiPipeline = new Pipeline<RequestContext>()
.use(middlewareFactories.logger())
.use(middlewareFactories.cors({
origin: ['http:__TOKEN_23__
credentials: true
}))
.use(middlewareFactories.jsonBodyParser())
.use(middlewareFactories.rateLimit({
windowMs: 15 * 60 * 1000, __TOKEN_24__
max: 100
}))
.use(middlewareFactories.authenticate(
async (token) => {
__TOKEN_25__
return { id: 1, name: 'John' };
},
{ header: 'authorization' }
))
.catch(
(error) => error.status === 404,
(context, error) => {
context.response.statusCode = 404;
context.response.json({ error: 'Not found' });
}
)
.catch(
(error) => error.message === 'Request timeout',
(context, error) => {
context.response.statusCode = 504;
context.response.json({ error: 'Gateway timeout' });
}
);
__TOKEN_26__
async function processRequest(request: any) {
const context: RequestContext = {
request,
response: {
statusCode: 200,
headers: {},
send: (body) => console.log('Response:', body),
json: (data) => console.log('JSON Response:', data)
},
state: {}
};
await apiPipeline.execute(context);
return context.response;
}
__TOKEN_27__
const publicApiPipeline = new Pipeline<RequestContext>()
.use(middlewareFactories.logger())
.use(middlewareFactories.cors())
.use(middlewareFactories.jsonBodyParser());
const adminApiPipeline = new Pipeline<RequestContext>()
.use(middlewareFactories.authenticate(
async (token) => {
__TOKEN_28__
return { id: 1, role: 'admin' };
}
))
.use((context, next) => {
if (context.state.user?.role !== 'admin') {
context.response.statusCode = 403;
context.response.json({ error: 'Admin access required' });
return;
}
return next();
});
__TOKEN_29__
const combinedPipeline = Pipeline.compose([publicApiPipeline, adminApiPipeline]);
__TOKEN_30__
function validate(schema: any): Middleware<RequestContext> {
return async (context, next) => {
try {
__TOKEN_31__
__TOKEN_32__
await next();
} catch (validationError) {
context.response.statusCode = 400;
context.response.json({ error: 'Validation failed', details: validationError });
}
};
}