高磊,王佐文,梁金佑,陈光福,卢夏萍,李辉涵,琚小鹏
(1. 广西电网有限责任公司贵港供电局,广西省,贵港市,537100, 邮编: 537100)
摘要:电力行业计量数据有多种类、多来源、速率高等特性。并且随着该计量行业的不断发展,数据处理的复杂性逐渐升高,传统程序写入数据库的方法暴露了稳定性差、速率慢等问题,需要结合先进的大数据处理技术设计一套符合当下该领域的数据写入处理方法。本文针对数据写入这一点进行了研究,结合了先进的流处理技术设计了一套数据写入程序,程序采用了Storm技术设计了拓扑结构实现了关系型数据库的可靠写入。通过制定了Storm模块间数据元交互格式制定,结合数据分组设计和多源类数据表缓冲结构的应用,优化了程序写入数据库的稳定性、速率性以及抵御了数据复杂性带来的入库挫折,满足了电力计量数据写入数据库的高要求,同时该框架及相关模块也有很高的其它行业数据写入的适用性。
关键词:流处理;数据入库;抗挫折数据写入;类数据表缓冲结构;实时存储;Storm;Oracle
0 引言
随着大数据技术应用不断深入,各类大数据业务系统对写入数据库的能力提出了更高的要求,一个坚强、可靠、快速的写入数据库的框架对于日益增长的大数据业务显得尤为重要,这是后续业务统计分析以及机器学习预测的先决条件。针对电力行业的计量自动化领域,电动汽车充换电、分布式电源接入、四表合一采集等新型业务快速扩展,数据采集对现有系统的数据项、采集频度、数据完整性以及及时性都提出了更高的 。这就需要符合新挑战、新要求的数据写入架构,这也是一个大数据处理系统能够稳定运行的基石,本文主要从计量自动化系统出发结合流处理技术,探讨具有数据量大、采集频度高的电量数据写入系统实现。同时该系统也能为其它行业的大数据业务数据库写入提供借鉴。
1相关工作
实时数据处理应用于各个行业中,如在电子商务中对客户行为进行实时分析并提出修正购买建议[1,2];在运输行业中通过实时跟踪分析进而制定智能化交通管理方案以减轻路线拥堵情况[3,4];在医疗卫生与生命科学中实施传感医疗数据的实时监控及疫情预警等[5,6]。如表1所示,部分公司根据各自业务开发的数据流或者分布式数据流处理系统。数据规模的增加通常表现为并发度、数据量和处理复杂度的提升,而这些会给现有的处理技术带来可用性方面的问题,所以,如何用最小的或较优的资源保障大规模数据流处理系统的可用性是亟待解决和研究的热门问题。
表1:典型分布式数据流实时计算系统
其中Storm流数据处理架构是比较热门的分布式的实时计算技术,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm用于实时处理,就好比?Hadoop?用于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。最大的特点是可以可靠且快速的处理无限的数据流,理论上其每分钟可以处理数以百万计的消息。Storm为分布式实时计算提供了一组通用原语,使用户可以实时处理消息并更新数据库,Storm有适用场景广、可伸缩性强、数据不丢失、健壮性强、高容错和
语言无关性等关键特征[9]。考虑到计量自动化系统领域的数据量大,设备持续向系统上送数据等特点,电量数据写入系统采用Storm框架。
2基于Storm的关系数据库的入库架构
2.1 数据源及存储要求描述
本文研究的是针对计量自动化系统的数据写入方法,有必要介绍一下数据的来源及特征以及数据存储的要求。数据源是电表的电量冻结值、电压的、电流等数据通过终端采集上送到系统网关服务器,前置机通过网关拉取到数据写入到kafka中,入库程序需要将kafka中的数据写入到关系型数据库Oracle 的各个表里,方便页面查询。
数据来源为采集电表终端的前置机上送,通过kafka中间件以json格式写到对应的topic里, json格式示例如下:
其中,deviceAddr项是终端编号,point是终端下的电表测量点号,作为唯一性的确认。数据项包括电能示值、电压、电流、功率因数等,数据有零点示值、有带时标的实时数据。数据量以一个省1000万块电表为例,包括上述提到的很多带时标的各种数据项,数据具有数量大、复杂性高、多样性、持续性的特点。这些数据项联系业务联系紧密,在后续的区域电量、负荷曲线等计算分析中了运用大量的关联计算,所以要存储在关系型数据库中,系统采用了oracle最新版12c作为存储系统,要求数据能够满足快速写入,便于查询等特点。表格2是部分入库表描述信息。
表格2:部分入库表描述信息
2.2数据写入框架设计
Storm框架由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节 点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。Nimbus和Supervisor都能快速失败,而且是无 状态的,这样一来它们就变得十分健壮,两者的协调工作是由ZooKeeper来完成的。
作为采用Storm框架的程序,主要工作就是设计一个良好的Topology结构和结合拓扑的细节实现,Storm框架里用户将符合自己需求的应用程序逻辑封装在Topology对象中,类似于Hadoop中的作业,在Storm中即为Topolog(拓扑)。一个拓扑通常可表示为有向无环图,其结构中的每一个节点都包含了数据的处理流程,节点间线段的指向则代表数据流在系统中的传递走向。数据写入系统的有向无环图如图1所示。
图1 电量信息入库程序拓扑结构
拓扑结构中,灰色部分是拓扑的内部结构,其中FetchSpout是组件Spout(喷口)具体实现,它从Kafka的名为DataObject的主题处拉取数据,这里我们的DataObject总共有5个分区,所以Spout的并行度设为5,达到线程最大使用率并且不浪费Spout主要作为数据拉取使用,过滤一些空值数据,不作业务处理,Spout的拉取速度很快,取到数据后封装成tuple1(元组)发送到AdapterBolt,Tuple1的格式为
AdapterBolt是数据处理模块Bolt(螺栓),这里对数据进行处理加工,添加电表相关的档案信息(如:唯一标识,供电单位编码等),档案信息取自redis,redis是缓存数据库,数据以KEY-VALUE形式存储在内存中,读写迅速,这里的KEY值由上文提到的终端号和测量点号组成,具体KEY字符串如公式(1),VALUE为Json格式,存放相关信息。
(1)
经过AdapterBolt处理的数据再以Tuple2发送到DataSaveBolt储存模块,Tuple2的格式为
Tuple2到达DataSaveBolt经过加工,调度写入数据库。其中TickBolt是提供心跳的组件,由于Storm的Bolt是消息触发的,根据业务需求添加了心跳模块,处理特定情况,具体会在下面章节提到。
通过图1总拓扑结构的设计和消息交互格式的设计,系统满足了电量数据入库业务的要求,同时对数据处理功能有效的拆分,模块化了步骤,类似流水化作业的结合了storm拓扑结构的设计原则,使得程序逻辑清晰,更易扩展。
2.3 数据写入难点
由计量采集终端、电表、网关、前置的结构复杂性及计量业务要求决定,上送的数据具有(1)数量大,(2)连续性高,(3)数据量有突增现象(比如:零点数据整点上送,各个数据项大量写入kafka中间件),(4)数据项不完整上送(比如:对于一个电表一个时间点的记录,一次消息有电压A,缺失电压B和电压C,这些数据要在后面的消息上送),(5)数据有误与数据库字段类型不匹配(5)偶尔伴随有重复数据上送的现象,这些数据质量都加大了数据写入的难度。
数据库的连接和调交是数据入库的瓶颈点,若逐记录入库,随着数据量的不断增大,数据处理延时也随之增大,下游Bolt模块尤其是DataSaveBolt模块的处理速度与FetchBolt的发送速度不匹配,导致数据堆积在Spout,不能实时处理,因而造成延时增加,速率大幅下降,如果持续积累或遇到零点数据整点上送,各个数据项大量写入kafka中间件时可能会造成内存溢出。所以批量执行是必须的,由于数据项不完整上送、重复数据等数据质量原因,批量提交又会带来无法获取稳定行、锁表、数据类型不匹配等问题,这会造成一批次提交数据库整体失败,大大降低成功率,这些问题需在程序实现前予以充分考虑。由数据特性和质量带来的写入困难是一种挫折,良好的入库架构设计应该是抗挫折的、健壮的。对于需要处理大量消息流的实时系统来说,消息处理始终是实时计算的基础,消息处理的最后就是对消息队列和消息处理者之间的组合。消息处理的核心是如何在消息处理的过程中不丢失数据,而且可以使整个处理系统具有很好的扩展性,以便能够处理更大的消息流。而Storm 正好可以满足这些要求。
2.4 关键设计
2.4.1分组处理
图1中FetchSpout到AdapterBolt采用的数据分组是ShuffleGrouping,随机分组(ShuffleGrouping)即将Tuple随机分配,使得同一级螺栓中每个任务处理的Tuple-样多。这里我们把Tuple1均分到了AdapterBolt里的每个Executer线程,executor是1个被worker进程启动的单独线程。每个executor只会运行1个topology的1个component(spout或bolt)的task(注:task可以是1个或多个,storm默认是1个component只生成1个task,executor线程里会在每次循环里顺序调用所有task实例。这里配置了每个Executer处理一个task,这样从FetchSpout传递来的信息就最大化的利用了并行度来的进行数据转换的处理。经过AdapterBolt处理的消息我们将它转换为下一个bolt解析的格式,也就是Tuple2中的msg,它是一种包含表信息的Json格式,如下:
其中operateFlag、operateTime、partition、uniqueIndex、是关键信息,分别是操作类型,操作时间,表分区字段以及唯一性索引。Tuple2中还携带了表名(tableName), 数据接收时间(dataJSSJ), 分组字段(groupingKeyword),分组字段作为AdapterBolt发送到DataSaveBolt的分组方式FieldGrouping的关键信息被使用。按字段分组是根据Tuple中字段的值来划分,即我们将数据流中此字段具有相同值的Tuple分发到一个任务中。例如单词计数例子中,如果采用按字段分组策略可单词名作为分组依据,后续Bolt将会根据不同单词名字进行计算,最终统计出各个单词的个数。我们的分组字段采用的是terminal_addr(终端),电表从属在终端下,这样做会把同一终端下的电表放到一个executor中处理,避免了不同Executor处理同一电表,同一时标的信息,从而避开了行记录锁的问题。TickBolt提供时钟心跳的tuple使用了具有广播特性的Allgrouping分组方案,这样保证被接受的Bolt的每个Executor都能接收到心跳。
2.4.2 类数据表缓冲结构及处理方法
类数据表缓冲结构是本系统设计的类似关系型数据库表的缓冲结构,它具备数据库中表的特性,数据唯一性,可增、可改,有索引等特性,下面叙述具体设计。
DataSaveBolt的数据源是多样性的,要处理不同的表,不同的上送字段,所以要为批量提交数据设计一些容纳这些信息的相关类,如图2所示
图2 数据缓存相关类
图2中包含了三个实现序列化接口类,分别是存储表信息及字段信息的TableInfoCache类和ColInfo类,存储接收数据的ValueInfoCache类,其中TableInfoCache类用来保存数据表表名、唯一索引等信息,复合了ColInfo类保存字段信息,ValueInfoCache类保存来源数据,通过HashMap,缓存一批次被配置好上限数量的待入库数据。DataSaveBolt的处理流程如图3所示。
图3 数据存储流程
图3中,Map<String, TableInfoCache>是以表名为KEY,TableInfoCache为VALUE的哈希表,用来存储不同表的相关信息,Map<String, ValuenfoCache>是以表名为KEY,ValueInfoCache为VALUE的哈希表,用来存储不同表的数据,存储大小通过配置在一个固定值(根据Oracle支持一次数据量提交大小,假设这里设置成500条),此值就是数据批量提交的数量值。
来自AdapterBolt的数据首先根据表名查询数据库,建立表相关信息。这个操作会在此类表第一次数据完成,后续再获取到该表的数据不需要查表建立该表的相关信息,避免了过多的数据库连接查询影响性能。Map<String, TableInfoCache>存储多表信息,在每种表的第一次数据来临时建立。数据加载到Map<String, ValuenfoCache>之前,后续的数据会根据Map<String, ValuenfoCache>已有数据的唯一索引字段来决定是先更新已有数据还是添加新的一条数据,在写入在写入Map<String, ValuenfoCache>之前,首先会根据建立的表信息查询数据值是否符合数据库字段类型,确保数据的有效性。这种设计和操作方式类似于插入更新一个类似数据库的表的缓存结构,但这些都是在写入数据库之前在内存中完成。这样就最大限度利用了CPU,内存的性能,在批量提交的一批数据前,每个表的一批数据里已经没有违反唯一性的数据和无效的数据,保证了数据安全有效、无遗失的批量提交到数据库。其中的TickBolt提供的心跳是为了定期全量提交Map<String, ValuenfoCache>里的数据,避免一些数据表阶段时间数据过少,没有及时提交。DataSaveBolt的实现具有完备性、通用性,实现方法可根据需求稍加修改复用于对写入关系数据库(如oracle、mysql等)要求高的各类业务。
根据上节设计的类数据表的缓冲结构结合该数据预处理方法,在每个线程中,数据提交数据库前是经过去除无效数据、更新并且唯一的记录,这样很大的提高了写入数据库的效率。
2.5 程序运行结果
该计量自动化入库入库程序运行日志及Storm运行状态如下图所示。
图3 入库程序运行日志及STORM运行指标
图3的上图为数据表批量到达设置值陆续提交数据库的日志,图中CJ_DYDLQX(电压电流曲线表)在各个Storm线程中执行了批量提交操作。下图为心跳触发的一些当前时段来自kafka流量过小的表的批量提交,其中CJ_DNSZQX(电能示值曲线表)线程中提交了2条,CJ_GJSJ_ZDDDSDJL(终端停电上电记录表) 提交了1条。这些表都是通过时钟触发入库,避免该时段对应表的数据上送少没有及时写入数据库。
图4 入库程序STORM运行指标
图4是Storm的监控页面结果,上图中是没有采用类数据表缓存结构和相关处理方法的入库程序,监控页面上inputBolt的线程数是400,Capacity(处理能力)是5.497已经超出了性能瓶颈,Execute latency(处理延迟)为25.048也很高,说明数据在入库提交阶段处理很慢,发生了数据拥堵。图4下图是采用类数据表缓存结构和相关处理方法的入库程序,第一行是类似AdapterBolt组件的性能指标,第二行是类DataSaveBolt的性能指标,线程数分别是30和60,总和少于上图中400线程数,Capacity分别是0.032和0.056,Execute latency(处理延迟)为3.458和4.423。该值处在很健康位置,表明数据处理延时很小,下游DataSaveBolt模块的处理速度与AdapterBolt、FetchBolt的发送速度匹配,能及时处理来自上游过来的数据。这样的设计使得整个流处理各个模块协调,数据能有序,快速的写入到数据库中。通过性能对比体现了该数据写入程序设计架构的优越性。
3 结束语
在针对计量自动化系统数据量大、采集频度、数据完整性以及及时性要求 高的复杂电量数据基础上,本文提出了基于Storm的数据写入架构及方法。该系统采用流处理分布式架构,分组策略的优化、一种类数据表的缓存器及处理方法。该设计结构及使用方法比起传统的入库方法提高了效率,并且有效的解决了复杂数据的写入关系型数据库的问题。。本文下一步工作将针对Storm流数据架构的参数配置及程序架构改进进一步分析,使该系统能够更具通用性,能适用于更广泛的业务场景及行业要求,使之具有更高的使用价值。
参考文献
[1][1] SONG Y, ZHUANG Z, LIH, et al. Real-time automatic tag recommendation [C]//SIGIR.Singapore: ACM, 2008: 515-522
[2][2] PHELAN O, MCCARTHY K, SMYTH B. Using twitter to recommend real-time topical news[C]/RecSys. Como: ACM, 2009 385-388
[3][3] ZHENG Y, ZHOU X. Computing with spatial trajectories[M]. Washington DC: Springer Science Business media, 2011
[4][4] JAIN N, AMINI L, ANDRADE H, et al. Design, implementation, and evaluation of the linear road bnchmark on the stream processing core [C]/SIGMOD. New York: ACM 2006:431-442.
[5][5] ABERER K, HAUSWIRTH M, SALEHI A. Infrastructure for data processing in large-Scale interconnected sensor Networks[C]/MDM. Mannheim: IEEE, 2007: 198-205
[6][6] ABADI D J, MADDEN S, LINDNER W. REED: robust, efficient filtering and event detection in sensor networks[C]/VLDB. Trondheim: ACM, 2005: 769-780
[7][7] BABCOCK B, BABU S, DATAR M, et al. Models and issues in data stream system S[C/PODS. Madison: ACM, 2002: 1-16.
[8][8] CHEN H, CHIANG R H, STOREY V C. Business intelligence and analytics: From big data to big impact.[J]. MIS quarterly, 2012, 36(4):1165-1188
[9][9] 孙大为,张广艳,郑绛民.大数据流式计算:关键技术及系统实例[J].软件学报,2014,04:839-862.
[10][10] 崔星灿,禹晓辉,刘洋,吕朝阳.分布式流处理技术综述[].计算机研究与发展,2015,02: 318-332.
[11][11] 张鹏,李鹏霄,任彦,林海伦,杨嵘,郑超.面向大数据的分布式流处理技术综述[J].计算机研究与发展,2014,S2:1-9.
[12][12] 宫宇,吕金壮.大数据挖掘分析在电力设备状态评估中的应用[J]南方电网技术,2014,06:74-77.
[13][13] 周蓉.电力系统实时数据管理系统的研究与开发[D].华北电力大学(北京) ,2003.
[14][14] 白洋.面向大数据的电力设备状态监测信息聚合研究[D].昆明理工大学,2014.
[15][15] DJ. Abadi, D. Carney, U. Cetintemel, et al. Aurora: A New Model and Architecture for Data Stream ' Management. VLDB Journal, vol.12 (2),Aug.2Q03:120-139.
[16][16] Daniel J. Abadi, Yanif Ahmad, Magdalena Balazinska,et al. The design of the Borealis stream processing engine. Proc. of the Second Biennial Conference on Innovative Data Systems Research (CIDR), Jan.2005.
[17][17] Krishnamurthy,S,Chandrasekaran, S?,Cooper, 0., et al. TelegraphCQ: An Architectural Status report. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering,March 2003.
[18][18] Mehul A. Shah, Joseph M. Hellerstein, Eric Brewer. Highly Available,Faultrtolerant, Parallel' Dataflows. SIGMOD '04 Proceedings of the 2004 ACM SIGMOD international conference on Management of data, 2004:827-838.
[19][19] M. Zaharia, T. Das, H. Li, et al. Discretized Streams: An Eficient and Fault-Tolerant Model for Stream Processing on Large Clusters. Proceedings of the Fourth USENIX Conference on Hot Topics in Cloud Computing (HotCloud ’12),pp.1-6,Boston,Massachusetts, June 2012.