本文介绍akka的基本使用方法,由于属于基础功能,想不出一个很高大上的名称,此处就以基础模式命名。下文会介绍actor的使用方法,及其优劣点。

class SimpleActor(name:String) extends Actor {
  private def doWork(message:SayHello):Unit = {
    println(s"$name 收到 ${message.from.path.name} 的消息 [$message] ,工作进行中... 当前线程号 ${Thread.currentThread().getId}")
  }
  override def receive: Receive = {
    case msg @ SayHello(from,message) =>
      doWork(msg)
      val returnMsg = HelloSaid(s"嗨 ${from.path.name} ,${self.path.name} 收到了 $message 消息")
      println(s"$name 工作结束,准备返回消息[${returnMsg.message}]")
  }
}
object SimpleBasicPattern {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("BasicPattern",ConfigFactory.load())
    val person1 = system.actorOf(Props(new SimpleActor("person1")),"personActor1")
    println(s"Main thread Id ${Thread.currentThread().getId}")
    person1 ! SayHello(person1,"Hello World 1")
    person1 ! SayHello(person1,"Hello World 2")
  }
}

 

输出:

Main thread Id 1
person1 收到 personActor1 的消息 [SayHello(Actor[akka://BasicPattern/user/personActor1#1662593548],Hello World 1)] ,工作进行中... 当前线程号 13
person1 工作结束,准备返回消息[嗨 personActor1 ,personActor1 收到了 Hello World 1 消息]
person1 收到 personActor1 的消息 [SayHello(Actor[akka://BasicPattern/user/personActor1#1662593548],Hello World 2)] ,工作进行中... 当前线程号 13
person1 工作结束,准备返回消息[嗨 personActor1 ,personActor1 收到了 Hello World 2 消息]

 

   如上图,我设计了一个简单的actor:HelloWroldActor。它有两个方法,其中receive是收到消息之后处理消息的入口函数,定义了对消息的处理方式。收到SayHello之后,调用doWork同步处理消息

  我们可以跟上一篇博客进行对比,此处给person1发送了一条SayHello消息,在OOP中是直接调用函数,此处使用 ! 函数发送消息;person1收到消息后,同步调用doWork处理消息。这是最基本的actor使用方式:通过 ! 发消息给actor。从输出中可以看到,主线程和doWork所在线程是不同的线程。

  这是基础模式的最基本形式,给actor发送消息,actor对消息进行响应,发送和响应是异步的,同一个actor对所有的消息都是按照邮箱队列的顺序,串行调用的。下面是基础模式的另外一种高级形式。

class HelloWorldActor(other:ActorRef,name:String) extends Actor {
  private def doWork(message:String):HelloSaid = {
    println(s"$name 收到 ${other.path.name} 的消息 [$message] ,工作进行中...")
    HelloSaid("这是处理后返回的消息")
  }
  override def receive: Receive = {
    case DoWork(message) =>
      println(s"嗨 ${other.path.name} ,我正在为你工作")
      val returnMsg = doWork(message)
      other ! WorkDone(returnMsg.message)
    case WorkDone(message) =>
      println(s"$name 收到了 ${sender().path.name} 的回复消息:[$message]")
  }
}
object BasicPattern {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("BasicPattern",ConfigFactory.load())
    val person1 = system.actorOf(Props(new HelloWorldActor(null,"person1")),"personActor1")
    val person2 = system.actorOf(Props(new HelloWorldActor(person1,"person2")),"personActor2")
    person2 ! DoWork("Hello World")
  }
}

 

输出:

嗨 personActor1 ,我正在为你工作
person2 收到 personActor1 的消息 [Hello World] ,工作进行中...
person1 收到了 personActor2 的回复消息:[嗨 personActor1 工作已完成,这是返回消息 HelloSaid(这是处理后返回的消息)]

 

   在上面的模式中,我们首先给person2发送了开始工作的消息,person2收到消息后,开始为person1工作:调用doWork进行计算。计算结束后把消息发送给了person1,person1收到workDone的消息后,将结果打印了出来。这个例子稍微复杂点,涉及到了两个actor的通信。但这仍然是一种简单的形式,因为person1的actorRef引用是通过构造函数传递给person2的,这样person2就只能为person1工作。这非常不方便,因为actor创建的时候不一定能知道另外一个actor的地址。那么下面又是一种高级形式:

class HelloActor(name:String) extends Actor {
  private def doWork(message:String,forActor:ActorRef):HelloSaid = {
    println(s"$name 收到 ${forActor.path.name} 的消息 [$message] ,工作进行中...")
    HelloSaid("这是处理后返回的消息")
  }
  override def receive: Receive = {
    case DoWorkFor(message,forActor) =>
      println(s"嗨 ${forActor.path.name} ,我正在为你工作")
      val returnMsg = doWork(message,forActor)
      forActor ! WorkDone(returnMsg.message)
    case WorkDone(message) =>
      println(s"$name 收到了 ${sender().path.name} 的回复消息:[$message]")
  }
}
object BasicPattern3 {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("BasicPattern2",ConfigFactory.load())
    val person1 = system.actorOf(Props(new HelloActor("person1")),"personActor1")
    val person2 = system.actorOf(Props(new HelloActor("person2")),"personActor2")
    person2 ! DoWorkFor("Hello World",person1)
  }
}

 

输出:
嗨 personActor1 ,我正在为你工作
person2 收到 personActor1 的消息 [Hello World] ,工作进行中...
person1 收到了 personActor2 的回复消息:[这是处理后返回的消息]

 

   在上面的图中,我们把person1的actorRef通过消息的形式发送给了person2,这样person2就能为不同的person工作了,因为工作的对象是通过消息传递的。

  通过上面3个例子,我们可以看到,只能通过给actor发送消息与actor通信,调用其对应的函数,函数的返回结果也只能异步的发送给调用方。而在OOP中调用另一个对象的函数,看起来比这个简单多了,获取函数处理结果也非常简单。但读者要仔细思考这两者的区别,actor的通信全都是异步的。意味着person2给person1发送消息之后,可以立即进行其他的处理,而不需要等待person1的应答,即person1和person2功能做到了完全解耦。

  BasicPattern2和BasicPattern3的区别是调用方获取方式的不同,其实还有另外一种形式:

class Master(workerPath:String) extends Actor{
  override def receive: Receive = {
    case DoWork(message) =>
      println(s"master 收到 doWork消息:$message")
      val worker = context.actorSelection( s"/user/$workerPath")
      worker ! DoWorkFor(message,self)
    case WorkDone(message) =>
      println(s"master 收到 ${sender().path.name} 的返回消息 $message")
  }
}
class Worker extends Actor{
  private def doWork(message:String):String = {
    println(s"worker 收到了消息 $message")
    "这里是worker返回消息"
  }
  override def receive: Receive = {
    case DoWorkFor(message,forActor) =>
      val result = doWork(message)
      forActor ! WorkDone(result)

  }
}
object BasicPattern4 {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("BasicPattern2",ConfigFactory.load())
    system.actorOf(Props(new Worker()),"workerActor")
    val master = system.actorOf(Props(new Master("workerActor")),"masterWorker")
    master ! DoWork("Hello World")
  }
}

 

输出

master 收到 doWork消息:Hello World
worker 收到了消息 Hello World
master 收到 workerActor 的返回消息 这里是worker返回消息

 

   在这个形式中,master通过worker的actorPath,用actorSelection查询了worker的地址,然后发送消息给它。与BasicPattern3不同的是,master不需要知道worker的邮箱地址,它只需要知道worker的actorPath就可以发消息了。然后master和worker就可以按照前面的pattern互通消息了。

  请注意DoWork、WorkDone、DoWork这三个消息的处理完全是异步的,没有任何直接的关系。

  上面的4个例子我都将其认定为基础模式,因为这都是Akka的基础功能,没有涉及太高深的技术,也是在学习Akka的初期最容易理解的模式。虽然简单,还是有很多值得学习的地方的。在下一篇博客中,我们会针对BasicPattern4进行优化,讲解另外MasterWorkerBackend模式,这种模式比较复杂,希望读者深刻理解本博文的4个例子,再阅读后续文章。