Go 基于泛化调用与 Nacos 实现 Dubbo 代理
前言
由于工作中使用的 rpc 框架是 dubbo,经常需要调试不同环境的 dubbo 接口,例如本地环境、开发环境和测试环境。而为了同一管理 http 接口和 dubbo 接口,希望使用统一的调试工具,例如 PostMan 或 ApiPost 等,因此萌生了开发一个 dubbo 的 http 代理工具的念头。
准备
由于是通用的 dubbo 代理,因此肯定需要使用泛化调用。而我们使用的注册中心是 nacos,因此也需要使用 nacos-sdk 来获取 provider 的实例信息。
实现
项目结构
│
├── dubbo/
│ ├─ generic.go # 泛化调用 dubbo 接口
│ ├─ models.go # 数据模型
│ └─ nacos.go # 获取 nacos 元信息
├── web/
│ └─ server.go # 对外 http 接口
│
├── main.go # main 入口函数
└── go.mod # 模块描述文件
go.mod
1module dubbo-proxy
2
3go 1.20
4
5require (
6 dubbo.apache.org/dubbo-go/v3 v3.0.5
7 github.com/apache/dubbo-go-hessian2 v1.12.0
8 github.com/gin-gonic/gin v1.9.0
9 github.com/nacos-group/nacos-sdk-go/v2 v2.1.2
10)
返回数据格式
dubbo/models.go:
1type DataResult struct {
2 Env string `json:"env,omitempty"` // 当前调用环境
3 Code string `json:"code,omitempty"` // 返回结果码
4 Data any `json:"data,omitempty"` // 返回结果
5 Message string `json:"message,omitempty"` // 返回消息
6}
获取 nacos 元信息
根据环境创建 nacos client
1func buildClient(env string, serverCfgs []constant.ServerConfig) naming_client.INamingClient {
2 client, _ := clients.NewNamingClient(
3 vo.NacosClientParam{
4 ClientConfig: constant.NewClientConfig(
5 constant.WithNamespaceId(env),
6 constant.WithNotLoadCacheAtStart(true),
7 ),
8 ServerConfigs: serverCfgs,
9 },
10 )
11 return client
12}
获取服务实例
1func SelectInstance(env, servName string) (string, bool) {
2 cli, ok := cliMap[env]
3 if !ok {
4 return "client not found from " + env, false
5 }
6 instances, e := cli.SelectInstances(vo.SelectInstancesParam{
7 ServiceName: fmt.Sprintf("providers:%s:1.0.0:", servName),
8 HealthyOnly: true,
9 })
10 if e != nil {
11 return "instance not found, " + e.Error(), false
12 }
13 if len(instances) <= 0 {
14 return "instance not found", false
15 }
16 return fmt.Sprintf("dubbo://%s:%d", instances[0].Ip, instances[0].Port), true
17}
完整代码
dubbo/nacos.go:
1package dubbo
2
3import (
4 "fmt"
5 "github.com/nacos-group/nacos-sdk-go/v2/clients"
6 "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
7 "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
8 "github.com/nacos-group/nacos-sdk-go/v2/vo"
9)
10
11var cliMap = make(map[string]naming_client.INamingClient)
12
13func init() {
14 serverCfgs := []constant.ServerConfig{
15 *constant.NewServerConfig("127.0.0.1", 6801, constant.WithContextPath("/nacos")),
16 }
17 cliMap["local"] = buildClient("local", serverCfgs)
18 cliMap["dev"] = buildClient("develop", serverCfgs)
19 cliMap["test"] = buildClient("test", serverCfgs)
20}
21
22func buildClient(env string, serverCfgs []constant.ServerConfig) naming_client.INamingClient {
23 client, _ := clients.NewNamingClient(
24 vo.NacosClientParam{
25 ClientConfig: constant.NewClientConfig(
26 constant.WithNamespaceId(env),
27 constant.WithNotLoadCacheAtStart(true),
28 ),
29 ServerConfigs: serverCfgs,
30 },
31 )
32 return client
33}
34
35func SelectInstance(env, servName string) (string, bool) {
36 cli, ok := cliMap[env]
37 if !ok {
38 return "client not found from " + env, false
39 }
40 instances, e := cli.SelectInstances(vo.SelectInstancesParam{
41 ServiceName: fmt.Sprintf("providers:%s:1.0.0:", servName),
42 HealthyOnly: true,
43 })
44 if e != nil {
45 return "instance not found, " + e.Error(), false
46 }
47 if len(instances) <= 0 {
48 return "instance not found", false
49 }
50 return fmt.Sprintf("dubbo://%s:%d", instances[0].Ip, instances[0].Port), true
51}
泛化调用
dubbo root 配置
1var dubboRoot = cfg.NewRootConfigBuilder().SetProtocols(map[string]*cfg.ProtocolConfig{
2 dubbo.DUBBO: {
3 Params: map[string]interface{}{
4 "getty-session-param": map[string]interface{}{
5 "max-msg-len": 1024000,
6 },
7 },
8 },
9}).Build()
泛化调用
1func GenericInvoke(iName, method, env string, req []byte) DataResult {
2 instance, ok := SelectInstance(env, iName)
3 if !ok {
4 return DataResult{
5 Code: "ERROR",
6 Message: instance,
7 }
8 }
9 cfg.Load(cfg.WithRootConfig(dubboRoot))
10 refConf := cfg.ReferenceConfig{
11 InterfaceName: iName,
12 Cluster: "failover",
13 Protocol: dubbo.DUBBO,
14 Generic: "true",
15 Version: "1.0.0",
16 URL: instance,
17 }
18 refConf.Init(dubboRoot)
19 refConf.GenericLoad("dubbo-proxy")
20 var args = utils.Unmarshal(req, &map[string]hessian.Object{})
21 raw, err := refConf.GetRPCService().(*generic.GenericService).Invoke(context.Background(), method, nil, []hessian.Object{args})
22 if err != nil {
23 panic(err)
24 }
25 rawResult := raw.(map[interface{}]interface{})
26 result := DataResult{
27 Code: rawResult["code"].(string),
28 Message: rawResult["message"].(string),
29 Data: utils.ConvertAs(rawResult["data"], map[string]interface{}{}),
30 }
31 return result
32}
注意25-30行要根据业务自身的返回数据格式包装结果:
1/*
2 这个例子的 dubbo 调用都会返回通过的结构:
3 {
4 "code": "",
5 "message": "",
6 "data": // 真正的调用结果
7 }
8*/
9rawResult := raw.(map[interface{}]interface{})
10result := DataResult{
11 Code: rawResult["code"].(string),
12 Message: rawResult["message"].(string),
13 Data: rawResult["data"],
14}
完整代码
dubbo/generic.go:
1package dubbo
2
3import (
4 "context"
5 "dubbo-proxy/utils"
6 cfg "dubbo.apache.org/dubbo-go/v3/config"
7 "dubbo.apache.org/dubbo-go/v3/config/generic"
8 _ "dubbo.apache.org/dubbo-go/v3/imports"
9 "dubbo.apache.org/dubbo-go/v3/protocol/dubbo"
10 hessian "github.com/apache/dubbo-go-hessian2"
11)
12
13var dubboRoot = cfg.NewRootConfigBuilder().SetProtocols(map[string]*cfg.ProtocolConfig{
14 dubbo.DUBBO: {
15 Params: map[string]interface{}{
16 "getty-session-param": map[string]interface{}{
17 "max-msg-len": 1024000,
18 },
19 },
20 },
21}).Build()
22
23func GenericInvoke(iName, method, env string, req []byte) DataResult {
24 instance, ok := SelectInstance(env, iName)
25 if !ok {
26 return DataResult{
27 Code: "ERROR",
28 Message: instance,
29 }
30 }
31 cfg.Load(cfg.WithRootConfig(dubboRoot))
32 refConf := cfg.ReferenceConfig{
33 InterfaceName: iName,
34 Cluster: "failover",
35 Protocol: dubbo.DUBBO,
36 Generic: "true",
37 Version: "1.0.0",
38 URL: instance,
39 }
40 refConf.Init(dubboRoot)
41 refConf.GenericLoad("dubbo-proxy")
42 var args = utils.Unmarshal(req, &map[string]hessian.Object{})
43 raw, err := refConf.GetRPCService().(*generic.GenericService).Invoke(context.Background(), method, nil, []hessian.Object{args})
44 if err != nil {
45 panic(err)
46 }
47 rawResult := raw.(map[interface{}]interface{})
48 result := DataResult{
49 Code: rawResult["code"].(string),
50 Message: rawResult["message"].(string),
51 Data: utils.ConvertAs(rawResult["data"], map[string]interface{}{}),
52 }
53 return result
54}
提供 http 服务
dubbo/generic.go:
1package web
2
3import (
4 "dubbo-proxy/dubbo"
5 "github.com/gin-gonic/gin"
6 "net/http"
7)
8
9func Run() {
10 router := gin.Default()
11 router.POST("/:intf/:method", func(c *gin.Context) {
12 intf := c.Param("intf")
13 method := c.Param("method")
14 env := c.Query("env")
15 data, err := c.GetRawData()
16 if err != nil {
17 panic(err)
18 }
19 res := dubbo.GenericInvoke(intf, method, env, data)
20 res.Env = env
21 c.JSON(http.StatusOK, res)
22 })
23 panic(router.Run(":7788"))
24}
启动
main.go:
1package main
2
3import "dubbo-proxy/web"
4
5func main() {
6 web.Run()
7}
效果

