Wenzi

JavaScript 中的 Promise 异步并发控制

蚊子前端博客
发布于 2022/08/03 14:26
前端开发或Node.js开发中,经常会遇到并发请求的场景,针对这些场景,我们怎么进行限制呢?

我们在开发的过程中,经常会遇到一些并发的情况,而如果并发量比较大时,需要进行限制。比如可能出现的场景:

  1. 传入多个异步请求,但最多只能触发 limit 个请求;额外的功能,所有的请求都执行完后,返回成功;
  2. 生成一个新函数,调用多次发起请求,但有并发限制;
  3. 多个 Promise 按顺序执行,这个其实可以认为并发数限制是 1,但我们可以用另一种方式来实现;

1. 多个异步请求的并发限制 #

有一系列的异步请求,比如爬虫抓取、后端接口请求、图片加载等场景,需要限制下并发请求的数量。这里要考虑下结果的处理,是每个请求完成后就可以了,还是要收集到所有的结果,类似于 Promise.all() 的效果。

1.1 递归的方式 #

思路大概是:首先发起 limit 个的请求,哪个完成了就递归发起下一个异步请求,所有的请求都完成后,则整体返回一个 Promise。如果不需要收集所有的数据,则不用写这个 Promise。

/**
 * 递归方式实现异步并发控制
 * @param arr 所有的数据集合,如请求的url等
 * @param limit 限制并发的个数
 * @param iteratorFn 对每个数据的处理
 */
const promiseLimitByDepth = <T>(arr: T[], limit: number, iteratorFn: (item: T, urls?: T[]) => Promise<any>) => {
  const { length } = arr;
  const result = [];
  let i = 0;
  let finishedNum = 0; // 完成的个数

  // 若不考虑最后数据的收集,可以不写这个Promise
  return new Promise((resolve) => {
    const request = async (index: number) => {
      kk.show(arr[index], index);
      const p = Promise.resolve().then(iteratorFn(arr[index]));
      const res = await p;

      result[index] = res;
      finishedNum++;
      if (finishedNum === length) {
        resolve(result);
      }
      if (i < length) {
        request(i);
        i++;
      }
    };

    for (; i < limit; i++) {
      request(i);
    }
  });
};

这里我们用 setTimeout 来模拟下异步请求。

const newFetch = (delay) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(delay);
    }, delay);
  });
};

调用方式:

promiseLimitByDepth([2000, 1000, 3000, 2500, 1200, 5000, 3500, 2300], 2, (num) => {
  return newFetch(num);
}).then(console.log);

您可以查看 demo:递归实现的异步并发控制。在 demo 中可以看到,控制着同一时刻的请求个数,某一个请求结束后,再启动下一个请求。

上面的代码还可以用来控制图片的加载:

const arr = [];
let i = 10;
while (i--) {
  arr.push(`https://www.xiabingbao.com/upload/368662d904df5cbe4.jpg?t=${Math.random()}`);
}
promiseLimitByDepth(arr, 2, (url) => {
  return new Promise((resolve) => {
    const img = new Image();
    img.src = url;
    // 这里暂时只考虑成功的情况
    img.onload = resolve;
  });
}).then(console.log);

我们从图片加载的瀑布流里可以看到,每次最多只加载 2 张图片:

每次最多只加载2张图片

1.2 循环的方式 #

使用循环的方式,肯定得用到 async-await 了。

