0%

Go微服务与云原生学习(二)

gRPC

gRPC是一种现代化开源的RPC框架,由Google进行研发,能够运行于任何环境之中,最初由谷歌进行开发,它使用HTTP/2作为传输协议。

在gRPC中,客户端可以像调用本地方法一样直接调用其他机器上的服务端应用程序的方法,帮助你更容易创建分布式应用程序和服务。gRPC是基于定义一个服务,制定一个可以远程调用的带有参数和返回类型的方法。在服务端程序中实现这个接口并且运行gRPC服务处理客户端调用,在客户端,有一个stub提供和服务端相同的方法

img

为什么要用gRPC

gRPC可以帮助我们一次性的在一个.proto文件中定义服务并使用任何支持它的语言去实现客户端和服务端,也就是说gRPC解决了不同语言以及环境间通信的复杂性。使用protocol buffer还能获得其他好处,包括高效的序列化,简单的IDL以及容易进行接口更新。总之,使用gRPC能够帮助我们更容易编写跨语言的分布式代码

IDL(Interface description Language) 是指接口描述语言,是用来描述软件组件接口的一种计算机语言,是跨平台开发的基础。IDL通过一种中立的方式来描述接口,使得在不同平台上运行的对象和用不同语言编写的程序可以相互通信交流;比如,一个组件用C++写成,另一个组件用Go写成

使用gRPC进行开发的步骤

1.编写.proto文件定义服务

默认情况下gRPC使用protocol buffers作为接口定义语言(IDL)来描述服务接口和有效负载消息的结构

在gRPC中可以定义四种类型的服务方法

普通rpc,客户端向服务器发送一个请求,然后得到一个响应,就像普通的函数调用一样

1
rpc SayHello(HelloRequest) returns (HelloResponse);

服务器流式rpc,其中客户端向服务器发送请求,并获得一个流来读取一系列消息。客户端从返回的流中读取,知道没有更多的消息,gRPC保证在单个RPC调用的消息是有序的

1
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);

客户端流式rpc,其中客户端写入一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端完成了消息的写入,它就等待服务器读取消息并返回响应,同样,gRPC保证单个RPC调用中的消息是有序的

1
rpc LostsOfGreetings(stream HelloRequest) returns(HelloResponse);

双向流式rpc,其中双方使用读写流发送一系列消息,这两个流独立运行,因此客户端和服务器可以按照自己指定的顺序读写;例如,服务器可以等待接受所有客户端消息后再写响应,或者可以交替读取消息然后写入消息,或者其他读写组合。每个流中的消息是有序的

1
rpc StreamGreetings(stream HelloRequest) returns(stream HelloResponse);

2.生成指定语言的代码(客户端一份、服务端一份)

.proto文件中定义好服务之后,gRPC提供了生成客户端和服务端代码的protocol buffers编译器插件。

我们使用这些插件可以根据需要生成Java Go C++ Python等语言的代码,我们通常会在客户端调用这些API,并且在服务器端实现对应的API

  • 在服务器端,服务器实现服务声明的方法,并运行一个gRPC服务器来处理客户端发来的调用请求。gRPC底层会对传入的请求进行编码,执行被调用的服务方法,并对服务响应进行编码
  • 在客户端,客户端有一个称为存根(stub)的本地对象,它实现了与服务相同的方法。然后,客户端可以在本地对象上调用这些方法,将调用的参数包装在适当的protocol buffers消息类型中–gRPC在向服务器发送请求并返回服务器的protocol buffers响应之后进行处理

3.编写业务逻辑代码

proto文件生成pb.go以及grpc.pb.go的命令

不指定proto路径

1
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative xxx.proto

指定proto路径

1
protoc --proto_path=xx --go_out=pb --go_opt=paths=source_relative --go-grpc_out=pb --go-grpc_opt=paths=source_relative xxx.proto

使用grpc实现一个简单的hello服务

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
type server struct {
pb.UnimplementedGreeterServer //当没有完全实现proto中的所有方法时依旧可以运行起来
}

// gRPC通过.proto文件自动生成的SayHello方法
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
reply := "hello" + in.GetName()
return &pb.HelloResponse{Reply: reply}, nil
}

func main() {
// 启动服务
l, err := net.Listen("tcp", ":8972")
if err != nil {
fmt.Println("failed to listen,err:", err)
return
}
// 注册服务
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
// 启动服务
err = s.Serve(l)
if err != nil {
fmt.Println("failed to server,err:", err)
}
}
syntax = "proto3"; //版本声明

option go_package="hello_server/pb"; // 项目中import导入生成go代码的模块

package pb; //proto文件模块

