服务端流式 RPC

PPG007 ... 2022-5-6 About 1 min

# 服务端流式 RPC

生成的接口:

type LaptopServiceServer interface {
	CreateLaptop(context.Context, *CreateLaptopRequest) (*CreateLaptopResponse, error)
	SearchLaptop(*SearchLaptopRequest, LaptopService_SearchLaptopServer) error
	mustEmbedUnimplementedLaptopServiceServer()
}
1
2
3
4
5

实现这个接口:

func (server *LaptopServer) SearchLaptop(request *pb.SearchLaptopRequest, stream pb.LaptopService_SearchLaptopServer) error {
  // 获取发送的 filter 对象。
	filter := request.GetFilter()
	log.Printf("receive a search-laptop request with filter: %v", filter)
  // 向自定义方法传入参数,context 用来进行取消和超时处理,filter 用来筛选,传入函数用来以流形式回复客户端,这个自定义方法循环遍历 map 中的每个元素,每遇到一个符合 filter 要求的就调用这个传入的函数发回客户端
	err := server.LaptopStore.Search(stream.Context(), filter, func(laptop *pb.Laptop) error {
		res := &pb.SearchLaptopResponse{Laptop: laptop}
    // 流调用 Send 方法发送。
		err := stream.Send(res)
		if err != nil {
			return err
		}
		log.Printf("sent laptop with id: %s", laptop.GetId())
		return nil
	})
	if err != nil {
		return status.Errorf(codes.Internal, "unexpected error: %v", err)
	}
	return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

客户端调用:

func searchLaptop(laptopClient pb.LaptopServiceClient, filter *pb.Filter) {
	log.Println("search filter", filter)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	req := &pb.SearchLaptopRequest{
		Filter: filter,
	}
	stream, err := laptopClient.SearchLaptop(ctx, req)
	if err != nil {
		log.Fatalf("cannot search laptop: %v\n", err)
	}
	for {
		res, err := stream.Recv()
		if err == io.EOF {
			return
		}
		if err != nil {
			log.Fatalf("cannot receive response: %v\n", err)
		}
		laptop := res.GetLaptop()
		log.Printf("- found laptop with id: %s", laptop.Id)
		log.Printf(" + found laptop with price_usd: %f", laptop.PriceUsd)
		log.Printf(" + found laptop with cpu_cores: %d", laptop.Cpu.NumberCores)
		log.Printf(" + found laptop with ram: %d %v", laptop.Ram.Value, laptop.Ram.Unit)
		log.Printf(" + found laptop with min cpu_ghz: %f", laptop.Cpu.MinGhz)
		log.Printf(" + found laptop with max cpu_ghz: %f", laptop.Cpu.MaxGhz)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

服务端、客户端启动函数不需要变化。