const promiseLimitByCycle = async <T>(arr: T[], limit: number, iteratorFn: (item: T, arr?: T[]) => Promise<any>) => {
  const { length } = arr;
  const result: Promise<any>[] = [];
  const runningList: Promise<any>[] = []; // 正在执行的异步任务

  for (const url of arr) {
    const p = Promise.resolve().then(iteratorFn(url)); // 转为promise
    result.push(p);

    // 若limit大于length,则不再进行控制,直接用Promise.all()即可
    if (limit <= length) {
      const e = p.then(() => {
        // promise p 执行完毕时,会触发这个,这个是后执行的,先执行的是下面的push操作
        const index = runningList.indexOf(e);
        // 当p执行成功的时候,从runningList中删除该Promise,同时也会触发下面的Promise.race()
        return runningList.splice(index, 1);
      });
      // promise e 是 p执行的过程,若p执行成功,则e.value就是p.then()里的return的值
      runningList.push(e);

      // 超过限制,则先存储起来
      if (runningList.length >= limit) {
        // 哪个先完成,都会触发race,然后进入下一层循环
        await Promise.race(runningList);
      }
    }
  }
  // 所有的都完成了,才最后返回结果
  return Promise.all(result);
};

上面有段代码比较绕,我们再单独拿出来讲解下:

// Promise是可以链式调用的,then()本身返回的就是Promise
// 因此e是p.then()的返回值,e自己也是Promise
// e.then()什么时候执行,取决于p.then()什么执行,又再取决于p什么时候执行
// const e = p.then()是同步执行的,因此先得到的变量e,再执行的p.then()里的操作
// 当p执行完成后,则就执行p.then()里的操作,找出e所在的位置并进行删除
// e.then()回调里的值据说splice()的返回值,其实就是e,但这里我们并不用关心他的返回值是什么
const e = p.then(() => {
  const index = runningList.indexOf(e);
  return runningList.splice(index, 1);
});
runningList.push(e);

// 这里监听的是runningList,即里面的某个e完成了,就会触发Promise.race()
// 若e完成了,必然p也是完成了的
await Promise.race(runningList);

这里充分用到了Promise.all()Promise.race()的特性,来实现的。

2. 新函数的并发限制 #

我们来简单描述下题目:创建返回一个新函数,在调用这个新函数产生异步请求时,有并发的限制。

// 创建返回一个新函数,在调用这个新函数产生异步请求时,限制并发的数量
// 问,如何实现这个create方法?
const createFetch = (limit) => {
  return () => {};
};

const newFetch = createFetch(2); // 最多只能并发2个
newFetch(url);
newFetch(url);
newFetch(url);
newFetch(url);

这里参考了 npm 包 p-queue 的源码,并对其进行了精简。新函数 newFetch() 每次都是要返回一个 Promise 的,就看什么时候执行 resolve(),并启动下一个。

const createFetch = (limit) => {
  let runningNum = 0; // 当前正在进行的数量
  const queue = []; // 所有将要执行的任务队列

  // 尝试启动下一个任务
  const tryNextOne = () => {
    if (queue.length === 0) {
      return false;
    }
    if (runningNum < limit) {
      // 若没有达到限制,则直接启动
      const job = queue.shift();
      if (!job) {
        return false;
      }
      job();
      return true;
    }
    return false;
  };

  // 返回一个新函数,新函数里直接返回一个Promise
  return (url, iteratorFn) => {
    return new Promise((resolve) => {
      // 定义一个函数,但不立即执行
      const run = async () => {
        runningNum++; // 启动一个任务,数量+1

        const result = await Promise.resolve(iteratorFn(url));
        resolve(result);

        runningNum--; // 完成一个任务,数量-1
        tryNextOne(); // 启动下一个任务
      };
      queue.push(run); // 将所有的任务,都推送到队列中
      tryNextOne(); // 启动队列中任务的入口
    });
  };
};

我们用 sleep() 函数模拟下:

const sleep = (delay) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(delay);
    }, delay);
  });
};

const newFetch = createFetch(2);
for (let i = 0; i < 10; i++) {
  console.log(`${i} start`);
  newFetch(i, async (i) => {
    await sleep(600 + 10 * i);
    return `${i}`;
  }).then((i) => {
    console.log(`${i} end`);
  });
}

3. 多个异步任务的顺序执行 #

