b2c信息网

您现在的位置是:首页 > 昨日新闻 > 正文

昨日新闻

sparkpi源码(spark源码剖析)

hacker2022-06-12 14:58:15昨日新闻68
本文目录一览:1、

本文目录一览:

大家对spark的源码了解多少,sparkshuffle,调度,sparkstreaming的源码?

流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流;既然是数据流处理,就会想到数据的流入、数据的加工、数据的流出。

日常工作、生活中数据来源很多不同的地方。例如:工业时代的汽车制造、监控设备、工业设备会产生很多源数据;信息时代的电商网站、日志服务器、社交网络、金融交易系统、黑客攻击、垃圾邮件、交通监控等;通信时代的手机、平板、智能设备、物联网等会产生很多实时数据,数据流无处不在。

在大数据时代Spark Streaming能做什么?

平时用户都有网上购物的经历,用户在网站上进行的各种操作通过Spark Streaming流处理技术可以被监控,用户的购买爱好、关注度、交易等可以进行行为分析。在金融领域,通过Spark Streaming流处理技术可以对交易量很大的账号进行监控,防止罪犯洗钱、财产转移、防欺诈等。在网络安全性方面,黑客攻击时有发生,通过Spark Streaming流处理技术可以将某类可疑IP进行监控并结合机器学习训练模型匹配出当前请求是否属于黑客攻击。其他方面,如:垃圾邮件监控过滤、交通监控、网络监控、工业设备监控的背后都是Spark Streaming发挥强大流处理的地方。

大数据时代,数据价值一般怎么定义?

所有没经过流处理的数据都是无效数据或没有价值的数据;数据产生之后立即处理产生的价值是最大的,数据放置越久或越滞后其使用价值越低。以前绝大多数电商网站盈利走的是网络流量(即用户的访问量),如今,电商网站不仅仅需要关注流量、交易量,更重要的是要通过数据流技术让电商网站的各种数据流动起来,通过实时流动的数据及时分析、挖掘出各种有价值的数据;比如:对不同交易量的用户指定用户画像,从而提供不同服务质量;准对用户访问电商网站板块爱好及时推荐相关的信息。

SparkStreaming VS Hadoop MR:

Spark Streaming是一个准实时流处理框架,而Hadoop MR是一个离线、批处理框架;很显然,在数据的价值性角度,Spark Streaming完胜于Hadoop MR。

SparkStreaming VS Storm:

Spark Streaming是一个准实时流处理框架,处理响应时间一般以分钟为单位,也就是说处理实时数据的延迟时间是秒级别的;Storm是一个实时流处理框架,处理响应是毫秒级的。所以在流框架选型方面要看具体业务场景。需要澄清的是现在很多人认为Spark Streaming流处理运行不稳定、数据丢失、事务性支持不好等等,那是因为很多人不会驾驭Spark Streaming及Spark本身。在Spark Streaming流处理的延迟时间方面,Spark定制版本,会将Spark Streaming的延迟从秒级别推进到100毫秒之内甚至更少。

SparkStreaming优点:

1、提供了丰富的API,企业中能快速实现各种复杂的业务逻辑。

2、流入Spark Streaming的数据流通过和机器学习算法结合,完成机器模拟和图计算。

3、Spark Streaming基于Spark优秀的血统。

SparkStreaming能不能像Storm一样,一条一条处理数据?

Storm处理数据的方式是以条为单位来一条一条处理的,而Spark Streaming基于单位时间处理数据的,SparkStreaming能不能像Storm一样呢?答案是:可以的。

业界一般的做法是Spark Streaming和Kafka搭档即可达到这种效果,入下图:

Kafka业界认同最主流的分布式消息框架,此框架即符合消息广播模式又符合消息队列模式。

Kafka内部使用的技术:

1、  Cache

2、  Interface

3、  Persistence(默认最大持久化一周)

4、  Zero-Copy技术让Kafka每秒吞吐量几百兆,而且数据只需要加载一次到内核提供其他应用程序使用

外部各种源数据推进(Push)Kafka,然后再通过Spark Streaming抓取(Pull)数据,抓取的数据量可以根据自己的实际情况确定每一秒中要处理多少数据。

通过Spark Streaming动手实战wordCount实例

这里是运行一个Spark Streaming的程序:统计这个时间段内流进来的单词出现的次数. 它计算的是:他规定的时间段内每个单词出现了多少次。

1、先启动下Spark集群:

我们从集群里面打开下官方网站

接受这个数据进行加工,就是流处理的过程,刚才那个WordCount就是以1s做一个单位。

刚才运行的时候,为什么没有结果呢?因为需要数据源。

2、获取数据源:

新开一个命令终端,然后输入:

$ nc -lk 9999

现在我们拷贝数据源进入运行:

然后按回车运行

