史上最细gRPC(Go)入门教程(三)---gRPC流式传输--Streaming
发布日期:2021-05-07 17:42:58 浏览次数:22 分类:精选文章

本文共 6854 字,大约阅读时间需要 22 分钟。

gRPC Stream API 开发指南

1. 概述

gRPC 是一种高效的网络通信协议,支持多种类型的 API。其中,Stream API 是用于处理大量数据传输和长时间数据交互的核心功能。以下将详细介绍 gRPC 中的四种 Stream API:UnaryAPI、ServerStreaming、ClientStreaming 和 BidirectionalStreaming。

2. proto 文件定义

在开始使用 Stream API 之前,首先需要定义一个 proto 文件,用于描述服务接口。以下是 echo.proto 文件的定义:

syntax = "proto3";
option go_package = "github.com/lixd/grpc-go-example/features/proto/echo";
package echo;
service Echo {
// UnaryAPI
rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
// ServerStreaming
rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
// ClientStreaming
rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
// BidirectionalStreaming
rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {};
}
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}

3. UnaryAPI

UnaryAPI 是最简单的 RPC 调用类型,类似于传统的 RESTful API。它不涉及流数据传输,只是简单的请求-响应模式。以下是 UnaryAPI 的实现示例:

服务器实现

type Echo struct {
pb.UnimplementedEchoServer
}
func (e *Echo) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
log.Printf("Recved: %v", req.GetMessage())
resp := &pb.EchoResponse{
Message: req.GetMessage(),
}
return resp, nil
}

客户端实现

func main() {
// 1. 建立连接 获取 client
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewEchoClient(conn)
// 2. 执行各个 Stream 的对应方法
unary(client)
}
func unary(client pb.EchoClient) {
resp, err := client.UnaryEcho(context.Background(), &pb.EchoRequest{
Message: "hello world",
})
if err != nil {
log.Printf("send error: %v\n", err)
}
fmt.Printf("Recved: %v \n", resp.GetMessage())
}

4. ServerStreaming

ServerStreaming 是一种服务端流 API,允许服务端向客户端推送多个响应数据。以下是 ServerStreaming 的实现示例:

服务器实现

func (e *Echo) ServerStreamingEcho(req *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
log.Printf("Recved %v", req.GetMessage())
// 通过循环调用 stream.Send() 返回多个响应
for i := 0; i < 2; i++ {
err := stream.Send(&pb.EchoResponse{
Message: req.GetMessage(),
})
if err != nil {
log.Fatalf("Send error: %v", err)
return err
}
}
// 返回 nil 表示响应完成
return nil
}

客户端实现

func serverStream(client pb.EchoClient) {
// 2. 调用获取 stream
stream, err := client.ServerStreamingEcho(context.Background(), &pb.EchoRequest{
Message: "Hello World",
})
if err != nil {
log.Fatalf("could not echo: %v", err)
}
// 3. 循环获取服务端推送的消息
for {
resp, err := stream.Recv()
if err == io.EOF {
log.Println("server closed")
break
}
if err != nil {
log.Printf("Recv error: %v", err)
continue
}
log.Printf("Recv data: %v", resp.GetMessage())
}
}

5. ClientStreaming

ClientStreaming 是一种客户端流 API,允许客户端向服务端推送多个请求数据。以下是 ClientStreaming 的实现示例:

服务器实现

func (e *Echo) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error {
// 循环接收客户端发送的消息
for {
req, err := stream.Recv()
if err == io.EOF {
log.Println("client closed")
// 调用 SendAndClose 返回响应并关闭连接
return stream.SendAndClose(&pb.EchoResponse{
Message: "ok",
})
}
if err != nil {
return err
}
log.Printf("Recved: %v", req.GetMessage())
}
}

客户端实现

func clientStream(client pb.EchoClient) {
// 2. 获取 stream 并通过 Send 方法不断推送数据到服务端
stream, err := client.ClientStreamingEcho(context.Background())
if err != nil {
log.Fatalf("Sum() error: %v", err)
}
for i := int64(0); i < 2; i++ {
err := stream.Send(&pb.EchoRequest{
Message: "hello world",
})
if err != nil {
log.Printf("send error: %v", err)
continue
}
}
// 发送完成后通过 CloseAndRecv() 关闭 stream 并接收服务端返回结果
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("CloseAndRecv() error: %v", err)
}
log.Printf("sum: %v", resp.GetMessage())
}

6. BidirectionalStreaming

BidirectionalStreaming 是一种双向流 API,允许客户端和服务端都向对方推送数据。以下是 BidirectionalStreaming 的实现示例:

服务器实现

func (e *Echo) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
var(
waitGroup sync.WaitGroup
msgCh chan string
)
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
for v := range msgCh {
err := stream.Send(&pb.EchoResponse{
Message: v,
})
if err != nil {
fmt.Println("Send error:", err)
continue
}
}
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("recv error: %v", err)
}
fmt.Printf("Recved: %v \n", req.GetMessage())
msgCh <- req.GetMessage()
}
close(msgCh)
}()
waitGroup.Wait()
return nil
}

客户端实现

func bidirectionalStream(client pb.EchoClient) {
var(
wg sync.WaitGroup
)
// 2. 调用方法获取 stream
stream, err := client.BidirectionalStreamingEcho(context.Background())
if err != nil {
panic(err)
}
// 3. 开两个 goroutine 分别用于 Recv() 和 Send()
wg.Add(1)
go func() {
defer wg.Done()
for {
req, err := stream.Recv()
if err == io.EOF {
fmt.Println("Server Closed")
break
}
if err != nil {
continue
}
fmt.Printf("Recv Data: %v \n", req.GetMessage())
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 2; i++ {
err := stream.Send(&pb.EchoRequest{
Message: "hello world",
})
if err != nil {
log.Printf("send error: %v\n", err)
}
time.Sleep(time.Second)
}
// 发送完成后关闭 stream
err := stream.CloseSend()
if err != nil {
log.Printf("Send error: %v\n", err)
return
}
}()
wg.Wait()
}

7. 小结

通过上述实现,我们可以看到 Stream API 的灵活性和强大之处。无论是服务端推送还是客户端推送,都可以通过循环调用 Recv() 或 Send() 来实现数据的高效传输。

需要注意的是:

  • 服务端 Stream 返回 nil 表示响应完成。
  • 客户端需要通过 err == io.EOF 判断是否已经获取了全部数据。
  • 客户端需要手动调用 CloseAndRecv() 或 CloseSend() 来关闭 Stream,以避免阻塞。

通过 goroutine 和 Stream 的结合,gRPC 提供了对大数据传输和长时间数据交互的高效解决方案。

8. 参考

上一篇:史上最细gRPC(Go)入门教程(四)---数据安全之--通过SSL/TLS建立安全连接
下一篇:史上最细gRPC(Go)入门教程(一)---Protobuf介绍及其编译原理

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2025年05月11日 08时45分39秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

ado读取多条oracle数据,Oracle ADO数据存取 2025-03-29
anaconda新建python2环境安装不了jupyterlab_anaconda3安装及jupyter环境配置教程(全)... 2025-03-29
android asynctask handler 区别,AsyncTask与Thread+Handler简要分析 2025-03-29
android fastjson漏洞_初识Fastjson漏洞(环境搭建及漏洞复现) 2025-03-29
android pod 组件化_CocoaPods 组件化实践 - 私有Pod 2025-03-29
$CH0201$ 费解的开关 2025-03-29
android进程管理策略,Android进程保活 2025-03-29
arduino蓝牙通讯代码_arduino 联接蓝牙模块 2025-03-29
asp.mvc 4项目发布文件目录结构_如何用SpringBoot(2.3.3版本)快速搭建一个项目?文末有小彩蛋... 2025-03-29
aspen串联反应怎么输入_如何进步提升串联谐振试验装置的稳定性 2025-03-29
aspose html转pdf_Java实现Word/Pdf/TXT转html 2023-01-24
a推b等价于非a或b_AB胶/蜜月胶常见问题的原因分析及解决方法 2023-01-24
bat 命令返回结果_【批处理】带你入门命令行 2023-01-24
c++ string取子串_Integer与String的设计哲学 2023-01-24
c++ 数组批量赋值_数组之间不能赋值?穿个马甲吧! 2023-01-24
cad模糊查询符号_mysql 正则模式和like模糊查询 2023-01-24
continue可以用if判断里面吗_谁能说说if()else()里的continue是干嘛的? 2023-01-24
ctrl c 和 ctrl v 不能用了_神奇操作,原来CTRL键还能这么用 2023-01-24
cytoscape安装java_Cytoscape史上最全攻略 2023-01-24
c语言程序设计年历显示,C语言程序设计报告《万年历》.doc 2023-01-24