RPC的原理及其简单实现

RPC的原理及其简单实现

什么是RPC

RPC,全称为Remote Procedure Call,即远程过程调用。它的目的是使调用远程服务像调用本地服务一样。

RPC可基于HTTP或TCP协议,Web Service就是基于HTTP协议的RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化。

RPC简易架构

RPC可简化为如下架构:

rpc简易架构

调用步骤为:

  1. 客户端函数调用客户端句柄,执行传送参数
  2. 调用本地系统内核发送网络消息
  3. 消息通过tcp或者自定义协议或者其他协议传送到服务端
  4. 服务端句柄获取消息及参数
  5. 执行远程过程
  6. 将远程过程的结果返回给服务器句柄
  7. 服务器句柄返回结果,调用远程系统内核发送网络消息
  8. 本地主机获取返回消息
  9. 客户端句柄通过内核接受消息
  10. 客户端接收句柄返回的数据

RPC可按照角色分为3类:

  • 服务提供者:由框架提供,运行在服务端,发布服务、接收客户端消息、返回消息
  • 服务调用者:由框架提供,运行在客户端,生成代理对象、调用远程服务
  • 服务具体实现:由使用框架者提供,运行在服务器,服务接口、接口实现类

RPC简易实现

Server端

Server

在服务端利用Socket监听指定端口,并启用线程池来执行客户端的请求。

注册服务service.registerService(StudentService.class, StudentServiceImpl.class);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Server {
private ServerSocket serverSocket;
private int servPort;

public Server(int port) throws IOException {
this.servPort = port;
serverSocket = new ServerSocket(this.servPort);

}
public void start(){
ThreadPoolExecutor threadPool =new ThreadPoolExecutor(5, 10,
200, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
while (true) {
try {
Socket sock = serverSocket.accept();
ServerService service = new ServerService(sock);
service.registerService(StudentService.class, StudentServiceImpl.class);
threadPool.execute(service);
} catch (IOException e){
System.out.println(e.getMessage());
}
}
}
}

ServerService

ServerService实现了Runnable接口,便于提交给线程池执行

而后从Socket读取数据,转为Request对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ServerService implements Runnable {
private Socket sockClient;
private Map<String, Class<?>> serviceRegistry = new HashMap<String, Class<?>>();
private Response response = new Response();

public ServerService(Socket sock) {
super();
this.sockClient = sock;
}

public void run() {
try {
OutputStream out = sockClient.getOutputStream();
// 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
InputStream in = sockClient.getInputStream();

ObjectInputStream objIn = new ObjectInputStream(in);
ObjectOutputStream objOut = new ObjectOutputStream(out);

// 2. 获取请求数据,强转参数类型
Object param = objIn.readObject();
Request request = null;
if (!(param instanceof Request)) {
response.setMessage("参数错误");
objOut.writeObject(response);
objOut.flush();
return;
} else {
request = (Request) param;
}

// 3. 查找并执行服务方法
System.out.println("要执行的类型为:" + request.getClassName());
Class<?> service = serviceRegistry.get(request.getClassName());
if (service != null) {
Method method = service.getMethod(request.getMethodName(), request.getParamTypes());
Object result = method.invoke(service.newInstance(), request.getParams());
// 4. 得到结果并返回
response.setObj(result);
}
objOut.writeObject(response);
objOut.flush();
out.close();
in.close();
sockClient.close();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}

public void registerService(Class<?> iface, Class<?> Imp) {
this.serviceRegistry.put(iface.getName(), Imp);
}
}

StudentService

1
2
3
4
5
public interface StudentService {
public Student getInfo();

public void print(Student stu);
}

StudentServiceImpl

StudentServiceImpl是StudentService的实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class StudentServiceImpl implements StudentService {
@Override
public Student getInfo() {
Student stu = new Student();
stu.setID("123456");
stu.setAge(10);
stu.setName("xxxx");
return stu;
}

@Override
public void print(Student stu) {

}
}

Client端

SocketClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SocketClient {
private Socket sock;

public SocketClient() {
}

public Object invoke(Request request, String Ip, int port) throws IOException {
sock = new Socket(Ip, port);
InputStream in = sock.getInputStream();
OutputStream out = sock.getOutputStream();
ObjectOutputStream objOut;
ObjectInputStream objIn;
Response response = null;
try {
objOut = new ObjectOutputStream(out);
objOut.writeObject(request);
objOut.flush();
objIn = new ObjectInputStream(in);
Object res = objIn.readObject();
if (res instanceof Response) {
response = (Response) res;
} else {
throw new RuntimeException("返回不正确!!!");
}
} catch (Exception e) {
System.out.println("error: " + e.getMessage());
} finally {
out.close();
in.close();
sock.close();
return response.getObj();
}
}
}

SocketClientProxy

SocketClientProxy创建代理对象,拦截代理对象执行方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class SocketClientProxy {
private SocketClient sock = new SocketClient();

public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[] { clazz }, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request = new Request();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamTypes(method.getParameterTypes());
request.setParams(args);
return sock.invoke(request, "127.0.0.1", 12000);
}
});
}
}

