Python AttributeError occurs when running on ROS Noetic
Hello firends, I have a python code that I have successfully had running on a ROS melodic installation for some time. Now that we've decided to switch over to the new ROS Noetic (since joy got added recently) any time I try to kick off this python node I get:
Traceback (most recent call last):
File "/home/casey/mqs_control/devel/lib/xbee/mqs_xbee.py", line 15, in <module>
exec(compile(fh.read(), python_script, 'exec'), context)
File "/home/casey/mqs_control/src/xbee/scripts/mqs_xbee.py", line 39, in <module>
xbee = XBee.XBee("/dev/ttyUSB0")
AttributeError: module 'XBee' has no attribute 'XBee'
I was certain that this code works for both python 2.7 and 3, and have tried all manner of file renaming, permissions, different import calls to no avail. Any help would be greatly appreciated!
Thanks in advance, M
mqs_xbee.py:
'''#!/usr/bin/env python''' import rospy import XBee from time import sleep from xbee.msg import cmd_ctrl from rospy.numpy_msg import numpy_msg ''' This node subscribes for data being changed on cmd_ctrl and publishes it to the XBEE. Data is coming in as a properly ordered bytearray in cmd_ctrl.msg ''' def callback(cmds): rospy.loginfo("Rx: %s", cmds) rospy.loginfo(type(cmds)) ''' sends data over the xbee''' def xbee_send(cmds): # sends the data on the 16 channels channels_byte = bytearray() channels_byte.extend(cmds.cmds) # sent=xbee.Send(bytearray(channels_byte)) xbee.Send(bytearray(channels_byte)) def mqs_xbee(): rospy.init_node('mqs_xbee', anonymous=True) # subscribe to cmd_ctrl rospy.Subscriber("cmds", numpy_msg(cmd_ctrl), xbee_send) # for debugging rospy.Subscriber("cmds", numpy_msg(cmd_ctrl), callback) rospy.spin() if __name__ == "__main__": xbee = XBee.XBee("/dev/ttyUSB0") mqs_xbee()'
XBee.py
import serial from collections import deque class XBee: RxBuff = bytearray() RxMessages = deque() def __init__(self, serialport, baudrate=115200): self.serial = serial.Serial(port=serialport, baudrate=baudrate) def Receive(self): """ Recieves data from serial and checks buffer for potential messages/ Returns the next message in the queue if available """ remaining = self.serial.inWaiting() while remaining: chunk = self.serial.read(remaining) remaining -= len(chunk) self.RxBuff.extend(chunk) msgs = self.RxBuff.split(bytes(b'\x7E')) for msg in msgs[:-1]: self.validate(msg) self.RxBuff = (bytearray() if self.Validate(msgs[-1]) else msgs[-1]) if self.RxMessages: return self.RxMessages.popleft() else: return None def Validate(self, msg): """ Parses a byte or bytearray object to verify the contents are a properly formatted XBee message. Inputs: An incoming XBee message Outputs: True or False, indicating message validity """ # 9 bytes is Minimum length to be a valid Rx frame # LSB, MSB,Type, Source Address(2), RSSI, Options, 1 byte data, checksum if (len(msg) - msg.count(bytes(b'0x7D'))) < 9: return False # All bytes in message must be unescaped before validating content frame = self.Unescape(msg) LSB = frame[1] # Frame (minus checksum) must contain at least length equal to LSB if LSB > (len(frame[2:]) - 1): return False # Validate checksum if (sum(frame[2:3 + LSB]) & 0xFF) != 0xFF: return False print("Rx: " + self.format(bytearray(b'\x7E') + msg)) self.RxMessages.append ...