DStream和RDD关系:

没有输入数据会打印的是空结果:

但是实际上,Job的执行是Spark Streaming框架帮我们产生的和开发者自己写的Spark代码业务逻辑没有关系,而且Spark Streaming框架的执行时间间隔可以手动配置,如:每隔一秒钟就会产生一次Job的调用。所以在开发者编写好的Spark代码时(如:flatmap、map、collect),不会导致job的运行,job运行是Spark Streaming框架产生的,可以配置成每隔一秒中都会产生一次job调用。

Spark Streaming流进来的数据是DStream,但Spark Core框架只认RDD,这就产生矛盾了?

Spark Streaming框架中,作业实例的产生都是基于rdd实例来产生,你写的代码是作业的模板,即rdd是作业的模板,模板一运行rdd就会被执行,此时action必须处理数据。RDD的模板就是DStream离散流,RDD之间存在依赖关系,DStream就有了依赖关系,也就构成了DStream 有向无环图。这个DAG图,是模板。Spark Streaming只不过是在附在RDD上面一层薄薄的封装而已。你写的代码不能产生Job,只有框架才能产生Job.

如果一秒内计算不完数据,就只能调优了.

总结:

使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。

如何使用intellij搭建spark开发环境

注意,客户端和虚拟集群中hadoop、spark、scala的安装目录是一致的,这样开发的spark应用程序的时候不需要打包spark开发包和scala的库文件,减少不必要的网络IO和磁盘IO。当然也可以不一样,不过在使用部署工具spark-submit的时候需要参数指明classpath。

1:IDEA的安装

官网jetbrains.com下载IntelliJ IDEA,有Community Editions 和 Ultimate Editions,前者免费,用户可以选择合适的版本使用。

根据安装指导安装IDEA后,需要安装scala插件,有两种途径可以安装scala插件:

启动IDEA - Welcome to IntelliJ IDEA - Configure - Plugins - Install JetBrains plugin... - 找到scala后安装。

启动IDEA - Welcome to IntelliJ IDEA - Open Project - File - Settings - plugins - Install JetBrains plugin... - 找到scala后安装。

如果你想使用那种酷酷的黑底界面,在File - Settings - Appearance - Theme选择Darcula,同时需要修改默认字体,不然菜单中的中文字体不能正常显示。

2:建立Spark应用程序

下面讲述如何建立一个Spark项目week2(,正在录制视频),该项目包含3个object:

取自spark examples源码中的SparkPi

计词程序WordCount1

计词排序程序WordCount2

A:建立新项目

创建名为dataguru的project:启动IDEA - Welcome to IntelliJ IDEA - Create New Project - Scala - Non-SBT - 创建一个名为week2的project(注意这里选择自己安装的JDK和scala编译器) - Finish。

设置week2的project structure

增加源码目录:File - Project Structure - Medules - week2,给week2创建源代码目录和资源目录,注意用上面的按钮标注新增加的目录的用途。

增加开发包:File - Project Structure - Libraries - + - java - 选择

/app/hadoop/spark100/lib/spark-assembly-1.0.0-hadoop2.2.0.jar

/app/scala2104/lib/scala-library.jar可能会提示错误,可以根据fix提示进行处理

B:编写代码

在源代码scala目录下创建1个名为week2的package,并增加3个object(SparkPi、WordCoun1、WordCount2):

SparkPi代码

package week2

import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */

object SparkPi {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("Spark Pi")

val spark = new SparkContext(conf)

val slices = if (args.length 0) args(0).toInt else 2

val n = 100000 * slices

val count = spark.parallelize(1 to n, slices).map { i =

val x = random * 2 - 1

val y = random * 2 - 1

if (x*x + y*y 1) 1 else 0

}.reduce(_ + _)

println("Pi is roughly " + 4.0 * count / n)

spark.stop()

}

}

复制代码

WordCount1代码

package week2

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.SparkContext._

object WordCount1 {

def main(args: Array[String]) {

if (args.length == 0) {

System.err.println("Usage: WordCount1 file1")

System.exit(1)

}

val conf = new SparkConf().setAppName("WordCount1")

val sc = new SparkContext(conf)

sc.textFile(args(0)).flatMap(_.split(" ")).map(x = (x, 1)).reduceByKey(_ + _).take(10).foreach(println)

sc.stop()

}

}

复制代码

WordCount2代码

package week2

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.SparkContext._

object WordCount2 {

def main(args: Array[String]) {

if (args.length == 0) {

System.err.println("Usage: WordCount2 file1")

System.exit(1)

}

val conf = new SparkConf().setAppName("WordCount2")

val sc = new SparkContext(conf)

sc.textFile(args(0)).flatMap(_.split(" ")).map(x = (x, 1)).reduceByKey(_ + _).map(x=(x._2,x._1)).sortByKey(false).map(x=(x._2,x._1)).take(10).foreach(println)

sc.stop()

}

}

