自从 ES 中的 Promise 普及后,多个异步任务的编排处理是一项棘手的时期,这里不仅仅是 “地狱回调” 的问题,这在后期的 async/await 已经解决了,但是遇到负责异步依赖的问题,依然要处理并发,错误,重试等业务问题。
这篇文章是介绍如何从业务的角度去解决多任务异步编排的处理方式,只着眼于“异步”。
Promise 多任务处理方法
Promise 类提供了四个静态方法来促进异步任务的并发:
为了方便模拟延迟,我们编写一个 delay
函数。
const delay = (time) => {
return new Promise((resolve) => {
setTimeout(resolve, time * 1000);
});
};
// 阻塞 1s
// await deply(1)
同样为了方便调试具体的延迟时间,我们为以下程序都执行一个全局的时间打印
let time = 1;
setInterval(() => {
console.log(`time: ${time}`);
time++;
}, 990); // 比 1s 少一些,不然程序执行太快,打印不出来执行时的时间
以下的完成指的是 Promise reject 或 resolve (异步有结果)了。
Promise.all()
在所有传入的 Promise 都被完成时完成;在任意一个 Promise 被拒绝时拒绝。
const task1 = new Promise(async (resolve, reject) => {
await delay(4);
resolve(1);
});
const task2 = new Promise(async (resolve, reject) => {
await delay(3);
reject(new Error("task2 error"));
});
const task3 = new Promise(async (resolve, reject) => {
await delay(2);
resolve(3);
});
Promise.all([task1, task2, task3]).then(console.log).catch(console.error);
/*
time: 1
time: 2
time: 3
Error: task2 error
*/
我们发现,即使任务三在 2s 时成功,当任务二在 3s 时错误,all 会被中断,执行 catch
特点
- 返回所有 Promise 的结果:Promise.all 返回一个新的 Promise,该 Promise 在所有传入的 Promise 都解决后被解决。它会等待所有 Promise 完成,而不管它们是解决还是拒绝。
- 取消其他 Promise:一旦一个 Promise 被拒绝,Promise.all 将立即取消其他未完成的 Promise。这意味着只要有一个 Promise 被拒绝,返回的 Promise 将立即被拒绝,并提供拒绝的原因。
- 处理所有 Promise 的结果:返回的 Promise 将解决为一个数组,其中包含每个 Promise 的结果。数组中的元素按照传入 Promise 的顺序排列,与 Promise 的解决顺序无关。
- 参数可以是可迭代对象:Promise.all 接受一个可迭代对象作为参数,这意味着你可以传递一个包含多个 Promise 的数组或类似数组的对象(如 Set)。这样可以方便地处理多个并发操作。
- 返回一个新的 Promise:Promise.all 返回一个新的 Promise 对象,因此你可以使用 then 方法来处理它的解决情况,并获取包含每个 Promise 结果的数组。
Promise.allSettled()
在所有的 Promise 都被完成时会被完成,没有拒绝的情况,如果有拒绝的任务,会在完成结果中为拒绝状态。
then 的返回值
type ThenDataInfo =
| {
status: "fulfilled";
value: unknow;
}
| {
status: "rejected";
reason: Error;
};
type ThenData = ThenDataInfo[];
const task1 = new Promise(async (resolve, reject) => {
await delay(4);
resolve(1);
});
const task2 = new Promise(async (resolve, reject) => {
await delay(3);
reject(new Error("task2 error"));
});
const task3 = new Promise(async (resolve, reject) => {
await delay(2);
resolve(3);
});
Promise.allSettled([task1, task2, task3])
.then(console.log)
.catch(console.error);
/*
times: 1
times: 2
times: 3
times: 4
[
{ status: 'fulfilled', value: 1 },
{
status: 'rejected',
reason: Error: task2 error
at /home/runner/NonstopCyberRedundancy/index.js:14:10
},
{ status: 'fulfilled', value: 3 }
]
*/
特点
- 返回所有 Promise 的结果:Promise.allSettled 返回一个新的 Promise,该 Promise 在所有传入的 Promise 都解决或拒绝后被解决。它不会在第一个拒绝的 Promise 出现时立即返回,而是等待所有 Promise 完成。
- 不会取消其他 Promise:一旦一个 Promise 解决或拒绝,Promise.allSettled 将不会取消其他未解决的 Promise。即使其他 Promise 后续解决或拒绝,它们的结果也不会影响已经返回的 Promise。
- 处理所有 Promise 的结果:返回的 Promise 将解决为一个包含每个 Promise 结果的数组。数组中的每个元素都是一个对象,该对象具有以下属性:
status
:表示 Promise 的状态,可能的取值为 "fulfilled"(已解决)或 "rejected"(已拒绝)。value
(仅当状态为 "fulfilled" 时存在):表示解决的值。reason
(仅当状态为 "rejected" 时存在):表示拒绝的原因。
- 参数可以是可迭代对象:Promise.allSettled 接受一个可迭代对象作为参数,这意味着你可以传递一个包含多个 Promise 的数组或类似数组的对象(如 Set)。这样可以方便地处理多个并发操作。
- 返回一个新的 Promise:Promise.allSettled 返回一个新的 Promise 对象,因此你可以使用 then 方法来处理它的解决情况,并获取包含每个 Promise 结果的数组。
Promise.any()
在任意一个 Promise 被完成时完成,仅在所有的 Promise 都被拒绝时才会拒绝。
const task1 = new Promise(async (resolve, reject) => {
await delay(4);
resolve(1);
});
const task2 = new Promise(async (resolve, reject) => {
await delay(3);
reject(new Error("task2 error"));
});
const task3 = new Promise(async (resolve, reject) => {
await delay(2);
resolve(3);
});
Promise.any([task1, task2, task3]).then(console.log).catch(console.error);
/*
time: 1
time: 2
3
*/
特点:
- 返回最快解决的 Promise:Promise.any 方法会返回最先解决(即状态变为 resolved)的 Promise。它会忽略拒绝的 Promise,并在所有 Promise 都被拒绝时才会返回拒绝状态的 Promise。
- 不会取消其他 Promise:一旦第一个 Promise 解决,Promise.any 将不会取消其他未解决的 Promise。即使其他 Promise 后续解决或拒绝,它们的结果也不会影响已经返回的 Promise。
- 处理第一个解决结果:Promise.any 只关注第一个解决的 Promise,忽略拒绝的 Promise。它将返回第一个解决的 Promise 的结果。
- 参数可以是可迭代对象:Promise.any 接受一个可迭代对象作为参数,这意味着你可以传递一个包含多个 Promise 的数组或类似数组的对象(如 Set)。这样可以方便地处理多个并发操作。
- 返回一个新的 Promise:Promise.any 返回一个新的 Promise 对象,因此你可以使用 then 和 catch 方法来处理它的解决和拒绝情况。
- 当所有 Promise 都被拒绝时返回拒绝状态:如果传递给 Promise.any 的所有 Promise 都被拒绝,它将返回一个拒绝状态的 Promise,并提供一个 AggregateError 实例,其中包含所有拒绝的原因。如果没有拒绝的 Promise,那么返回的 Promise 将解决为第一个解决的 Promise。
Promise.race()
在任意一个 Promise 被完成时完成。换句话说,在任意一个 Promise 被 resolve 时 resolve;在任意一个的 Promise 被 reject 时 reject。
const task1 = new Promise(async (resolve, reject) => {
await delay(4);
resolve(1);
});
const task2 = new Promise(async (resolve, reject) => {
await delay(3);
reject(new Error("task2 error"));
});
const task3 = new Promise(async (resolve, reject) => {
await delay(2);
resolve(3);
});
Promise.race([task1, task2, task3]).then(console.log).catch(console.error);
/*
times: 1
times: 2
3
*/
特点:
- 返回最快解决的 Promise:Promise.race 方法会返回最先解决(即状态变为 resolved)或拒绝(即状态变为 rejected)的 Promise。无论是解决还是拒绝,只要有一个 Promise 先完成,返回的 Promise 将会与之相关联。
- 不会取消其他 Promise:一旦第一个 Promise 解决或拒绝,Promise.race 将不会取消其他未解决的 Promise。即使其他 Promise 后续解决或拒绝,它们的结果也不会影响已经返回的 Promise。
- 只处理第一个结果:Promise.race 只关注第一个完成的 Promise,不管它的状态是解决还是拒绝。它将忽略其他 Promise 的结果。
- 参数可以是可迭代对象:Promise.race 接受一个可迭代对象作为参数,这意味着你可以传递一个包含多个 Promise 的数组或类似数组的对象(如 Set)。这样可以方便地处理多个并发操作。
- 返回一个新的 Promise:Promise.race 返回一个新的 Promise 对象,因此你可以使用 then 和 catch 方法来处理它的解决和拒绝情况。
请注意,JavaScript 的本质上是单线程的,因此在任何时刻,只有一个任务会被执行,尽管控制权可以在不同的 Promise 之间切换,从而使 Promise 的执行看起来是并发的。在 JavaScript 中,并行执行只能通过 worker 线程实现。
场景
超时处理
当需要执行一个操作,但希望在一定时间内完成,否则视为超时时,可以使用 Promise.race 来创建一个超时机制。将操作的 Promise 和一个定时器 Promise(在指定时间后拒绝)传递给 Promise.race,这样,如果操作的 Promise 在超时之前解决,就可以获取其结果;否则,如果超时定时器先拒绝,就可以执行相应的超时处理逻辑。
function timeout(ms) {
return new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error("Operation timed out."));
}, ms);
});
}
function fetchData() {
return new Promise((resolve, reject) => {
// 执行异步操作
// 根据结果解决或拒绝
});
}
Promise.race([fetchData(), timeout(5000)])
.then((result) => {
// 结果
})
.catch((error) => {
// 超时
});
function timeoutPromise<T>(
promise: Promise<T>,
timeout: number
): Promise<T> {
return Promise.race([
promise,
new Promise<T>((_, reject) => {
setTimeout(() => {
reject(new Error('Timeout'));
}, timeout);
})
]);
}
// example
unction fetchData(): Promise<string> {
return new Promise((resolve) => {
setTimeout(() => {
resolve('Data fetched successfully!');
}, 2000);
});
}
const timeout = 1500;
const fetchPromise = fetchData();
const timeoutPromise = timeoutPromise(fetchPromise, timeout);
timeoutPromise.then((result) => {
console.log(result);
}).catch((error) => {
console.error(error);
});
错误重试
function retry<T>(
fn: () => Promise<T>,
retries: number,
delay: number
): Promise<T> {
return new Promise((resolve, reject) => {
const attempt = () => {
fn()
.then(resolve)
.catch((error) => {
if (retries === 0) {
reject(error);
} else {
setTimeout(() => {
attempt();
retries--;
}, delay);
}
});
};
attempt();
});
}
// example
function fetchData(): Promise<string> {
return new Promise((resolve, reject) => {
const randomNumber = Math.random();
// 模拟异步操作,有 70% 的概率成功,30% 的概率失败
setTimeout(() => {
if (randomNumber < 0.7) {
resolve("Data fetched successfully!");
} else {
reject(new Error("Failed to fetch data"));
}
}, 1000);
});
}
const maxRetries = 3;
const retryDelay = 500;
const fetchPromise = () => fetchData();
const retryPromise = retry(fetchPromise, maxRetries, retryDelay);
retryPromise
.then((result) => {
console.log(result);
})
.catch((error) => {
console.error(error);
});
这个 retry
函数接受一个返回 Promise 的函数 fn
,重试次数 retries
,以及重试之间的延迟时间 delay
(以毫秒为单位)。它返回一个新的 Promise,该 Promise 将在 fn
成功解决时解决,并在达到最大重试次数时拒绝。
在 retry
函数内部,我们使用递归来实现错误重试的逻辑。在每次重试之前,我们使用 setTimeout
来延迟一段时间。如果达到最大重试次数而 fn
仍然抛出错误,则拒绝返回的 Promise,并将错误传递给调用者。
并行限流执行
export type Options = {
// 可选参数,最大并发数量,默认为无限大
maxConcurrent?: number;
// 可选参数,是否重试失败的任务,默认为true
retry?: boolean;
};
// 定义处理任务的回调函数类型
export type ProcessCallback<TProcessArgs extends any[]> = (
job: string,
...args: TProcessArgs
) => void | Promise<void>;
export class PromiseQueue<TProcessArgs extends any[]> {
// 处理任务的回调函数
process: ProcessCallback<TProcessArgs>;
// 最大并发数量
maxConcurrent: number;
// 是否重试失败的任务
retry: boolean;
// 任务队列,存储待处理的任务及参数
queue: Array<[string, TProcessArgs]>;
// 正在处理中的任务集合
processing: Set<string>;
// 已处理完成的任务集合
processed: Set<string>;
// 当前正在运行的任务数量
numRunning: number;
// 运行队列的 Promise 对象
runPromise: Promise<Set<string>> | null;
// Promise 的 resolve 函数
resolve: ((processed: Set<string>) => void) | null;
// Promise 的 reject 函数
reject: ((error: Error) => void) | null;
constructor(callback: ProcessCallback<TProcessArgs>, options: Options = {}) {
this.process = callback;
this.maxConcurrent = options.maxConcurrent || Infinity;
this.retry = options.retry !== false;
this.queue = [];
this.processing = new Set();
this.processed = new Set();
this.numRunning = 0;
this.runPromise = null;
this.resolve = null;
this.reject = null;
}
add(job: string, ...args: TProcessArgs) {
if (this.processing.has(job) || this.processed.has(job)) {
// 如果任务已经在处理中或已完成,则直接返回
return;
}
if (this.runPromise && this.numRunning < this.maxConcurrent) {
// 如果有空闲线程,则立即处理任务
this._runJob(job, args);
} else {
// 否则将任务加入队列
this.queue.push([job, args]);
}
// 将任务标记为处理中
this.processing.add(job);
}
run() {
if (this.runPromise) {
// 如果队列已经在运行中,则直接返回 Promise 对象
return this.runPromise;
}
const runPromise = new Promise<Set<string>>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
this.runPromise = runPromise;
// 开始处理队列中的任务
this._next();
return runPromise;
}
private async _runJob(job: string, args: TProcessArgs) {
try {
this.numRunning++;
// 调用处理任务的回调函数
await this.process(job, ...args);
// 从正在处理中的任务集合中删除已完成的任务
this.processing.delete(job);
// 将任务标记为已处理完成
this.processed.add(job);
this.numRunning--;
// 处理下一个任务
this._next();
} catch (err) {
this.numRunning--;
if (this.retry) {
// 如果设置了重试,并且任务失败,则将任务重新加入队列
this.queue.push([job, args]);
} else {
// 否则将任务从正在处理中的集合中删除
this.processing.delete(job);
}
if (this.reject) {
// 如果有 reject 函数,则调用该函数,并传入错误信息
this.reject(err as any);
}
// 重置队列状态
this._reset();
}
}
private _next() {
if (!this.runPromise) {
// 如果队列已经结束,则直接返回
return;
}
if (this.queue.length > 0) {
while (this.queue.length > 0 && this.numRunning < this.maxConcurrent) {
const item = this.queue.shift()!;
// 处理队列中的任务
this._runJob(item[0], item[1]);
}
} else if (this.processing.size === 0) {
// 如果队列已空且没有正在处理中的任务,则执行 resolve 函数,传入已处理完成的任务集合
this.resolve!(this.processed);
// 重置队列状态
this._reset();
}
}
private _reset() {
this.processed = new Set(); // 重置已处理完成的任务集合
this.runPromise = null; // 清空 Promise 对象
this.resolve = null; // 清空 resolve 函数
this.reject = null; // 清空 reject 函数
}
clear() {
this._reset(); // 清空队列状态
}
}
使用案例
const exportProcessor = async (job: string, type: "pdf" | "zip") => {
console.log(`export ${type} job: [${job}]`);
};
const exportQueue = new PromiseQueue(exportProcessor, {
maxConcurrent: 5,
});
async function handleExport(type: "pdf" | "zip") {
try {
// loading start
await exportQueue.run();
// all success
} catch (err) {
// has error
console.log(err);
} finally {
// loading end
}
}
如果你有较复杂的并发任务管理需求,建议使用 sindresorhus/p-queue