// 定义服务
service Greeter {
// 定义方法
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

// 定义消息
message HelloRequest {
string name = 1; //字段的序号
}

message HelloResponse {
string reply = 1;
}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func main() {
//连接server 带加密连接
conn, err := grpc.Dial("127.0.0.1:8972", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("grpc.Dial failed,err:%v", err)
return
}
defer conn.Close()
//创建客户端
c := proto.NewGreeterClient(conn)
//使用context进行控制,传入background和超时时间一秒钟
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
name := "xiaocheng"
resp, err := c.SayHello(ctx, &proto.HelloRequest{Name: name})
if err != nil {
log.Printf("c.SayHello failed, err:%v", err)
return
}
// 拿到RPC响应
log.Printf("resp:%v", resp.GetReply())
}
// 应该是同一份proto文件
syntax = "proto3"; //版本声明

option go_package="hello_client/proto"; // 项目中import导入生成go代码的模块

package pb; //proto文件模块 必须与server端一致

// 定义服务
service Greeter {
// 定义方法
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

// 定义消息
message HelloRequest {
string name = 1; //字段的序号
}

message HelloResponse {
string reply = 1;
}

使用grpc实现一个简单的add服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type server struct {
pb.UnimplementedAddMethodServer
}

func (s *server) Add(ctx context.Context, in *pb.AddRequest) (*pb.AddResponse, error) {
reply := in.GetArgs1() + in.GetArgs2()
return &pb.AddResponse{Number: reply}, nil
}

func main() {
//启动服务
l, err := net.Listen("tcp", ":9999")
if err != nil {
fmt.Println("net listen failed,err:", err)
return
}
s := grpc.NewServer()
pb.RegisterAddMethodServer(s, &server{})
err = s.Serve(l)
if err != nil {
fmt.Println("failed to server,err:", err)
}
}

proto文件应该在客户端和服务端都有一份

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
syntax="proto3";
option go_package="server/pb";
package pb;

service AddMethod {
rpc Add(AddRequest) returns (AddResponse) {}
}

message AddRequest {
int32 args1 =1;
int32 args2 =2;
}

message AddResponse {
int32 number =1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func main() {
conn, err := grpc.Dial("127.0.0.1:9999", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
fmt.Println("grpc dail failed,err:", err)
return
}
defer conn.Close()
c := pb.NewAddMethodClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
var args1, args2 int32
args1 = 1
args2 = 2
resp, err := c.Add(ctx, &pb.AddRequest{Args1: args1, Args2: args2})
if err != nil {
fmt.Println("c.Add failed,err:", err)
return
}
log.Println("Add Response:", resp)
}

protobuf语法

protobuf为什么体积小、解析快

protobuf是google提出的数据交换格式,同一条消息数据,使用Protobuf序列化之后占用空间是json的1/10,但是性能却是几十倍

原因

  • 编解码大多采用位运算,比JSON/XML的字符匹配效率更高
  • pb定义了varint类型,使用变长编码压缩数值类型。值越小的数字,使用的字节数就越少
  • 采用Tag-value类型,没有冗余字符

定义一个消息类型

1
2
3
4
5
6
7
8
9
10
11
syntax="proto3";

message SearchRequest {
string query =1;
int32 page_number=2;
}
//文件的第一行指定使用proto3语法,如果不这么写
//pb的编译器默认使用proto2

//SearchRequest定义了一个消息,使用了两个字段
//每个字段需要定义类型 名字 和编号

字段编号

消息定义中的每个字段都要有一个唯一的编号,这些编号用来在消息二进制格式中标识字段,在消息类型使用后就不能更改。在范围1到15中的字段需要一个字节进行编码,而16-2047的字段采用两个字节。所以应该为经常使用的消息元素保留数字1到15的编号,也要为将来可能添加的经常使用的元素留出一些编号

指定字段规则

消息字段可以是下列字段之一

  • singular:格式正确的消息可以有这个字段的0个或者一个,默认使用singular字段
  • repeated:该字段可以在格式正确的消息中重复任意次数(包括0次),重复值的顺序将被保留
  • optional:该字段在传递的时候可选也可不选

保留字段

如果你通过完全删除字段或者将其注释来更新消息类型,那么未来的用户在对该类型进行自己的更新的时候就可以重用字段号,如果其他人以后加载旧版本的相同.proto文件,这可能就会导致严重的问题,比如数据损坏、隐私漏洞等等。

解决方法是指定已经删除的字段的字段编号,如果将来有用户尝试使用这些字段标识符,protocol buffer编译器将发出提示

1
2
3
message Foo {
reserved 2,15,9 to 11;
}

值类型

.proto Type Notes C++ Type Java/Kotlin Type[1] Python Type[3] Go Type PHP Type
double double double float float64 float
float float float float float32 float
int32 使用可变长度编码。编码负数效率低下——如果你的字段可能有负值,则使用 sint32代替。 int32 int int int32 integer
int64 使用可变长度编码。编码负数效率低下——如果你的字段可能有负值,则使用 sint64代替。 int64 long int/long[4] int64 integer/string[6]
uint32 使用变长编码。 uint32 int[2] int/long[4] uint32 integer
uint64 使用变长编码。 uint64 long[2] int/long[4] uint64 integer/string[6]
sint32 使用可变长度编码。带符号的 int 值。这些编码比普通的 int32更有效地编码负数。 int32 int int int32 integer
sint64 使用可变长度编码。带符号的 int 值。这些编码比普通的 int64更有效地编码负数。 int64 long int/long[4] int64 integer/string[6]
fixed32 总是四个字节。如果值经常大于228,则比 uint32更有效率。 uint32 int[2] int/long[4] uint32 integer
fixed64 总是8字节。如果值经常大于256,则比 uint64更有效率。 uint64 integer/string[6]
sfixed32 总是四个字节。 int32 int int int32 integer
sfixed64 总是八个字节。 int64 integer/string[6]
bool bool boolean bool bool boolean
string 字符串必须始终包含 UTF-8编码的或7位 ASCII 文本,且不能长于232。 string String str/unicode[5] string string
bytes 可以包含任何不超过232字节的任意字节序列。 string ByteString str (Python 2) bytes (Python 3) []byte string

枚举

在定义消息类型的时候,可能希望其中的一个字段只能是预定义的值列表中的一个值。下面是一个栗子,Conrpus字段的值只能是其中的一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
Corpus corpus = 4;
}

嵌套消息类型

1
2
3
4
5
6
7
8
9
message SearchResponse {
repeated Result results = 1;
}

message Result {
string url = 1;
string title = 2;
repeated string snippets = 3;
}

Any

Any类型允许你将消息作为嵌入类型使用,使用Any类型需要导入google/protobuf/any.proto

1
2
3
4
5
6
import "google/protobuf/any.proto";

message ErrorStatus {
string message = 1;
repeated google.protobuf.Any details = 2;
}

oneof

如果你有一条包含多个字段的消息,并且同时最多设置其中的一个字段,那么可以通过oneof来实现并节省内存,可以通过case()或者WihchOneOf()来检查one of 中的哪个值被设置(如果有)

1
2
3
4
5
6
7
8
9
10
11
message SampleMessage {
oneof test_oneof {
string name = 4;
SubMessage sub_message = 9;
}
}
SampleMessage message;
message.set_name("name");
CHECK(message.has_name());
message.mutable_sub_message(); // Will clear name field.
CHECK(!message.has_name());

Maps

如果想创建一个关联映射作为数据定义的一部分,可以使用这个map

1
map<key_type, value_type> map_field = N;

protobuf实战

oneof字段

oneof中的值只能选择其中的一个

1
2
3
4
5
6
7
message NoticeReaderRequest {
string msg=1;
oneof notice_way{
string email=2;
string phone=3;
}
}

对应的服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func oneofDemo() {
req := &book.NoticeReaderRequest{
Msg: "here is chengxisheng",
NoticeWay: &book.NoticeReaderRequest_Email{
Email: "xxx",
},
}
req2 := &book.NoticeReaderRequest{
Msg: "here is xishengcheng",
NoticeWay: &book.NoticeReaderRequest_Phone{
Phone: "1008611",
},
}
switch v := req.NoticeWay.(type) {
case *book.NoticeReaderRequest_Email:
noticeWithEmail(v)
case *book.NoticeReaderRequest_Phone:
noticeWithPhone(v)
}
switch v := req2.NoticeWay.(type) {
case *book.NoticeReaderRequest_Email:
noticeWithEmail(v)
case *book.NoticeReaderRequest_Phone:
noticeWithPhone(v)
}
}
func noticeWithEmail(in *book.NoticeReaderRequest_Email) {
fmt.Printf("notice reader by email:%v\n", in.Email)
}
func noticeWithPhone(in *book.NoticeReaderRequest_Phone) {
fmt.Printf("notice reader by phone:%v\n", in.Phone)
}

//这里必须使用类型断言+switch case
//来进行one of 字段的确认

wrapvalue类型

首先让我们想一想Go中区分一个MySQL的int类型是默认值还是0值该怎么做?

其实就只有以下两种方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
price sql.NullInt64
price *int64
//第一种方式是一个定义好的结构体
//里面有一个字段是 该结构体是否被赋值
//第二种方式是直接用指针来做
//对指针解引用,如果为0则赋值,如果为Nil则是默认值
func wrapValueDemo() {
// client
book:=book.Book{
Title: "learning go language",
Price: &wrapperspb.Int64Value{Value: 600},
Memo: &wrapperspb.StringValue{Value: "学"},
}
// server
if book.GetPrice()==nil {
fmt.Println("is not assigned")
} else {
fmt.Println(book.GetPrice().GetValue())
}
if book.GetMemo()==nil {
fmt.Println("is not assigned")
} else {
fmt.Println(book.GetMemo().GetValue())
}
}

FieldMask类型

当我们更新的时候,定义了很多字段,不可能全部进行全量更新Book的每个字段,因为通常操作只会更新1到2个字段。

当我们想知道更新操作涉及到的具体字段,就需要使用到filedmask类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
message UpdateBookRequest {
// 操作人
string op =1;
// 要更新的书籍信息
Book book=2;
// 要更新的字段
google.protobuf.FieldMask update_mask=3;
}
func fieldMaskDemo() {
//client
paths := []string{"price"}
req := &book.UpdateBookRequest{
Op: "chengxisheng",
Book: &book.Book{
Price: &wrapperspb.Int64Value{Value: 8800},
},
UpdateMask: &fieldmaskpb.FieldMask{Paths: paths},
}
mask, _ := fieldmask_utils.MaskFromProtoFieldMask(req.UpdateMask, generator.CamelCase)
var bookDst = make(map[string]interface{})
fieldmask_utils.StructToMap(mask, book.UpdateBookRequest{}.Book, bookDst)
fmt.Printf("bookDst:%#v\n", bookDst)
}

服务端流式RPC

对应的proto(client和server)中添加一个流式方法

1
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);

Server添加一个新的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (s *server) LotsOfReplies(in *pb.HelloRequest, stream pb.Greeter_LotsOfRepliesServer) error {
words := []string{
"你好",
"hello",
"こんにちは",
"안녕하세요",
}
for _, word := range words {
data := &pb.HelloResponse{
Reply: word + in.GetName(),
}
// 使用Send方法发送多个数据 每当有一个data就send一次数据
if err := stream.Send(data); err != nil {
return err
}
}
return nil
}

Client端添加一个新的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func callLotsOfReplies(c proto.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
stream, err := c.LotsOfReplies(ctx, &proto.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("c.LotsOfReplies failed,err:%v", err)
}
for {
//依次从流式响应中读取返回的响应数据
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("c.LotsOfReplies failed,err:%v", err)
}
log.Printf("got reply: %q\n", res.GetReply())
}
}

