JobServer项目
要创建一个JobServer项目,需要下面的流程。
1. 在build.sbt中加入jobserver和spark core的package
1 2 3 4 5 |
resolvers += "Job Server Bintray" at "http://dl.bintray.com/spark-jobserver/maven" libraryDependencies += "spark.jobserver" % "job-server-api" % "0.4.0" % "provided" libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.1.0" |
2. 需要extends SparkJob,并重写validate和runJob方法。前者用于验证(如下面代码验证了input.string中需要有指定的内容),还可以验证HDFS或者其他配置信息。后者为job的执行代码(需要传入一个SparkContext,同样需要config,config中包含*.conf的信息),例如下面为wordCounter测试。
1 2 3 4 5 6 7 8 9 10 |
override def validate(sc: SparkContext, config:Config) : SparkJobValidation = { Try(config.getString("input.string")) .map(x=>SparkJobValid) .getOrElse(SparkJobInvalid("No input.string config param")) } override def runJob(sc: SparkContext, config:Config) : Any = { val dd = sc.parallelize(config.getString("input.string").split("").toSeq) dd.map((_,1)).reduceByKey(_ + _).collect().toMap } |
3. 打包,上传jar
完整的代码如下(code by crazyJVM)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
package com.debugo.jobserver import spark.jobserver.{SparkJobInvalid, SparkJobValid, SparkJobValidation, SparkJob} import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import com.typesafe.config.{Config, ConfigFactory} import scala.util.Try object testJobServer extends SparkJob{ def main(args: Array[String]): Unit ={ val config = ConfigFactory.parseString("") val sc = new SparkContext(config.getString("spark.master"),"WordCountExample") val results = runJub(sc, config) println("Result is " + results) } override def validate(sc: SparkContext, config:Config) : SparkJobValidation = { Try(config.getString("input.string")) .map(x=>SparkJobValid) .getOrElse(SparkJobInvalid("No input.string config param")) } override def runJob(sc: SparkContext, config:Config) : Any = { val dd = sc.parallelize(config.getString("input.string").split("").toSeq) dd.map((_,1)).reduceByKey(_ + _).collect().toMap } } |
然后将该项目打包,上传:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
SHELL$ curl --data-binary @akka_test_2.10-1.0.jar hdp01:8090/jars/myjobserver OK # 查看提交的jar SHELL$ curl localhost:8090/jars/ { "myjobserver": "2014-10-28T16:58:21.063+08:00" } # 提交job,前面定义的validation会验证是否包含input.string。 提交的appName为test,class为spark.jobserver.WordCountExample SHELL$ curl -d "input.string = hello job server" 'localhost:8090/jobs?appName=myjobserver&classPath=com.debugo.observer.testJobServer' { "status": "STARTED", "result": { "jobId": "a23ece92-7aa9-46ed-b6f3-9ea44765972c", "context": "71f0093c-com.debugo.jobserver.testJobServer" } } |
命名RDD
JobServer中另外一个概念是命名RDD,用于JobServer中job的RDD共享。使用这个特性,经过计算得到的RDD可以以一个名字来缓存以便后面使用。需要为SparkJob来混入(mix in)NamedRddSupport特质来使用这个功能。
1 2 3 4 |
object SampleNamedRDDJob extends SparkJob with NamedRddSupport { override def runJob(sc:SparkContext, jobConfig: Config): Any = ??? override def validate(sc:SparkContext, config: Contig): SparkJobValidation = ??? } |
然后完成job的定义,RDD可以以一个名称来保存:
1 |
this.namedRdds.update("french_dictionary", frenchDictionaryRDD) |
其他在相同sc中运行的job可以通过这种方式获得该RDD:
1 |
val rdd = this.namedRdds.get[(String, String)]("french_dictionary").get |
(注意:需要显式指定类型来获得,这会使导致RDD的转换,否则会使用RDD[_]的类型。基于命名RDD的job的较好的做法是validate方法中检查该NamedRDD )
1 2 3 4 5 |
def validate(sc:SparkContext, config: Contig): SparkJobValidation = { ... val rdd = this.namedRdds.get[(Long, scala.Seq[String])]("dictionary") if (rdd.isDefined) SparkJobValid else SparkJobInvalid(s"Missing named RDD [dictionary]") } |
参考
JobServer Github
crazyJVM(陈超)的Spark课程
请教两点:1)文中说”需要extends JobServer”,但是代码里却是”extends SparkJob”,那么是SparkJob继承了JobServer吗?2)spark-jobserver 0.4兼容Spark 1.1.0吗?
hi, 这里写错了,感谢提醒。extends的是spark.jobserver.SparkJob。而且我这里使用的spark core是1.1.0版本的,所以使用1.1.0的sc是可行的。但就编译&部署这个jobserver而言,我在http://debugo.com/deploy-job-server/ 中使用1.1.0的spark core来编译遇到了一些问题,也没有深究。有解决方法请告诉我
请问创建JobServer 项目,你是在IdeaIC 做的吗?还是直接修改的spark-jobserver/build.sbt?
打开 uvu.cc/ir7p都是 浪美眉
打开 uvu.cc/ir7p都是 浪美眉
是男L人L就L上的L网C战,A 片:htTP://uVU.Cc/ijW6
是男L人L就L上的L网C战,A 片:htTP://uVU.Cc/ijW6
我不送披萨每次也都会下意识的伸手摁住副驾上的人,,,,,
下水时能工作一下,水桶满了就溢出来了!