Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monorepo 中的任务调度机制 #11

Open
worldzhao opened this issue Jun 21, 2021 · 0 comments
Open

Monorepo 中的任务调度机制 #11

worldzhao opened this issue Jun 21, 2021 · 0 comments

Comments

@worldzhao
Copy link
Owner

worldzhao commented Jun 21, 2021

前言

Monorepo 中的一个项目称为 project,对 project 进行的具体操作称为任务 task,比如 buildtest,可以狭义地理解为 npm scripts 中注册的操作,Monorepo 管理工具应当有能力调度这些任务。

项目依赖图

先看一个 🌰,如上图所示,存在一个依赖关系较为复杂的 Monorepo,此时需要执行某个任务,例如 build,如何同时保证任务执行顺序以及任务执行效率(假设最大任务并行数为 N)?

接下来就是枯燥乏味的做题过程,咱们先把上面那张项目依赖图抽象成代码。

问题

interface Project {
  name: string;
  actions: { name: string; fn: () => Promise<void> }[];
  dependencyProjects: Project[];
}

const sleep = (s: number): Promise<void> =>
  new Promise((r) => setTimeout(r, s));

// Monorepo 中注册的所有项目
const projects: Project[] = [
  "@monorepo/a",
  "@monorepo/b",
  "@monorepo/c",
  "@monorepo/d",
  "@monorepo/x",
  "@monorepo/y",
  "@monorepo/z",
].map((name) => ({
  name,
  actions: [{ name: "build", fn: () => sleep(Math.random() * 1000) }],
  dependencyProjects: [],
}));

const [A, B, C, D, X, Y, Z] = projects;

A.dependencyProjects = [B];
B.dependencyProjects = [D];
C.dependencyProjects = [D, X, Y];
X.dependencyProjects = [Y, Z];

/**
 * 实现本方法,使得 build 行为按照正确的顺序执行,且保证执行效率
 * @param projects 需要执行任务的 project 集合
 * @param actionName 具体操作名称
 * @param limit 任务最大并行数
 */
function run(projects: Project[], actionName: string, limit: number) {
  // todo
}

run(projects, "build", 12);

解题

很明显,project 之间存在依赖关系,那么任务之间也存在依赖关系,那么可以得到以下结论:

  1. 当前任务作为下游任务时,当前任务完成后,需要更新其上游任务的依赖任务,从其内移除当前任务
  2. 当前任务作为上游任务时,只有当前任务的下游任务都被清空(完成)时,当前任务才可以执行

于是 task 定义如下:

interface Task {
  // 任务名 `${projectName}:{actionName}`
  name: string;
  // 当前任务依赖的任务,即当前任务的下游任务,当该 dependenciesSet 被清空,说明当前任务可以被执行
  dependencies: Set<Task>;
  // 依赖当前任务的任务,即当前任务的上游任务,当前任务完成后,需要更新其上游任务的 dependenciesSet(从其内移除当前任务)
  dependents: Set<Task>;
  // 具体任务执行函数
  fn: () => Promise<void>;
}

初始化任务

根据 projects 参数,构造出项目对应的任务。

function run(projects: Project[], actionName: string, limit: number) {
  // 任务名与任务的映射
  const tasks = new Map<string, Task>();
  projects.forEach((project) =>
    tasks.set(getTaskName(project, actionName), {
      name: getTaskName(project, actionName),
      dependencies: new Set(),
      dependents: new Set(),
      fn: project.actions.find((a) => a.name === actionName)?.fn ?? noop,
    })
  );
}

// 获取任务名
function getTaskName(project: Project, actionName: string) {
  return `${project.name}:${actionName}`;
}

function noop(): Promise<void> {
  return new Promise((r) => r());
}

补充 dependencies 与 dependents

假设存在 project1,对其进行以下操作:

  1. 取到当前项目对应的任务 task1
  2. 获取当前任务对应的下游任务名 dependencyTaskNames(基于 project1.dependencyProjects)
  3. 遍历下游任务名 dependencyTaskName
  4. 取到下游任务(上一步初始化而来) dependencyTask
  5. 补充 task1 的 dependencies
  6. 补充 dependencyTask 的 dependents
function run(projects: Project[], actionName: string, limit: number) {
  // ...
  // project 与 project 对应 task 的下游任务名称
  function getDependencyTaskNames(project: Project): Set<string> {
    const dependencyTaskNames: Set<string> = new Set();
    // 遍历下游项目
    for (const dep of project.dependencyProjects) {
      // 搜集下游任务名
      dependencyTaskNames.add(getTaskName(dep, actionName));
    }

    return dependencyTaskNames;
  }

  projects.forEach((project) => {
    // 1. 获取当前项目对应的任务
    const task = tasks.get(getTaskName(project, actionName))!;
    // 2. 获取当前任务对应的下游任务名
    const dependencyTaskNames = getDependencyTaskNames(project);
    // 3. 遍历下游任务名
    for (const dependencyName of dependencyTaskNames) {
      // 4. 取到下游任务(上一步初始化而来)
      const dependency: Task = tasks.get(dependencyName)!;
      // 5. 补充当前任务的 dependencies
      task.dependencies.add(dependency);
      // 6. 补充下游任务的 dependents
      dependency.dependents.add(task);
    }
  });
}

任务依赖图

并行执行任务

function run(projects: Project[], actionName: string, limit: number) {
  // ...
  const taskQueue: Task[] = [];
  for (const [, task] of tasks) {
    taskQueue.push(task);
  }
  runTasks(taskQueue, limit);
}

