TYPESCRIPT

Priority Scheduler Queue

Priority-based task scheduler with concurrency control and task dependency management

TypeScriptSchedulerQueueConcurrency

Code

interface Task<T = any> {
  id: string;
  priority: number; // Higher number = higher priority
  execute: () => Promise<T>;
  dependencies?: string[]; // IDs of tasks that must complete first
  timeout?: number; // Timeout in milliseconds
  retries?: number; // Number of retry attempts
  onSuccess?: (result: T) => void;
  onError?: (error: any) => void;
  onComplete?: () => void;
}

interface TaskResult<T = any> {
  id: string;
  success: boolean;
  result?: T;
  error?: any;
  duration: number;
  retries: number;
}

interface SchedulerOptions {
  maxConcurrent: number;
  defaultPriority?: number;
  defaultTimeout?: number;
  defaultRetries?: number;
  onQueueChange?: (queueSize: number, running: number) => void;
}

class PriorityScheduler {
  private pendingTasks: Array<Task> = [];
  private runningTasks = new Map<string, { task: Task; startTime: number }>();
  private completedTasks = new Map<string, TaskResult>();
  private taskDependencies = new Map<string, Set<string>>();
  private options: Required<Omit<SchedulerOptions, 'onQueueChange'>> & {
    onQueueChange?: (queueSize: number, running: number) => void;
  };
  private isProcessing = false;
  private idCounter = 0;
  
  constructor(options: SchedulerOptions) {
    this.options = {
      defaultPriority: 0,
      defaultTimeout: 30000,
      defaultRetries: 0,
      ...options
    };
  }
  
  // Add a task to the scheduler
  addTask<T>(task: Omit<Task<T>, 'id'>, id?: string): string {
    const taskId = id || `task_${++this.idCounter}`;
    
    const fullTask: Task<T> = {
      id: taskId,
      priority: task.priority ?? this.options.defaultPriority,
      timeout: task.timeout ?? this.options.defaultTimeout,
      retries: task.retries ?? this.options.defaultRetries,
      ...task
    };
    
    // Set up dependencies
    __TOKEN_116__ (fullTask.dependencies && fullTask.dependencies.length > 0) {
      this.taskDependencies.set(taskId, new Set(fullTask.dependencies));
    }
    
    this.pendingTasks.push(fullTask);
    this.sortTasks();
    this.notifyQueueChange();
    this.processQueue();
    
    return taskId;
  }
  
  // Add multiple tasks
  addTasks(tasks: Array<Omit<Task, 'id'>>): string[] {
    return tasks.map(task => this.addTask(task));
  }
  
  // Remove a task (if not running)
  removeTask(taskId: string): boolean {
    const index = this.pendingTasks.findIndex(task => task.id === taskId);
    __TOKEN_128__ (index !== -1) {
      this.pendingTasks.splice(index, 1);
      this.taskDependencies.delete(taskId);
      this.notifyQueueChange();
      return true;
    }
    
    // Clean up dependencies pointing to this task
    __TOKEN_133__ (const [id, deps] of this.taskDependencies.entries()) {
      __TOKEN_137__ (deps.has(taskId)) {
        deps.delete(taskId);
      }
    }
    
    return false;
  }
  
  // Get task status
  getTaskStatus(taskId: string): 'pending' | 'running' | 'completed' | 'not-found' {
    __TOKEN_139__ (this.runningTasks.has(taskId)) return 'running';
    __TOKEN_142__ (this.completedTasks.has(taskId)) return 'completed';
    __TOKEN_145__ (this.pendingTasks.some(task => task.id === taskId)) return 'pending';
    return 'not-found';
  }
  
  // Get task result
  getTaskResult<T>(taskId: string): TaskResult<T> | undefined {
    return this.completedTasks.get(taskId) as TaskResult<T> | undefined;
  }
  
  // Wait for a specific task to complete
  async waitForTask<T>(taskId: string, timeout?: number): Promise<TaskResult<T>> {
    return new __TOKEN_391__((resolve, reject) => {
      const startTime = Date.now();
      
      const checkInterval = setInterval(() => {
        const result = this.completedTasks.get(taskId);
        
        __TOKEN_158__ (result) {
          clearInterval(checkInterval);
          resolve(result as TaskResult<T>);
          return;
        }
        
        __TOKEN_160__ (timeout && Date.now() - startTime > timeout) {
          clearInterval(checkInterval);
          reject(new Error(`Timeout waiting for task ${taskId}`));
        }
        
        // Check if task doesn't exist
        __TOKEN_162__ (this.getTaskStatus(taskId) === 'not-found') {
          clearInterval(checkInterval);
          reject(new Error(`Task ${taskId} not found`));
        }
      }, 100);
    });
  }
  
