如何开发一个简单的Akka Java应用
Akka 是一个构建并发、分布式、可扩展的消息驱动应用程序的工具包与运行时。
要开发一个简单的Akka Java应用,可以按照以下步骤进行。
步骤一:添加依赖
在项目的 pom.xml 文件中添加以下依赖:
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.6.16</version>
</dependency>
</dependencies>
步骤二:编写Actor类
创建一个继承自 akka.actor.AbstractActor 的 Actor 类,一个简单的例子如下所示:
import akka.actor.AbstractActor;
import akka.actor.Props;
public class Greeter extends AbstractActor {
static public Props props(String message) {
return Props.create(Greeter.class, () -> new Greeter(message));
}
static public class Greeting {
public final String message;
public Greeting(String message) {
this.message = message;
}
}
private final String message;
public Greeter(String message) {
this.message = message;
}
@Override
public void preStart() throws Exception {
super.preStart();
System.out.println("Greeter actor started");
}
@Override
public void postStop() throws Exception {
super.postStop();
System.out.println("Greeter actor stopped");
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Greeting.class, greeting -> {
System.out.println(message + ", " + greeting.message);
})
.build();
}
}
在这个例子中,我们定义了一个 Greeter 类,它继承自 AbstractActor 类,重写了其 createReceive 函数实现了消息的处理逻辑。其中,我们定义了一个 Greeting 类,来传入 Actor 的消息。重写了 preStart 和 postStop 函数用于在 Actor 启动和停止的时候打印相应的日志。
步骤三:创建 ActorSystem
创建一个 ActorSystem 来协调 Actor 的创建和调度。
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
public class HelloWorld {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("helloakka");
final ActorRef greeter = system.actorOf(Greeter.props("Hello"), "greeter");
greeter.tell(new Greeter.Greeting("World"), ActorRef.noSender());
system.terminate();
}
}
这个例子中,我们创建了一个名为 "helloakka" 的 ActorSystem,创建了一个带有 "Hello" 消息的 Greeter Actor,被命名为 "greeter",然后将一个新的 "World" 消息发送给它。
执行该例子后,将输出以下内容:
Greeter actor started
Hello, World
Greeter actor stopped
这个例子展示了如何创建一个基本的 Akka 应用程序。
示例:使用Actor进行并发计算
下面我们再演示一个使用Akka Actor进行并发计算的例子。当我们输入一个整数时,对这个整数进行两次平方操作,最后将操作结果两次相加。其中,每个平方操作和相加操作是由不同的Actor完成的,基本实现步骤与上面的案例类似。
- ComputeActor:计算平方并通知相加Actor
import akka.actor.AbstractActor;
public class ComputeActor extends AbstractActor {
static public Props props(Integer number, ActorRef addActor) {
return Props.create(ComputeActor.class, () -> new ComputeActor(number, addActor));
}
private final Integer number;
private final ActorRef addActor;
public ComputeActor(Integer number, ActorRef addActor) {
this.number = number;
this.addActor = addActor;
}
@Override
public void preStart() throws Exception {
super.preStart();
System.out.println("Compute actor started");
}
@Override
public void postStop() throws Exception {
super.postStop();
System.out.println("Compute actor stopped");
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("square", s -> {
Integer result = number * number;
System.out.println("Square of " + number + " is " + result);
addActor.tell(result, getSelf());
})
.build();
}
}
在上面的代码中,我们创建了 ComputeActor 类,它负责计算输入整数的平方并将结果通知给 AddActor。当该 Actor 接收到 "square" 消息时,它将计算平方并将结果发送给 AddActor。
- AddActor:计算累加结果
import akka.actor.AbstractActor;
public class AddActor extends AbstractActor {
static public Props props(Integer count, ActorRef sourceActor) {
return Props.create(AddActor.class, () -> new AddActor(count, sourceActor));
}
private final Integer count;
private final ActorRef sourceActor;
private Integer current = 0;
public AddActor(Integer count, ActorRef sourceActor) {
this.count = count;
this.sourceActor = sourceActor;
}
@Override
public void preStart() throws Exception {
super.preStart();
System.out.println("Add actor started");
}
@Override
public void postStop() throws Exception {
super.postStop();
System.out.println("Add actor stopped");
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, num -> {
current += num;
if (current.equals(count)) {
System.out.println("Result is " + current);
getContext().getSystem().terminate();
}
})
.build();
}
}
在上面的代码中,我们创建了 AddActor 类,它负责将 ComputeActor 计算出的平方结果累加起来,并在完成累加后终止 ActorSystem。当该 Actor 接收到包含 Integer 类型数字的消息时,将计数器 current 的值增加,并检查是否到达计算平方的次数 count,是则输出计算结果,并调用 ActorSystem 的 terminate 方法终止应用程序。
- Main:
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
public class Main {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("helloakka");
final ActorRef addActor = system.actorOf(AddActor.props(2, system.deadLetters()), "addActor");
final ActorRef computeActor1 = system.actorOf(ComputeActor.props(2, addActor), "computeActor1");
final ActorRef computeActor2 = system.actorOf(ComputeActor.props(3, addActor), "computeActor2");
computeActor1.tell("square", ActorRef.noSender());
computeActor2.tell("square", ActorRef.noSender());
system.awaitTermination();
}
}
在 Main 类中,我们创建 ActorSystem,并按照顺序创建 AddActor 和两个 ComputeActor,指定计算平方的次数。然后我们向每个 ComputeActor 发送消息,通知它们开始计算平方。
执行该例子后,将输出以下内容:
Add actor started
Compute actor started
Square of 3 is 9
Compute actor started
Square of 2 is 4
Result is 13
Compute actor stopped
Compute actor stopped
Add actor stopped
这个例子展示了如何使用 Akka Actor 进行并发计算。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:如何开发一个简单的Akka Java应用 - Python技术站