客户端流式RPC

hello.proto中添加这么一个新的方法

1
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);

在server端添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (s *server) LotsOfGreetings(stream pb.Greeter_LotsOfGreetingsServer) error {
reply := "你好:"
for {
//接受客户端发来的流式数据
res, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.HelloResponse{
Reply: reply,
})
}
if err != nil {
return err
}
reply += res.GetName()
}
}

在Client端中添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func runLotsOfGreeting(c proto.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 客户端要流式的发送请求消息
stream, err := c.LotsOfGreetings(ctx)
if err != nil {
log.Printf("c.LotsOfGreetings failed,err:%v\n", err)
return
}
names := []string{"张三", "李四", "王五"}
for _, name := range names {
stream.Send(&proto.HelloRequest{Name: name})
time.Sleep(200 * time.Millisecond)
}
//关闭流
res, err := stream.CloseAndRecv()
log.Printf("res:%v\n", res)
}

双向流式RPC

在proto中添加

1
rpc BidiHello(stream HelloRequest) returns(stream HelloResponse);

在client中添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (s *server) BidiHello(stream pb.Greeter_BidiHelloServer) error {
for {
//接受流式请求
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
reply := magic(in.GetName())
//返回流式响应
if err := stream.SendAndClose(&pb.HelloResponse{Reply: reply}); err != nil {
return err
}

}
}

