Ros2 Foxy Python custom SRV type error
Here's the traceback from the error,
[Intersection(i_d=0, ins=[1011, 1804], out=[1000], clear=True), Intersection(i_d=1, ins=[1103, 1001], out=[1002], clear=True), Intersection(i_d=2, ins=[1002, 1008], out=[1004, 1010], clear=True)]
[INFO] [1627053698.780029518] [intersection_action_server]: Checking intersection
[INFO] [1627053698.780460999] [intersection_action_server]: Incoming request
Type: Incoming RFID: 1002
[INFO] [1627053698.780763667] [intersection_action_server]: Sending back: Clear
[INFO] [1627053053.366115161] [intersection_action_server]: Checking intersection
[INFO] [1627053053.366806566] [intersection_action_server]: Incoming request
Type: Incoming RFID: 1002
[INFO] [1627053053.367215310] [intersection_action_server]: Sending back: Clear
Traceback (most recent call last):
File "/home/mrumel/fisher_agc/AGCROS/install/server/lib/server/intersection_manager", line 11, in <module>
load_entry_point('server==0.0.0', 'console_scripts', 'intersection_manager')()
File "/home/mrumel/fisher_agc/AGCROS/install/server/lib/python3.8/site-packages/server/intersection_manager.py", line 70, in main
rclpy.spin(intersection_manager)
File "/opt/ros/foxy/lib/python3.8/site-packages/rclpy/__init__.py", line 191, in spin
executor.spin_once()
File "/opt/ros/foxy/lib/python3.8/site-packages/rclpy/executors.py", line 711, in spin_once
raise handler.exception()
File "/opt/ros/foxy/lib/python3.8/site-packages/rclpy/task.py", line 239, in __call__
self._handler.send(None)
File "/opt/ros/foxy/lib/python3.8/site-packages/rclpy/executors.py", line 426, in handler
await call_coroutine(entity, arg)
File "/opt/ros/foxy/lib/python3.8/site-packages/rclpy/executors.py", line 381, in _execute_service
srv.send_response(response, header)
File "/opt/ros/foxy/lib/python3.8/site-packages/rclpy/service.py", line 72, in send_response
raise TypeError()
TypeError
Here's my .SRV file first, it's filename is IntersectionManager.srv
string ask
---
string is_clear
My Service node
import rclpy
from rclpy.node import Node
from agc_interfaces.srv import IntersectionManager
from dataclasses import dataclass
from typing import Optional, List
import json
import ast
@dataclass
class Intersection:
i_d: int
ins: List[int]
out: List[int]
clear: bool
intersectionList: List[Intersection] = []
class IntersectionActionServer(Node):
def __init__(self):
super().__init__('intersection_action_server')
self.srv = self.create_service(IntersectionManager,'intersection_manage',self.execute_callback)
global intersectionList
with open('/home/maxunm/fisher_agc/AGCROS/scs_rfids.json') as f:
data = json.load(f)
dicts = data['rfids'] #Extract all the RFIDs from data
i = 0
for rfid in dicts:
for s in rfid['args']:
if "intr" in s:
split_s = s.split(';',-1)
ins = ast.literal_eval(split_s[1])
out = ast.literal_eval(split_s[2])
intersectionList.append(Intersection(i,ins,out,True))
i = i+1
print(intersectionList)
def execute_callback(self, request, responce):
self.get_logger().info('Checking intersection')
ask_list = request.ask.split(",")
which = ask_list[0]
rfid_num = int(ask_list[1])
self.get_logger().info('Incoming request\nType: %s RFID: %d' % (which, rfid_num))
for intersect in intersectionList:
if which == "Incoming": #AGV coming into an intersection
if rfid_num in intersect.ins: #Which intersection the rfid it's at is in
"""Check for if that intersection is locked or not"""
if intersect.clear:
responce.is_clear = str('Clear')
intersect.clear = False
else:
responce.is_clear = str('Blocked')
else: #AGV leaving intersection
if rfid_num in intersect.out:
if intersect.clear:
responce.is_clear = str('ERROR')
intersect.clear = True
else:
responce.is_clear = str('OK')
intersect.clear = True
self.get_logger().info('Sending back: %s'%(responce ...