  // Wait for all tasks to complete
  async waitForAll(timeout?: number): Promise<Map<string, TaskResult>> {
    return new __TOKEN_395__((resolve, reject) => {
      const startTime = Date.now();
      
      const checkInterval = setInterval(() => {
        __TOKEN_170__ (this.pendingTasks.length === 0 && this.runningTasks.size === 0) {
          clearInterval(checkInterval);
          resolve(new Map(this.completedTasks));
          return;
        }
        
        __TOKEN_176__ (timeout && Date.now() - startTime > timeout) {
          clearInterval(checkInterval);
          reject(new Error('Timeout waiting for all tasks'));
        }
      }, 100);
    });
  }
  
  // Get queue statistics
  getStats() {
    return {
      pending: this.pendingTasks.length,
      running: this.runningTasks.size,
      completed: this.completedTasks.size,
      maxConcurrent: this.options.maxConcurrent
    };
  }
  
  // Pause scheduler (won't start new tasks)
  pause(): void {
    this.isProcessing = false;
  }
  
  // Resume scheduler
  resume(): void {
    __TOKEN_184__ (!this.isProcessing) {
      this.isProcessing = true;
      this.processQueue();
    }
  }
  
  // Clear all pending tasks
  clearPending(): void {
    this.pendingTasks = [];
    this.taskDependencies.clear();
    this.notifyQueueChange();
  }
  
  // Cancel all tasks
  async cancelAll(): Promise<void> {
    this.clearPending();
    
    // Wait for running tasks to complete
    await this.waitForAll();
  }
  
  private async processQueue(): Promise<void> {
    __TOKEN_197__ (this.isProcessing || this.pendingTasks.length === 0) {
      return;
    }
    
    this.isProcessing = true;
    
    __TOKEN_202__ (
      this.pendingTasks.length > 0 &&
      this.runningTasks.size < this.options.maxConcurrent
    ) {
      // Find next runnable task (considering dependencies)
      const nextTaskIndex = this.findNextRunnableTask();
      
      __TOKEN_208__ (nextTaskIndex === -1) {
        // No runnable tasks at the moment
        break;
      }
      
      const task = this.pendingTasks.splice(nextTaskIndex, 1)[0];
      this.runTask(task);
    }
    
    this.isProcessing = false;
    this.notifyQueueChange();
  }
  
  private findNextRunnableTask(): number {
    __TOKEN_215__ (let i = 0; i < this.pendingTasks.length; i++) {
      const task = this.pendingTasks[i];
      
      // Check dependencies
      const deps = this.taskDependencies.get(task.id);
      __TOKEN_222__ (deps && deps.size > 0) {
        // Check if all dependencies are completed
        const allDepsCompleted = Array.__TOKEN_224__(deps).every(
          depId => this.completedTasks.has(depId)
        );
        
        __TOKEN_226__ (!allDepsCompleted) {
          continue;
        }
      }
      
      return i;
    }
    
    return -1;
  }
  
  private async runTask<T>(task: Task<T>): Promise<void> {
    const startTime = Date.now();
    this.runningTasks.set(task.id, { task, startTime });
    
    let retries = 0;
    const maxRetries = task.retries ?? 0;
    
    const executeWithRetry = __TOKEN_236__ (): Promise<TaskResult<T>> => {
      try {
        const result = await this.executeWithTimeout(task);
        
        return {
          id: task.id,
          success: true,
          result,
          duration: Date.now() - startTime,
          retries
        };
      } __TOKEN_242__ (error) {
        __TOKEN_243__ (retries < maxRetries) {
          retries++;
          return executeWithRetry();
        }
        
        return {
          id: task.id,
          success: false,
          error,
          duration: Date.now() - startTime,
          retries
        };
      }
    };
    
    const result = await executeWithRetry();
    
    // Clean up
    this.runningTasks.delete(task.id);
    this.taskDependencies.delete(task.id);
    this.completedTasks.set(task.id, result);
    
    // Call callbacks
    __TOKEN_251__ (result.success) {
      task.onSuccess?.(result.result!);
    } else {
      task.onError?.(result.error);
    }
    
    task.onComplete?.();
    
    // Process next tasks
    this.processQueue();
  }
  
  private async executeWithTimeout<T>(task: Task<T>): Promise<T> {
    const timeout = task.timeout ?? this.options.defaultTimeout;
    
    __TOKEN_258__ (timeout <= 0) {
      return task.execute();
    }
    
    const timeoutPromise = new Promise<never>((_, reject) => {
      setTimeout(() => reject(new Error(`Task ${task.id} timeout`)), timeout);
    });
    
    return Promise.race([task.execute(), timeoutPromise]);
  }
  
  private sortTasks(): void {
    this.pendingTasks.sort((a, b) => {
      // Higher priority first
      __TOKEN_266__ (b.priority !== a.priority) {
        return b.priority - a.priority;
      }
      
      // If priorities are equal, maintain insertion order
      return 0;
    });
  }
  
  private notifyQueueChange(): void {
    this.options.onQueueChange?.(
      this.pendingTasks.length,
      this.runningTasks.size
    );
  }
  
