Source code for examples.basic.server

import asyncio
import logging

import grpc

from examples.basic import hello_pb2
from examples.basic import hello_pb2_grpc


[docs] class Greeter(hello_pb2_grpc.GreeterServicer):
[docs] async def SayHello( self, request: hello_pb2.HelloRequest, context: grpc.aio.ServicerContext, ) -> hello_pb2.HelloReply: return hello_pb2.HelloReply(message="Hello, %s!" % request.name)
[docs] def get_server() -> grpc.aio.Server: server = grpc.aio.server() hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) listen_addr = "[::]:50051" server.add_insecure_port(listen_addr) logging.info("Starting server on %s", listen_addr) return server
[docs] async def serve() -> None: # pragma no cover server = get_server() await server.start() await server.wait_for_termination()
if __name__ == "__main__": # pragma no cover logging.basicConfig(level=logging.INFO) asyncio.run(serve())