Scala并发编程【进阶】

时间:2023-03-09 20:43:43
Scala并发编程【进阶】
 package com.dingxin.entrance

 import java.text.SimpleDateFormat
import java.util.Date import scala.actors.Actor
import scala.actors.Actor._
/**
* Created by zhen on 2019/1/24.
*/
object My_Actor_Receive extends Actor{
def act(){
while(true){
receive{
case str : String => print(str + " ") // 模式匹配
case dat : Date => println(new SimpleDateFormat("yyyy").format(dat))
case _ => println("My heart will go on !")
}
}
}
}
object Actor_Receive {
def main(args: Array[String]) {
val getMessage = actor{
while(true){
receive{
case str : String => print(str) // 模式匹配
case dat : Date => println(new SimpleDateFormat("yyyy").format(dat))
case _ => My_Actor_Receive ! null // 消息转发
}
}
}
val sendMessage = actor{
while(true){
receive{
case str : String => getMessage ! str + " " // 消息转发
case dat : Date => getMessage ! dat
case _ => getMessage ! null
}
}
}
sendMessage ! "Scala"
sendMessage ! new Date()
sendMessage ! 2020 // 这种方式必须执行start开启,且都是并行执行,不确定先后顺序
My_Actor_Receive.start()
My_Actor_Receive ! "Spark"
}
}

结果1:

  Scala并发编程【进阶】

结果2:

  Scala并发编程【进阶】

信息交互

 package big.data.analyse.scala

 import scala.actors.Actor
import scala.actors.Actor._
/**
* 消息发送与接收,可用于流计算测试的输入
* Created by zhen on 2018/4/15.
*/
object ActorTest {
def main(args: Array[String]) {
val actor = new HelloActor
actor.start//启动actor消息机制
var counter = 0
while(counter<10){
actor ! "Step " + counter //发送消息
counter += 1
Thread.sleep(2000)
self.receive{case msg => println("返回结果:"+msg)} // 获取子线程的消息
}
}
}
class HelloActor extends Actor{
def act(): Unit ={
while(true){
receive{
case content : String => println("Message : " + content)
sender ! content.split(" ")(1) // 向主线程发送消息
}
}
}
}

结果3:

  Scala并发编程【进阶】

loop+react

 package big.data.analyse.scala.actor

 import java.net.{UnknownHostException, InetAddress}
import scala.actors.Actor
import scala.actors.Actor._ /**
* Created by zhen on 2019/6/19.
*/
object NameResolver extends Actor{
def act(){
loop {
react {
case Net (name, actor) => actor ! getIp(name)
case msg => println("Unhandled message : " + msg)
}
}
}
def getIp(name : String) : Option[InetAddress] = {
try{
println(name)
Some(InetAddress.getByName(name))
} catch {
case _ : UnknownHostException => None
}
}
} case class Net(name : String, actor: Actor) object Actor_More_Effective {
def main(args: Array[String]) {
NameResolver.start
NameResolver ! Net("www.baidu.com", self)
NameResolver ! "www.xiaomi.com" for(i <- 1 until 10){
NameResolver ! "小米" + i
}
println(self.receiveWithin(1000){case x => x})
}
}

结果4:

  Scala并发编程【进阶】

Actor详解

  1.Actor是一个通信模型,Scala提供了Actor的实现

  2.Spark1.6之前集群节点之间通信使用的是Akka,Akka底层是Actor实现的。Spark1.6之后,节点的通信变成Netty

  3.Actor相当于我们理解的Thread,Actor的出现主要解决的是代码锁的问题

  4.Actor底层通信实现用到了模式匹配