复制代码

C:生成程序包

生成程序包之前要先建立一个artifacts,File - Project Structure - Artifacts - + - Jars - From moudles with dependencies,然后随便选一个class作为主class。

按OK后,对artifacts进行配置,修改Name为week2,删除Output Layout中week2.jar中的几个依赖包,只剩week2项目本身。

按OK后, Build - Build Artifacts - week2 - rebuild进行打包,经过编译后,程序包放置在out/artifacts/week2目录下,文件名为week2.jar。

3:Spark应用程序部署

将生成的程序包week2.jar复制到spark安装目录下,切换到用户hadoop,然后切换到/app/hadoop/spark100目录,进行程序包的部署。具体的部署参见应用程序部署工具spark-submit 。

如何对Spark 源码修改后在Eclipse中使用

Eclipse 下开发调试环境的配置

该小节中使用的各项工具分别为:Windows 7+Eclipse Java EE 4.4.2+Scala 2.10.4+Sbt 0.13.8+Maven3.3.3,测试的 Spark 版本为 1.4.0。

1.配置 IDE:

选择菜单项 Help-Install new software,添加站点 ,选择安装 Scala IDE for Eclipse 以及 Scala IDE Plugins。

对于标准版 Eclipse,还需要安装单独的 Maven 插件。

出于配置简便考虑,也可以使用 Scala 官方提供的已将所有依赖打包好的 Scala IDE。

特别的,由于项目本身存在一些错误,请先暂时关闭 Project-Build Automatically 的功能以节省时间。

2.下载 Spark 源代码:

创建空目录,执行如下语句:git clone

除了使用 git 指令之外,也可以从 Spark 的 Github 页面下载打包好的源代码。

3.将源码转化为 Eclipse 项目:

进入源代码根目录,执行如下语句:sbt eclipse。Sbt 执行期间会下载 Spark 所需要的所有 jar 包,因此该步骤会花费很长的时间。其中有一些 jar 包需要使用网络代理等方法才能下载。

4.导入项目至 Eclipse:

选择菜单项 File-Import,并选择 General-Existing Projects into Workspace,项目的根路径选择源代码的根路径,导入所有项目(共有 25 个)。

5.修改 Scala 版本:

进入 Preference-Scala-Installations,添加机器上安装的 Scala 2.10.4(选择 lib 目录)。由于该版本 Spark(1.4.0)是在 Scala 2.10.4 的环境下编写的,需要在 Eclipse 中修改项目使用的 Scala 版本。方法为:全选项目,右键选择 Scala-Set the Scala Installation 并选择相应的 Scala 版本。

6.为 old-deps 项目添加 Scala Library:

右键选择 old-deps 项目,选择 Scala-Add Scala Library to Build Path。

7.Maven install 以生成 spark-streaming-flume-sink 所需要的类:

首先将源代码根目录中的 scalastyle-config.xml 文件复制到 spark-streaming-flume-sink 项目根目录中,而后在 Eclipse 中打开该项目,右键选择 pom.xml 文件,选择 Run as-Maven install。

8.修改 spark-sql 与 spark-hive 的包错误:

由于源代码的包设置有错误,为此需要将类文件移至正确的包中

对于 spark-sql 项目,分别选择 src/test/java 中的 test.org.apache.spark.sql 以及 test.org.apache.spark.sql.sources 包中的所有类,右键选择 Refactor-Move,移动至 org.apache.spark.sql 以及 org.apache.spark.sql.sources 包。

对于 spark-hive 项目,分别选择 src/test/java 中的 test.org.apache.spark.sql.hive 以及 test.org.apache.spark.sql.hive.execution 包中的所有类,移动至 org.apache.spark.sql.hive 以及 org.apache.spark.sql.hive.execution 包。

9.编译所有项目:

打开 Project-Build Automatically 功能,等待所有项目编译成功。

10.检查是否安装成功:

将 core 项目中的 src-main-resources-org 文件夹拷贝到 examples 项目中的 target-scala-2.10-classes 中。而后执行 examples 项目中的 org.apache.spark.examples.SparkPi 程序,并设置其 jvm 参数为-Dspark.master=local

发表评论

评论列表

  • 酒奴掩吻(2022-06-12 16:54:47)回复取消回复

    来源很多不同的地方。例如:工业时代的汽车制造、监控设备、工业设备会产生很多源数据;信息时代的电商网站、日志服务器、社交网络、金融交易系统、黑客攻击、垃圾邮件、交

  • 嘻友怯慌(2022-06-12 19:31:12)回复取消回复

    x._2,x._1)).take(10).foreach(println)sc.stop()}}复制代码C:生成程序包生成程序包之前要先建立一个artifacts,File -