你好,我是温铭。
今天这节课,微服务 API 网关搭建就到了最后的环节了。让我们用一个最小的示例来把之前选型的组件,按照设计的蓝图,拼装运行起来吧!
Nginx 配置和初始化
我们知道,API 网关是用来处理流量入口的,所以我们首先需要在 Nginx.conf 中做简单的配置,让所有的流量都通过网关的 Lua 代码来处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
server {
listen 9080;
init_worker_by_lua_block {
apisix.http_init_worker()
}
location / {
access_by_lua_block {
apisix.http_access_phase()
}
header_filter_by_lua_block {
apisix.http_header_filter_phase()
}
body_filter_by_lua_block {
apisix.http_body_filter_phase()
}
log_by_lua_block {
apisix.http_log_phase()
}
}
}
|
这里我们使用开源 API 网关 APISIX 为例,所以上面的代码示例中带有 apisix
的关键字。在这个示例中,我们监听了 9080 端口,并通过 location /
的方式,把这个端口的所有请求都拦截下来,并依次通过 access
、rewrite
、header filter
、body filter
和 log
这几个阶段进行处理,在每个阶段中都会去调用对应的插件函数。其中, rewrite
阶段便是在 apisix.http_access_phase
函数中合并处理的。
而对于系统初始化的工作,我们放在了 init_worker
阶段来处理,这其中包含了读取各项配置参数、预制 etcd 中的目录、从 etcd 中获取插件列表、对于插件按照优先级进行排序等。我这里列出了关键部分的代码并进行讲解,当然,你可以在 GitHub 上看到更完整的初始化函数。
1
2
3
4
5
6
7
8
9
10
11
12
|
function _M.http_init_worker()
-- 分别初始化路由、服务和插件这三个最重要的部分
router.init_worker()
require("apisix.http.service").init_worker()
require("apisix.plugin").init_worker()
end
|
通过阅读这段代码,你可以发现,router
和 plugin
这两部分的初始化相对复杂一些,主要涉及到读取配置参数,并根据参数的不同做一些选择。因为这里会涉及到从 etcd 中读取数据,所以我们使用的是 ngx.timer
的方式,来绕过“不能在 init_worker
阶段使用 cosocket”的这个限制。如果你对这部分很感兴趣并且学有余力,建议一定要去读读源码,加深理解。
匹配路由
在最开始的 access
阶段里面,我们首先需要做的就是匹配路由,根据请求中携带 uri、host、args、cookie 等,来和已经设置好的路由规则进行匹配:
1
2
|
router.router_http.match(api_ctx)
|
对外暴露的,其实只有上面一行代码,这里的api_ctx
中存放的就是 uri、host、args、cookie 这些请求的信息。而具体的 match
函数的实现,就用到了我们前面提到过的 lua-resty-radixtree
。如果没有命中,就说明这个请求并没有设置与之对应的上游,就会直接返回 404。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
local router = require("resty.radixtree")
local match_opts = {}
function _M.match(api_ctx)
-- 从 ctx 中获取请求的参数,作为路由的判断条件
match_opts.method = api_ctx.var.method
match_opts.host = api_ctx.var.host
match_opts.remote_addr = api_ctx.var.remote_addr
match_opts.vars = api_ctx.var
-- 调用路由的判断函数
local ok = uri_router:dispatch(api_ctx.var.uri, match_opts, api_ctx)
-- 没有命中路由就直接返回 404
if not ok then
core.log.info("not find any matched route")
return core.response.exit(404)
end
return true
end
|
加载插件
当然,如果路由可以命中,就会走到过滤插件和加载插件的步骤,这也是 API 网关的核心所在。我们先来看下面这段代码:
1
2
3
4
5
6
7
8
9
10
11
12
|
local plugins = core.tablepool.fetch("plugins", 32, 0)
-- etcd 中的插件列表和本地配置文件中的插件列表进行交集运算
api_ctx.plugins = plugin.filter(route, plugins)
-- 依次运行插件在 rewrite 和 access 阶段挂载的函数
run_plugin("rewrite", plugins, api_ctx)
run_plugin("access", plugins, api_ctx)
|
在这段代码中,我们首先通过 table pool 的方式,申请了一个长度为 32 的 table,这是我们之前介绍过的性能优化技巧。然后便是插件的过滤函数。你可能疑惑,为什么需要这一步呢?在插件的 init worker
阶段,我们不是已经从 etcd 中获取插件列表并完成排序了吗?
事实上,这里的过滤是和本地配置文件来做对比的,主要有下面两个原因。
- 第一,新开发的插件需要灰度来发布,这时候新插件在 etcd 的列表中存在,但只在部分网关节点中处于开启状态。所以,我们需要额外做一次交集的运算。
- 第二,为了支持 debug 模式。终端的请求经过了哪些插件的处理?这些插件的加载顺序是什么?这些信息在调试的时候会很有用,所以在过滤函数中也会判断其是否处于 debug 模式,并在响应头中记录下这些信息。
因此,在 access 阶段的最后,我们会把这些过滤好的插件,按照优先级逐个运行,如下面这段代码所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
local function run_plugin(phase, plugins, api_ctx)
for i = 1, #plugins, 2 do
local phase_fun = plugins[i][phase]
if phase_fun then
-- 最核心的调用代码
phase_fun(plugins[i + 1], api_ctx)
end
end
return api_ctx
end
|
你可以看到,在遍历插件的时候,我们是以 2
为间隔进行的,这是因为每个插件都会有两个部分组成:插件对象和插件的配置参数。现在,我们来看上面示例代码中最核心的那一行代码:
1
2
|
phase_fun(plugins[i + 1], api_ctx)
|
单独看这行代码会有些抽象,我们用一个具体的 limit_count
插件来替换一下,就会清楚很多:
1
2
|
limit_count_plugin_rewrite_function(conf_of_plugin, api_ctx)
|
到这里,API 网关的整体流程,我们就实现得差不多了。这些代码都在同一个代码文件中,它里面有 400 多行代码,但核心的代码就是我们上面所介绍的这短短几十行。
编写插件
现在,距离一个完整的 demo 还差一件事情,那就是编写一个插件,让它可以跑起来。我们以 limit-count
这个限制请求数的插件为例,它的完整实现只有 60 多行代码,你可以点击链接查看。下面,我来详细讲解下其中的关键代码。
首先,我们要引入 lua-resty-limit-traffic
,作为限制请求数的基础库:
1
2
|
local limit_count_new = require("resty.limit.count").new
|
然后,使用 rapidjson 中的 json schema,来定义这个插件的参数有哪些:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
local schema = {
type = "object",
properties = {
count = {type = "integer", minimum = 0},
time_window = {type = "integer", minimum = 0},
key = {type = "string",
enum = {"remote_addr", "server_addr"},
},
rejected_code = {type = "integer", minimum = 200, maximum = 600},
},
additionalProperties = false,
required = {"count", "time_window", "key", "rejected_code"},
}
|
插件的这些参数,和大部分 resty.limit.count
的参数是对应的,其中包含了限制的 key、时间窗口的大小、限制的请求数。另外,插件中增加了一个参数:rejected_code
,在请求被限速的时候返回指定的状态码。
最后一步,我们把插件的处理函数挂载到 rewrite
阶段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
function _M.rewrite(conf, ctx)
-- 从缓存中获取 limit count 的对象,如果没有就使用 `create_limit_obj` 函数新建并缓存
local lim, err = core.lrucache.plugin_ctx(plugin_name, ctx, create_limit_obj, conf)
-- 从 ctx.var 中获取 key 的值,并和配置类型和配置版本号一起组成新的 key
local key = (ctx.var[conf.key] or "") .. ctx.conf_type .. ctx.conf_version
-- 进入限制的判断函数
local delay, remaining = lim:incoming(key, true)
if not delay then
local err = remaining
-- 如果超过阈值,就返回指定的状态码
if err == "rejected" then
return conf.rejected_code
end
core.log.error("failed to limit req: ", err)
return 500
end
-- 如果没有超过阈值,就放行,并设置对应响应头
core.response.set_header("X-RateLimit-Limit", conf.count,
"X-RateLimit-Remaining", remaining)
end
|
上面的代码中,进行限制判断的逻辑只有一行,其他的都是来做准备工作和设置响应头的。如果没有超过阈值,就会继续按照优先级运行下一个插件。
写在最后
今天这节课,通过整体框架和插件的编写,我们就完成了一个 API 网关的 demo。更进一步,利用本专栏学到的 OpenResty 知识,你可以在上面继续添砖加瓦,搭建更丰富的功能。
最后,给你留一个思考题。我们知道,API 网关不仅可以处理七层的流量,也可以处理四层的流量,基于此,你能想到它的一些使用场景吗?欢迎留言说说你的看法,也欢迎你把这篇文章分享出去,和更多的人一起学习、交流。