Avoiding boilerplate when calling services, using an abstract class

asked 2022-08-17 10:50:49 -0500

thibd gravatar image

Dear,

To avoid boilerplate I imagined an abstract interface that can be inherited from to create service callers with standard code. I just wrote it yet the result sound very esoteric to me. I wanted to have your point of view before going further...

class ServiceCaller(ABC):
    """Call a service and wait for the answer

    This class has to be inherited from to create each service caller with specific methods.

    @param node: the node that is manipulated (this class instance shall be inside so pass the 'self').
    @param interface_class: the class used for the ROS2 service interface.
    @param interface_name: the name of the ROS2 service interface.
    @param logger: optionally, another logger than the node's one.
    @param sleep_initial: in the while loop that waits for the service, we start by a small wait time
    @param sleep_initial: then we raise it incrementally
    @param sleep_alert: if we waited more than this time, we start to print alerts for debug
    """

    def __init__(
        self,
        node: Node,
        name: str,
        logger = None,
        sleep_initial: float = 0.01,
        sleep_increment: float = 0.1,
        sleep_alert: float = 0.5,
    ):

        # Parameters
        self.sleep_initial: float = sleep_initial
        self.sleep_increment: float = sleep_increment
        self.sleep_alert: float = sleep_alert

        # We get the "self" of the node.
        self.node: Node = node

        # Get the provided logger or default to the node logger
        self.logger = logger if logger is not None else self.node.get_logger()

        # Name of the custom interface
        self.name: str = name

        # Initialize the status
        self.status: Status = Status.INACTIVE

        # We just create a client and connect to the service at startup
        self.initialize_client()

    ############################
    ###   Concrete methods   ###
    ############################

    def initialize_client(self):
        """Launch the client"""
        self.logger.info(f"Initializing the {self.name} service client")
        self.client: Client = self.node.create_client(self.ServiceInterface, self.name)
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.logger.warn(f'{self.name} service not available, waiting again...')
        # We can only get out of this loop if the service is available
        self.logger.info(f"{self.name} service found")
        self.status: Status = Status.INACTIVE

    def run(self):
        """Call the service and wait for the answer"""
        self.logger.info(f"Running service {self.name}")
        future: Future = self.call_service()
        sleep_time = self.sleep_initial
        while not future.done():
            time.sleep(sleep_time)
            if sleep_time > self.sleep_alert:
                self.logger.info(f"Waiting for the {self.name} service to complete...")
            sleep_time += self.sleep_increment
        if self.status == Status.SUCCESS:
            return True
        else:
            self.logger.error(f"The status of the service {self.name} is: {self.status}")
            return False  

    def call_service(
        self,
        messages: any,
    ) -> Future:
        """Call the service and return the future"""

        # Update the status
        self.status: Status = Status.RUNNING
        self.logger.debug(f"Calling {self.name} service with messages: {messages}")

        # Create a request and feed it with the messages data
        request = self.feed_request(self.ServiceInterface.Request(), messages)

        # Call the service and get the response as a future
        future: Future = self.client.call_async(request)

        # Launch the callback once the future is completed
        # Use a partial since all_done_callback only accept one argument
        future.add_done_callback(
            partial(
                self.callback,
                messages=messages,
                )
            )

        return future

    def callback(
        self ...
(more)
edit retag flag offensive close merge delete