  // Advanced: Task groups
  addTaskGroup(
    tasks: Array<Omit<Task, 'id'>>,
    options?: {
      parallel?: boolean; // true: tasks can run in parallel, false: sequential
      groupId?: string;
    }
  ): string[] {
    const groupId = options?.groupId || `group_${++this.idCounter}`;
    const taskIds: string[] = [];
    
    __TOKEN_275__ (options?.parallel) {
      // All tasks can run in parallel
      taskIds.push(...this.addTasks(tasks));
    } else {
      // Tasks must run sequentially
      __TOKEN_278__ (let i = 0; i < tasks.length; i++) {
        const task = tasks[i];
        const dependencies = i > 0 ? [taskIds[i - 1]] : undefined;
        
        const taskId = this.addTask({
          ...task,
          dependencies
        });
        
        taskIds.push(taskId);
      }
    }
    
    return taskIds;
  }
  
  // Advanced: Priority adjustment
  adjustPriority(taskId: string, newPriority: number): boolean {
    const pendingIndex = this.pendingTasks.findIndex(t => t.id === taskId);
    
    __TOKEN_287__ (pendingIndex !== -1) {
      this.pendingTasks[pendingIndex].priority = newPriority;
      this.sortTasks();
      return true;
    }
    
    return false;
  }
}

// Usage examples
// Create scheduler with max 3 concurrent tasks
const scheduler = new PriorityScheduler({
  maxConcurrent: 3,
  defaultPriority: 0,
  defaultTimeout: 10000,
  onQueueChange: (pending, running) => {
    console.log(`Queue: ${pending} pending, ${running} running`);
  }
});

// Add tasks with different priorities
const task1Id = scheduler.addTask({
  priority: 10, // High priority
  execute: __TOKEN_295__ () => {
    console.log('Running high priority task');
    await new __TOKEN_431__(resolve => setTimeout(resolve, 1000));
    return 'Task 1 result';
  },
  onSuccess: (result) => console.log('Task 1 succeeded:', result)
});

const task2Id = scheduler.addTask({
  priority: 5, // Medium priority
  execute: __TOKEN_300__ () => {
    console.log('Running medium priority task');
    await new __TOKEN_432__(resolve => setTimeout(resolve, 2000));
    return 'Task 2 result';
  },
  retries: 2 // Will retry twice on failure
});

// Task with dependencies
const task3Id = scheduler.addTask({
  priority: 1,
  execute: __TOKEN_305__ () => {
    console.log('Running dependent task');
    return 'Task 3 result';
  },
  dependencies: [task1Id], // Must wait for task1 to complete
  timeout: 5000
});

// Add multiple tasks
const taskIds = scheduler.addTasks([
  {
    priority: 2,
    execute: __TOKEN_308__ () => {
      await new __TOKEN_433__(resolve => setTimeout(resolve, 500));
      return 'Batch task 1';
    }
  },
  {
    priority: 3,
    execute: __TOKEN_312__ () => {
      await new __TOKEN_434__(resolve => setTimeout(resolve, 300));
      return 'Batch task 2';
    }
  }
]);

// Monitor task status
console.log('Task 1 status:', scheduler.getTaskStatus(task1Id));

// Wait for specific task
scheduler.waitForTask(task1Id, 15000)
  .then(result => console.log('Task 1 completed:', result))
  .__TOKEN_316__(error => console.error('Task 1 failed:', error));

// Wait for all tasks
scheduler.waitForAll(30000)
  .then(results => {
    console.log('All tasks completed');
    console.log('Results:', results);
  })
  .__TOKEN_317__(error => console.error('Timeout:', error));

// Advanced: Task groups
const groupTaskIds = scheduler.addTaskGroup(
  [
    {
      execute: __TOKEN_319__ () => {
        console.log('Group task 1');
        return 'Group 1';
      }
    },
    {
      execute: __TOKEN_321__ () => {
        console.log('Group task 2');
        return 'Group 2';
      }
    }
  ],
  { parallel: false } // Run sequentially
);

// Get statistics
const stats = scheduler.getStats();
console.log('Scheduler stats:', stats);

// Example: Image processing pipeline
class ImageProcessingScheduler {
  private scheduler = new PriorityScheduler({
    maxConcurrent: 2,
    defaultPriority: 5
  });
  
  processImage(imageId: string, operations: Array<{
    type: 'resize' | 'crop' | 'filter';
    priority: number;
  }>) {
    const taskIds: string[] = [];
    
    __TOKEN_329__ (let i = 0; i < operations.length; i++) {
      const operation = operations[i];
      const dependencies = i > 0 ? [taskIds[i - 1]] : undefined;
      
      const taskId = this.scheduler.addTask({
        id: `${imageId}_${operation.type}_${i}`,
        priority: operation.priority,
        dependencies,
        execute: __TOKEN_335__ () => {
          console.log(`Processing ${imageId}: ${operation.type}`);
          await new __TOKEN_441__(resolve => setTimeout(resolve, 1000));
          return { imageId, operation: operation.type, success: true };
        },
        onError: (error) => {
          console.error(`Failed to process ${imageId}:`, error);
        }
      });
      
      taskIds.push(taskId);
    }
    
    return this.scheduler.waitForAll();
  }
}