Avoiding boilerplate when calling services, using an abstract class
Dear,
To avoid boilerplate I imagined an abstract interface that can be inherited from to create service callers with standard code. I just wrote it yet the result sound very esoteric to me. I wanted to have your point of view before going further...
class ServiceCaller(ABC):
"""Call a service and wait for the answer
This class has to be inherited from to create each service caller with specific methods.
@param node: the node that is manipulated (this class instance shall be inside so pass the 'self').
@param interface_class: the class used for the ROS2 service interface.
@param interface_name: the name of the ROS2 service interface.
@param logger: optionally, another logger than the node's one.
@param sleep_initial: in the while loop that waits for the service, we start by a small wait time
@param sleep_initial: then we raise it incrementally
@param sleep_alert: if we waited more than this time, we start to print alerts for debug
"""
def __init__(
self,
node: Node,
name: str,
logger = None,
sleep_initial: float = 0.01,
sleep_increment: float = 0.1,
sleep_alert: float = 0.5,
):
# Parameters
self.sleep_initial: float = sleep_initial
self.sleep_increment: float = sleep_increment
self.sleep_alert: float = sleep_alert
# We get the "self" of the node.
self.node: Node = node
# Get the provided logger or default to the node logger
self.logger = logger if logger is not None else self.node.get_logger()
# Name of the custom interface
self.name: str = name
# Initialize the status
self.status: Status = Status.INACTIVE
# We just create a client and connect to the service at startup
self.initialize_client()
############################
### Concrete methods ###
############################
def initialize_client(self):
"""Launch the client"""
self.logger.info(f"Initializing the {self.name} service client")
self.client: Client = self.node.create_client(self.ServiceInterface, self.name)
while not self.client.wait_for_service(timeout_sec=1.0):
self.logger.warn(f'{self.name} service not available, waiting again...')
# We can only get out of this loop if the service is available
self.logger.info(f"{self.name} service found")
self.status: Status = Status.INACTIVE
def run(self):
"""Call the service and wait for the answer"""
self.logger.info(f"Running service {self.name}")
future: Future = self.call_service()
sleep_time = self.sleep_initial
while not future.done():
time.sleep(sleep_time)
if sleep_time > self.sleep_alert:
self.logger.info(f"Waiting for the {self.name} service to complete...")
sleep_time += self.sleep_increment
if self.status == Status.SUCCESS:
return True
else:
self.logger.error(f"The status of the service {self.name} is: {self.status}")
return False
def call_service(
self,
messages: any,
) -> Future:
"""Call the service and return the future"""
# Update the status
self.status: Status = Status.RUNNING
self.logger.debug(f"Calling {self.name} service with messages: {messages}")
# Create a request and feed it with the messages data
request = self.feed_request(self.ServiceInterface.Request(), messages)
# Call the service and get the response as a future
future: Future = self.client.call_async(request)
# Launch the callback once the future is completed
# Use a partial since all_done_callback only accept one argument
future.add_done_callback(
partial(
self.callback,
messages=messages,
)
)
return future
def callback(
self ...