在上一课时,我们讲解了 RPC 的相关概念和常见的 RPC 框架。其中, Go RPC 是指 Go 语言原生支持的 RPC 框架,它虽然简单但却十分经典,非常适合作为你后续深入了解 RPC 框架时的研究对象。
在本课时,我们将先通过一个字符串服务为案例简单讲解 Go RPC 是如何进行通信的,然后再具体剖析 Go RPC 的底层原理和实现,对以服务端注册服务、接收并处理客户端请求和客户端发起请求等步骤分别进行详细介绍,相信你学习后,一定会对 Go RPC 有更加全面的了解和认识。
Go 语言 RPC 过程调用实践
Go 语言原生的 RPC 过程调用实现起来非常简单。服务端只需实现对外提供的远程过程方法和结构体,然后将其注册到 RPC 服务中,客户端就可以通过其服务名称和方法名称进行 RPC 方法调用。
本课时我们就使用字符串操作的服务来展示如何使用 Go 语言原生的 RPC 来进行过程调用。
第一步,定义远程过程调用相关接口传入参数和返回参数的数据结构。如下代码所示,调用字符串操作的请求包括两个参数:字符串 A 和字符串 B。
1
2
3
4
|
type StringRequest struct {
A string
B string
}
|
第二步,定义一个服务对象。这个服务对象可以很简单,比如类型是 int 或者是 interface{},重要的是它输出的方法。这里我们定义一个字符串服务类型的 interface,其名称为 Service,它有一个字符串拼接函数 Concat;然后定义一个名为 StringService 的结构体,实现 Service 接口,并给出 Concat 具体实现。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
type Service interface {
// Concat a and b
Concat(req StringRequest, ret *string) error
}
type StringService struct {
}
func (s StringService) Concat(req StringRequest, ret *string) error {
// test for length overflow
if len(req.A)+len(req.B) > StrMaxSize {
*ret = ""
return ErrMaxSize
}
*ret = req.A + req.B
return nil
}
|
第三步,实现 RPC 服务器。这里我们生成了一个 StringSevice 结构体,并使用 rpc.Register 注册这个服务,然后通过 net.Listen 监听对应 socket 并对外提供服务。客户端可以访问服务 StringService 以及它的方法 Concat,代码如下:
1
2
3
4
5
6
7
8
9
10
|
func main() {
stringService := new(service.StringService)
rpc.Register(stringService)
rpc.HandleHTTP()
l, e := net.Listen("tcp", "127.0.0.1:1234")
if e != nil {
log.Fatal("listen error:", e)
}
http.Serve(l, nil)
}
|
第四步,建立 HTTP 客户端,然后通过 Call 方法调用远程 StringService 的对应方法,比如使用同步的方式,代码如下所示。这时客户端就可以进行远程调用了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func main() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
if err != nil {
log.Fatal("dialing:", err)
}
stringReq := &service.StringRequest{"A", "B"}
var reply string
err = client.Call("StringService.Concat", stringReq, &reply)
if err != nil {
log.Fatal("Concat error:", err)
}
// 异步的调用方式
call := client.Call("StringService.Concat", stringReq, &reply)
_ := <-call.Done
}
|
通过上述代码的编写就可以实现两个 Go 服务之间的 RPC 调用了。那 Go RPC 又是如何实现的呢?
Go RPC 原理解析
接下来我们将对 Go 语言的 RPC 原生实现进行源码分析,细致讲解其具体实现和原理。首先我们会对 RPC 的服务(Server)端代码进行分析,包括注册服务、反射处理和存根保存,然后讲解服务端处理 RPC 请求的流程,最后讲解客户(Client)端的 RPC 请求处理。
1. Go RPC 服务端原理
服务端的 RPC 代码主要分为两个部分:①服务方法注册,包括调用注册接口,通过反射处理将方法取出,并存到 map 中;②处理网络调用,主要是监听端口、读取数据包、解码请求和调用反射处理后的方法,将返回值编码,返回给客户端。
在上面的示例代码中,我们使用 rpc.Register 对 StringService 进行了注册,Register 是进行 RPC 服务注册的入口方法,其参数 interface{} 类型的 rcvr 就是要注册的 RPC 服务类型,该注册过程的流程如下图所示。
服务端注册 RPC 服务示意图
Register 方法中通过反射获取接口类型和值,并通过 suitableMethods 函数判断注册的 RPC 是否符合规范,最后调用 serviceMap 的 LoadOrStore(sname, s) 方法将对应 RPC 存根存放于 map 中,供之后查找。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
// 如果服务为空,默认注册一个
if server.serviceMap == nil {
server.serviceMap = make(map[string]*service)
}
// 获取注册服务的反射信息
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
// 可以使用自定义名称
sname := reflect.Indirect(s.rcvr).Type().Name()
if useName {
sname = name
}
// 方法必须是暴露的,既服务名首字符大写;不允许重复注册。代码有省略
if !isExported(sname) && !useName {
}
if _, present := server.serviceMap[sname]; present {
}
s.name = sname
// 开始注册 rpc struct 内部的方法存根
s.method = suitableMethods(s.typ, true)
if len(s.method) == 0 {
// 如果struct内部一个方法也没,那么直接报错,打印详细的错误信息
}
// 保存在server的serviceMap中
server.serviceMap[s.name] = s
return nil
}
|
接下来,我们来看一下服务端处理 RPC 请求的实现。如下图就展示了服务端 RPC 程序处理请求的过程,它会一直循环处理接收到的客户端 RPC 请求,将其交由 ReadRequestHandler 处理,然后从之前 Register 方法保存的 map 中获取到要调用的对应方法;接着从请求中解码出对应的参数,使用反射调用其方法,获取到结果后将结果编码成响应消息返回给客户端。
RPC 服务端处理流程示意图
下面,我们来看一下服务端接收并处理 RPC 请求的具体代码实现。
(1)接收请求
Server 的 Accept 函数会无限循环地调用 net.Listener 的 Accept 函数来获取客户端建立连接的请求,获取到连接请求后,再使用协程来处理请求。代码如下:
1
2
3
4
5
6
7
8
9
10
|
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Fatal("rpc.Serve: accept:", err.Error())
}
// accept连接以后,打开一个goroutine处理请求
go server.ServeConn(conn)
}
}
|
(2)读取并解析请求
ServeConn 函数会从建立的连接中读取数据,然后创建一个 gobServerCodec,并将其交由 Server 的 ServeCodec 函数处理,如下所示:
1
2
3
4
5
6
7
8
9
10
11
|
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
// 根据指定的codec进行协议解析
server.ServeCodec(srv)
}
|
ServeCodec 函数会循环地调用 readRequest 函数,读取网络连接上的字节流,解析出请求,然后开启协程执行 Server 的 call 函数,处理对应的 RPC 调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
for {
// 解析请求
service, mtype, req, argv, replyv, keepReading, err := server.readRequest (codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
// 如果当前请求错误了,我们应该返回信息,然后继续处理
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
// 因为需要继续处理后续请求,所以开一个gorutine处理rpc方法
go service.call(server, sending, mtype, req, argv, replyv, codec)
}
// 如果连接关闭了需要释放资源
codec.Close()
}
|
(3)执行远程方法并返回响应
Server 的 call 函数就是通过 Func.Call 反射调用对应 RPC 过程的方法,它还会调用 send Response 将返回值发送给 RPC 客户端,代码如下:
1
2
3
4
5
6
7
8
9
10
|
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
function := mtype.method.Func
// 这里是真正调用rpc方法的地方
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
errInter := returnValues[0].Interface()
errmsg := ""
// 处理返回请求了
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}
|
2. 客户端发送 RPC 请求原理
无论是同步调用还是异步调用,每次 RPC 请求都会生成一个 Call 对象,并使用 seq 作为 key 保存在 map 中,服务端返回响应值时再根据响应值中的 seq 从 map 中取出 Call,进行相应处理。客户端发起 RPC 调用的过程大致如下图所示。
客户端发送和接收请求流程示意图
下面我们将依次讲解同步调用和异步调用、请求参数编码和接收服务器响应三个部分的具体实现。
(1)同步调用和异步调用
本课时的案例展示了 Go 原生 RPC 的客户端支持同步和异步两种调用,下面我们来介绍一下这两种调用的函数以及调用的数据结构。
1
2
3
4
|
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
|
Call 方法直接调用了 Go 方法,而 Go 方法则是先创建并初始化了 Call 对象,记录下此次调用的方法、参数和返回值,并生成 DoneChannel;然后调用 Client 的 send 方法进行真正的请求发送处理,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// 异步调用实现
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
// 初始化 Call
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
// 调用 Client 的 send 方法
client.send(call)
return call
}
type Call struct {
ServiceMethod string // 服务名及方法名 格式:服务.方法
Args interface{} // 函数的请求参数 (*struct).
Reply interface{} // 函数的响应参数 (*struct).
Error error // 方法完成后 error的状态.
Done chan *Call //
}
|
(2)请求参数编码
Client 的 send 函数首先会判断客户端实例的状态,如果处于关闭状态,则直接返回结果;否则会生成唯一的 seq 值,将 Call 保存到客户端的哈希表 pending 中,然后调用客户端编码器的WriteRequest 来编码请求并发送,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (client *Client) send(call *Call) {
// ....
//生成seq,每次调用均生成唯一的seq,在服务端返回结果后会通过该值进行匹配
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// 请求并发送请求
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
//发送请求错误时,将map中call对象删除.
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
}
}
|
(3)接收返回值
接下来我们来看一下客户端是如何接收并处理服务端返回值的。客户端的 input 函数接收服务端返回的响应值,它进行无限 for 循环,不断调用 codec 也就是 gobClientCodecd 的 ReadResponseHeader 函数,然后根据其返回数据中的 seq 来判断是否是本客户端发出请求的响应值。如果是,则获取对应的 Call 对象,并将其从 pending 哈希表中删除,继续调用 codec 的 ReadReponseBody 方法获取返回值 Reply 对象,并调用 Call 对象的 done 方法,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
//通过response中的 Seq获取call对象
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
case response.Error != "":
//上述两个case,一个处理call为nil,另外处理服务端返回的错误,直接将错误返回
default:
//通过编码器,将Resonse的body部分解码成reply对象.
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
}
|
上述代码中,gobClientCodecd 的 ReadResponseHeader、ReadReponseBody 方法和上文中的 WriteRequest 类似,这里不做赘述。Call 对象的 done 方法则通过 Call 的 DoneChannel,将获得返回值的结果通知到调用层,代码如下:
1
2
3
4
5
6
7
8
9
10
|
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
if debugLog {
log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
}
}
}
|
客户端接收到 RPC 请求的响应后会进行其他业务逻辑操作,RPC 框架则会对进行 RPC 请求所需要的资源进行回收,下次进行 RPC 请求时则需要再次建立相应的结构体并获取对应的资源。
小结
Go 语言原生 RPC 算是个基础版本的 RPC 框架,代码精简,可扩展性高,但是只实现了 RPC 最基本的网络通信,而超时熔断、链接管理(保活与重连)、服务注册发现等功能还是欠缺的。因此还是达不到生产环境“开箱即用”的水准,不过 GitHub 就有一个基于 RPC 的功能增强版本——rpcx,支持了大部分主流 RPC 的特性。
虽然目前官方已经宣布不再添加新功能,并推荐使用 gRPC,但是作为 Go 标准库中的 RPC 框架,还是有很多地方值得我们借鉴和学习,比如注册服务时如何保存反射信息等。本课时我们从源码角度分析了 Go 语言原生 RPC 框架,希望能给你带来对 RPC 框架的整体认知。
如果 Go 官方团队准备为 Go RPC 添加新功能,那么你最希望添加哪一项目前尚未支持的功能呢?欢迎你在留言区分享你的想法。
-– ### 精选评论 ##### **个咪的汪: > go官方的rpc文档地址:https://golang.org/pkg/net/rpc/, 异步方式应该是// 异步的调用方式reply, nil)-call.Done