Octopus:基于矩阵模型的跨平台大数据机器学习系统及其性能优化
大数据时代,人们普遍地意识到大数据较之于小数据通常隐藏着更多、更深层次的价值与知识,对大数据的分析与挖掘能够获得非常大的社会效益和经济效益。UC Berkeley AMP实验室对现存的大数据机器学习系统,从易用性和计算性能两个维度进行考察,他们觉得现有的几乎全部的大数据机器学习系统都不能够同时具有良好的易用性和高效的性能。
在易用性方面,主流的大数据平台Hadoop、Spark等以及常用的数据分析平台R,针对大数据的处理提供的是分布式接口,易用性差,学习成本高。
在计算性能方面,机器学习并行算法库提供的是数量有限的、通用的机器学习算法,用户难以根据实际需求对算法进行定制化实现。因为矩阵运算是机器学习和数据分析时的重要分析建模方法,出现了很多高效的矩阵运算库,但是矩阵运算库只是提供了单个矩阵运算的高性能实现,并没有针对整个应用程序(即多矩阵操作)的计算流程进行相应的优化,以提升性能。此外,不同的分布式矩阵库对不同规模和不同矩阵操作的计算性能各有优劣,用户很难针对特定的问题去选择最佳的单个平台或多个平台的组合。
因此,一个良好的大数据机器学习系统需要满足以下特性:1)提供统一的编程抽象,提供良好的编程易用性;2)底层支持多种计算平台,实现跨平台的特性;3)支持应用的全局优化,提升执行性能。
因为矩阵运算是数据科学家最为常用、最接近于数学表示方式的一种数据分析建模方法,且绝大多数机器学习和数据分析算法都可以基于矩阵进行建模和算法设计。因此采用矩阵模型作为抽象的编程模型。Octopus系统的框架如图1所示:
图1 Octopus系统框架图示(黄色部分是本文工作)
4.1 OctMatrix和SymbolMatrix
OctMatrix提供的每个矩阵接口都封装底层不同平台的对应的矩阵操作的实现,使得程序在多种底层计算平台上都可以执行。OctMatrix的接口会立即调用底层平台相应的矩阵函数执行。
当矩阵操作涉及到不同计算平台的矩阵时,通过定义平台优先级Spark > MPI > Hadoop > R,将低优先级矩阵转换成高优先级矩阵再进行计算。矩阵在不同计算平台间的转换,通过读写Alluxio文件系统实现。
对于R中用户自定义函数在Spark等JVM平台上的执行通过apply(m, margin, func)函数实现,其实现流程图如图2所示:在R中将函数及依赖序列化传给Spark Driver广播到各个Worker上,每个Worker与本地的Rserve进行通信,Rserve反序列化得到函数并将其应用到矩阵数据上,最后将结果数据再传回Worker。
图2:Spark执行R函数示意图
SymbolMatrix矩阵接口会保存矩阵操作类型及其依赖矩阵的关系记录,并根据这些记录的关系形成矩阵计算流图,表示依赖关系的有向无环图(Directed Acyclic Graph, DAG)。针对计算流图可以进行逻辑优化和物理优化,最终转换成OctMatrix的接口进行计算。SymbolMatrix的求值是通过特定函数(如evaluate)触发执行。Octopus接口如表1所示。
表1: Octopus矩阵接口(API)列表
4.2 DAG逻辑优化
根据所求矩阵的DAG,可以进行一系列的逻辑执行方案优化形成等价的DAG(即不改变最终结果的正确性),使得优化后的DAG的执行时间低于原始DAG的执行时间。这类优化适用于所有的底层计算平台,能够提升应用的执行性能。逻辑执行方案优化如图3所示:
图3: DAG逻辑执行方案优化框架
因为逻辑优化会改变DAG的结构,对节点求值会造成错误的结果,因此需要根据DAG计算顺序依次对节点进行拷贝。
由于用户无意中写出重复代码,增加执行的时间,因此进行公共子表达式消除(Common Subexpression Elimination, CSE)优化。通过“依赖节点ID#矩阵函数#函数参数”进行标识每个节点,识别重复节点并进行替换。
对于多个矩阵相乘的情形,不同的乘法顺序导致总的计算量不同,执行时间差异较大。因此通过矩阵连乘优化(Multiply Chain Optimization, MCO)得到最优的乘法顺序,获得最佳的执行性能。其实现思路如下:
- 确定DAG中所有矩阵的规模。
- 通过BFS(第一个乘法节点)和中序遍历(所有乘法节点)寻找DAG中乘法链。
- 根据矩阵规模通过动态规划得到计算最佳乘法顺序。
- 根据最优的乘法顺序改变DAG结构。
4.3 DAG物理优化
4.3.1 Spark执行优化
当整个计算流图在Spark平台上运行时,会根据Spark的特性对DAG做分析处理来进一步地优化性能。Spark平台下的执行优化的流程图如图4所示:
图4:Spark平台的执行优化的流程图
因为Spark lazy执行机制,导致矩阵会被重复计算或重复读取文件。Cache优化将重复的耗时的矩阵操作进行Cache,减少运算时间。
因为矩阵间加减乘除运算涉及数据传输,矩阵多次参与运算导致数据的重复shuffle。Partition优化是对多次参与数据传输的矩阵进行co-partition,减少数据重复传输。
4.3.2 多平台自动调度
仅使用单个平台执行应用时其计算性能已经达到瓶颈,无法进一步提升。如果大数据系统底层支持多个分布式计算平台,可以对应用程序的每个矩阵操作选择最合适的底层计算平台,从而进一步提升应用的执行性能。多平台自动调度执行流程图如图5所示:
图5:多平台的自动调度执行的流程图
1)时间估计模型
因为特定的集群和计算平台,稠密矩阵操作的时间只和矩阵的规模相关。通过训练得到不同平台下不同规模的矩阵操作时间,根据样本通过普通最小二乘OLS线性回归方法建立模型。时间估计模型包含:1)矩阵操作的执行时间估计模型;2)矩阵数据在平台间的传输时间模型。
2)调度模块
建立时间估计模型后,DAG中的节点需要根据时间估计模型,选择最佳的执行平台,使得DAG的整体执行时间最优。调度模块就是针对DAG中的每个节点计算得到最佳的执行平台。
假设矩阵计算流图的DAG如图6 (a)所示,共有A、B、C、D四个节点,底层执行平台只有和两种,其搜索模型如图6 (b)所示。每个节点的搜索时间包含两部分:1)\(T^{c}_{\alpha}(C)\)节点C的操作在平台α上的计算时间;2)\(T^{t}_{\alpha - \beta}(A)\):依赖节点A的数据从到平台的传输时间(根据节点在平台和Alluxio中分为无传输开销、读开销、读加写开销三种情形)。
图6:搜索模型目标函数的定义
根据模型搜索最佳映射策略的方法如下:
- 基本思路:穷举法,对每个节点都搜索所有的计算平台,累加节点的计算时间和依赖节点的传输时间,搜索出最佳的映射策略;
- 剪枝:将最佳单个平台总时间作为当前最佳执行时间,排除了非最佳映射的搜索空间;
- 启发式:限制矩阵转换次数占DAG节点总数的比例(转换定义为矩阵在平台α上计算而在平台β(α≠β)中使用,即跨平台矩阵传输)。因为矩阵转换需要传输开销,因此最佳的映射不会出现很多矩阵转换。
3)执行模块
在执行时,会自动插入读写Alluxio函数进行矩阵传输并且节点会保存在Alluxio中存储的位置信息。
集群共有9个物理节点(1台Master, 8台Worker)。每台机器采用RedHat 7,Atlas 3.10.1,Java 1.7,Hadoop 2.6.0,Spark 1.4.1,Alluxio 0.6.4,R 3.2.1。Spark中分区数目被固定为100,MPI的ranks同样被固定为100。
5.1 物理执行方案优化实验
实验对DAG逻辑执行方案优化在单个平台下的实验,评估对高斯非负矩阵分解GNMF应用的性能提升,GNMF代码实现如图7所示。
图7:基于Octopus实现的GNMF代码
实验中采用5轮迭代,固定K为1000,M与N的值相同且分别取值为5000和50000,实验结果见表2,可以看出优化对GNMF应用在三个平台都获得了一定的性能提升。
表2:DAG逻辑执行方案优化对GNMF应用的结果(时间单位:秒)
5.2 Spark执行优化实验
针对GNMF应用,将矩阵V从内存随机初始化生成改成从HDFS上读取。实验中采用5轮迭代,固定K为1000,M与N相同,分别取值为5000、20000和50000,实验结果如表3所示。随着规模的增大,加速比也在上升,当矩阵规模为50000时,加速比达到了5。
表3:Spark执行优化实验结果(时间单位:秒)
5.3 多平台调度优化实验
选取矩阵乘法当作典型的测试用例来评估时间估计模型的精度,实验结果如表4所示,可以看出对于三个平台,模型误差基本在10%以内,模型比较精确。
表4:时间估算模型的乘法运算的误差率(执行时间单位:秒)
选取GNMF应用来测量穷尽搜索、剪枝搜索以及带转换率限制的剪枝搜索三种策略搜索到最佳调度策略的时间。GNMF算法中,M和N的值为30000,K的值为1000,设置迭代次数为7,DAG中节点总数为87个。剪枝搜索中,矩阵转换的次数占DAG节点数的比率在0.2左右。因此,限制最大转换率为剪枝搜索的转换率的一半,即在0.1以内。
图8展示了三种搜索策略的时间与节点数目的关系,从图中可以看出转换率限制的剪枝搜索具有最少的搜索时间以及良好的扩展性。
图8:三种搜索策略的时间与节点数目关系图示
采用GNMF算法和示例应用来测试多平台自动调度优化对应用的性能影响。GNMF应用的实验中的迭代次数为5轮,K的值为常数1000,M取值和N一样,分别为5000和50000。实验将自动调度方案和用户随机选择矩阵操作执行平台的方案中最好、最坏的选择方案进行比较,并给出单平台执行的时间开销。图9显示了不同方案的执行时间性能。从图中可以看出,自动调度框架会选择最佳的节点到平台的映射,获得了最优的执行时间。
图9:GNMF应用在不同选择方案下的执行时间
示例实验(代码如图10所示)被用来表明多个平台的组合使用,较之于单平台执行,多平台间的调度执行会进一步地提升性能。实验结果如图11所示,从图中可以看出,较之于单MPI平台和Spark平台执行应用,组合使用两个计算平台并通过自动调度策略执行应用能够分别获取62%与91%的性能提升。
图10:基于Octopus实现的示例程序
图11:示例程序在不同平台上的运行性能
Octopus提供基于矩阵模型的跨平台的大规模机器学习与数据分析编程模型和框架,用户基于提供的R语言的矩阵接口,能够进行机器学习和数据分析算法的快速设计实现,具有良好的易用性与可编程性。通过构建矩阵计算流图,并针对矩阵计算流图进行了逻辑执行方案优化和物理执行方案优化,进一步提升了执行的性能。此外,底层支持R、Spark、Hadoop和MPI等多种计算平台,实现了上层算法设计实现与底层分布式并行化实现细节解耦,具备了“Write Once, Run Anywhere”的跨平台特性以及良好的平台扩展性。