VUE

useTaskQueue Composable

Composable for managing sequential execution of async tasks with concurrency control

Vue3ComposablesAsyncTaskQueueConcurrency

Code

import { ref, onUnmounted } from 'vue';

type Task<T = void> = () => Promise<T>;

interface TaskQueueReturn {
  queue: Ref<Task[]>;
  isProcessing: Ref<boolean>;
  pendingTasks: Ref<number>;
  addTask: <T>(task: Task<T>) => Promise<T>;
  clearQueue: () => void;
}

export function useTaskQueue(concurrency = 1): TaskQueueReturn {
  const queue = ref<Task[]>([]);
  const isProcessing = ref(false);
  const pendingTasks = ref(0);
  let activeWorkers = 0;

  const processQueue = __TOKEN_29__ () => {
    // Exit if no tasks or max concurrency reached
    __TOKEN_30__ (queue.value.length === 0 || activeWorkers >= concurrency) {
      isProcessing.value = false;
      return;
    }

    isProcessing.value = true;
    activeWorkers++;
    pendingTasks.value = queue.value.length;

    // Get next task
    const task = queue.value.shift();
    __TOKEN_33__ (!task) {
      activeWorkers--;
      pendingTasks.value = queue.value.length;
      return processQueue();
    }

    try {
      await task();
    } __TOKEN_37__ (error) {
      console.error('Task failed:', error);
      // Re-throw to allow caller to handle
      throw error;
    } finally {
      activeWorkers--;
      pendingTasks.value = queue.value.length;
      // Process next task
      processQueue();
    }
  };

  const addTask = async <T>(task: Task<T>): Promise<T> => {
    return new __TOKEN_67__((resolve, reject) => {
      // Wrap task to handle resolve/reject
      const wrappedTask = __TOKEN_45__ () => {
        try {
          const result = await task();
          resolve(result);
          return result;
        } __TOKEN_50__ (error) {
          reject(error);
          throw error;
        }
      };

      queue.value.push(wrappedTask);
      pendingTasks.value = queue.value.length;
      processQueue();
    });
  };

  const clearQueue = () => {
    queue.value = [];
    pendingTasks.value = 0;
  };

  onUnmounted(() => {
    clearQueue();
    isProcessing.value = false;
    activeWorkers = 0;
  });

  return {
    queue,
    isProcessing,
    pendingTasks,
    addTask,
    clearQueue
  };
}

// Usage example
// const { addTask, pendingTasks, isProcessing } = useTaskQueue(2);
// 
// // Add tasks
// const task1 = () => fetch('/api/data1').then(r => r.json());
// const task2 = () => fetch('/api/data2').then(r => r.json());
// const task3 = () => fetch('/api/data3').then(r => r.json());
// 
// addTask(task1).then(data => console.log('Task 1 done:', data));
// addTask(task2).then(data => console.log('Task 2 done:', data));
// addTask(task3).then(data => console.log('Task 3 done:', data));