在Server端中添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *server) BidiHello(stream pb.Greeter_BidiHelloServer) error {
for {
//接受流式请求
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
reply := magic(in.GetName())
//返回流式响应
if err := stream.SendAndClose(&pb.HelloResponse{Reply: reply}); err != nil {
return err
}

}
}


// magic 一段价值连城的“人工智能”代码
func magic(s string) string {
s = strings.ReplaceAll(s, "吗", "")
s = strings.ReplaceAll(s, "吧", "")
s = strings.ReplaceAll(s, "你", "我")

s = strings.ReplaceAll(s, "?", "!")
s = strings.ReplaceAll(s, "?", "!")
return s
}

MetaData元数据

metadata是指在处理RPC请求和响应过程中需要但又不属于具体业务(例如身份验证详细信息)的信息,采用的是键值对列表 的形式,其中键是string类型,值通常是[]string类型,但也可以是二进制数据。gRPC中的metadata类似于我们在HTTP headers中的键值对,元数据可以包含认真token、请求标识和监控标签等等

metadata中的键是大小写不敏感的,不能以grpc-开头,并且二进制的简明必须以-bin结尾,元数据对gRPC本身是不可见的,通常在应用程序代码或者中间件中处理元数据,我们不需要再.proto中指定元数据

1
type MD map[string][]string