Priority Scheduler Queue
Priority-based task scheduler with concurrency control and task dependency management
Code
interface Task<T = any> {
id: string;
priority: number; // Higher number = higher priority
execute: () => Promise<T>;
dependencies?: string[]; // IDs of tasks that must complete first
timeout?: number; // Timeout in milliseconds
retries?: number; // Number of retry attempts
onSuccess?: (result: T) => void;
onError?: (error: any) => void;
onComplete?: () => void;
}
interface TaskResult<T = any> {
id: string;
success: boolean;
result?: T;
error?: any;
duration: number;
retries: number;
}
interface SchedulerOptions {
maxConcurrent: number;
defaultPriority?: number;
defaultTimeout?: number;
defaultRetries?: number;
onQueueChange?: (queueSize: number, running: number) => void;
}
class PriorityScheduler {
private pendingTasks: Array<Task> = [];
private runningTasks = new Map<string, { task: Task; startTime: number }>();
private completedTasks = new Map<string, TaskResult>();
private taskDependencies = new Map<string, Set<string>>();
private options: Required<Omit<SchedulerOptions, 'onQueueChange'>> & {
onQueueChange?: (queueSize: number, running: number) => void;
};
private isProcessing = false;
private idCounter = 0;
constructor(options: SchedulerOptions) {
this.options = {
defaultPriority: 0,
defaultTimeout: 30000,
defaultRetries: 0,
...options
};
}
// Add a task to the scheduler
addTask<T>(task: Omit<Task<T>, 'id'>, id?: string): string {
const taskId = id || `task_${++this.idCounter}`;
const fullTask: Task<T> = {
id: taskId,
priority: task.priority ?? this.options.defaultPriority,
timeout: task.timeout ?? this.options.defaultTimeout,
retries: task.retries ?? this.options.defaultRetries,
...task
};
// Set up dependencies
__TOKEN_116__ (fullTask.dependencies && fullTask.dependencies.length > 0) {
this.taskDependencies.set(taskId, new Set(fullTask.dependencies));
}
this.pendingTasks.push(fullTask);
this.sortTasks();
this.notifyQueueChange();
this.processQueue();
return taskId;
}
// Add multiple tasks
addTasks(tasks: Array<Omit<Task, 'id'>>): string[] {
return tasks.map(task => this.addTask(task));
}
// Remove a task (if not running)
removeTask(taskId: string): boolean {
const index = this.pendingTasks.findIndex(task => task.id === taskId);
__TOKEN_128__ (index !== -1) {
this.pendingTasks.splice(index, 1);
this.taskDependencies.delete(taskId);
this.notifyQueueChange();
return true;
}
// Clean up dependencies pointing to this task
__TOKEN_133__ (const [id, deps] of this.taskDependencies.entries()) {
__TOKEN_137__ (deps.has(taskId)) {
deps.delete(taskId);
}
}
return false;
}
// Get task status
getTaskStatus(taskId: string): 'pending' | 'running' | 'completed' | 'not-found' {
__TOKEN_139__ (this.runningTasks.has(taskId)) return 'running';
__TOKEN_142__ (this.completedTasks.has(taskId)) return 'completed';
__TOKEN_145__ (this.pendingTasks.some(task => task.id === taskId)) return 'pending';
return 'not-found';
}
// Get task result
getTaskResult<T>(taskId: string): TaskResult<T> | undefined {
return this.completedTasks.get(taskId) as TaskResult<T> | undefined;
}
// Wait for a specific task to complete
async waitForTask<T>(taskId: string, timeout?: number): Promise<TaskResult<T>> {
return new __TOKEN_391__((resolve, reject) => {
const startTime = Date.now();
const checkInterval = setInterval(() => {
const result = this.completedTasks.get(taskId);
__TOKEN_158__ (result) {
clearInterval(checkInterval);
resolve(result as TaskResult<T>);
return;
}
__TOKEN_160__ (timeout && Date.now() - startTime > timeout) {
clearInterval(checkInterval);
reject(new Error(`Timeout waiting for task ${taskId}`));
}
// Check if task doesn't exist
__TOKEN_162__ (this.getTaskStatus(taskId) === 'not-found') {
clearInterval(checkInterval);
reject(new Error(`Task ${taskId} not found`));
}
}, 100);
});
}
// Wait for all tasks to complete
async waitForAll(timeout?: number): Promise<Map<string, TaskResult>> {
return new __TOKEN_395__((resolve, reject) => {
const startTime = Date.now();
const checkInterval = setInterval(() => {
__TOKEN_170__ (this.pendingTasks.length === 0 && this.runningTasks.size === 0) {
clearInterval(checkInterval);
resolve(new Map(this.completedTasks));
return;
}
__TOKEN_176__ (timeout && Date.now() - startTime > timeout) {
clearInterval(checkInterval);
reject(new Error('Timeout waiting for all tasks'));
}
}, 100);
});
}
// Get queue statistics
getStats() {
return {
pending: this.pendingTasks.length,
running: this.runningTasks.size,
completed: this.completedTasks.size,
maxConcurrent: this.options.maxConcurrent
};
}
// Pause scheduler (won't start new tasks)
pause(): void {
this.isProcessing = false;
}
// Resume scheduler
resume(): void {
__TOKEN_184__ (!this.isProcessing) {
this.isProcessing = true;
this.processQueue();
}
}
// Clear all pending tasks
clearPending(): void {
this.pendingTasks = [];
this.taskDependencies.clear();
this.notifyQueueChange();
}
// Cancel all tasks
async cancelAll(): Promise<void> {
this.clearPending();
// Wait for running tasks to complete
await this.waitForAll();
}
private async processQueue(): Promise<void> {
__TOKEN_197__ (this.isProcessing || this.pendingTasks.length === 0) {
return;
}
this.isProcessing = true;
__TOKEN_202__ (
this.pendingTasks.length > 0 &&
this.runningTasks.size < this.options.maxConcurrent
) {
// Find next runnable task (considering dependencies)
const nextTaskIndex = this.findNextRunnableTask();
__TOKEN_208__ (nextTaskIndex === -1) {
// No runnable tasks at the moment
break;
}
const task = this.pendingTasks.splice(nextTaskIndex, 1)[0];
this.runTask(task);
}
this.isProcessing = false;
this.notifyQueueChange();
}
private findNextRunnableTask(): number {
__TOKEN_215__ (let i = 0; i < this.pendingTasks.length; i++) {
const task = this.pendingTasks[i];
// Check dependencies
const deps = this.taskDependencies.get(task.id);
__TOKEN_222__ (deps && deps.size > 0) {
// Check if all dependencies are completed
const allDepsCompleted = Array.__TOKEN_224__(deps).every(
depId => this.completedTasks.has(depId)
);
__TOKEN_226__ (!allDepsCompleted) {
continue;
}
}
return i;
}
return -1;
}
private async runTask<T>(task: Task<T>): Promise<void> {
const startTime = Date.now();
this.runningTasks.set(task.id, { task, startTime });
let retries = 0;
const maxRetries = task.retries ?? 0;
const executeWithRetry = __TOKEN_236__ (): Promise<TaskResult<T>> => {
try {
const result = await this.executeWithTimeout(task);
return {
id: task.id,
success: true,
result,
duration: Date.now() - startTime,
retries
};
} __TOKEN_242__ (error) {
__TOKEN_243__ (retries < maxRetries) {
retries++;
return executeWithRetry();
}
return {
id: task.id,
success: false,
error,
duration: Date.now() - startTime,
retries
};
}
};
const result = await executeWithRetry();
// Clean up
this.runningTasks.delete(task.id);
this.taskDependencies.delete(task.id);
this.completedTasks.set(task.id, result);
// Call callbacks
__TOKEN_251__ (result.success) {
task.onSuccess?.(result.result!);
} else {
task.onError?.(result.error);
}
task.onComplete?.();
// Process next tasks
this.processQueue();
}
private async executeWithTimeout<T>(task: Task<T>): Promise<T> {
const timeout = task.timeout ?? this.options.defaultTimeout;
__TOKEN_258__ (timeout <= 0) {
return task.execute();
}
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(`Task ${task.id} timeout`)), timeout);
});
return Promise.race([task.execute(), timeoutPromise]);
}
private sortTasks(): void {
this.pendingTasks.sort((a, b) => {
// Higher priority first
__TOKEN_266__ (b.priority !== a.priority) {
return b.priority - a.priority;
}
// If priorities are equal, maintain insertion order
return 0;
});
}
private notifyQueueChange(): void {
this.options.onQueueChange?.(
this.pendingTasks.length,
this.runningTasks.size
);
}
// Advanced: Task groups
addTaskGroup(
tasks: Array<Omit<Task, 'id'>>,
options?: {
parallel?: boolean; // true: tasks can run in parallel, false: sequential
groupId?: string;
}
): string[] {
const groupId = options?.groupId || `group_${++this.idCounter}`;
const taskIds: string[] = [];
__TOKEN_275__ (options?.parallel) {
// All tasks can run in parallel
taskIds.push(...this.addTasks(tasks));
} else {
// Tasks must run sequentially
__TOKEN_278__ (let i = 0; i < tasks.length; i++) {
const task = tasks[i];
const dependencies = i > 0 ? [taskIds[i - 1]] : undefined;
const taskId = this.addTask({
...task,
dependencies
});
taskIds.push(taskId);
}
}
return taskIds;
}
// Advanced: Priority adjustment
adjustPriority(taskId: string, newPriority: number): boolean {
const pendingIndex = this.pendingTasks.findIndex(t => t.id === taskId);
__TOKEN_287__ (pendingIndex !== -1) {
this.pendingTasks[pendingIndex].priority = newPriority;
this.sortTasks();
return true;
}
return false;
}
}
// Usage examples
// Create scheduler with max 3 concurrent tasks
const scheduler = new PriorityScheduler({
maxConcurrent: 3,
defaultPriority: 0,
defaultTimeout: 10000,
onQueueChange: (pending, running) => {
console.log(`Queue: ${pending} pending, ${running} running`);
}
});
// Add tasks with different priorities
const task1Id = scheduler.addTask({
priority: 10, // High priority
execute: __TOKEN_295__ () => {
console.log('Running high priority task');
await new __TOKEN_431__(resolve => setTimeout(resolve, 1000));
return 'Task 1 result';
},
onSuccess: (result) => console.log('Task 1 succeeded:', result)
});
const task2Id = scheduler.addTask({
priority: 5, // Medium priority
execute: __TOKEN_300__ () => {
console.log('Running medium priority task');
await new __TOKEN_432__(resolve => setTimeout(resolve, 2000));
return 'Task 2 result';
},
retries: 2 // Will retry twice on failure
});
// Task with dependencies
const task3Id = scheduler.addTask({
priority: 1,
execute: __TOKEN_305__ () => {
console.log('Running dependent task');
return 'Task 3 result';
},
dependencies: [task1Id], // Must wait for task1 to complete
timeout: 5000
});
// Add multiple tasks
const taskIds = scheduler.addTasks([
{
priority: 2,
execute: __TOKEN_308__ () => {
await new __TOKEN_433__(resolve => setTimeout(resolve, 500));
return 'Batch task 1';
}
},
{
priority: 3,
execute: __TOKEN_312__ () => {
await new __TOKEN_434__(resolve => setTimeout(resolve, 300));
return 'Batch task 2';
}
}
]);
// Monitor task status
console.log('Task 1 status:', scheduler.getTaskStatus(task1Id));
// Wait for specific task
scheduler.waitForTask(task1Id, 15000)
.then(result => console.log('Task 1 completed:', result))
.__TOKEN_316__(error => console.error('Task 1 failed:', error));
// Wait for all tasks
scheduler.waitForAll(30000)
.then(results => {
console.log('All tasks completed');
console.log('Results:', results);
})
.__TOKEN_317__(error => console.error('Timeout:', error));
// Advanced: Task groups
const groupTaskIds = scheduler.addTaskGroup(
[
{
execute: __TOKEN_319__ () => {
console.log('Group task 1');
return 'Group 1';
}
},
{
execute: __TOKEN_321__ () => {
console.log('Group task 2');
return 'Group 2';
}
}
],
{ parallel: false } // Run sequentially
);
// Get statistics
const stats = scheduler.getStats();
console.log('Scheduler stats:', stats);
// Example: Image processing pipeline
class ImageProcessingScheduler {
private scheduler = new PriorityScheduler({
maxConcurrent: 2,
defaultPriority: 5
});
processImage(imageId: string, operations: Array<{
type: 'resize' | 'crop' | 'filter';
priority: number;
}>) {
const taskIds: string[] = [];
__TOKEN_329__ (let i = 0; i < operations.length; i++) {
const operation = operations[i];
const dependencies = i > 0 ? [taskIds[i - 1]] : undefined;
const taskId = this.scheduler.addTask({
id: `${imageId}_${operation.type}_${i}`,
priority: operation.priority,
dependencies,
execute: __TOKEN_335__ () => {
console.log(`Processing ${imageId}: ${operation.type}`);
await new __TOKEN_441__(resolve => setTimeout(resolve, 1000));
return { imageId, operation: operation.type, success: true };
},
onError: (error) => {
console.error(`Failed to process ${imageId}:`, error);
}
});
taskIds.push(taskId);
}
return this.scheduler.waitForAll();
}
}