我们其实把上面实现的一些函数,并发数量设置为 1,就是多个异步任务的顺序执行了。不过我们这里还有一些其他的方式。

3.1 async-wait #

把所有的异步任务都放到数组中,然后用 async-wait 的方式来控制:

const arr = [600, 500, 400, 700, 300, 450];

const asyncLoop = async (arr, iteratorFn) => {
  const result = [];
  for (const item of arr) {
    console.log(`${item} start`);
    const res = await Promise.resolve(iteratorFn(item));
    console.log(`${res} end`);
    result.push(res);
  }
  return result;
};
asyncLoop(arr, (item) => {
  return sleep(item);
});

3.2 纯 Promise #

如果不使用 async-await,用 Promise 可以实现吗?

Promise 是异步的,在一个同步流程中,是无法等待这个 Promise 完成的,因此这里我用递归的方式来实现的。

const promiseLoop = (arr, iteratorFn) => {
  const result = [];

  return new Promise((allResolve) => {
    const run = (index = 0) => {
      if (index < arr.length) {
        return new Promise((resolve) => {
          const p = Promise.resolve(iteratorFn(arr[index]));

          p.then((res) => {
            console.log(res);
            result.push(res);
            resolve(res);

            if (index + 1 < arr.length) {
              // 上一个Promise完成后,启动下一个
              run(index + 1);
            } else {
              // 若全部都完成了,则执行最外层的Promise
              allResolve(result);
            }
          });
        });
      }
    };
    run();
  });
};

使用方式与上面的一样:

promiseLoop(arr, (item) => {
  return sleep(item);
}).then(console.log);

4. 同时请求,但按顺序尽快输出 #

如并发请求一些数据,结果按照请求顺序依次输出,而且要尽可能早的输出结果。

如 a,b,c 三个请求并发请求:

  • a 需要 200ms;
  • b 需要 100ms;
  • c 需要 300ms;

即使 b 先完成,也得等着 a 完成输出结果后,b 再输出,c 稍后完成后,再输出 c 的结果。等所有的请求都执行完毕后,再整体按照顺序返回请求的结果。

我实现的思路是在后面的请求先完成的,则将结果先存储起来,等前面的请求完成后,再一并输出。

// 并发请求但顺序输出
const concurrentAndSyncLog = (arr, iteratorFn) => {
  const { length } = arr;
  const list = new Array(length).fill({ fulfilled: false, value: null }); // fulfilled表示数据是否已准备好
  let showStart = 0; // 开始输出的位置
  let fulfilledNum = 0; // 完成的个数

  return new Promise((resolve) => {
    for (let i = 0; i < length; i++) {
      const p = Promise.resolve(iteratorFn(arr[i]));
      p.then((result) => {
        list[i] = { fulfilled: true, value: result };
        fulfilledNum++;

        if (i === showStart) {
          let j = showStart;
          while (j < length) {
            if (list[j].fulfilled) {
              // 输出所有完成的数据
              console.log(list[j].value);
            } else {
              // 当前位置的数据还没准备好,直接停止,并设置下次输出的位置
              showStart = j;
              break;
            }
            j++;
          }
        }
        if (fulfilledNum >= length) {
          resolve(list.map((item) => item.value));
        }
      });
    }
  });
};

调用:

concurrentAndSyncLog([200, 100, 300], sleep).then(console.log);
// 200, 100, 300
// [200, 100, 300]

5. 总结 #

JavaScript 中对 Promise 的异步并发的控制,更多地是考察我们对 Promise 中一些知识点的运用和和深刻理解。比如 Promise.race(),Promise.all()等方法的使用,还有 Promise 的链式调用、等待机制等。

我们之前在之前的文章实现 Promise 的 first 等各种变体中,也是运用了 Promise 的各种机制,来实现一些 Promise 本身不支持的功能。这篇文章希望能更加加深我们 Promise 的理解。

标签:promise
阅读(5227)
Simple Empty
No data