牛逼!自己动手从0实现一个分布式RPC框架,成功拿下阿里offer
suiw9 2024-11-17 15:50 20 浏览 0 评论
写给大家的话
最近我收到很多读者的来信,对如何学习分布式、如何进行项目实践和提高编程能力,存在很多疑问。
分布式那么难,怎么学?为什么看了那么多书还是掌握不了?
开源的框架比如Dubbo代码太多了,完全一头雾水,应该怎么学习?
在学校没有项目经验,找工作在即,怎么把这块补起来?你还仅限于XXX管理系统么「面试官都疲劳了」?
我在学校的时候,也有和大家一样的困惑。
毕业去阿里工作了几年后,通过参与实际的项目开发,关于如何学习新知识、如何快速上手并应用有了一些体会,在这里给大家分享一下:
- 一定要动手,动手写代码,实现demo,去debug,在调试的过程中学习。没有必要抱着大块头的书看完了以后再动手写代码,在实践中学习是最快速的方法。
- 在学习框架的过程中,尽量先从框架的初始版本开始看,因为开源框架往往功能复杂,代码庞大,很容易劝退。比如学习linux内核,可以从早期版本开始看。
- 「造轮子」。掌握知识最好的办法是去做项目、实现它。关于项目,很多推荐XXX管理系统的,我认为,此类XXX管理系统些在简历上目前大厂是一点竞争力都没有的,面试官都疲了,培训机构清一色的XXX管理系统,springboot全家桶。必须得差异化竞争。
- 我在这里给大家推荐几个优秀的项目,后面我也会逐个实现给大家:自己实现spring ioc/aop、RPC框架、MQ框架、KV存储、分布式锁。这些项目和互联网大厂技术栈无缝结合,通过自己实现分布式组件「也就是大家平时说的造轮子」,为什么要造轮子?一方面是避免成为调包侠或CRUD工程师,另一方面是提高自己的技术深度,让自己的职业道路更宽。
这一系列文章我目前已经写了5篇,后面会在本公众号陆续分享给大家,大家可以关注我的宫伀号【编程学习指南】追更。
本文非常值得点赞+收藏,因为内容非常多,包含完整的代码实现,真的是手把手教你们怎么实现。要想让自己的简历让面试官眼前一亮,这些项目肯定是加分项。
Github地址: (欢迎star)
https://github.com/xiajunhust/tinywheel/tree/main/RPC%20framework
分布式RPC框架,WHY?
RPC是指远程过程调用(Remote Procedure Call)。可以使得我们在分布式环境下调用远程服务像调用本机服务一样方便。在分布式应用中使用非常广泛。
有人会问:“有了开源的RPC框架,为什么要自己去实现?”
RPC基本原理不难,但是在实际实现的过程中还是会遇到很多坑,涉及很多知识点:线程模型、通信协议设计、负载均衡、动态代理等。
通过自己动手实现的方式一个简易的RPC框架,包含RPC的核心功能「麻雀虽小五脏俱全」,可以检验自己对知识的掌握情况,学会在实际中灵活运用,加深理解。当然了,生产环境中,建议大家还是用成熟的开源框架。
RPC框架理论基础
BRUCE JAY NELSON在其1984年的论文《Implementing Remote Procedure Calls》中描述到,当我们在程序中发起RPC调用时,会涉及5个模块:
- user:发起调用的应用模块,发起rpc调用 会和发起本地调用一样,不感知rpc底层逻辑。
- user-stub:负责调用请求的打包以及结果的解包。
- RPCRuntime:RPC运行时,负责处理远程网络交互,如网络包重传、加密等。
- server-stub:负责请求的解包以及结果的打包。
- server:真正提供服务处理的应用模块。
这5部分的关系如下图所示:
主流开源RPC框架
(1)dubbo:阿里巴巴出品的RPC框架,经历了电商海量场景的考验,github 36.7star。支持java语言。
官网:https://dubbo.apache.org/zh/
github:https://github.com/apache/dubbo
(2)grpc:谷歌开源rpc框架,支持多种语言。github star 33.2k。
官网:https://grpc.io/
github:https://github.com/grpc/grpc
(3)motan:新浪开源的rpc框架,仅支持java语言。
github:https://github.com/weibocom/motan
(4)spring cloud:Pivotal公司2014年对外开源的RPC框架,仅支持Java。
(5)brpc:百度开源的rpc框架,C++实现。
github:https://github.com/apache/incubator-brpc
RPC框架设计
整体结构如下图,给大家展示了一个「麻雀虽小五脏俱全」的RPC框架,去除了管控平台等辅助功能。通过对核心功能进行设计和实现,理解整个RPC框架的设计原理。
涉及核心技术:
- 注册中心:服务端将发布的服务注册到注册中心,调用端从注册中心订阅服务,获得服务的地址,才能发起调用。
- 分布式环境不同服务器之间需要通过网络通信(RPC client)。
- 网络通信必然涉及到编解码。
- 避免每次寻址都需要调用注册中心,服务调用端还需要对服务信息进行缓存。
- 动态代理:方便对客户端调用透明化。
详细设计&技术实现
01
技术选型
- spring-boot,依赖管理,强大的配置化能力。可以方便制作RPC框架的starter,集成使用起来非常便捷。
- netty
- zookeeper
- protobuf
02
RPC调用流程分析
一次RPC调用整个过程,到底发生了什么事情呢?如下通过序列图的方式展示了详细步骤:
03
工程模块依赖
代码模块分层如下:
- util:基础工具类。
- model:基础领域模型。
- annotation:注解。提供注解功能,可以非常方便的发布RPC服务和引用RPC服务。
- registry:注册中心,给出了zk的实现。
- io:编码和解码实现。
- provider:服务提供者实现。
- consumer:服务消费者实现。
代码包详细情况:
04
代码详细介绍
采用spring-boot框架。将RPC框架实现为一个starter,方便集成使用。
注解
为了方便使用此RPC框架,我们通过定义注解,让使用者能直接通过一行注解进行服务的发布和引用。
/**
* RPC provider注解
*/
@Retention(RetentionPolicy.RUNTIME)
//注解打在类上
@Target(ElementType.TYPE)
@Component
public @interface SimpleRpcProvider {
Class<?> serviceInterface() default Object.class;
String serviceVersion() default "1.0.0";
}
/**
* RPC consumer
*
* @author summer
* @version $Id: SimpleRpcProviderBean.java, v 0.1 2022年01月16日 11:53 AM summer Exp $
*/
@Retention(RetentionPolicy.RUNTIME)
//注解打在属性上
@Target(ElementType.FIELD)
@Component
public @interface SimpleRpcConsumer {
/**
* 服务版本号
* @return
*/
String serviceVersion() default "1.0.0";
/**
* 注册中心类型-默认zk
* @return
*/
String registerType() default "zookeeper";
/**
* 注册中心地址
* @return
*/
String registerAddress() default "127.0.0.1:2181";
}
注册中心
常见的注册中心有很多种,比如zookepper、eureka、nacos、consul等。注册中心的原理不是本文的重点,因此不做详细描述。
此处采用zookeeper的实现,有兴趣的童鞋可以自行进行其他实现,只需要实现一个子类即可。
/**
* 注册中心服务接口定义
*/
public interface ServiceRegistry {
/**
* 注册服务
*
* @param serviceMetaConfig 服务元数据配置
* @throws Exception
*/
void register(ServiceMetaConfig serviceMetaConfig) throws Exception;
/**
* 取消注册服务
*
* @param serviceMetaConfig 服务元数据配置
* @throws Exception
*/
void unRegister(ServiceMetaConfig serviceMetaConfig) throws Exception;
/**
* 服务发现
*
* @param serviceName 服务名
* @return
* @throws Exception
*/
ServiceMetaConfig discovery(String serviceName) throws Exception;
}
zk实现(采用curator):
import com.summer.simplerpc.registry.cache.ServiceProviderCache;
import com.summer.simplerpc.registry.model.ServiceMetaConfig;
import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.util.ServiceUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.*;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
/**
* 服务注册中心-zk实现
*/
public class ZkServiceRegistry implements ServiceRegistry {
/**
* zk base path
*/
private final static String ZK_BASE_PATH = "/simplerpc";
/**
* serviceProvider锁
*/
private final Object lock = new Object();
/**
* zk framework client
*/
private CuratorFramework client;
/**
* 服务发现
*/
private ServiceDiscovery<ServiceMetaConfig> serviceDiscovery;
/**
* serviceProvider缓存
*/
private ServiceProviderCache serviceProviderCache;
/**
* 构造函数
*
* @param address 地址
*/
public ZkServiceRegistry(String address, ServiceProviderCache serviceProviderCache) throws Exception {
this.client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));
this.client.start();
this.serviceProviderCache = serviceProviderCache;
JsonInstanceSerializer<ServiceMetaConfig> serializer = new JsonInstanceSerializer<>(ServiceMetaConfig.class);
serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaConfig.class)
.client(client)
.serializer(serializer)
.basePath(ZK_BASE_PATH)
.build();
serviceDiscovery.start();
}
@Override
public void register(ServiceMetaConfig serviceMetaConfig) throws Exception {
ServiceInstanceBuilder<ServiceMetaConfig> serviceInstanceBuilder = ServiceInstance.builder();
ServiceInstance<ServiceMetaConfig> serviceInstance = serviceInstanceBuilder
.name(ServiceUtils.buildServiceKey(serviceMetaConfig.getName(), serviceMetaConfig.getVersion()))
.address(serviceMetaConfig.getAddress())
.port(serviceMetaConfig.getPort())
.payload(serviceMetaConfig)
.uriSpec(new UriSpec("{scheme}://{address}:{port}"))
.build();
serviceDiscovery.registerService(serviceInstance);
}
@Override
public void unRegister(ServiceMetaConfig serviceMetaConfig) throws Exception {
ServiceInstanceBuilder<ServiceMetaConfig> serviceInstanceBuilder = ServiceInstance.builder();
ServiceInstance<ServiceMetaConfig> serviceInstance = serviceInstanceBuilder
.name(ServiceUtils.buildServiceKey(serviceMetaConfig.getName(), serviceMetaConfig.getVersion()))
.address(serviceMetaConfig.getAddress())
.port(serviceMetaConfig.getPort())
.payload(serviceMetaConfig)
.uriSpec(new UriSpec("{scheme}://{address}:{port}"))
.build();
serviceDiscovery.unregisterService(serviceInstance);
}
@Override
public ServiceMetaConfig discovery(String serviceName) throws Exception {
//先读缓存
ServiceProvider<ServiceMetaConfig> serviceProvider = serviceProviderCache.queryCache(serviceName);
//缓存miss,需要调serviceDiscovery
if (serviceProvider == null) {
synchronized (lock) {
serviceProvider = serviceDiscovery.serviceProviderBuilder()
.serviceName(serviceName)
.providerStrategy(new RoundRobinStrategy<>())
.build();
serviceProvider.start();
//更新缓存
serviceProviderCache.updateCache(serviceName, serviceProvider);
}
}
ServiceInstance<ServiceMetaConfig> serviceInstance = serviceProvider.getInstance();
return serviceInstance != null ? serviceInstance.getPayload() : null;
}
}
核心领域模型和本地缓存:
/**
* 服务元数据配置领域模型
*/
@Data
public class ServiceMetaConfig {
/**
* 服务名
*/
private String name;
/**
* 服务版本
*/
private String version;
/**
* 服务地址
*/
private String address;
/**
* 服务端口
*/
private Integer port;
}
/**
*
* @author summer
* @version $Id: ServiceProviderCache.java, v 0.1 2022年01月16日 11:41 AM summer Exp $
*/
public interface ServiceProviderCache {
/**
* 查询缓存
* @param serviceName
* @return
*/
ServiceProvider<ServiceMetaConfig> queryCache(String serviceName);
/**
* 更新缓存
*
* @param serviceName 服务名
* @param serviceProvider 服务provider
* @return
*/
void updateCache(String serviceName, ServiceProvider<ServiceMetaConfig> serviceProvider);
}
/**
* 本地缓存实现
*
* @author summer
* @version $Id: ServiceProviderLocalCache.java, v 0.1 2022年01月16日 11:43 AM summer Exp $
*/
public class ServiceProviderLocalCache implements ServiceProviderCache {
/**
* 本地缓存map
*/
private Map<String, ServiceProvider<ServiceMetaConfig>> serviceProviderMap = new ConcurrentHashMap<>();
@Override
public ServiceProvider<ServiceMetaConfig> queryCache(String serviceName) {
return serviceProviderMap.get(serviceName);
}
@Override
public void updateCache(String serviceName, ServiceProvider<ServiceMetaConfig> serviceProvider) {
serviceProviderMap.put(serviceName, serviceProvider);
}
}
服务提供方
我前面提到过,在实际使用的时候会通过注解的方式来发布服务。那么,我们需要在bean初始化后去扫描带SimpleRpcProvider注解的bean,将服务注册到注册中心。另外,我们还需要在初始化后启动netty服务端。因此,我定义服务提供方bean实现SimpleRpcProviderBean,继承InitializingBean、BeanPostProcessor:
- 在postProcessAfterInitialization方法中判断bean是否带SimpleRpcProvider注解,如果是则解析服务信息,注册到注册中心。
- 在afterPropertiesSet方法中启动netty服务端。
- 接收服务调用请求,通过动态代理执行实际调用
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.summer.simplerpc.annotation.SimpleRpcProvider;
import com.summer.simplerpc.io.RPCDecoder;
import com.summer.simplerpc.io.RPCEncoder;
import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.registry.model.ServiceMetaConfig;
import com.summer.simplerpc.util.ServiceUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import java.util.Map;
import java.util.concurrent.*;
/**
* rpc provider功能实现。
*
* 负责扫描服务provider注解bean,注册服务到注册中心,启动netty监听。
* 提供RPC请求实际处理。
*/
@Slf4j
public class SimpleRpcProviderBean implements InitializingBean, BeanPostProcessor {
/**
* 地址
*/
private String address;
/**
* 服务注册中心
*/
private ServiceRegistry serviceRegistry;
/**
* 服务提供bean的缓存map
*/
private Map<String, Object> providerBeanMap = new ConcurrentHashMap<>(64);
/**
* 处理实际rpc请求的线程池
*/
private static ThreadPoolExecutor rpcThreadPoolExecutor;
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("simplerpc-provider-pool-%d").build();
/**
* netty相关
*/
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
/**
* 构造函数
*
* @param address 地址
* @param serviceRegistry 服务注册中心
*/
public SimpleRpcProviderBean(String address, ServiceRegistry serviceRegistry) {
this.address = address;
this.serviceRegistry = serviceRegistry;
}
@Override
public void afterPropertiesSet() throws Exception {
//启动netty服务监听
new Thread(() -> {
try {
startNettyServer();
} catch (InterruptedException e) {
log.error("startNettyServer exception,", e);
}
}).start();
}
/**
* 提交rpc处理任务
*
* @param task 任务
*/
public static void submit(Runnable task) {
if (rpcThreadPoolExecutor == null) {
synchronized (SimpleRpcProviderBean.class) {
if (rpcThreadPoolExecutor == null) {
rpcThreadPoolExecutor = new ThreadPoolExecutor(100, 100,
600L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000),
threadFactory);
}
}
}
rpcThreadPoolExecutor.submit(task);
}
/**
* 启动netty服务监听
*
* @throws InterruptedException
*/
private void startNettyServer() throws InterruptedException {
if (workerGroup != null && bossGroup != null) {
return;
}
log.info("startNettyServer begin");
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(65535,0,4,0,0))
.addLast(new RPCDecoder())
.addLast(new RPCEncoder())
.addLast(new SimpleRpcProviderNettyHandler(providerBeanMap))
;
}
})
.option(ChannelOption.SO_BACKLOG, 512)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String[] array = address.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
//启动服务
ChannelFuture future = serverBootstrap.bind(host, port).sync();
log.info(String.format("startNettyServer,host=%s,port=%s", host, port));
future.channel().closeFuture().sync();
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//获取bean上的注解
SimpleRpcProvider simpleRpcProvider = bean.getClass().getAnnotation(SimpleRpcProvider.class);
if (simpleRpcProvider == null) {
//无注解直接return原始的bean
return bean;
}
//缓存保存
String serviceName = simpleRpcProvider.serviceInterface().getName();
String version = simpleRpcProvider.serviceVersion();
providerBeanMap.put(ServiceUtils.buildServiceKey(serviceName, version), bean);
log.info("postProcessAfterInitialization find a simpleRpcProvider[" + serviceName + "," + version + "]");
//将服务注册到注册中心
String[] addressArray = address.split(ServiceUtils.SPLIT_CHAR);
String host = addressArray[0];
String port = addressArray[1];
ServiceMetaConfig serviceMetaConfig = new ServiceMetaConfig();
serviceMetaConfig.setAddress(host);
serviceMetaConfig.setName(serviceName);
serviceMetaConfig.setVersion(version);
serviceMetaConfig.setPort(Integer.parseInt(port));
try {
serviceRegistry.register(serviceMetaConfig);
log.info("register service success,serviceMetaConfig=" + serviceMetaConfig.toString());
} catch (Exception e) {
log.error("register service fail,serviceMetaConfig=" + serviceMetaConfig.toString(), e);
}
return bean;
}
}
netty ChannelPipeline设计:
- LengthFieldBasedFrameDecoder:解码器,解决自定义长度TCP粘包问题
- RPCDecoder:解码器,解析出RPC请求参数对象
- SimpleRpcProviderNettyHandler:实际的RPC请求处理逻辑,接收请求参数,返回RPC响应结果
- RPCEncoder:编码器,将RPC响应结果编码序列化,返回
RPC核心逻辑处理handler-SimpleRpcProviderNettyHandler
import com.summer.simplerpc.model.SimpleRpcRequest;
import com.summer.simplerpc.model.SimpleRpcResponse;
import com.summer.simplerpc.util.ServiceUtils;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cglib.reflect.FastClass;
import java.util.Map;
@Slf4j
public class SimpleRpcProviderNettyHandler extends SimpleChannelInboundHandler<SimpleRpcRequest> {
/**
* 提供rpc服务的实例缓存map
*/
private Map<String, Object> handlerMap;
/**
* 构造函数
*
* @param handlerMap
*/
public SimpleRpcProviderNettyHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, SimpleRpcRequest simpleRpcRequest) throws Exception {
SimpleRpcProviderBean.submit(() -> {
log.debug("Receive rpc request {}", simpleRpcRequest.getBizNO());
SimpleRpcResponse simpleRpcResponse = new SimpleRpcResponse();
simpleRpcResponse.setBizNO(simpleRpcRequest.getBizNO());
try {
Object result = doHandle(simpleRpcRequest);
simpleRpcResponse.setData(result);
} catch (Throwable throwable) {
simpleRpcResponse.setMsg(throwable.toString());
log.error("handle rpc request error", throwable);
}
channelHandlerContext.writeAndFlush(simpleRpcResponse).addListener(
(ChannelFutureListener) channelFuture ->
log.info("return response for request " + simpleRpcRequest.getBizNO() + ",simpleRpcResponse=" + simpleRpcResponse));
});
}
/**
* 通过反射,执行实际的rpc请求
* @param simpleRpcRequest
* @return
*/
private Object doHandle(SimpleRpcRequest simpleRpcRequest) throws Exception {
String key = ServiceUtils.buildServiceKey(simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion());
if (handlerMap == null || handlerMap.get(key) == null) {
log.error("doHandle,the provider {0} not exist,", simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion());
throw new RuntimeException("the provider not exist");
}
log.info("doHandle,simpleRpcRequest=" + simpleRpcRequest.toString());
Object provider = handlerMap.get(key);
//通过动态代理执行实际的调用
FastClass fastClass = FastClass.create(provider.getClass());
return fastClass.invoke(fastClass.getIndex(simpleRpcRequest.getMethodName(), simpleRpcRequest.getParamTypes()),
provider, simpleRpcRequest.getParamValues());
}
}
前面我提到过,我实现的是一个框架,需要很方便被集成和使用,因此会实现为一个springboot的starter:
import com.summer.simplerpc.model.RpcCommonProperty;
import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.registry.cache.ServiceProviderCache;
import com.summer.simplerpc.registry.cache.ServiceProviderLocalCache;
import com.summer.simplerpc.registry.zk.ZkServiceRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class SimplerRpcProviderAutoConfiguration {
@Bean
public SimpleRpcProviderBean initRpcProvider() throws Exception {
RpcCommonProperty rpcCommonProperty = new RpcCommonProperty();
rpcCommonProperty.setServiceAddress("127.0.0.1:50001");
rpcCommonProperty.setRegistryAddress("127.0.0.1:2181");
log.info("===================SimplerRpcProviderAutoConfiguration init,rpcCommonProperty=" + rpcCommonProperty.toString());
ServiceProviderCache serviceProviderCache = new ServiceProviderLocalCache();
ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(rpcCommonProperty.getRegistryAddress(), serviceProviderCache);
return new SimpleRpcProviderBean(rpcCommonProperty.getServiceAddress(), zkServiceRegistry);
}
}
IO
IO主要是序列化和反序列化,常见的序列化工具有很多,这里采用Hessian,对于不同序列化工具的详细比对这里不做赘述,后续单独开章节讲述。
服务端和消费端分别会实现编码器和解码器,加入到netty的ChannelPipeline中,具体见服务端和消费端讲解。
服务消费方
使用此框架进行服务消费,同样是通过注解,将注解打在一个bean上,那么则完成了对一个服务的引用。可以像直接使用本地bean一样发起RPC调用。其他操作都由RPC框架来实现:
- 扫描所有带SimpleRpcConsumer注解的bean
- 重定义BeanDefinition,使用代理类重新注入spring容器
- 发起RPC服务调用,从本地缓存或注册中心拿到远端服务详情,发起网络调用
- 获取服务返回结果
SimpleRpcConsumer注解
import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* RPC consumer注解
*/
@Retention(RetentionPolicy.RUNTIME)
//注解打在属性上
@Target(ElementType.FIELD)
@Component
public @interface SimpleRpcConsumer {
/**
* 服务版本号
* @return
*/
String serviceVersion() default "1.0.0";
/**
* 注册中心类型-默认zk
* @return
*/
String registerType() default "zookeeper";
/**
* 注册中心地址
* @return
*/
String registerAddress() default "127.0.0.1:2181";
}
生成代理类的FactoryBean:
import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.registry.cache.ServiceProviderCache;
import com.summer.simplerpc.registry.cache.ServiceProviderLocalCache;
import com.summer.simplerpc.registry.zk.ZkServiceRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.FactoryBean;
import java.lang.reflect.Proxy;
/**
* 生成rpc consumer代理bean的FactoryBean
*/
@Slf4j
public class SimpleRpcConsumerFactoryBean implements FactoryBean {
/**
* 调用的服务接口类
*/
private Class<?> interfaceClass;
/**
* 服务版本号
*/
private String serviceVersion;
/**
* 注册中心类型
*/
private String registryType;
/**
* 注册中心地址
*/
private String registryAddress;
/**
* 实际的bean
*/
private Object object;
/**
* init方法,通过动态代理生成bean
*
* @throws Exception
*/
public void init() throws Exception {
ServiceProviderCache serviceProviderCache = new ServiceProviderLocalCache();
ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(registryAddress, serviceProviderCache);
//动态代理
this.object = Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[] {interfaceClass},
new SimpleRpcInvokeHandler<>(this.serviceVersion, zkServiceRegistry));
log.info("SimpleRpcConsumerFactoryBean getObject {}", interfaceClass.getName());
}
/**
* 返回创建的bean实例
*
* @return
* @throws Exception
*/
@Override
public Object getObject() throws Exception {
return this.object;
}
/**
* 创建的bean实例的类型
*
* @return
*/
@Override
public Class<?> getObjectType() {
return interfaceClass;
}
/**
* 创建的bean实例的作用域
*
* @return
*/
@Override
public boolean isSingleton() {
return true;
}
public void setInterfaceClass(Class<?> interfaceClass) {
this.interfaceClass = interfaceClass;
}
public void setServiceVersion(String serviceVersion) {
this.serviceVersion = serviceVersion;
}
public void setRegistryType(String registryType) {
this.registryType = registryType;
}
public void setRegistryAddress(String registryAddress) {
this.registryAddress = registryAddress;
}
}
SimpleRpcInvokeHandler-执行实际网络调用的Handler:
import com.summer.simplerpc.model.SimpleRpcRequest;
import com.summer.simplerpc.model.SimpleRpcResponse;
import com.summer.simplerpc.registry.ServiceRegistry;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.UUID;
/**
* RPC调用动态代理handler实现
*/
@Slf4j
public class SimpleRpcInvokeHandler<T> implements InvocationHandler {
/**
* 服务版本号
*/
private String serviceVersion;
/**
* 注册中心
*/
private ServiceRegistry serviceRegistry;
/**
* 默认构造函数
*/
public SimpleRpcInvokeHandler() {
}
public SimpleRpcInvokeHandler(String serviceVersion, ServiceRegistry serviceRegistry) {
this.serviceVersion = serviceVersion;
this.serviceRegistry = serviceRegistry;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
SimpleRpcRequest simpleRpcRequest = new SimpleRpcRequest();
simpleRpcRequest.setBizNO(UUID.randomUUID().toString());
simpleRpcRequest.setClassName(method.getDeclaringClass().getName());
simpleRpcRequest.setServiceVersion(this.serviceVersion);
simpleRpcRequest.setMethodName(method.getName());
simpleRpcRequest.setParamTypes(method.getParameterTypes());
simpleRpcRequest.setParamValues(args);
log.info("begin simpleRpcRequest=" + simpleRpcRequest.toString());
SimpleRpcConsumerNettyHandler simpleRpcConsumerNettyHandler = new SimpleRpcConsumerNettyHandler(this.serviceRegistry);
SimpleRpcResponse simpleRpcResponse = simpleRpcConsumerNettyHandler.sendRpcRequest(simpleRpcRequest);
log.info("result simpleRpcResponse=" + simpleRpcResponse);
return simpleRpcResponse.getData();
}
}
由SimpleRpcConsumerNettyHandler发起netty网络调用,客户端的netty ChannelPipeline比服务端简单:
核心在于SimpleRpcConsumerNettyHandler:
import com.summer.simplerpc.io.RPCDecoder;
import com.summer.simplerpc.io.RPCEncoder;
import com.summer.simplerpc.model.SimpleRpcRequest;
import com.summer.simplerpc.model.SimpleRpcResponse;
import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.registry.model.ServiceMetaConfig;
import com.summer.simplerpc.util.ServiceUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
/**
* consumer netty handler
*/
@Slf4j
public class SimpleRpcConsumerNettyHandler extends SimpleChannelInboundHandler<SimpleRpcResponse> {
/**
* 注册中心
*/
private ServiceRegistry serviceRegistry;
/**
* netty EventLoopGroup
*/
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);
/**
* netty channel
*/
private Channel channel;
/**
* rpc response
*/
private SimpleRpcResponse rpcResponse;
/**
* lock
*/
private final Object lock = new Object();
/**
* 构造函数
*
* @param serviceRegistry
*/
public SimpleRpcConsumerNettyHandler(ServiceRegistry serviceRegistry) {
this.serviceRegistry = serviceRegistry;
}
/**
* 发起RPC网络调用请求
*
* @param simpleRpcRequest 请求参数
* @return
*/
public SimpleRpcResponse sendRpcRequest(SimpleRpcRequest simpleRpcRequest) {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new RPCEncoder())
.addLast(new RPCDecoder())
//通过.class获取此类型的实例(https://www.cnblogs.com/penglee/p/3993033.html)
.addLast(SimpleRpcConsumerNettyHandler.this);
}
});
String key = ServiceUtils.buildServiceKey(simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion());
ServiceMetaConfig serviceMetaConfig = this.serviceRegistry.discovery(key);
if (serviceMetaConfig == null) {
log.error("sendRpcRequest fail,serviceMetaConfig not found");
throw new Exception("serviceMetaConfig not found in registry");
}
log.info("sendRpcRequest begin,serviceMetaConfig=" + serviceMetaConfig.toString() + ",key=" + key);
final ChannelFuture channelFuture = bootstrap.connect(serviceMetaConfig.getAddress(), serviceMetaConfig.getPort())
.sync();
channelFuture.addListener((ChannelFutureListener)args0 -> {
if (channelFuture.isSuccess()) {
log.info("rpc invoke success,");
} else {
log.info("rpc invoke fail," + channelFuture.cause().getStackTrace());
eventLoopGroup.shutdownGracefully();
}
});
this.channel = channelFuture.channel();
this.channel.writeAndFlush(simpleRpcRequest).sync();
synchronized (this.lock) {
log.info("sendRpcRequest lock.wait");
this.lock.wait();
}
log.info("get rpc response=" + rpcResponse.toString());
return this.rpcResponse;
} catch (Exception e) {
log.error("sendRpcRequest exception,", e);
return null;
} finally {
//关闭相关连接
if (this.channel != null) {
this.channel.close();
}
if (this.eventLoopGroup != null) {
this.eventLoopGroup.shutdownGracefully();
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, SimpleRpcResponse simpleRpcResponse) throws Exception {
this.rpcResponse = simpleRpcResponse;
log.info("rpc consumer netty handler,channelRead0,rpcResponse=" + rpcResponse);
//收到远程网络的rpc response,通知调用端
synchronized (lock) {
log.info("channelRead0 simpleRpcResponse lock.notifyAll");
lock.notifyAll();
}
}
}
starter定义:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rpc consumer starter
*/
@Configuration
@Slf4j
public class SimplerConsumerAutoConfiguration {
@Bean
public static BeanFactoryPostProcessor initRpcConsumer() throws Exception {
return new SimpleRpcConsumerPostProcessor();
}
}
05
RPC框架的集成和使用
上述RPC框架代码,通过springboot打包安装到本地mvn仓库,然后新建一个springboot工程来集成和测试。
mvn依赖:
<dependency>
<groupId>com.summer</groupId>
<artifactId>simplerpc-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
用来测试的服务很简单,参数是一个String,然后服务端会构造返回值:入参拼接上一个随机UUUID字符串。
服务定义和实现:
/**
* 服务接口定义
*/
public interface HelloworldService {
/**
* 示例方法
* @param param
* @return
*/
String buildHelloworld(String param);
}
服务实现:
import com.summer.simplerpc.annotation.SimpleRpcProvider;
import com.summer.simplerpctest.consumer.HelloworldService;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
/**
* HelloworldService接口实现
*/
@SimpleRpcProvider(serviceInterface=HelloworldService.class)
@Slf4j
public class HelloworldServiceImpl implements HelloworldService {
@Override
public String buildHelloworld(String param) {
log.info("HelloworldServiceImpl begin");
return param + "_" + UUID.randomUUID().toString();
}
}
我们定义一个bean,在其中发起对RPC服务的调用:
import com.summer.simplerpc.annotation.SimpleRpcConsumer;
import com.summer.simplerpctest.consumer.HelloworldService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 发起对HelloWorldService调用示例
*/
@Slf4j
@Component
public class ConsumerSample {
@SimpleRpcConsumer
@Resource
private HelloworldService helloworldService;
public String invokeHelloworldService() {
String result = helloworldService.buildHelloworld("qwert");
return result;
}
}
然后我们开一个Controller,启动springboot工程,这样我们在浏览器中直接发起测试即可:
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* 测试controller
*
* url:http://127.0.0.1:8004/helloworld/do
*/
@RestController
@RequestMapping("/helloworld")
public class TestController {
@Resource
private ConsumerSample consumerSample;
@GetMapping(value = "/do")
public String say(){
String helloServiceRes = consumerSample.invokeHelloworldService();
return helloServiceRes;
}
}
只需要在浏览器中输入如下url,则可以发起对rpc服务的调用:
http://127.0.0.1:8004/helloworld/do
注意一些前置工作:
- 需要启动zk。
IDEA控制台日志打印:
2022-01-25 09:27:22.581 INFO 30366 --- [nio-8004-exec-1] c.s.s.consumer.SimpleRpcInvokeHandler : begin simpleRpcRequest=SimpleRpcRequest(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, className=com.summer.simplerpctest.consumer.HelloworldService, methodName=buildHelloworld, serviceVersion=1.0.0, paramTypes=[class java.lang.String], paramValues=[qwert])
2022-01-25 09:27:22.698 INFO 30366 --- [nio-8004-exec-1] c.s.s.c.SimpleRpcConsumerNettyHandler : sendRpcRequest begin,serviceMetaConfig=ServiceMetaConfig(name=com.summer.simplerpctest.consumer.HelloworldService, version=1.0.0, address=127.0.0.1, port=50001),key=com.summer.simplerpctest.consumer.HelloworldService:1.0.0
2022-01-25 09:27:22.715 INFO 30366 --- [ntLoopGroup-4-1] c.s.s.c.SimpleRpcConsumerNettyHandler : rpc invoke success,
2022-01-25 09:27:22.759 INFO 30366 --- [nio-8004-exec-1] c.s.s.c.SimpleRpcConsumerNettyHandler : sendRpcRequest lock.wait
2022-01-25 09:27:22.771 INFO 30366 --- [provider-pool-0] c.s.s.p.SimpleRpcProviderNettyHandler : doHandle,simpleRpcRequest=SimpleRpcRequest(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, className=com.summer.simplerpctest.consumer.HelloworldService, methodName=buildHelloworld, serviceVersion=1.0.0, paramTypes=[class java.lang.String], paramValues=[qwert])
2022-01-25 09:27:22.772 INFO 30366 --- [provider-pool-0] c.s.s.provider.HelloworldServiceImpl : HelloworldServiceImpl begin
2022-01-25 09:27:22.774 INFO 30366 --- [ntLoopGroup-3-1] c.s.s.p.SimpleRpcProviderNettyHandler : return response for request 46154373-2cf7-4731-b4c0-208d6ca28b87,simpleRpcResponse=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, data=qwert_3ca010cc-cb14-49d0-b9d4-23168e7786e4)
2022-01-25 09:27:22.774 INFO 30366 --- [ntLoopGroup-4-1] c.s.s.c.SimpleRpcConsumerNettyHandler : rpc consumer netty handler,channelRead0,rpcResponse=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, data=qwert_3ca010cc-cb14-49d0-b9d4-23168e7786e4)
2022-01-25 09:27:22.775 INFO 30366 --- [ntLoopGroup-4-1] c.s.s.c.SimpleRpcConsumerNettyHandler : channelRead0 simpleRpcResponse lock.notifyAll
2022-01-25 09:27:22.775 INFO 30366 --- [nio-8004-exec-1] c.s.s.c.SimpleRpcConsumerNettyHandler : get rpc response=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, data=qwert_3ca010cc-cb14-49d0-b9d4-23168e7786e4)
2022-01-25 09:27:22.776 INFO 30366 --- [nio-8004-exec-1] c.s.s.consumer.SimpleRpcInvokeHandler : result simpleRpcResponse=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, data=qwert_3ca010cc-cb14-49d0-b9d4-23168e7786e4)
这篇文章花了我挺长时间写出来的,看看这些详细的代码实现,你们应该能感受到我想把你教会的诚意了吧~~~
如果觉得有用的话,点赞+转发+收藏,一键三连防止走丢哇~
相关推荐
- 俄罗斯的 HTTPS 也要被废了?(俄罗斯网站关闭)
-
发布该推文的ScottHelme是一名黑客,SecurityHeaders和ReportUri的创始人、Pluralsight作者、BBC常驻黑客。他表示,CAs现在似乎正在停止为俄罗斯域名颁发...
- 如何强制所有流量使用 HTTPS一网上用户
-
如何强制所有流量使用HTTPS一网上用户使用.htaccess强制流量到https的最常见方法可能是使用.htaccess重定向请求。.htaccess是一个简单的文本文件,简称为“.h...
- https和http的区别(https和http有何区别)
-
“HTTPS和HTTP都是数据传输的应用层协议,区别在于HTTPS比HTTP安全”。区别在哪里,我们接着往下看:...
- 快码住!带你十分钟搞懂HTTP与HTTPS协议及请求的区别
-
什么是协议?网络协议是计算机之间为了实现网络通信从而达成的一种“约定”或“规则”,正是因为这个“规则”的存在,不同厂商的生产设备、及不同操作系统组成的计算机之间,才可以实现通信。简单来说,计算机与网络...
- 简述HTTPS工作原理(简述https原理,以及与http的区别)
-
https是在http协议的基础上加了一层SSL(由网景公司开发),加密由ssl实现,它的目的是为用户提供对网站服务器的身份认证(需要CA),以至于保护交换数据的隐私和完整性,原理如图示。1、客户端发...
- 21、HTTPS 有几次握手和挥手?HTTPS 的原理什么是(高薪 常问)
-
HTTPS是3次握手和4次挥手,和HTTP是一样的。HTTPS的原理...
- 一次安全可靠的通信——HTTPS原理
-
为什么HTTPS协议就比HTTP安全呢?一次安全可靠的通信应该包含什么东西呢,这篇文章我会尝试讲清楚这些细节。Alice与Bob的通信...
- 为什么有的网站没有使用https(为什么有的网站点不开)
-
有的网站没有使用HTTPS的原因可能涉及多个方面,以下是.com、.top域名的一些见解:服务器性能限制:HTTPS使用公钥加密和私钥解密技术,这要求服务器具备足够的计算能力来处理加解密操作。如果服务...
- HTTPS是什么?加密原理和证书。SSL/TLS握手过程
-
秘钥的产生过程非对称加密...
- 图解HTTPS「转」(图解http 完整版 彩色版 pdf)
-
我们都知道HTTPS能够加密信息,以免敏感信息被第三方获取。所以很多银行网站或电子邮箱等等安全级别较高的服务都会采用HTTPS协议。...
- HTTP 和 HTTPS 有何不同?一文带你全面了解
-
随着互联网时代的高速发展,Web服务器和客户端之间的安全通信需求也越来越高。HTTP和HTTPS是两种广泛使用的Web通信协议。本文将介绍HTTP和HTTPS的区别,并探讨为什么HTTPS已成为We...
- HTTP与HTTPS的区别,详细介绍(http与https有什么区别)
-
HTTP与HTTPS介绍超文本传输协议HTTP协议被用于在Web浏览器和网站服务器之间传递信息,HTTP协议以明文方式发送内容,不提供任何方式的数据加密,如果攻击者截取了Web浏览器和网站服务器之间的...
- 一文让你轻松掌握 HTTPS(https详解)
-
一文让你轻松掌握HTTPS原文作者:UC国际研发泽原写在最前:欢迎你来到“UC国际技术”公众号,我们将为大家提供与客户端、服务端、算法、测试、数据、前端等相关的高质量技术文章,不限于原创与翻译。...
- 如何在Spring Boot应用程序上启用HTTPS?
-
HTTPS是HTTP的安全版本,旨在提供传输层安全性(TLS)[安全套接字层(SSL)的后继产品],这是地址栏中的挂锁图标,用于在Web服务器和浏览器之间建立加密连接。HTTPS加密每个数据包以安全方...
- 一文彻底搞明白Http以及Https(http0)
-
早期以信息发布为主的Web1.0时代,HTTP已可以满足绝大部分需要。证书费用、服务器的计算资源都比较昂贵,作为HTTP安全扩展的HTTPS,通常只应用在登录、交易等少数环境中。但随着越来越多的重要...
你 发表评论:
欢迎- 一周热门
-
-
Linux:Ubuntu22.04上安装python3.11,简单易上手
-
宝马阿布达比分公司推出独特M4升级套件,整套升级约在20万
-
MATLAB中图片保存的五种方法(一)(matlab中保存图片命令)
-
别再傻傻搞不清楚Workstation Player和Workstation Pro的区别了
-
Linux上使用tinyproxy快速搭建HTTP/HTTPS代理器
-
如何提取、修改、强刷A卡bios a卡刷bios工具
-
Element Plus 的 Dialog 组件实现点击遮罩层不关闭对话框
-
日本组合“岚”将于2020年12月31日停止团体活动
-
SpringCloud OpenFeign 使用 okhttp 发送 HTTP 请求与 HTTP/2 探索
-
tinymce 号称富文本编辑器世界第一,大家同意么?
-
- 最近发表
- 标签列表
-
- dialog.js (57)
- importnew (44)
- windows93网页版 (44)
- yii2框架的优缺点 (45)
- tinyeditor (45)
- qt5.5 (60)
- windowsserver2016镜像下载 (52)
- okhttputils (51)
- android-gif-drawable (53)
- 时间轴插件 (56)
- docker systemd (65)
- slider.js (47)
- android webview缓存 (46)
- pagination.js (59)
- loadjs (62)
- openssl1.0.2 (48)
- velocity模板引擎 (48)
- pcre library (47)
- zabbix微信报警脚本 (63)
- jnetpcap (49)
- pdfrenderer (43)
- fastutil (48)
- uinavigationcontroller (53)
- bitbucket.org (44)
- python websocket-client (47)