JGSK - Scala - 06.Actor

Table Of Contents

Actor

概念

Actor 是 Scala 的并发模型。在 2.10 之后的版本中,Scala 抛弃了自身的 Actor 而是使用了 Akka 作为其推荐的 Actor 实现。

使用 Actor

定义一个 Actor 只需要继承 Actor 特质并实现其中的 receive() 方法即可。除此之外,也可以实现其它的 Actor 的钩子方法 preStart(), postStop() 等。

创建一个 Actor

以下创建了一个回显的 Actor,其通过 receive() 接收到消息并且打印在控制台上。

class EchoServer extends Actor {
  def receive = {
    case msg: String => println("echo " + msg)
  }

  @throws[Exception](classOf[Exception])
  override def preStart(): Unit = {
    super.preStart()
    println("preStart")
  }
}

Actor 实例可以通过方法 system.actorOf 来进行创建。ActorSystem 可以用于创建多个基于同样配置的 Actor,也可以对 Actor 进行管理。

val system = ActorSystem()
val echoServer = system.actorOf(Props[EchoServer])

创建完 Actor 后可以通过 ! 向 Actor 内部的邮箱发消息。

echoServer ! "hi"

控制台输出

preStart
echo hi

通过隐式转换,Scala 可以使用 DSL 语句将以上创建 Actor 的语句进行进一步简化

implicit val system = ActorSystem()
val echoServer = actor(new Act {
  become {
    case msg => println("echo " + msg)
  }
  whenStarting {
    println("preStart")
  }
})
echoServer ! "hi"

Actor 和线程

  • Scala 中 Actor 和线程是不同的抽象,他们的对应关系是由 Dispatcher 决定的。
  • Actor 比线程轻量。在 Scala 中可以创建数以百万级的 Actor。奥秘在于 Actor 直接可以复用线程。
  • Actor 和线程之间没有一对一的对应关系。一个 Actor 可以使用多个线程,一个线程也会被多个 Actor 复用。

配置 Dispatcher

每一个 ActorSystem 都有一个默认的 Dispatcher,但是也可以通过配置文件进行更改使用。

在工程的资源目录下创建 application.conf 文件。内容如下:

my-dispatcher {
  # Dispatcher 的类型
  type = Dispatcher
  # ExecutionService 的类型
  executor = "fork-join-executor"
  # 配置 fork join 池
  fork-join-executor {
    # 最小线程数
    parallelism-min = 2
    # 线程增长因子
    parallelism-factor = 2.0
    # 最大线程数
    parallelism-max = 10
  }
  # 线程切换到另一个actor之前处理的消息数上限,1 为尽可能公平
  throughput = 100
}

以上就定义了一个名为 my-dispatcher 的 Dispatcher。

Dispatcher 共有四种类型:

  • Dispatcher 默认类型,底层为 java.util.concurrent.ExecutorService,为每个 Actor 创建一个邮箱
  • PinnedDispatcher 底层为 akka.dispatch.ThreadPoolExecutorConfigurator,为每个 Actor 创建一个邮箱
  • BalancingDispatcher 底层为 java.util.concurrent.ExecutorService,为所有 Actor 创建一个邮箱,只有同一类型的 Actor 可以进行共享
  • CallingThreadDispatcher 仅供测试

使用自定义的 Dispatcher

val echoServers = (1 to 10).map(x =>
  system.actorOf(Props(new EchoServer2(x.toString))
    .withDispatcher("my-dispatcher")))

(1 to 10).foreach(echoServers(Random.nextInt(10)) ! _)

以上例子中创建了 10 个 基于 my-dispatcher 的 Actor,每个 Actor 接收一个 0 到 10 的随机数作为消息。

除此之外,也可以通过 lookup() 方法获得已定义好的 Dispatcher 的实例

implicit val executionContext = system.dispatchers.lookup("my-dispatcher")

Actor 间通信

Actor 间使用样本类可以发送更丰富的消息内容也能够轻易完成 Actor 间的通信。

定义作为消息的样本类,最后一个参数为发送消息的 Actor 的引用

case class Message[T <: ActorRef](content: String, sender: T)

使用该 Actor

implicit val system = ActorSystem()
val pingActor = actor(new Act {
  become {
    case Message(msg: String, sender: ActorRef) =>
      println(s"$msg pang")
  }
})
pingActor ! Message("ping", pingActor)

获得处理结果

Actor 发送消息后可以获得处理该消息的 Actor 的处理结果。要实现这功能,只需要使用 ask() 代替 ! 发送消息就可以获得用于获得结果的 Future 对象。

import akka.actor.ActorDSL._
import akka.pattern.ask

implicit val ec = scala.concurrent.ExecutionContext.Implicits.global
implicit val system = akka.actor.ActorSystem()

val versionUrl = "https://github.com/SidneyXu"

val fromURL = actor(new Act {
  become {
    case url: String => sender ! scala.io.Source.fromURL(url)
      .getLines().mkString("\n")
  }
})

val versionFuture = fromURL.ask(versionUrl)(akka.util.Timeout(5, TimeUnit.SECONDS))

这个例子通过调用 ask 函数来获取一个 Futureask 内部也是用 ! 来传递消息,但是其可以同时设置超时时间。

通过调用 Future 的不同方法可以实现同步和异步操作:

获得同步结果

versionFuture.foreach(println)

获得异步结果

versionFuture onComplete {
  case msg => println(msg)
}

远程 Actor

Actor 可以进行远程调用,实现 RMI 的功能。

服务端

代码

import akka.actor.{ActorSystem, Props}

object RemoteServer extends App {

  implicit val system = ActorSystem("RemoteSystem")
  val remoteActor = system.actorOf(Props[EchoServer], name = "remoteServer")
  remoteActor ! "The RemoteActor is alive"

}

配置文件

配置文件需要放在 classpath 下, 系统默认读取的配置文件名为 application.conf

application.conf

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    netty.tcp {
      hostname = "127.0.0.1"
      port = 5150
    }
  }
}

客户端

代码

object Client extends App {

  implicit val system = ActorSystem("LocalSystem", ConfigFactory.load("client"))
  val remote = system.actorSelection("akka.tcp://RemoteSystem@127.0.0.1:5150/user/remoteServer")
  remote ! "Hello from the LocalActor"

}

配置文件

可以通过 ConfigFactory.load() 来读取指定的配置文件,文件名不包含后缀名。

只有服务器端需要知道端口号,所以客户端的端口号设为 0

client.conf

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }
}

运行

先运行服务器端,控制台输出 “echo The RemoteActor is alive”, 再运行客户端,服务器端控制台会接着输出 “echo Hello from the LocalActor”