Robotics StackExchange | Archived questions

Avoiding boilerplate when calling services, using an abstract class

Dear,

To avoid boilerplate I imagined an abstract interface that can be inherited from to create service callers with standard code. I just wrote it yet the result sound very esoteric to me. I wanted to have your point of view before going further...

class ServiceCaller(ABC):
    """Call a service and wait for the answer

    This class has to be inherited from to create each service caller with specific methods.

    @param node: the node that is manipulated (this class instance shall be inside so pass the 'self').
    @param interface_class: the class used for the ROS2 service interface.
    @param interface_name: the name of the ROS2 service interface.
    @param logger: optionally, another logger than the node's one.
    @param sleep_initial: in the while loop that waits for the service, we start by a small wait time
    @param sleep_initial: then we raise it incrementally
    @param sleep_alert: if we waited more than this time, we start to print alerts for debug
    """

    def __init__(
        self,
        node: Node,
        name: str,
        logger = None,
        sleep_initial: float = 0.01,
        sleep_increment: float = 0.1,
        sleep_alert: float = 0.5,
    ):

        # Parameters
        self.sleep_initial: float = sleep_initial
        self.sleep_increment: float = sleep_increment
        self.sleep_alert: float = sleep_alert

        # We get the "self" of the node.
        self.node: Node = node

        # Get the provided logger or default to the node logger
        self.logger = logger if logger is not None else self.node.get_logger()

        # Name of the custom interface
        self.name: str = name

        # Initialize the status
        self.status: Status = Status.INACTIVE

        # We just create a client and connect to the service at startup
        self.initialize_client()

    ############################
    ###   Concrete methods   ###
    ############################

    def initialize_client(self):
        """Launch the client"""
        self.logger.info(f"Initializing the {self.name} service client")
        self.client: Client = self.node.create_client(self.ServiceInterface, self.name)
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.logger.warn(f'{self.name} service not available, waiting again...')
        # We can only get out of this loop if the service is available
        self.logger.info(f"{self.name} service found")
        self.status: Status = Status.INACTIVE

    def run(self):
        """Call the service and wait for the answer"""
        self.logger.info(f"Running service {self.name}")
        future: Future = self.call_service()
        sleep_time = self.sleep_initial
        while not future.done():
            time.sleep(sleep_time)
            if sleep_time > self.sleep_alert:
                self.logger.info(f"Waiting for the {self.name} service to complete...")
            sleep_time += self.sleep_increment
        if self.status == Status.SUCCESS:
            return True
        else:
            self.logger.error(f"The status of the service {self.name} is: {self.status}")
            return False  

    def call_service(
        self,
        messages: any,
    ) -> Future:
        """Call the service and return the future"""

        # Update the status
        self.status: Status = Status.RUNNING
        self.logger.debug(f"Calling {self.name} service with messages: {messages}")

        # Create a request and feed it with the messages data
        request = self.feed_request(self.ServiceInterface.Request(), messages)

        # Call the service and get the response as a future
        future: Future = self.client.call_async(request)

        # Launch the callback once the future is completed
        # Use a partial since all_done_callback only accept one argument
        future.add_done_callback(
            partial(
                self.callback,
                messages=messages,
                )
            )

        return future

    def callback(
        self,
        future: Future,
        messages = None
    ) -> None:
        """Function launched once the future is completed
        It updates the self.scene object with the refined scene

        @param future: Future object
        @param messages: messages are transmitted to the callback to be able to use them for information

        @return: None
        """
        try:
            # Get the result from the server
            response: self.ServiceInterface.Response = future.result()
            # Log the answer for debugging
            self.logger.info(f'Service {self.name} response: {response}')
            # Update the scene with the new data
            if response is not None:
                self.process_response(response, messages)
                self.status: Status = Status.SUCCESS
            else:
                self.status: Status = Status.ERROR
                self.logger.error(f"The service {self.name} ran but returned None as a response.")
        except Exception as e:
            # Log the error for the example
            self.status : Status = Status.ERROR
            self.logger.error(f"Calling the {self.name} service raised error: {e}")

###########################################
###   Abstract properties and methods   ###
###########################################

@abstractproperty
def ServiceInterface(self) -> ServiceInterfaceType:
    """Return the service interface class that is used in the service caller

    This property has to be implemented in the child class
    """
    ServiceInterface: ServiceInterfaceType = None
    return ServiceInterface

@abstractmethod
def feed_request(self, request, messages):
    """Feed the data into the request

    Abstract method that need to be reimplemented in the child class.

    Something like:
    request.part_0 = messages[0]
    request.part_1 = messages[1]
    return request
    """
    return request

@abstractmethod
def process_response(self, response, messages):
    """Process the response

    Abstract method that need to be reimplemented in the child class.

    Something like:
    self.node.part_0 = response.part_0
    self.node.part_1 = response.part_1
    """
    pass

Asked by thibd on 2022-08-17 10:50:49 UTC

Comments

Answers