当前位置:编程学堂 > 如何基于Spark Streaming搭建实时计算平台

如何基于Spark Streaming搭建实时计算平台

  • 发布:2023-10-06 01:23

随着互联网技术的快速发展,用户对数据处理的及时性、准确性和稳定性的要求越来越高。如何构建一个稳定、易用、具有全面监控和预警功能的实时计算平台也成为很多企业面临的一大挑战。 1 简介 随着互联网技术的快速发展,用户对数据处理的及时性、准确性和稳定性的要求越来越高。如何搭建一个稳定易用、提供完善监控预警功能的实时计算平台,成为了很多企业面临的难题。一个很大的挑战。 携程实时计算平台自2015年成立以来,经过两年多的不断技术演进,目前实时集群规模已达数百个。该平台涵盖了各个SBU和公共部门的数百个实时应用程序。 JStorm集群全年稳定性达到100%。目前的实时平台主要基于JStorm和Spark Streaming构建。相信关注携程实时平台的朋友去年都看过一篇关于携程实时平台的分享:携程实时大数据平台实践分享。 本次分享将重点介绍携程如何基于Spark Streaming构建实时计算平台。文章将从以下几个方面详细阐述该平台的建设和应用: Spark Streaming 与 JStorm Spark Streaming设计和封装 Spark Streaming在携程的实践 我曾经踩过的坑 未来展望 2.Spark Streaming vsJStorm 在携程实时平台接入Spark Streaming之前,JStorm已经稳定运行了一年半的时间,基本可以满足大部分应用场景。接入Spark Streaming主要考虑以下几点: 首先,携程使用的JStorm版本是2.1.1版本。该版本的JStorm封装和抽象程度较低,没有提供High Level的抽象方法和对windows、status、Sql等功能的支持,这大大提高了用户使用JStorm实现实时的门槛应用程序以及开发复杂的实时应用场景的难度。在这些方面,SparkStreaming表现得比较好。它不仅提供了高度集成的抽象方法(各种运算符),用户还可以将其与SparkSQL结合起来,直接使用SQL来处理数据。 其次,用户在处理数据时往往需要维护两套数据处理逻辑。实时计算使用JStorm,离线计算使用Hive或Spark。为了降低开发和维护成本,实现流式和离线计算引擎的统一,Spark对此提供了很好的支持。最后,在介绍Spark Streaming之前,我们重点分析了Spark和Flink技术的引入成本。当时Flink的版本是1.2版本,Spark的版本是2.0.1。与 Spark 相比,Flink 对 SQL 和 MLlib 的支持相对较弱,公司很多部门基于 Spark SQL 和 MLlib 开发离线任务和算法模型,大大降低了用户使用 Spark 的学习成本。 下图简单展示了Spark Streaming和JStorm目前的使用情况对比: 3.Spark Streaming设计与封装 在接入Spark Streaming的初期,首先要考虑的是如何在现有实时平台的基础上无缝嵌入Spark Streaming。原来的实时平台已经包含了很多功能:元数据管理、监控报警等,所以第一步我们封装了SparkStreaming,提供了丰富的功能。整个系统包括Muise Spark Core、Muise Portal以及外部系统。 3.1 Muise Spark 核心 MuiseSpark Core是我们基于Spark Streaming的二次封装,用于支持携程的各种消息队列。 HermesKafka和元盛Kafka基于Direct Approach消费数据,Hermes Mysql和Qmq基于Receiver消费数据。接下来要讨论的很多特性主要是针对Kafka类型的数据源的。 Muisespark核心主要包括以下功能: Kafka Offset自动管理 支持 Exactly Once 和 At Least Once 语义 提供Metric注册系统,用户可以注册自定义的Metric 基于系统和用户定义指标的预警 长时间运行在Yarn上,提供容错机制 3.1.1 Kafka Offset自动管理 封装muise Spark Core的首要目标是简单易用,让用户能够以最简单的方式开始使用SparkStreaming。首先,我们实现了帮助用户自动读取并存储Kafka Offset的功能。用户无需关心Offset是如何处理的。其次,我们还验证了Kafka Offset的有效性。部分用户的作业长时间停止后重新运行时可能会出现Offset失败的情况。我们也采取了相应的操作。当前操作是将过期的Offset设置为当前最旧的有效Offset。下图展示了用户基于muise Spark Core编写Spark Streaming作业的简单示例。用户只需要几行代码就可以完成代码的初始化并创建相应的DStream: 默认情况下,作业每次都会根据上次存储的Kafka Offset继续消费,但用户也可以决定Offset消费的起点。下图展示了三种设置消费起点的方法: 3.1.2 Exactly Once的实现 实时作业想要实现端到端的Exactly Once,数据源、数据处理和数据存储三个阶段都需要保证Exactly Once语义。目前,基于Kafka Direct API和Spark RDD算子的Exactly Once保证可以实现端到端的Exactly Once语义。在数据存储阶段,一般需要保证存储过程是幂等操作或者是事务操作实现Exactly Once。很多系统本身就支持幂等操作,比如将相同的数据写入同一个HDFS文件。这本身就是一个幂等操作,保证多次操作得到的最终值仍然相同; HBase、ElasticSearch、redis等都可以实现幂等操作。 。对关系数据库的操作一般都支持事务性操作。 在创建DirectKafkaInputStream时,官方只需要输入消费Kafka的From Offset,就可以获取本次消费的End Offset,也就是最新的Offset。保存的Offset是本批次的End Offset,下次消费从最后一个End Offset开始。程序崩溃或重新启动任务时会出现一些问题。如果在数据处理完成之前存储偏移量,可能会出现作业处理数据失败、作业停机等情况。重启后无法追溯上次处理的数据,导致数据丢失。如果数据处理完成后存储Offset,但在存储Offset的过程中发生故障或作业崩溃,重启后会再次消耗上次消费的数据。而且,不保证重启后消耗的数据与停机前的数据相同。这会引入另一个问题。如果根据聚合统计指标进行更新操作,将无法判断最后一次数据是否更新成功。 所以在muise Spark Core中我们添加了自己的实现来保证Exactly Once的语义。具体实现是,我们对Spark源码进行了改造,保证在创建DirectKafkaInputStream时可以同时输入From Offset和End Offset,并且在存储Kafka Offset时保存每个batch的起始Offset和结束Offset 。具体格式如下: 这样做的目的是为了保证无论是停机还是手动重启,重启后的第一批与重启前的最后一批一模一样。这样的设计使得后续用户在处理第一批数据时非常灵活。如果用户直接忽略第一批数据,那么此时最多一次的语义就得到了保证,因为我们无法知道重启前的最后一批数据操作是否成功完成;如果用户按照原来的逻辑处理第一批数据,不进行去重操作,那么此时至少一次的语义得到了保证,而最终的结果可能存在重复数据;最后,如果用户想要实现exactonce,muise Spark core提供了基于topic、partition、offset生成UID的功能。只要两个batch消耗的Offset相同,那么最终生成的UID也会相同。这个UID可以作为判断上一批数据是否存储成功的依据。下面简要展示了重启后第一个批处理操作的行为。 3.1.3 指标体系 Musiespark core基于Spark自己的metrics系统进行了修改,添加了很多自定义的metrics,并将metrics注册接口暴露给用户。用户可以轻松地在程序中注册自己的指标并更新指标的值。最后,所有指标都会按照作业设定的批次间隔写入Graphite,并根据公司定制的预警系统发出警报。前端可以通过Grafana展示各种指标。Muisespark core 本身定制的指标包括以下三类: 失败,如果一个batch内spark任务失败超过4次,就会发出警报,监控程序的运行状态。 Ack,如果一个batch时间内Spark Streaming处理的数据量较小,则会发出警报。用于监控程序是否正常消费数据。 滞后,如果批量时间内数据消耗延迟大于设定值,就会产生报警。 由于我们大部分作业都开启了BackPressure功能,所以在SparkUI中每批数据都可以在正常时间内被消耗掉。但是,此时kafka中可能已经积累了大量的数据,因此每个批次,我们都会计算当前消耗时间与数据本身之间的平均差值。如果这个差值大于批处理时间,则说明数据消耗已经存在延迟。 下图所示,预警系统中,预警是基于用户自定义注册的Metrics和系统自定义的Metrics进行的。 3.1.4 容错 其实,在上面的Exactly Once章节中,我们已经详细描述了muise Spark core如何保证程序崩溃后数据的正确处理。但为了让Spark Sreaming能够在Yarn集群上长期稳定运行,需要添加很多配置。感兴趣的朋友可以查看:Long running Spark Streaming Jobs on YarnCluster。 除了上述容错保证之外,Muise Portal(稍后讨论)还提供了 Spark Streaming 作业的定时检测功能。目前,所有数据库中标记为正在运行的 Spark Streaming 作业的状态每 5 分钟检查一次。通过Yarn提供的REST API,可以根据每个作业的Application ID查询Yarn上作业的状态。如果状态为非运行状态,则会尝试重新启动作业。 3.2 缪斯门户 封装完所有Spark Streaming之后,我们需要一个平台来管理配置作业,MuisePortal就是这样存在的。 Muise Portal目前主要支持Storm和Spark Streaming两类作业,并支持创建新作业、发布Jar包、运行和停止作业等一系列功能。创建新作业的界面如下图所示: SparkStreaming作业基于Yarn Cluster模式运行。所有作业都提交到 Yarn 集群,以便通过 Muise Portal 上的 Spark 客户端运行。具体的Job运行流程如下图所示: 3.3 整体架构 最后,这是目前携程实时平台的整体架构。 4、Spark Streaming在携程的实践 目前Spark Streaming在携程的业务场景主要可以分为以下几个方面:ETL、实时报表统计、个性化推荐营销场景以及风控和安全应用。抽象地说,主要可以分为数据过滤和提取、数据指标统计和模型算法的使用。 4.1 提取和加载 目前市面上有各种工具可以实时消费来自Kafka的数据,进行过滤和清洗,最后实现到相应的存储系统中,例如Camus、Flume等。与此类产品相比,Spark的优势在于流式传输在于它可以支持更复杂的处理逻辑。其次,基于Yarn系统的资源调度使得Spark Streaming的资源配置更加灵活。最后,用户可以将Spark RDD数据转换为Spark Dataframe数据。 ,这样就可以和Spark SQL结合起来,当数据最终输出到HDFS、Alluxio等分布式文件系统时,可以存储为Parquet等格式化数据,方便用户后续使用Spark SQL处理数据。 目前,典型的ETL使用场景是假期部门的Data Lake应用。假期部门使用Spark Streaming对数据进行ETL操作,最终将数据存储在Alluxio中。期间数据是基于muise-spark-core自定义的metric函数进行处理的。对数据量、字段数、数据格式和重复数据进行了检查和数据质量监控。具体监测和预警上面已经提到了。 4.2 实时报表统计 实时报表统计和展示也是Spark Streaming经常使用的场景。数据可以基于处理时间统计或事件时间统计。由于Spark Streaming中不同批次的作业可以视为滚动窗口,一个独立的窗口包含多个时间段的数据,这给基于Event Time统计的Spark Streaming的使用带来了一定的限制。比较常用的方法是统计每个batch中不同时间维度的累计值,导入到外部系统,比如ES;然后在报表展示时根据时间进行二次聚合,得到完整的累计值,最终得到聚合值。下图是携程IBU基于Spark Streaming实现的实时仪表板。 4.3 个性化推荐与风控安全 这两类应用的共同点是它们都需要基于算法模型来预测或分类用户行为。携程目前的所有模型都是每天根据线下数据定期进行线下训练。 Spark Streaming引入后,很多部门开始积极尝试实时特征提取和模型的在线训练。并且Spark Streaming可以和Spark MLlib很好的结合。最成功的案例是安全部门用来根据各种过滤条件来捕获攻击请求。后来他们采用离线模型训练,Spark Streaming加上Spark MLlib为用户做实时预测。 ,相比JStorm(基于大量正则表达式匹配用户,消耗大量CPU)性能提升十倍,漏报率降低20%。 5、我踩过的坑 目前,携程Spark Streaming作业运行的YARN集群与离线作业属于同一个集群,这对作业的性能和稳定性都有很大影响。尤其是当YARN或Hadoop集群需要更新、维护和服务重启时,很大程度上会导致Spark Streaming作业出错并挂起。虽然有很多容错保证,但也会导致数据积压和数据处理延迟。未来Hadoop和Yarn集群将独立部署。所有实时作业都将运行在独立的集群上,不会受到外部因素的影响。这也有利于未来Flink作业的开发和维护。后来使用Alluxio来实现主集群和子集群之间的数据共享。 在使用过程中,我也遇到了各种Bug。这里我简单介绍一下一些比较严重的问题。首先,第一个问题是,每批Spark Streaming作业都会通过DirectKafkaInputStream的comput方法获取消费的Kafka Topic的最新offset。如果此时kafka集群由于某种原因不稳定,就会引发java.lang.RuntimeException。 :找不到分区 xx 问题的领导者。由于这段代码运行在Driver端,如果不做配置和处理,程序会直接挂掉。对应的解决方案是配置spark.streaming.kafka.maxRetries大于1,并且可以通过配置www.sychzs.cn参数来设置每次重试的时间间隔。 其次,Spark Streaming与Spark Sql结合的过程中会出现很多问题。例如,使用过程中可能会出现Out of Memory: PermGen space。这是因为Spark sql使用了代码生成器,导致大量的PermGen空间被使用。可以通过在spark.driver.extraJavaOptions中添加-XX:MaxPermSize=1024m-XX:PermSize=512m来解决。 。 Spark Sql还需要创建Spark Warehouse。如果是基于Yarn运行的话,HDFS上可能会默认创建相应的目录。如果没有权限,会报Permission Denied问题。用户可以通过配置 config("spark.sql.warehouse .dir","file:${system:user.dir}/spark-warehouse") 来解决问题。 六、未来展望 以上主要详细介绍了Spark Streaming在携程实时平台中的应用。 Spark Streaming在使用过程中还存在一些痛点,比如窗口功能比较单一,基于Event Time的统计指标过于繁琐,而新版本中官方又缺少了这些痛点。新功能的加入让我们更倾向于尝试Flink。 Flink 基本上实现了 Google 提出的各种实时处理概念,并引入了 WaterMark 的实现。感兴趣的朋友可以查看Google的官方文档:The world Beyond Batch: Streaming 102。 目前,Flink 1.4 Release 版本即将发布,基于 kafka 数据源的 Spark 2.2.0 Structured Streaming 也支持更多功能。我们前期对Flink做了充分的研究,下半年的主要工作将在Flink的对接上。在提供了很多实时计算框架的支持之后,带来了更多的学习成本。未来我们的重点将是如何让用户更方便的实现实时计算逻辑。其中,Apache Beam为各种实时场景提供了良好的封装,支持多种实时计算引擎。其次,基于Stream Sql实现复杂的实时应用场景将是我们的主要研究方向。

相关文章