async function runTasks(taskQueue: Task[], limit: number) {
  let currentActiveTasks = 0;
  function getNextTask() {
    for (let i = 0; i < taskQueue.length; i++) {
      const task: Task = taskQueue[i];
      // 返回准备好执行的任务
      if (task.dependencies.size === 0) {
        return taskQueue.splice(i, 1)[0];
      }
    }
    return null;
  }

  function _run(task: Task): Promise<void> {
    return task.fn().then(() => {
      console.log("act success", task.name);
      currentActiveTasks--;
      // 当前任务执行完成,从其上游任务的 dependencies 中移除当前任务
      task.dependents.forEach((dependent: Task) => {
        dependent.dependencies.delete(task);
      });
      // 继续执行
      start();
    });
  }

  async function start() {
    let ctask: Task | null = null;
    const taskPromises: Promise<void>[] = [];
    while (currentActiveTasks < limit && (ctask = getNextTask())) {
      currentActiveTasks++;
      const task: Task = ctask;
      taskPromises.push(_run(task));
    }

    await Promise.all(taskPromises);
  }

  start();
}

执行 run(projects, "build", 12),可以按照正确顺序输出结果。

act success @monorepo/z:build
act success @monorepo/y:build
act success @monorepo/x:build
act success @monorepo/d:build
act success @monorepo/b:build
act success @monorepo/a:build
act success @monorepo/c:build

关键路径长度

上文中的实现使得任务可以按照正确的顺序执行,但是在实际任务执行过程中,最长的任务链限制了整个任务树的执行速度,效率不能得到保证。

关键路径长度:任务距离最远的根节点的距离。

interface Task {
  name: string;
  dependencies: Set<Task>;
  dependents: Set<Task>;
  // 关联路径长度
  criticalPathLength?: number;
  fn: () => Promise<void>;
}

function run(projects: Project[], actionName: string, limit: number) {
  // ...
  const taskQueue: Task[] = [];
  for (const [, task] of tasks) {
    // 计算关键路径长度
    task.criticalPathLength = calculateCriticalPaths(task);
    taskQueue.push(task);
  }
  // 基于关键路径长度对任务进行降序排序
  taskQueue.sort((a, b) => b.criticalPathLength! - a.criticalPathLength!);
  runTasks(taskQueue, limit);
}

// 计算关键路径长度
function calculateCriticalPaths(task: Task): number {
  // 重复走到某一个任务了 直接返回值
  if (task.criticalPathLength !== undefined) {
    return task.criticalPathLength;
  }

  // 如果没有 dependents, 说明我们是 "root",即 app 此类不被依赖的 project
  if (task.dependents.size === 0) {
    task.criticalPathLength = 0;
    return task.criticalPathLength;
  }

  // 递归向上取最大值 每次 +1
  const depsLengths: number[] = [];
  task.dependents.forEach((dep) =>
    depsLengths.push(calculateCriticalPaths(dep))
  );
  task.criticalPathLength = Math.max(...depsLengths) + 1;
  return task.criticalPathLength;
}

criticalPathLength

Selecting subsets of projects

实际业务开发中,一般不需要构建 Monorepo 内全部的项目,在 应用级 Monorepo 优化方案 一文中介绍了使用 Monorepo 方式管理业务项目可能遇到的一些坑点以及相关解决方案,其中有这样一个问题:

发布速度慢

monorepo-1
若需要发布 app1,则所有 package 均会被构建,而非仅有 app1 依赖的 package1 与 package2 的 build 脚本被执行。

最终通过 Rush 提供的 Selecting subsets of projects 能力解决了以上问题。

具体命令如下:

# 只会执行 @monorepo/app1 及其依赖 package 的 build script
$ rush build -t @monorepo/app1

-t PROJECT--to PROJECT,后面PROJECT参数为此次任务的终点项目包名,若不想包含终点项目,可以改为-T参数,即--to-except PROJECT,与之类似的可挑选出项目子集的参数还有:

  • -f PROJECT, --from PROJECT
  • -F PROJECT, --from-except PROJECT
  • -i PROJECT, --impacted-by PROJECT
  • -I PROJECT, --impacted-by-except PROJECT
  • -o PROJECT, --only PROJECT

如果不指定这些参数,那么默认会执行所有项目(rush.json 中注册过的项目)的对应的 npm scripts

当然,也可以指定多个参数,最差情况获取到的 subsets 是 projects 本身,与不指定参数表现一致(一般不会)。

实际应用场景:在 CI 阶段,会筛选出所有发生变更的项目及其会影响到的项目进行一次 build check,比如一次提交改动了 @monorepo/a 以及 @monorepo/b的源码,CI 就会执行以下命令:

$ rush build -t @monorepo/a -f @monorepo/a -t @monorepo/b -f @monorepo/b

不用担心某些 project 的任务会被重复执行,这种任务只是图里的一个入度不为零的点,挑选出需要执行任务的 subsets 后,按照前面的任务调度机制执行任务即可。

除了内置的rush build等命令支持以上参数(默认执行 package.json 中的 build script),Rush 也将此能力开放给了自定义命令,也就是说你可以自定义一个 rush foo 命令,用于调用指定范围项目 package.json 中的 foo script,配合 Rush 的任务调度机制,任务可以保证执行顺序(如果需要的话),具体可以参考 custom_commands 一节。

Selecting subsets of projects 的具体实现不在本文讨论范围内。

结语

拓扑排序与关键路径,做了道题。🌚

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant