
史上最细gRPC(Go)入门教程(三)---gRPC流式传输--Streaming
发布日期:2021-05-07 17:42:58
浏览次数:22
分类:精选文章
本文共 6854 字,大约阅读时间需要 22 分钟。
gRPC Stream API 开发指南
1. 概述
gRPC 是一种高效的网络通信协议,支持多种类型的 API。其中,Stream API 是用于处理大量数据传输和长时间数据交互的核心功能。以下将详细介绍 gRPC 中的四种 Stream API:UnaryAPI、ServerStreaming、ClientStreaming 和 BidirectionalStreaming。
2. proto 文件定义
在开始使用 Stream API 之前,首先需要定义一个 proto 文件,用于描述服务接口。以下是 echo.proto
文件的定义:
syntax = "proto3";option go_package = "github.com/lixd/grpc-go-example/features/proto/echo";package echo;service Echo { // UnaryAPI rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} // ServerStreaming rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} // ClientStreaming rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} // BidirectionalStreaming rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {};}message EchoRequest { string message = 1;}message EchoResponse { string message = 1;}
3. UnaryAPI
UnaryAPI 是最简单的 RPC 调用类型,类似于传统的 RESTful API。它不涉及流数据传输,只是简单的请求-响应模式。以下是 UnaryAPI 的实现示例:
服务器实现
type Echo struct { pb.UnimplementedEchoServer}func (e *Echo) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { log.Printf("Recved: %v", req.GetMessage()) resp := &pb.EchoResponse{ Message: req.GetMessage(), } return resp, nil}
客户端实现
func main() { // 1. 建立连接 获取 client conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() client := pb.NewEchoClient(conn) // 2. 执行各个 Stream 的对应方法 unary(client)}func unary(client pb.EchoClient) { resp, err := client.UnaryEcho(context.Background(), &pb.EchoRequest{ Message: "hello world", }) if err != nil { log.Printf("send error: %v\n", err) } fmt.Printf("Recved: %v \n", resp.GetMessage())}
4. ServerStreaming
ServerStreaming 是一种服务端流 API,允许服务端向客户端推送多个响应数据。以下是 ServerStreaming 的实现示例:
服务器实现
func (e *Echo) ServerStreamingEcho(req *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error { log.Printf("Recved %v", req.GetMessage()) // 通过循环调用 stream.Send() 返回多个响应 for i := 0; i < 2; i++ { err := stream.Send(&pb.EchoResponse{ Message: req.GetMessage(), }) if err != nil { log.Fatalf("Send error: %v", err) return err } } // 返回 nil 表示响应完成 return nil}
客户端实现
func serverStream(client pb.EchoClient) { // 2. 调用获取 stream stream, err := client.ServerStreamingEcho(context.Background(), &pb.EchoRequest{ Message: "Hello World", }) if err != nil { log.Fatalf("could not echo: %v", err) } // 3. 循环获取服务端推送的消息 for { resp, err := stream.Recv() if err == io.EOF { log.Println("server closed") break } if err != nil { log.Printf("Recv error: %v", err) continue } log.Printf("Recv data: %v", resp.GetMessage()) }}
5. ClientStreaming
ClientStreaming 是一种客户端流 API,允许客户端向服务端推送多个请求数据。以下是 ClientStreaming 的实现示例:
服务器实现
func (e *Echo) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error { // 循环接收客户端发送的消息 for { req, err := stream.Recv() if err == io.EOF { log.Println("client closed") // 调用 SendAndClose 返回响应并关闭连接 return stream.SendAndClose(&pb.EchoResponse{ Message: "ok", }) } if err != nil { return err } log.Printf("Recved: %v", req.GetMessage()) }}
客户端实现
func clientStream(client pb.EchoClient) { // 2. 获取 stream 并通过 Send 方法不断推送数据到服务端 stream, err := client.ClientStreamingEcho(context.Background()) if err != nil { log.Fatalf("Sum() error: %v", err) } for i := int64(0); i < 2; i++ { err := stream.Send(&pb.EchoRequest{ Message: "hello world", }) if err != nil { log.Printf("send error: %v", err) continue } } // 发送完成后通过 CloseAndRecv() 关闭 stream 并接收服务端返回结果 resp, err := stream.CloseAndRecv() if err != nil { log.Fatalf("CloseAndRecv() error: %v", err) } log.Printf("sum: %v", resp.GetMessage())}
6. BidirectionalStreaming
BidirectionalStreaming 是一种双向流 API,允许客户端和服务端都向对方推送数据。以下是 BidirectionalStreaming 的实现示例:
服务器实现
func (e *Echo) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error { var( waitGroup sync.WaitGroup msgCh chan string ) waitGroup.Add(1) go func() { defer waitGroup.Done() for v := range msgCh { err := stream.Send(&pb.EchoResponse{ Message: v, }) if err != nil { fmt.Println("Send error:", err) continue } } }() waitGroup.Add(1) go func() { defer waitGroup.Done() for { req, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("recv error: %v", err) } fmt.Printf("Recved: %v \n", req.GetMessage()) msgCh <- req.GetMessage() } close(msgCh) }() waitGroup.Wait() return nil}
客户端实现
func bidirectionalStream(client pb.EchoClient) { var( wg sync.WaitGroup ) // 2. 调用方法获取 stream stream, err := client.BidirectionalStreamingEcho(context.Background()) if err != nil { panic(err) } // 3. 开两个 goroutine 分别用于 Recv() 和 Send() wg.Add(1) go func() { defer wg.Done() for { req, err := stream.Recv() if err == io.EOF { fmt.Println("Server Closed") break } if err != nil { continue } fmt.Printf("Recv Data: %v \n", req.GetMessage()) } }() wg.Add(1) go func() { defer wg.Done() for i := 0; i < 2; i++ { err := stream.Send(&pb.EchoRequest{ Message: "hello world", }) if err != nil { log.Printf("send error: %v\n", err) } time.Sleep(time.Second) } // 发送完成后关闭 stream err := stream.CloseSend() if err != nil { log.Printf("Send error: %v\n", err) return } }() wg.Wait()}
7. 小结
通过上述实现,我们可以看到 Stream API 的灵活性和强大之处。无论是服务端推送还是客户端推送,都可以通过循环调用 Recv() 或 Send() 来实现数据的高效传输。
需要注意的是:
- 服务端 Stream 返回 nil 表示响应完成。
- 客户端需要通过 err == io.EOF 判断是否已经获取了全部数据。
- 客户端需要手动调用 CloseAndRecv() 或 CloseSend() 来关闭 Stream,以避免阻塞。
通过 goroutine 和 Stream 的结合,gRPC 提供了对大数据传输和长时间数据交互的高效解决方案。
8. 参考
发表评论
最新留言
留言是一种美德,欢迎回访!
[***.207.175.100]2025年05月11日 08时45分39秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
ado读取多条oracle数据,Oracle ADO数据存取
2025-03-29
android fastjson漏洞_初识Fastjson漏洞(环境搭建及漏洞复现)
2025-03-29
android pod 组件化_CocoaPods 组件化实践 - 私有Pod
2025-03-29
$CH0201$ 费解的开关
2025-03-29
android进程管理策略,Android进程保活
2025-03-29
arduino蓝牙通讯代码_arduino 联接蓝牙模块
2025-03-29
aspen串联反应怎么输入_如何进步提升串联谐振试验装置的稳定性
2025-03-29
aspose html转pdf_Java实现Word/Pdf/TXT转html
2023-01-24
a推b等价于非a或b_AB胶/蜜月胶常见问题的原因分析及解决方法
2023-01-24
bat 命令返回结果_【批处理】带你入门命令行
2023-01-24
c++ string取子串_Integer与String的设计哲学
2023-01-24
c++ 数组批量赋值_数组之间不能赋值?穿个马甲吧!
2023-01-24
cad模糊查询符号_mysql 正则模式和like模糊查询
2023-01-24
ctrl c 和 ctrl v 不能用了_神奇操作,原来CTRL键还能这么用
2023-01-24
cytoscape安装java_Cytoscape史上最全攻略
2023-01-24
c语言程序设计年历显示,C语言程序设计报告《万年历》.doc
2023-01-24