spark是一个并行分布式的计算框架,其设计阶段肯定要考虑到数据的交互。那么spark是怎么进行数据交互的?Spark是用Scala编写的,今天来看一下如何使用Scala设计一个Rpc通信服务
设计思路
akka.actor.Actor 组件它于servlet有点类似,你可以把它想象成一个servlet,它同样也有自己的生命周期,preStart会在构造函数执行以后被调用,receive会在接收到消息以后被调用
Actor中我们分出两类,一个叫Master,一个叫Worker
Master 是头头Worker是小弟,就像Yarn里面的resourceManage和nodeManage、HDFS中的NameNode和dataNode一样,无规矩不成方圆,代码里面也是一样的。
1.worker 启动后,在preStart方法中与master建立链接,向Master发送注册消息(将worker的信息通过样例类封装起来发送给master)
2.Master接收到Worker的注册消息后将worker的信息保存起来,向worker反馈注册成功
3.Worker定期要向master发送心跳(防止worker挂掉)
4.如果worker长时间不回复,就把自己内存中的信息清除,防止将任务分配给死掉的worker
创建maven项目
创建一个空的maven项目
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| <properties> ... <scala.version>2.10.6</scala.version> <scala.compat.version>2.10</scala.compat.version> </properties>
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency>
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.10</artifactId> <version>2.3.14</version> </dependency>
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.10</artifactId> <version>2.3.14</version> </dependency>
</dependencies>
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin>
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.zonegood.Master</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
|
如果没有src/main/scala 需要手动创建
编写Master
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| package com.zonegood
import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import com.zonegood.MessageBox._
import scala.collection.mutable import scala.concurrent.duration._
class Master extends Actor{
val hashMap = new mutable.HashMap[String,WorkerInfo]() val CHECK_INTERVAL = 15000 override def preStart(): Unit = { println("master run...")
import context.dispatcher context.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis, self,CheckTimeOutWorker) }
override def receive: Receive = { case WorkerStartedMessage(workerId:String,workerInfo : WorkerInfo) => { println("worker connect") hashMap.put(workerId,workerInfo) sender ! RegisterSuccessMessage } case HeartBeatMessage(workerId:String) =>{ if(hashMap.contains(workerId)){ val worker = hashMap(workerId) worker.flashTime = System.currentTimeMillis() } } case CheckTimeOutWorker =>{ val currentTime = System.currentTimeMillis() val remove = hashMap.filter(t => (currentTime - t._2.flashTime) > CHECK_INTERVAL) for (e <- remove){ hashMap -= e._1 } println(hashMap.size)
} case HeartBeatSendMessage(workerId) =>{ val workerInfo = hashMap(workerId) workerInfo.flashTime = System.currentTimeMillis() } } }
object Master{ def main(args : Array[String]) :Unit = { val host = args(0) val port = args(1).toInt val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val conf = ConfigFactory.parseString(configStr) val masterSystem = ActorSystem("masterSystem",conf) masterSystem.actorOf(Props[Master],"masterActor") masterSystem.awaitTermination() }
}
|
编写Worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package com.zonegood
import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import com.zonegood.MessageBox.{HeartBeatMessage, HeartBeatSendMessage, RegisterSuccessMessage, WorkerStartedMessage}
import scala.concurrent.duration._
class Worker(id:String,masterHost:String,masterPort:Int) extends Actor with Serializable{ var master : ActorSelection = _ private[this] val HEART_BEAT_INTERVAL = 10000
val workerInfo = new WorkerInfo()
override def preStart(): Unit = {
master = context.actorSelection( s"akka.tcp://masterSystem@$masterHost:$masterPort/user/masterActor") master ! new WorkerStartedMessage(this.id,workerInfo)
}
override def receive: Receive = { case RegisterSuccessMessage => {
import context.dispatcher context.system.scheduler.schedule(0 millis, HEART_BEAT_INTERVAL millis , self, HeartBeatMessage) } case HeartBeatMessage => { master ! new HeartBeatSendMessage(this.id) } }
}
object Worker{ def main(args : Array[String]) : Unit = { val host = args(0) val port = args(1).toInt val masterHost = args(2) val masterPort = args(3).toInt val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val conf = ConfigFactory.parseString(configStr) val workerSystem = ActorSystem("workerSystem",conf) workerSystem.actorOf(Props(new Worker(UUID.randomUUID().toString,masterHost,masterPort)),"workerActor") workerSystem.awaitTermination()
} }
|
样例类
1 2 3 4 5 6 7 8 9
| package com.zonegood
object MessageBox { case class WorkerStartedMessage(workerId:String,workerInfo : WorkerInfo) case class RegisterSuccessMessage() case class CheckTimeOutWorker() case class HeartBeatMessage(workerId:String) case class HeartBeatSendMessage(workerId:String) }
|
封装数据类
1 2 3 4 5 6
| package com.zonegood
class WorkerInfo extends Serializable{
var flashTime = System.currentTimeMillis() }
|
使用maven-shade-plugin插件打包
1.打包Master执行 jar
修改pom.xml
1 2 3
| <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.zonegood.Master</mainClass> </transformer>
|
运行 mvn clean package
mv target/my-scala-rpc-1.0-SNAPSHOT.jar ~/workspace/master.jar
2.打包Worker执行 jar
同理 打出worker.jar
进入到~/workspace你就可以看到新的两个jar文件
运行
$ cd ~/workspace
$ java -jar master.jar 127.0.0.1 8888
$ java -jar worker.jar 127.0.0.1 7001 127.0.0.1 8888
$ java -jar worker.jar 127.0.0.1 7002 127.0.0.1 8888
$ java -jar worker.jar 127.0.0.1 7003 127.0.0.1 8888
查看运行结果
关闭其中一个Worker查看console结果