JavaScript 中的 Promise 异步并发控制

蚊子前端博客
发布于 2022-08-03 14:26
前端开发或Node.js开发中,经常会遇到并发请求的场景,针对这些场景,我们怎么进行限制呢?

我们在开发的过程中,经常会遇到一些并发的情况,而如果并发量比较大时,需要进行限制。比如可能出现的场景:

  1. 传入多个异步请求,但最多只能触发 limit 个请求;额外的功能,所有的请求都执行完后,返回成功;

  2. 生成一个新函数,调用多次发起请求,但有并发限制;

  3. 多个 Promise 按顺序执行,这个其实可以认为并发数限制是 1,但我们可以用另一种方式来实现;

1. 多个异步请求的并发限制

有一系列的异步请求,比如爬虫抓取、后端接口请求、图片加载等场景,需要限制下并发请求的数量。这里要考虑下结果的处理,是每个请求完成后就可以了,还是要收集到所有的结果,类似于 Promise.all() 的效果。

1.1 递归的方式

思路大概是:首先发起 limit 个的请求,哪个完成了就递归发起下一个异步请求,所有的请求都完成后,则整体返回一个 Promise。如果不需要收集所有的数据,则不用写这个 Promise。

COPYTYPESCRIPT

/** * 递归方式实现异步并发控制 * @param arr 所有的数据集合,如请求的url等 * @param limit 限制并发的个数 * @param iteratorFn 对每个数据的处理 */ const promiseLimitByDepth = <T>(arr: T[], limit: number, iteratorFn: (item: T, urls?: T[]) => Promise<any>) => { const { length } = arr; const result = []; let i = 0; let finishedNum = 0; // 完成的个数 // 若不考虑最后数据的收集,可以不写这个Promise return new Promise((resolve) => { const request = async (index: number) => { kk.show(arr[index], index); const p = Promise.resolve().then(iteratorFn(arr[index])); const res = await p; result[index] = res; finishedNum++; if (finishedNum === length) { resolve(result); } if (i < length) { request(i); i++; } }; for (; i < limit; i++) { request(i); } }); };

这里我们用 setTimeout 来模拟下异步请求。

COPYJAVASCRIPT

const newFetch = (delay) => { return new Promise((resolve) => { setTimeout(() => { resolve(delay); }, delay); }); };

调用方式:

COPYJAVASCRIPT

promiseLimitByDepth([2000, 1000, 3000, 2500, 1200, 5000, 3500, 2300], 2, (num) => { return newFetch(num); }).then(console.log);

您可以查看 demo:递归实现的异步并发控制。在 demo 中可以看到,控制着同一时刻的请求个数,某一个请求结束后,再启动下一个请求。

上面的代码还可以用来控制图片的加载:

COPYJAVASCRIPT

const arr = []; let i = 10; while (i--) { arr.push(`https://www.xiabingbao.com/upload/368662d904df5cbe4.jpg?t=${Math.random()}`); } promiseLimitByDepth(arr, 2, (url) => { return new Promise((resolve) => { const img = new Image(); img.src = url; // 这里暂时只考虑成功的情况 img.onload = resolve; }); }).then(console.log);

我们从图片加载的瀑布流里可以看到,每次最多只加载 2 张图片:

每次最多只加载2张图片-蚊子的前端博客

1.2 循环的方式

使用循环的方式,肯定得用到 async-await 了。

COPYJAVASCRIPT

const promiseLimitByCycle = async <T>(arr: T[], limit: number, iteratorFn: (item: T, arr?: T[]) => Promise<any>) => { const { length } = arr; const result: Promise<any>[] = []; const runningList: Promise<any>[] = []; // 正在执行的异步任务 for (const url of arr) { const p = Promise.resolve().then(iteratorFn(url)); // 转为promise result.push(p); // 若limit大于length,则不再进行控制,直接用Promise.all()即可 if (limit <= length) { const e = p.then(() => { // promise p 执行完毕时,会触发这个,这个是后执行的,先执行的是下面的push操作 const index = runningList.indexOf(e); // 当p执行成功的时候,从runningList中删除该Promise,同时也会触发下面的Promise.race() return runningList.splice(index, 1); }); // promise e 是 p执行的过程,若p执行成功,则e.value就是p.then()里的return的值 runningList.push(e); // 超过限制,则先存储起来 if (runningList.length >= limit) { // 哪个先完成,都会触发race,然后进入下一层循环 await Promise.race(runningList); } } } // 所有的都完成了,才最后返回结果 return Promise.all(result); };

上面有段代码比较绕,我们再单独拿出来讲解下:

COPYJAVASCRIPT

// Promise是可以链式调用的,then()本身返回的就是Promise // 因此e是p.then()的返回值,e自己也是Promise // e.then()什么时候执行,取决于p.then()什么执行,又再取决于p什么时候执行 // const e = p.then()是同步执行的,因此先得到的变量e,再执行的p.then()里的操作 // 当p执行完成后,则就执行p.then()里的操作,找出e所在的位置并进行删除 // e.then()回调里的值据说splice()的返回值,其实就是e,但这里我们并不用关心他的返回值是什么 const e = p.then(() => { const index = runningList.indexOf(e); return runningList.splice(index, 1); }); runningList.push(e); // 这里监听的是runningList,即里面的某个e完成了,就会触发Promise.race() // 若e完成了,必然p也是完成了的 await Promise.race(runningList);

这里充分用到了Promise.all()Promise.race()的特性,来实现的。

2. 新函数的并发限制

我们来简单描述下题目:创建返回一个新函数,在调用这个新函数产生异步请求时,有并发的限制。

COPYJAVASCRIPT

// 创建返回一个新函数,在调用这个新函数产生异步请求时,限制并发的数量 // 问,如何实现这个create方法? const createFetch = (limit) => { return () => {}; }; const newFetch = createFetch(2); // 最多只能并发2个 newFetch(url); newFetch(url); newFetch(url); newFetch(url);

这里参考了 npm 包 p-queue 的源码,并对其进行了精简。新函数 newFetch() 每次都是要返回一个 Promise 的,就看什么时候执行 resolve(),并启动下一个。

COPYJAVASCRIPT

const createFetch = (limit) => { let runningNum = 0; // 当前正在进行的数量 const queue = []; // 所有将要执行的任务队列 // 尝试启动下一个任务 const tryNextOne = () => { if (queue.length === 0) { return false; } if (runningNum < limit) { // 若没有达到限制,则直接启动 const job = queue.shift(); if (!job) { return false; } job(); return true; } return false; }; // 返回一个新函数,新函数里直接返回一个Promise return (url, iteratorFn) => { return new Promise((resolve) => { // 定义一个函数,但不立即执行 const run = async () => { runningNum++; // 启动一个任务,数量+1 const result = await Promise.resolve(iteratorFn(url)); resolve(result); runningNum--; // 完成一个任务,数量-1 tryNextOne(); // 启动下一个任务 }; queue.push(run); // 将所有的任务,都推送到队列中 tryNextOne(); // 启动队列中任务的入口 }); }; };

我们用 sleep() 函数模拟下:

COPYJAVASCRIPT

const sleep = (delay) => { return new Promise((resolve) => { setTimeout(() => { resolve(delay); }, delay); }); }; const newFetch = createFetch(2); for (let i = 0; i < 10; i++) { console.log(`${i} start`); newFetch(i, async (i) => { await sleep(600 + 10 * i); return `${i}`; }).then((i) => { console.log(`${i} end`); }); }

3. 多个异步任务的顺序执行

我们其实把上面实现的一些函数,并发数量设置为 1,就是多个异步任务的顺序执行了。不过我们这里还有一些其他的方式。

3.1 async-wait

把所有的异步任务都放到数组中,然后用 async-wait 的方式来控制:

COPYJAVASCRIPT

const arr = [600, 500, 400, 700, 300, 450]; const asyncLoop = async (arr, iteratorFn) => { const result = []; for (const item of arr) { console.log(`${item} start`); const res = await Promise.resolve(iteratorFn(item)); console.log(`${res} end`); result.push(res); } return result; }; asyncLoop(arr, (item) => { return sleep(item); });

3.2 纯 Promise

如果不使用 async-await,用 Promise 可以实现吗?

Promise 是异步的,在一个同步流程中,是无法等待这个 Promise 完成的,因此这里我用递归的方式来实现的。

COPYJAVASCRIPT

const promiseLoop = (arr, iteratorFn) => { const result = []; return new Promise((allResolve) => { const run = (index = 0) => { if (index < arr.length) { return new Promise((resolve) => { const p = Promise.resolve(iteratorFn(arr[index])); p.then((res) => { console.log(res); result.push(res); resolve(res); if (index + 1 < arr.length) { // 上一个Promise完成后,启动下一个 run(index + 1); } else { // 若全部都完成了,则执行最外层的Promise allResolve(result); } }); }); } }; run(); }); };

使用方式与上面的一样:

COPYJAVASCRIPT

promiseLoop(arr, (item) => { return sleep(item); }).then(console.log);

4. 同时请求,但按顺序尽快输出

如并发请求一些数据,结果按照请求顺序依次输出,而且要尽可能早的输出结果。

如 a,b,c 三个请求并发请求:

  • a 需要 200ms;

  • b 需要 100ms;

  • c 需要 300ms;

即使 b 先完成,也得等着 a 完成输出结果后,b 再输出,c 稍后完成后,再输出 c 的结果。等所有的请求都执行完毕后,再整体按照顺序返回请求的结果。

我实现的思路是在后面的请求先完成的,则将结果先存储起来,等前面的请求完成后,再一并输出。

COPYJAVASCRIPT

// 并发请求但顺序输出 const concurrentAndSyncLog = (arr, iteratorFn) => { const { length } = arr; const list = new Array(length).fill({ fulfilled: false, value: null }); // fulfilled表示数据是否已准备好 let showStart = 0; // 开始输出的位置 let fulfilledNum = 0; // 完成的个数 return new Promise((resolve) => { for (let i = 0; i < length; i++) { const p = Promise.resolve(iteratorFn(arr[i])); p.then((result) => { list[i] = { fulfilled: true, value: result }; fulfilledNum++; if (i === showStart) { let j = showStart; while (j < length) { if (list[j].fulfilled) { // 输出所有完成的数据 console.log(list[j].value); } else { // 当前位置的数据还没准备好,直接停止,并设置下次输出的位置 showStart = j; break; } j++; } } if (fulfilledNum >= length) { resolve(list.map((item) => item.value)); } }); } }); };

调用:

COPYJAVASCRIPT

concurrentAndSyncLog([200, 100, 300], sleep).then(console.log); // 200, 100, 300 // [200, 100, 300]

5. 总结

JavaScript 中对 Promise 的异步并发的控制,更多地是考察我们对 Promise 中一些知识点的运用和和深刻理解。比如 Promise.race(),Promise.all()等方法的使用,还有 Promise 的链式调用、等待机制等。

我们之前在之前的文章实现 Promise 的 first 等各种变体中,也是运用了 Promise 的各种机制,来实现一些 Promise 本身不支持的功能。这篇文章希望能更加加深我们 Promise 的理解。

标签:
阅读(3336)

公众号:

qrcode

微信公众号:前端小茶馆

相关文章

公众号:

qrcode

微信公众号:前端小茶馆