公共

Request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Request implements Serializable {
private static final long serialVersionUID = 1L;

private String className;

private String methodName;

private Object[] params;

private Class<?>[] paramTypes;

public String getClassName() {
return className;
}

public void setClassName(String className) {
this.className = className;
}

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}

public Object[] getParams() {
return params;
}

public void setParams(Object[] params) {
this.params = params;
}

public Class<?>[] getParamTypes() {
return paramTypes;
}

public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}


}

Response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Response implements Serializable {
private static final long serialVersionUID = 1L;

private String message;

private Object obj;

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public Object getObj() {
return obj;
}

public void setObj(Object obj) {
this.obj = obj;
}
}

Student

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Student implements Serializable {
private static final long serialVersionUID = 3L;
private String ID;

public String getID() {
return ID;
}

public void setID(String ID) {
this.ID = ID;
}

private String name;
private int age;
private String school;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getSchool() {
return school;
}

public void setSchool(String school) {
this.school = school;
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Test {
public static void main(String[] arg) {
new Thread(() -> {
try {
Server server = new Server(12000);
server.start();
} catch (IOException e) {
System.out.println(e.getMessage());
}
}).start();

SocketClientProxy proxy = new SocketClientProxy();
StudentService studentService = proxy.getProxy(StudentService.class);
Student student = studentService.getInfo();
System.out.println(student.getName());
}
}

RMI实现简易RPC

定义RMI对外服务接口HelloService

HelloService

1
2
3
public interface HelloService extends Remote {
String sayHello(String someone) throws RemoteException;
}

RMI接口方法定义必须显式声明抛出RemoteException异常

HelloServiceImpl

服务端接口实现HelloServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HelloServiceImpl extends UnicastRemoteObject implements HelloService {

private static final long serialVersionUID = 1L;

protected HelloServiceImpl() throws RemoteException {
super();
}

@Override
public String sayHello(String someone) throws RemoteException {
return "Hello " + someone;
}
}

服务端方法实现必须继承UnicastRemoteObject类,该类定义了服务调用方法与服务提供方对象实例,并建立一对一连接。

ServerMain

服务端RMI服务启动代码

1
2
3
4
5
6
7
8
public class ServerMain {
public static void main(String[] args) throws Exception {
HelloService helloService = new HelloServiceImpl();
LocateRegistry.createRegistry(12000);
Naming.bind("rmi://localhost:12000/helloService", helloService);
System.out.println("Server main provide RPC service now.");
}
}

ClientMain

客户端远程调用RMI服务代码

1
2
3
4
5
6
public class ClientMain {
public static void main(String[] args) throws Exception {
HelloService helloService = (HelloService) Naming.lookup("rmi://localhost:12000/helloService");
System.out.println("RMI 服务器返回的结果是:" + helloService.sayHello("haha"));
}
}
0%