1. grpc开源包的安装# conda$ conda create -n grpc_env python=3.9 # install grpc$ pip install grpc -i https://pypi.doubanio.com/simple$ pip install grpc-tools -i https://pypi.doubanio.com/simple # 有时proto生成py文件不对就是得换换grpc两个包的版本
2. grpc的使用之传送消息整体结构,client.py server.py 和proto目录下的example.proto
1)在example.proto定义传送体
// 声明syntax = "proto3";package proto; // service创建service helloservice{ rpc hello(request) returns (response) {} // 单单传送消息} // 请求参数消息体 1、2是指参数顺序message request { string data = 1;} // 返回参数消息体message response { int32 ret = 1; //返回码 string data = 2;} //python -m grpc_tools.protoc -i ./ --python_out=./ --grpc_python_out=./ ./example.proto
2) 在虚拟环境里使用命令生成py文件
$ conda activate grpc_env
$ f:
$ cd f:\examples
$ python -m grpc_tools.protoc -i ./ --python_out=./ --grpc_python_out=./ ./example.proto
在proto目录下会生成两个py文件,如下图所示:
3) 编辑client.py 和 server.py
# server.pyimport timeimport grpcfrom concurrent import futuresfrom proto import example_pb2_grpc, example_pb2 class serviceback(example_pb2_grpc.helloserviceservicer): """接口的具体功能实现""" def hello(self, request, context): """hello""" data = request.data print(data) ret_data = "response:" + data return example_pb2.response(ret=0, data=ret_data) def server(ip: str, port: int) -> none: server = grpc.server(futures.threadpoolexecutor(max_workers=10)) # ⼤⼩为10的线程池 ai_servicer = serviceback() example_pb2_grpc.add_helloserviceservicer_to_server(ai_servicer, server) server.add_insecure_port(f"{ip}:{port}") server.start() try: print(f"server is started! ip:{ip} port:{str(port)}") while true: time.sleep(60 * 60) except exception as es: print(es) server.stop(0) if __name__ == '__main__': server("127.0.0.1", 8000)# client.pyimport grpcfrom proto import example_pb2_grpc, example_pb2 def client(ip: str, port: int) -> none: target = str(ip) + ":" + str(port) channel = grpc.insecure_channel(target) # 连接rpc服务器 cli = example_pb2_grpc.helloservicestub(channel) # 创建stub data = "hello 123" request = example_pb2.request(data=data) res = cli.hello(request) print(f"ret:{res.ret}, data:{res.data}") if __name__ == '__main__': client("127.0.0.1", 8000)
3. grpc的使用之数据传输大小配置默认情况下,grpc 将传入消息限制为 4 mb。 传出消息没有限制。
1)example.proto定义不变
2)编辑client.py 和 server.py
# server.pyimport timeimport grpcfrom concurrent import futuresfrom proto import example_pb2_grpc, example_pb2 class serviceback(example_pb2_grpc.helloserviceservicer): """接口的具体功能实现""" def hello(self, request, context): """hello""" data = request.data print(data) ret_data = "response:" + data return example_pb2.response(ret=0, data=ret_data) def server(ip: str, port: int) -> none: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] server = grpc.server(futures.threadpoolexecutor(max_workers=10), options=options) # ⼤⼩为10的线程池 ai_servicer = serviceback() example_pb2_grpc.add_helloserviceservicer_to_server(ai_servicer, server) server.add_insecure_port(f"{ip}:{port}") server.start() try: print(f"server is started! ip:{ip} port:{str(port)}") while true: time.sleep(60 * 60) except exception as es: print(es) server.stop(0) if __name__ == '__main__': server("127.0.0.1", 8000)
# client.pyimport grpcfrom proto import example_pb2_grpc, example_pb2 def client(ip: str, port: int) -> none: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] target = str(ip) + ":" + str(port) channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器 cli = example_pb2_grpc.helloservicestub(channel) # 创建stub data = "hello 123" * 1024 * 1024 request = example_pb2.request(data=data) res = cli.hello(request) print(f"ret:{res.ret}, data:{res.data}") if __name__ == '__main__': client("127.0.0.1", 8000)
4. grpc的使用之超时配置1)example.proto定义不变
2)编辑client.py 和 server.py
# server.pyimport timeimport grpcfrom concurrent import futuresfrom proto import example_pb2_grpc, example_pb2 class serviceback(example_pb2_grpc.helloserviceservicer): """接口的具体功能实现""" def hello(self, request, context): """hello""" data = request.data print(data) time.sleep(2) ret_data = "response:" + data return example_pb2.response(ret=0, data=ret_data) def server(ip: str, port: int) -> none: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] server = grpc.server(futures.threadpoolexecutor(max_workers=10), options=options) # ⼤⼩为10的线程池 ai_servicer = serviceback() example_pb2_grpc.add_helloserviceservicer_to_server(ai_servicer, server) server.add_insecure_port(f"{ip}:{port}") server.start() try: print(f"server is started! ip:{ip} port:{str(port)}") while true: time.sleep(60 * 60) except exception as es: print(es) server.stop(0) if __name__ == '__main__': server("127.0.0.1", 8000)
# client.pyimport sysimport grpcfrom proto import example_pb2_grpc, example_pb2 def client(ip: str, port: int) -> none: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] target = str(ip) + ":" + str(port) channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器 cli = example_pb2_grpc.helloservicestub(channel) # 创建stub try: data = "hello 123" request = example_pb2.request(data=data) res = cli.hello(request, timeout=1) # timeout 单位:秒 print(f"ret:{res.ret}, data:{res.data}") except grpc.rpcerror as rpc_error: print("grpc.rpcerror", rpc_error.details()) except exception as es: print(es) finally: sys.exit(-1) if __name__ == '__main__': client("127.0.0.1", 8000)
运行结果:
grpc.rpcerror deadline exceeded
5. grpc之大文件之流stream传输1)在example.proto重新定义传送体
// 声明syntax = "proto3";package proto; // service创建service helloservice{ rpc hello(request) returns (response) {} // 单单传送消息 rpc clienttoserver(stream upfilerequest) returns (response) {} // 流式上传文件 rpc servertoclient(request) returns (stream upfilerequest) {} // 流式下载文件} // 请求参数消息体 1、2是指参数顺序message request { string data = 1;} // 返回参数消息体message response { int32 ret = 1; //返回码 string data = 2;} message upfilerequest { string filename = 1; int64 sendsize = 2; int64 totalsize = 3; bytes data = 4;} //python -m grpc_tools.protoc -i ./ --python_out=./ --grpc_python_out=./ ./example.proto
2)在虚拟环境里使用命令生成py文件,参考2. 2)
3)编辑client.py 和 server.py
# server.pyimport osimport timeimport grpcfrom concurrent import futuresfrom proto import example_pb2_grpc, example_pb2 class serviceback(example_pb2_grpc.helloserviceservicer): """接口的具体功能实现""" def hello(self, request, context): """hello""" data = request.data print(data) time.sleep(2) ret_data = "response:" + data return example_pb2.response(ret=0, data=ret_data) def clienttoserver(self, request_iterator, context): """上传文件""" data = bytearray() for upfilerequest in request_iterator: file_name = upfilerequest.filename file_size = upfilerequest.totalsize file_data = upfilerequest.data print(f"文件名称:{file_name}, 文件总长度:{file_size}") data.extend(file_data) # 拼接两个bytes print(f"已接收长度:{len(data)}") if len(data) == file_size: with open("242_copy.mp3", "wb") as fw: fw.write(data) print(f"{file_name=} 下载完成") (ret, res) = (0, file_name) else: print(f"{file_name=} 下载失败") (ret, res) = (-1, file_name) return example_pb2.response(ret=ret, data=res) def servertoclient(self, request, context): """下载文件""" fp = request.data print(f"下载文件:{fp=}") # 获取文件名和文件大小 file_name = os.path.basename(fp) file_size = os.path.getsize(fp) # 获取文件大小 # 发送文件内容 part_size = 1024 * 1024 # 每次读取1mb数据 count = 1 with open(fp, "rb") as fr: while true: try: if count == 1: count += 1 yield example_pb2.upfilerequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"") else: context = fr.read(part_size) if context: yield example_pb2.upfilerequest(filename=file_name, totalsize=file_size, sendsize=len(context), data=context) else: print(f"发送完毕") return 0 except exception as es: print(es) def server(ip: str, port: int) -> none: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] server = grpc.server(futures.threadpoolexecutor(max_workers=10), options=options) # ⼤⼩为10的线程池 ai_servicer = serviceback() example_pb2_grpc.add_helloserviceservicer_to_server(ai_servicer, server) server.add_insecure_port(f"{ip}:{port}") server.start() try: print(f"server is started! ip:{ip} port:{str(port)}") while true: time.sleep(60 * 60) except exception as es: print(es) server.stop(0) if __name__ == '__main__': server("127.0.0.1", 8000)
# client.pyimport osimport sysimport grpcfrom proto import example_pb2_grpc, example_pb2 def send_stream_data(fp: str): """迭代器发送大文件""" # 获取文件名和文件大小 file_name = os.path.basename(fp) file_size = os.path.getsize(fp) # 获取文件大小 # 发送文件内容 part_size = 1024 * 1024 # 每次读取1mb数据 count = 1 with open(fp, "rb") as fr: while true: try: if count == 1: count += 1 yield example_pb2.upfilerequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"") else: context = fr.read(part_size) if context: yield example_pb2.upfilerequest(filename=file_name, totalsize=file_size, sendsize=len(context), data=context) else: print(f"发送完毕") return 0 except exception as es: print(es) def client(ip: str, port: int) -> none: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] target = str(ip) + ":" + str(port) channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器 cli = example_pb2_grpc.helloservicestub(channel) # 创建stub try: data = "hello 123" request = example_pb2.request(data=data) res = cli.hello(request, timeout=1) # timeout 单位:秒 print(f"ret:{res.ret}, data:{res.data}") except grpc.rpcerror as rpc_error: print("grpc.rpcerror", rpc_error.details()) except exception as es: print(es) finally: sys.exit(-1) def client_to_server(ip: str, port: int, fp: str): """ 流式上传数据。 """ # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] target = str(ip) + ":" + str(port) channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器 cli = example_pb2_grpc.helloservicestub(channel) # 创建stub try: request = send_stream_data(fp=fp) res = cli.clienttoserver(request, timeout=600) # timeout 单位:秒 print(f"ret:{res.ret}, data:{res.data}") except grpc.rpcerror as rpc_error: print("grpc.rpcerror", rpc_error.details()) except exception as es: print(es) finally: sys.exit(-1) def server_to_client(ip: str, port: int, fp: str): """ 流式上传数据。 """ # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] target = str(ip) + ":" + str(port) channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器 cli = example_pb2_grpc.helloservicestub(channel) # 创建stub try: data = bytearray() request = example_pb2.request(data=fp) filename = "" for res in cli.servertoclient(request, timeout=300): filename = res.filename total_size = res.totalsize data += res.data if total_size == len(data): with open("242_1.mp3", "wb") as fw: fw.write(data) print(f"{filename=} : {total_size=} 下载完成!") else: print(f"{filename=} 下载失败!") except grpc.rpcerror as rpc_error: print("grpc.rpcerror", rpc_error.details()) except exception as es: print(es) finally: sys.exit(-1) if __name__ == '__main__': # client("127.0.0.1", 8000) # client_to_server("127.0.0.1", 8000, "242.mp3") server_to_client("127.0.0.1", 8000, "242.mp3")
6. grpc之大文件之流async异步传输# server.pyimport osimport timeimport grpcfrom concurrent import futuresfrom proto import example_pb2_grpc, example_pb2import asyncio class serviceback(example_pb2_grpc.helloserviceservicer): """接口的具体功能实现""" def hello(self, request, context): """hello""" data = request.data print(data) time.sleep(2) ret_data = "response:" + data return example_pb2.response(ret=0, data=ret_data) def clienttoserver(self, request_iterator, context): """上传文件""" data = bytearray() for upfilerequest in request_iterator: file_name = upfilerequest.filename file_size = upfilerequest.totalsize file_data = upfilerequest.data print(f"文件名称:{file_name}, 文件总长度:{file_size}") data.extend(file_data) # 拼接两个bytes print(f"已接收长度:{len(data)}") if len(data) == file_size: with open("242_copy.mp3", "wb") as fw: fw.write(data) print(f"{file_name=} 下载完成") (ret, res) = (0, file_name) else: print(f"{file_name=} 下载失败") (ret, res) = (-1, file_name) return example_pb2.response(ret=ret, data=res) def servertoclient(self, request, context): """下载文件""" fp = request.data print(f"下载文件:{fp=}") # 获取文件名和文件大小 file_name = os.path.basename(fp) file_size = os.path.getsize(fp) # 获取文件大小 # 发送文件内容 part_size = 1024 * 1024 # 每次读取1mb数据 count = 1 with open(fp, "rb") as fr: while true: try: if count == 1: count += 1 yield example_pb2.upfilerequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"") else: context = fr.read(part_size) if context: yield example_pb2.upfilerequest(filename=file_name, totalsize=file_size, sendsize=len(context), data=context) else: print(f"发送完毕") return 0 except exception as es: print(es) async def server(ip: str, port: int) -> none: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] server = grpc.aio.server(futures.threadpoolexecutor(max_workers=10), options=options) # ⼤⼩为10的线程池 ai_servicer = serviceback() example_pb2_grpc.add_helloserviceservicer_to_server(ai_servicer, server) server.add_insecure_port(f"{ip}:{port}") await server.start() try: print(f"server is started! ip:{ip} port:{str(port)}") await server.wait_for_termination() except exception as es: print(es) await server.stop(none) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([server("127.0.0.1", 8000)])) loop.close()
# client.pyimport osimport sysimport grpcfrom proto import example_pb2_grpc, example_pb2import asyncio def send_stream_data(fp: str): """迭代器发送大文件""" # 获取文件名和文件大小 file_name = os.path.basename(fp) file_size = os.path.getsize(fp) # 获取文件大小 # 发送文件内容 part_size = 1024 * 1024 # 每次读取1mb数据 count = 1 with open(fp, "rb") as fr: while true: try: if count == 1: count += 1 yield example_pb2.upfilerequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"") else: context = fr.read(part_size) if context: yield example_pb2.upfilerequest(filename=file_name, totalsize=file_size, sendsize=len(context), data=context) else: print(f"发送完毕") return 0 except exception as es: print(es) async def client(ip: str, port: int) -> none: # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] target = str(ip) + ":" + str(port) async with grpc.aio.insecure_channel(target, options=options) as channel: # 连接rpc服务器 cli = example_pb2_grpc.helloservicestub(channel) # 创建stub try: data = "hello 123" request = example_pb2.request(data=data) res = await cli.hello(request, timeout=3) # timeout 单位:秒 print(f"ret:{res.ret}, data:{res.data}") except grpc.rpcerror as rpc_error: print("grpc.rpcerror", rpc_error.details()) except exception as es: print(es) finally: sys.exit(-1) async def client_to_server(ip: str, port: int, fp: str): """ 流式上传数据。 """ # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] target = str(ip) + ":" + str(port) async with grpc.aio.insecure_channel(target, options=options) as channel: # 连接rpc服务器 cli = example_pb2_grpc.helloservicestub(channel) # 创建stub try: request = send_stream_data(fp=fp) res = await cli.clienttoserver(request, timeout=600) # timeout 单位:秒 print(f"ret:{res.ret}, data:{res.data}") except grpc.rpcerror as rpc_error: print("grpc.rpcerror", rpc_error.details()) except exception as es: print(es) finally: sys.exit(-1) def server_to_client(ip: str, port: int, fp: str): """ 流式上传数据。 """ # 数据传输大小配置 max_message_length = 1024 * 1024 * 1024 # 1g options = [('grpc.max_send_message_length', max_message_length), ('grpc.max_receive_message_length', max_message_length), ('grpc.enable_retries', 1), ] target = str(ip) + ":" + str(port) channel = grpc.insecure_channel(target, options=options) # 连接rpc服务器 cli = example_pb2_grpc.helloservicestub(channel) # 创建stub try: data = bytearray() request = example_pb2.request(data=fp) filename = "" for res in cli.servertoclient(request, timeout=300): filename = res.filename total_size = res.totalsize data += res.data if total_size == len(data): with open("242_1.mp3", "wb") as fw: fw.write(data) print(f"{filename=} : {total_size=} 下载完成!") else: print(f"{filename=} 下载失败!") except grpc.rpcerror as rpc_error: print("grpc.rpcerror", rpc_error.details()) except exception as es: print(es) finally: sys.exit(-1) if __name__ == '__main__': # asyncio.run(client("127.0.0.1", 8000)) asyncio.run(client_to_server("127.0.0.1", 8000, "242.mp3")) # server_to_client("127.0.0.1", 8000, "242.mp3")
以上就是使用python语言实现消息传递的grpc教程的详细内容。