基于Hadoop和Spark的证券交易数据分析
上海证券交易所
黄寅飞
摘要:
本文对Hive、Impala、Spark SQL三个开源数据仓库产品进行性能测试,对分区技术和列存算法进行比较,并对Spark SQL进行参数配置调优。结果表明Sql on Hadoop开源产品在加载性能、查询性能等方面已达到企业级数据仓库使用要求。本文还结合iPython Notebook对证券数据进行探索式分析和可视化展现。
1 技术概述
证券交易数据属于典型的结构化数据,交易记录存放简短的数字和文本信息,记录长度在数百字节。记录条数较多,按照每天5000万条记录估算,10年的记录总量可超过1000亿条,存储量可达到100TB。采用Sql on Hadoop[1]技术,既可用廉价PC服务器获得良好的容量线性扩展能力,又可提供便于统计分析的SQL接口方便数据应用开发。
1.1 产品概述
Hadoop由Yahoo公司在2006年发布,目标是利用廉价服务器集群搭建面向海量数据的开源计算引擎,同时保证容忍硬件故障的数据高可用性和计算高可靠性,编程语言为Java。Hadoop框架包含Common公共库、HDFS文件系统、YARN调度引擎、MapReduce计算引擎,Hadoop生态圈包含Hive、HBase、Spark、ZooKeeper、Storm等大量开源软件。目前已有Cloudera、Hortonworks、MapR等多家大型企业围绕Hadoop发行版开展经营服务。
Sql on Hadoop的三个代表性开源产品是Hive、Impala[2]和Spark SQL[3],共同特点是数据存储在Hadoop的HDFS分布式文件系统中,通过Hive管理元数据。产品属于OLAP范畴,不建索引,读操作远多于写操作,不支持事务,写操作采用批量执行方式。
Hive是Hadoop生态圈中最早的数据仓库产品,由Facebook在2008年贡献给Apache基金会,编程语言为Java。Hive将SQL语句转换为MapReduce作业提交到Hadoop集群进行分布式计算,再返回执行结果。受限于Hadoop的作业调度开销,Hive性能较差。
Impala是Cloudera公司在2012年发布的MPP(Massively Parallel Processing)数据库产品,目标是为Hadoop提供原生SQL查询引擎,编程语言为C++。Impala借鉴Google的Dremel产品设计思想,使用多层查询树结构,将SQL查询层层分解到下层节点并行执行,在返回结果时各层节点可进行局部聚合。Impala不用MapReduce任务机制,改用MPP机制对数据进行处理,性能较优。
Spark由加州大学伯克利分校AMP实验室(UC Berkeley AMPLab)在2013年贡献给Apache基金会,编程语言为Scala(基于JVM)。Spark SQL是Spark生态圈中的SQL数据引擎,前身为Shark,2014年替代为Spark SQL。借助Spark的RDD内存计算模型和丰富算子,Spark SQL提供对海量数据的SQL查询接口。Spark SQL社区活跃,新版本性能有较大提升。
1.2 技术概述
在Sql on Hadoop产品中应用的关键技术包括分区和列存压缩[4]。分区技术根据特定字段对数据进行划分,缩小扫描范围,辅助查询。列存压缩技术按照列存储并压缩数据,大幅减少IO量。分区参数和压缩算法的合理选择,可有效提升整体性能。
Hive、Impala和Spark SQL依赖HDFS文件系统存储数据文件。HDFS支持行存格式TextFile(文本行)、SequenceFile(键值数组),也支持列存格式RCFile、ORC、Parquet。列存技术将数据表垂直划分,同一列的所有数据连续存储在一起。如SQL查询只涉及个别列,则只需读取对应列内容,可有效减少IO量。因数据按列存储,所以可以针对每列数据的类型采取具有针对性的数据压缩算法,令整体压缩效率大幅提升。
RCFile是Hive推出的列存格式,ORC是针对RCFile提出的优化文件格式。ORC文件包含若干数据带,每个数据带包含若干列记录,以及每列的压缩算法和统计信息。通过数据带中记录信息,可以在查找时跳过不满足条件的列,提高执行效率。
Parquet是Twitter和Cloudera合作推出的列存格式。Parquet文件包含若干行组,每个行组包含若干列块,每个列块包含若干数据页,每个数据页内连续存放列数据,文件尾存储格式信息。Parquet格式对于宽表的查询性能提升明显。
本文对上述产品开展一系列性能试验,评估数据加载性能、单表查询性能、多表查询性能,并对参数配置进行调优。此外,对如何进行探索式分析做了介绍。
2 性能试验
2.1环境参数
试验环境为6台PC服务器(DL380,32核,128GB内存,1TB本地硬盘)构成的千兆网络集群。操作系统为64位RHEL 6.3,安装CDH 5.4.5(Hadoop 2.6.0),包含Hive 1.1.0,Impala 2.2.0,单独安装Spark 1.5.2,共用同一套HDFS文件系统。Spark采用StandAlone集群模式。HDFS副本数配置为2。
数据源为2015年6月至8月间上海证券市场的脱敏交易日志,共65天,30.6亿条订单记录,38.1亿条成交记录,其中订单记录长635字节,成交记录长269字节,总共2.70 TB数据量。Impala内存设为48GB,Spark内存设为48GB。
2.2 加载与查询
交易日志上传到服务器本地磁盘后,经历两步数据加载。第一步是文件加载,通过 hdfs dfs -put 命令,将日志文件加载到HDFS文件系统中,在这一过程中,数据被分块后通过网络进行分布式存储。第二步是表加载,将日志文件映射为外部表,通过Hive的insert overwrite table指令,将交易记录转换为关系表,以Parquet列式压缩格式和日期字段分区存入HDFS文件系统。
加载测试使用65天的成交日志文件,数据总量为955GB,38.1亿条记录。记录不同用例的加载时间和压缩比,见表1。
表1 文件加载与表加载性能
编号 |
LF1 |
LT1 |
LT2 |
LT3 |
LT4 |
LT5 |
用例 |
文件加载 |
表加载TextFile |
表加载RcFile |
表加载ORC |
表加载Parquet 日期分区 |
表加载Parquet 不分区 |
耗时 |
3624秒 |
4448秒 |
3264秒 |
3239秒 |
5155秒 |
4819秒 |
压缩比 |
n/a |
48.7% |
43.3% |
8.99% |
15.18% |
15.28% |
单个文件(9.9GB)加载时长1分钟44秒,加载速度97.4 MB/s。将分布在6台服务器硬盘上的文件进行并行加载,总耗时1小时24秒,总加载速度269MB/s。并行加载技术可将文件加载速度提升到2.76倍。
表加载阶段, 进一步按照Hive元数据进行ETL转换,数据按列进行压缩,按指定字段进行分区存储。TextFile和RcFile格式简单地滤掉空白字符仅有45%左右的压缩比,ORC和Parquet进行列存压缩可达到8.99%的压缩比,即955GB成交日志压缩后只占用85.9GB的存储空间。
在分区配置方面,按日期分为65个分区。如分区太多会对内存产生较高要求,需小心配置以避免出现OOM(Out Of Memory)错误。
数据导入HDFS并在Hive建立元数据后,使用不同产品进行查询,见表2。单表查询用例包括:个数统计、唯一性统计、排序TOP N、分类汇总、随机查询。多表查询用例包括:维度统计。以Parquet列存压缩、 交易日期分区作为基准。
表2 Sql on Hadoop产品查询性能比较
编号 |
用例 |
Hive |
Impala |
Spark SQL |
QB1 |
个数统计 |
92.207秒 |
13.93秒 |
37.17秒 |
QB2 |
唯一性统计 |
119.343秒 |
45.00秒 |
26.832秒 |
QB3 |
排序TOP N |
n/a |
614.19秒 |
201.02秒 |
QB4 |
分类汇总 |
573.282秒 |
58.72秒 |
66.433秒 |
QB5 |
随机查询 |
84.231秒 |
2.80秒 |
2.307秒 |
QB6 |
维度统计 |
314.982秒 |
39.33秒 |
44.895秒 |
从表中可见,三个产品中,Hive性能较差(其中QB3用例跑不出来),Impala和Spark SQL性能相当。考虑到Spark SQL在数据格式方面兼容性更好,后续主要基于Spark SQL进行不同参数配置下的性能试验。
2.3 数据格式与分区试验
首先进行数据格式试验,分析列存压缩算法对于Spark SQL查询性能的影响。以交易日期分区作为基准。从表3可以看到,Rcfile列存相对Textfile行存,访问列相比访问整行IO量减少到1/20,查询效率可提高。ORC和Parquet列存压缩将数据量进一步减少到十分之一,查询效率可进一步提高。
表3 不同数据格式下查询性能比较
编号 |
用例 |
Textfile |
Rcfile |
ORC |
Parquet |
QC1 |
个数统计 |
271.534秒 |
115.29秒 |
46.528秒 |
37.17秒 |
QC2 |
唯一性统计 |
185.483秒 |
134.317秒 |
40.513秒 |
26.832秒 |
QC3 |
排序TOP N |
541.186秒 |
511.268秒 |
657.166秒 |
201.02秒 |
QC4 |
分类汇总 |
289.402秒 |
140.898秒 |
157.899秒 |
66.433秒 |
QC5 |
随机查询 |
2.296秒 |
2.824秒 |
1.877秒 |
2.307秒 |
QC6 |
维度统计 |
227.818秒 |
175.412秒 |
138.633秒 |
44.895秒 |
其次试验不同分区策略下Impala和Spark SQL的性能,以Parquet作为基准。从表4可以看到,数据分区可对查询性能带来相当大的提升。
表4 不同分区策略下查询性能比较
编号 |
用例 |
Impala 不分区 |
Impala 日期分区 |
Sparksql 不分区 |
Sparksql 日期分区 |
QP1 |
个数统计 |
13.46秒 |
13.93秒 |
46.715秒 |
37.17秒 |
QP2 |
唯一性统计 |
45.04秒 |
45.00秒 |
30.675秒 |
26.832秒 |
QP3 |
排序TOP N |
629.62秒 |
614.19秒 |
346.104秒 |
201.02秒 |
QP4 |
分类汇总 |
60.22秒 |
58.72秒 |
117.112秒 |
66.433秒 |
QP5 |
随机查询 |
3.11秒 |
2.80秒 |
2.371秒 |
2.307秒 |
QP6 |
维度统计 |
39.36秒 |
39.33秒 |
94.173秒 |
44.895秒 |
2.4 参数配置试验
Spark配置参数较多,对性能影响较大的有内存参数、实例数参数和总核数参数[5]。对不同参数配置下Spark SQL性能进行试验,寻找最优的配置策略。
首先调整SPARK_WORKER_MEMORY参数,将实例数固定为2,总核数固定为32。当实例内存设为6G时,QM3用例会因OOM错误而中止。从表5可以看到,实例内存从9G到12G时,性能提升较大,内存再增加,对于单个任务的执行效率影响不大。
表5 Spark内存参数配置试验
编号 |
用例 |
9G |
12G |
18G |
QM1 |
个数统计 |
36.258秒 |
37.17秒 |
35.086秒 |
QM2 |
唯一性统计 |
25.563秒 |
26.832秒 |
26.699秒 |
QM3 |
排序TOP N |
408.309秒 |
201.02秒 |
196.43秒 |
QM4 |
分类汇总 |
66.807秒 |
66.433秒 |
65.215秒 |
QM5 |
随机查询 |
3.2秒 |
2.307秒 |
2.751秒 |
QM6 |
维度统计 |
43.504秒 |
44.895秒 |
44.362秒 |
实例内存固定为12G,总核数固定为32,调整实例数参数SPARK_WORKER_INSTANCES。从表6可以看到,实例数从1增至2时,性能有一定提升,再增加实例数,对于单个任务性能影响不大。
表6 Spark实例数参数配置试验
编号 |
用例 |
1实例 |
2实例 |
3实例 |
QI1 |
个数统计 |
33.921秒 |
37.17秒 |
36.631秒 |
QI2 |
唯一性统计 |
27.0秒 |
26.832秒 |
29.097秒 |
QI3 |
排序TOP N |
246.428秒 |
201.02秒 |
198.137秒 |
QI4 |
分类汇总 |
85.213秒 |
66.433秒 |
73.671秒 |
QI5 |
随机查询 |
2.523秒 |
2.307秒 |
2.797秒 |
QI6 |
维度统计 |
45.811秒 |
44.895秒 |
51.382秒 |
将实例内存固定为12G,实例数固定为2,调整总核数参数SPARK_WORKER_CORES。从表7可以看到,从4核到16核,性能有显著提升,核数再增加,对单个任务的执行速度影响不大。
表7 Spark总核数参数配置试验
编号 |
用例 |
4核 |
16核 |
32核 |
QO1 |
个数统计 |
43.106秒 |
33.683秒 |
37.17秒 |
QO2 |
唯一性统计 |
31.791秒 |
30.402秒 |
26.832秒 |
QO3 |
排序TOP N |
352.349秒 |
194.08秒 |
201.02秒 |
QO4 |
分类汇总 |
122.85秒 |
71.682秒 |
66.433秒 |
QO5 |
随机查询 |
2.11秒 |
2.197秒 |
2.307秒 |
QO6 |
维度统计 |
74.853秒 |
44.34秒 |
44.895秒 |
2.5 试验小结
经过一系列性能试验,倾向于选择Impala和Spark SQL作为Hadoop系统上用SQL分析结构化数据的工具。数据格式方面Parquet相对更优,数据分区比不分区更优。对Spark SQL,需合理配置内存参数、实例数和总核数参数,以获得优化性能。
3 探索式分析
Impala和Spark SQL可提供基于Hadoop分布式数据上的秒级查询回应。从更多数据源获取产品、席位、会员、营业部信息等基础数据,结合证券交易数据可开展探索式分析,分析数据特征,展现可视化图像,进行数据挖掘。采用PySpark+iPython+MatplotLib技术[6]的组合,可通过网页远程交互并可视化展示。
上证股票共1076支,可按照行业、地域、流通市值等划分板块。从图1中可看到不同板块的成交额饼图,从行业看,制造业、金融业、交通运输业股票交易最为活跃,从地域看,北京、上海、广东省上市股票交易最为活跃,从流通市值看,50-100亿、100-150亿、150-200亿流通市值股票交易最为活跃。相关脚本执行时间分别为64、58、256秒。
图1 产品交易额分布:行业板块、地域、流通市值
从逐笔成交中可以还原出行情曲线。在逐笔成交数据基础上,结合Spark RDD开发,可生成秒行情,向市场提供历史行情的按需回放功能,如图2所示。相关脚本执行时间为6秒。
图2 某股票6月5日逐秒行情
在历史数据中可以根据用户账户查询全部历史行为,绘制用户画像,选取某用户,统计该用户买卖股票的分布,计算用户的买入卖出变化曲线,如图3所示。相关脚本执行时间为75、38秒。
图3 某账户用户画像
在用户交易行为历史数据基础上,可使用Spark MLLib机器学习库[7]对用户进行分类,根据不同类别用户的行为特征,实施差异化的监管风控策略。以股票板块成交金额特征为例,每个账户对应16条特征,5250万活跃账户共计生成特征向量4.7GB,用K均值算法聚类,设K值为5。特征抽取和聚类算法执行时间分别为299、42秒。
4 小结
本文根据证券行业数据特点,提出基于Sql on Hadoop的行业大数据解决方案。通过一系列性能试验,表明Impala、Spark SQL等开源技术在加载性能、查询性能等方面已达到企业级数据仓库使用要求,同时开源社区提供的可视化和机器学习工具可满足数据科学家的探索性分析需要。
致谢
感谢信息中心罗列和樊小泊的数据服务支持。感谢上市公司监管一部刘海波提供的上市股票分类数据。感谢市场监察部李旭和陈雷提供的业务信息。
参考文献
[1] Michael Frampton. Big Data Made Easy: A Working Guide to the Complete Hadoop Toolset. Apress Berkely, CA, USA. 2015
[2] 贾传青. 开源大数据分析引擎:Impala实战. 清华大学出版社. 2015
[3] 王家林. 大数据Spark企业级实战. 电子工业出版社. 2015
[4] 张俊林. 大数据日知录:架构与算法. 电子工业出版社. 2014
[5] H. Karau, A. Konwinski, P. Wendell and M. Zaharia. Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly Media, Inc. 2015
[6] Wes McKinney著,唐学韬等译. 利用Python进行数据分析. 机械工业出版社. 2014
[7] Nick Pentreath著,蔡立宇等译. Spark机器学习. 人民邮电出版社. 2015