Source code for examples.basic.server
import asyncio
import logging
import grpc
from examples.basic import hello_pb2
from examples.basic import hello_pb2_grpc
[docs]
class Greeter(hello_pb2_grpc.GreeterServicer):
[docs]
async def SayHello(
self,
request: hello_pb2.HelloRequest,
context: grpc.aio.ServicerContext,
) -> hello_pb2.HelloReply:
return hello_pb2.HelloReply(message="Hello, %s!" % request.name)
[docs]
def get_server() -> grpc.aio.Server:
server = grpc.aio.server()
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
listen_addr = "[::]:50051"
server.add_insecure_port(listen_addr)
logging.info("Starting server on %s", listen_addr)
return server
[docs]
async def serve() -> None: # pragma no cover
server = get_server()
await server.start()
await server.wait_for_termination()
if __name__ == "__main__": # pragma no cover
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())