下面详细讲解“Springboot整合Netty实现RPC服务器的示例代码”的完整攻略。
一、简介
RPC(Remote Procedure Call),即远程过程调用,是一种通过网络从远程计算机上请求服务,而不需要了解底层网络技术的协议,是一种基于客户端/服务端模式的通信协议。相信大家已经非常熟悉 SpringBoot,那么我们如何使用 SpringBoot 整合 Netty 实现 RPC 呢?下面就来介绍具体的操作步骤。
二、前置知识
在学习本示例代码前需要对 springboot、netty 和 rpc 等技术有一定的了解和掌握。
三、示例代码
首先,我们会建立一个 SpringBoot 项目,并添加相应的依赖。然后我们需要在项目中添加 netty 和 protobuf 相关的依赖。这里我们使用的是最新版本的 netty 和 protobuf,具体如下所示:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.58.Final</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.15.6</version>
</dependency>
接下来我们编写 RPC 协议相关的代码,如下:
syntax = "proto3";
package rpcdemo;
message RpcRequest {
int32 requestId = 1;
string className = 2;
string methodName = 3;
repeated string parameterTypeNames = 4;
repeated bytes parameterValues = 5;
}
message RpcResponse {
int32 requestId = 1;
bytes value = 2;
}
这里我们使用 protobuf 定义了 RPC 协议的请求和响应消息格式。
接下来我们编写 Netty 的服务端代码,如下:
@Service
public class RpcServer {
private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
private EventLoopGroup bossGroup = new NioEventLoopGroup(4);
private EventLoopGroup workerGroup = new NioEventLoopGroup(20);
@Value("${server.port}")
private int port;
@Autowired
private RpcServiceRegistry rpcServiceRegistry;
@PostConstruct
public void start() {
logger.info("Starting RPC server on port {}", port);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new RpcRequestDecoder());
p.addLast(new RpcResponseEncoder());
p.addLast(new RpcServerHandler(rpcServiceRegistry));
}
});
ChannelFuture f = b.bind(port).syncUninterruptibly();
logger.info("RPC server is listening on port {}", port);
f.channel().closeFuture().syncUninterruptibly();
}
@PreDestroy
public void shutdown() {
logger.info("Shutting down RPC server on port {}", port);
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
这里我们使用了 SpringBoot 注解将 RpcServer 类定义为一个 Spring Bean,并且通过 @PostConstruct 和 @PreDestroy 定义了服务启动时和关闭时的方法。我们在启动 RpcServer 的时候,在 ServerBootstrap 中配置了 Netty 的参数,以及添加了对应的 ChannelHandler,其中 RpcServerHandler 是我们自己实现的处理逻辑。
RpcServer 首先会启动 Netty 服务器,并且监听指定端口(默认为 8080)。RpcServerHandler 是我们自己实现的处理逻辑,它依次从 RpcRequestDecoder 中获取 RpcRequest,然后通过 RpcServiceRegistry 获取对应的服务实例,调用对应的方法并返回 RpcResponse。
接下来我们编写 RpcRequestDecoder 和 RpcResponseEncoder 的代码,如下:
public class RpcRequestDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(RpcRequestDecoder.class);
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
RpcRequest rpcRequest = RpcRequest.parseFrom(in.array());
out.add(rpcRequest);
} catch (InvalidProtocolBufferException e) {
logger.error("Error decoding rpc request", e);
throw new RuntimeException(e);
}
}
}
public class RpcResponseEncoder extends MessageToByteEncoder<RpcResponse> {
private static final Logger logger = LoggerFactory.getLogger(RpcResponseEncoder.class);
@Override
protected void encode(ChannelHandlerContext ctx, RpcResponse msg, ByteBuf out) throws Exception {
try {
byte[] bytes = msg.toByteArray();
out.writeBytes(bytes);
} catch (Exception e) {
logger.error("Error encoding rpc response", e);
throw new RuntimeException(e);
}
}
}
其中,RpcRequestDecoder 继承了 ByteToMessageDecoder,实现了将 ByteBuf 解码为对象(RpcRequest)的逻辑,RpcResponseEncoder 继承了 MessageToByteEncoder,实现了将对象(RpcResponse)编码为 ByteBuf 的逻辑。
接下来我们编写 RpcServerHandler,如下:
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private static final Logger logger = LoggerFactory.getLogger(RpcServerHandler.class);
private RpcServiceRegistry rpcServiceRegistry;
public RpcServerHandler(RpcServiceRegistry rpcServiceRegistry) {
this.rpcServiceRegistry = rpcServiceRegistry;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception {
logger.info("Received RPC request: {}", rpcRequest);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(rpcRequest.getRequestId());
RpcService rpcService = rpcServiceRegistry.getRpcService(rpcRequest.getClassName());
if (rpcService == null) {
rpcResponse.setValue(ByteString.EMPTY);
} else {
try {
Method method = rpcService.getClass().getMethod(rpcRequest.getMethodName());
Object[] parameterValues = new Object[rpcRequest.getParameterValuesCount()];
for (int i = 0; i < rpcRequest.getParameterValuesCount(); i++) {
parameterValues[i] = rpcService.getParameterTypeNames()[i].equals("string")
? rpcRequest.getParameterValues(i).toStringUtf8()
: rpcRequest.getParameterValues(i).toByteArray();
}
Object result = method.invoke(rpcService, parameterValues);
rpcResponse.setValue(ByteString.copyFrom(result.toString().getBytes()));
} catch (Exception e) {
rpcResponse.setValue(ByteString.EMPTY);
logger.error("Error executing RPC service method: {}", e);
}
}
logger.info("Sending RPC response: {}", rpcResponse);
ctx.writeAndFlush(rpcResponse);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Error during RPC request processing", cause);
ctx.close();
}
}
这里我们使用了 SimpleChannelInboundHandler 处理 RpcRequest,在 channelRead0 中处理所接收到的请求,并将处理结果写回客户端。其中,RpcServiceRegistry 是我们创建的服务注册表,它通过 Map 存储了服务实例,并提供了相应的查询服务方法。
至此,我们已经将所有的代码实现完成。我们可以编译并运行程序,验证 Netty 能够正常处理来自客户端的请求。
四、示例说明
下面我们提供两个示例说明:
示例一:添加新的 Service
假设我们需要增加一个新的 Service。假设这个新的 Service 的名称为 MyRpcService,需要暴露的方法为 hello,该方法的参数为一个字符串,并返回一个字符串。
我们可以按照如下步骤添加新的 Service:
- 在 rpcdemo 目录下创建 MyRpcService.java 文件,具体内容如下:
public class MyRpcService implements RpcService {
@Override
public String[] getParameterTypeNames() {
return new String[]{"string"};
}
public String hello(String name) {
return "Hello, " + name + "!";
}
}
- 在启动时将新 Service 注册到 RpcServiceRegistry 中:
@Autowired
private RpcServiceRegistry rpcServiceRegistry;
@PostConstruct
public void start() {
...
rpcServiceRegistry.registerRpcService("MyRpcService", new MyRpcService());
...
}
启动程序,并使用 telnet 连接到 RPC Server 的端口,发送以下请求:
0000002c 0a 0b 4d 79 52 70 63 53 65 72 76 69 63 65 12 05 |..MyRpcService..|
0000003c 68 65 6c 6c 6f 1a 07 70 61 72 61 6d 65 74 65 72 |hello..parameter|
0000004c 54 79 70 65 4e 61 6d 65 1d 00 00 00 0b 48 65 6c |TypeName......Hel|
0000005c 6c 6f 2c |lo,|
其中,请求的消息体为:
message RpcRequest {
int32 requestId = 1;
string className = 2;
string methodName = 3;
repeated string parameterTypeNames = 4;
repeated bytes parameterValues = 5;
}
RpcRequest {
requestId: 0
className: "MyRpcService"
methodName: "hello"
parameterTypeNames: "string"
parameterValues: "parameterTypeName"
}
发送之后,我们可以收到以下响应:
0000000f 08 00 12 07 48 65 6c 6c 6f 2c |....Hello,|
示例二:添加新的协议类型
假设我们需要支持一个新的协议类型:XML。这里我们使用 jaxb 库来完成 XML 的序列化和反序列化操作。
我们可以按照如下步骤添加对新协议的支持:
- 在 rpcdemo 目录下创建 XmlRpcRequestDecoder 和 XmlRpcResponseEncoder,具体内容如下:
public class XmlRpcRequestDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(XmlRpcRequestDecoder.class);
private JAXBContext jaxbContext;
public XmlRpcRequestDecoder() throws JAXBException {
jaxbContext = JAXBContext.newInstance(XmlRpcRequest.class);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
XmlRpcRequest xmlRpcRequest = (XmlRpcRequest) unmarshaller.unmarshal(new ByteBufInputStream(in));
out.add(xmlRpcRequest);
}
}
public class XmlRpcResponseEncoder extends MessageToByteEncoder<XmlRpcResponse> {
private static final Logger logger = LoggerFactory.getLogger(XmlRpcResponseEncoder.class);
private JAXBContext jaxbContext;
public XmlRpcResponseEncoder() throws JAXBException {
jaxbContext = JAXBContext.newInstance(XmlRpcResponse.class);
}
@Override
protected void encode(ChannelHandlerContext ctx, XmlRpcResponse msg, ByteBuf out) throws Exception {
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.marshal(msg, new ByteBufOutputStream(out));
}
}
其中,XmlRpcRequestDecoder 继承了 ByteToMessageDecoder,实现了将 ByteBuf 解码为对象(XmlRpcRequest)的逻辑,XmlRpcResponseEncoder 继承了 MessageToByteEncoder,实现了将对象(XmlRpcResponse)编码为 ByteBuf 的逻辑。
- 修改 RpcServerHandler 的代码,增加对新协议类型的支持:
public class RpcServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger logger = LoggerFactory.getLogger(RpcServerHandler.class);
private RpcServiceRegistry rpcServiceRegistry;
private static final Map<String, BiFunction<byte[], String, XmlRpcResponse>> XML_RPC_FUNCTIONS = new HashMap<>();
static {
XML_RPC_FUNCTIONS.put("demo", (params, id) -> {
XmlRpcResponse response = new XmlRpcResponse();
response.setValue("<demo>Hello, World!</demo>");
response.setId(id);
return response;
});
}
public RpcServerHandler(RpcServiceRegistry rpcServiceRegistry) {
this.rpcServiceRegistry = rpcServiceRegistry;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
logger.info("Received RPC request: {}", message);
RpcResponse rpcResponse = new RpcResponse();
if (message instanceof RpcRequest) {
RpcRequest rpcRequest = (RpcRequest) message;
rpcResponse.setRequestId(rpcRequest.getRequestId());
RpcService rpcService = rpcServiceRegistry.getRpcService(rpcRequest.getClassName());
if (rpcService == null) {
rpcResponse.setValue(ByteString.EMPTY);
} else {
try {
Method method = rpcService.getClass().getMethod(rpcRequest.getMethodName());
Object[] parameterValues = new Object[rpcRequest.getParameterValuesCount()];
for (int i = 0; i < rpcRequest.getParameterValuesCount(); i++) {
parameterValues[i] = rpcService.getParameterTypeNames()[i].equals("xml")
? xmlRpcToObject(rpcRequest.getParameterValues(i).toByteArray())
: rpcService.getParameterTypeNames()[i].equals("string")
? rpcRequest.getParameterValues(i).toStringUtf8()
: rpcRequest.getParameterValues(i).toByteArray();
}
Object result = method.invoke(rpcService, parameterValues);
rpcResponse.setValue(ByteString.copyFrom(result.toString().getBytes()));
} catch (Exception e) {
rpcResponse.setValue(ByteString.EMPTY);
logger.error("Error executing RPC service method: {}", e);
}
}
logger.info("Sending RPC response: {}", rpcResponse);
ctx.writeAndFlush(rpcResponse);
} else if (message instanceof XmlRpcRequest) {
XmlRpcRequest xmlRpcRequest = (XmlRpcRequest) message;
BiFunction<byte[], String, XmlRpcResponse> function = XML_RPC_FUNCTIONS.get(xmlRpcRequest.getMethodName());
if (function == null) {
logger.error("Error executing XML RPC function: {}", xmlRpcRequest.getMethodName());
ctx.close();
}
byte[] params = Base64.getDecoder().decode(xmlRpcRequest.getParams());
XmlRpcResponse xmlRpcResponse = function.apply(params, xmlRpcRequest.getId());
logger.info("Sending RPC response: {}", xmlRpcResponse);
ctx.writeAndFlush(xmlRpcResponse);
} else {
logger.error("Invalid RPC message type: {}", message.getClass());
ctx.close();
}
}
private Object xmlRpcToObject(byte[] xmlBytes) throws Exception {
JAXBContext jaxbContext = JAXBContext.newInstance(XmlRpcValue.class);
Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
XmlRpcValue xmlRpcValue = (XmlRpcValue) unmarshaller.unmarshal(new ByteArrayInputStream(xmlBytes));
Object result;
switch (xmlRpcValue.getType()) {
case "string":
result = xmlRpcValue.getString();
break;
case "int":
result = xmlRpcValue.getInt();
break;
case "boolean":
result = xmlRpcValue.getBoolean();
break;
// More types can be added here
default:
throw new UnsupportedOperationException("Unsupported XML-RPC data type: " + xmlRpcValue.getType());
}
return result;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Error during RPC request processing", cause);
ctx.close();
}
}
这里我们将 RpcServerHandler 修改为了 SimpleChannelInboundHandler
这样,我们就给 Netty 添加了对 XML 协议的支持。
总结
本文主要是通过一个实例,来讲解 SpringBoot 整合 Netty 实现 RPC 服务器的完整操作过程,包含了各个组件的详细介绍和使用,希望可以帮助大家更好地理解和掌握相关的知识。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Springboot整合Netty实现RPC服务器的示例代码 - Python技术站