package com.dingxin.entrance import java.text.SimpleDateFormat
import java.util.Date import scala.actors.Actor
import scala.actors.Actor._
/**
* Created by zhen on 2019/1/24.
*/
object My_Actor_Receive extends Actor{
def act(){
while(true){
receive{
case str : String => print(str + " ") // 模式匹配
case dat : Date => println(new SimpleDateFormat("yyyy").format(dat))
case _ => println("My heart will go on !")
}
}
}
}
object Actor_Receive {
def main(args: Array[String]) {
val getMessage = actor{
while(true){
receive{
case str : String => print(str) // 模式匹配
case dat : Date => println(new SimpleDateFormat("yyyy").format(dat))
case _ => My_Actor_Receive ! null // 消息转发
}
}
}
val sendMessage = actor{
while(true){
receive{
case str : String => getMessage ! str + " " // 消息转发
case dat : Date => getMessage ! dat
case _ => getMessage ! null
}
}
}
sendMessage ! "Scala"
sendMessage ! new Date()
sendMessage ! 2020 // 这种方式必须执行start开启,且都是并行执行,不确定先后顺序
My_Actor_Receive.start()
My_Actor_Receive ! "Spark"
}
}
结果1:
结果2:
信息交互
package big.data.analyse.scala import scala.actors.Actor
import scala.actors.Actor._
/**
* 消息发送与接收,可用于流计算测试的输入
* Created by zhen on 2018/4/15.
*/
object ActorTest {
def main(args: Array[String]) {
val actor = new HelloActor
actor.start//启动actor消息机制
var counter = 0
while(counter<10){
actor ! "Step " + counter //发送消息
counter += 1
Thread.sleep(2000)
self.receive{case msg => println("返回结果:"+msg)} // 获取子线程的消息
}
}
}
class HelloActor extends Actor{
def act(): Unit ={
while(true){
receive{
case content : String => println("Message : " + content)
sender ! content.split(" ")(1) // 向主线程发送消息
}
}
}
}
结果3:
loop+react
package big.data.analyse.scala.actor import java.net.{UnknownHostException, InetAddress}
import scala.actors.Actor
import scala.actors.Actor._ /**
* Created by zhen on 2019/6/19.
*/
object NameResolver extends Actor{
def act(){
loop {
react {
case Net (name, actor) => actor ! getIp(name)
case msg => println("Unhandled message : " + msg)
}
}
}
def getIp(name : String) : Option[InetAddress] = {
try{
println(name)
Some(InetAddress.getByName(name))
} catch {
case _ : UnknownHostException => None
}
}
} case class Net(name : String, actor: Actor) object Actor_More_Effective {
def main(args: Array[String]) {
NameResolver.start
NameResolver ! Net("www.baidu.com", self)
NameResolver ! "www.xiaomi.com" for(i <- 1 until 10){
NameResolver ! "小米" + i
}
println(self.receiveWithin(1000){case x => x})
}
}
结果4:
Actor详解
1.Actor是一个通信模型,Scala提供了Actor的实现
2.Spark1.6之前集群节点之间通信使用的是Akka,Akka底层是Actor实现的。Spark1.6之后,节点的通信变成Netty
3.Actor相当于我们理解的Thread,Actor的出现主要解决的是代码锁的问题
4.Actor底层通信实现用到了模式匹配