Using multiprocessing shared variable in ROS2 nodes
I would like to use the multiprocessing shared_memory between two ros2 nodes. The idea is to pass the get_value function handle to many other ros independent classes to use it. If I uncomment the lines in the "__init__" of MinimalSubscriber, the get_value function works indicating that there is no issues with the shared vairable. However, starting the subscriber does not seem to work. I do not get any error. Is there any way to make this work?
import rclpy
import numpy as np
from rclpy.node import Node
from std_msgs.msg import Int64
from multiprocessing import shared_memory, Process, get_context
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(Int64, '/minimal_topic', 1)
timer_period = 0.5
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = Int64()
msg.data = self.i
self.publisher_.publish(msg)
self.i += 1
class MinimalSubscriber(Node):
def __init__(self, shared_variable):
super().__init__('minimal_subscriber')
existing_shm = shared_memory.SharedMemory(name=shared_variable)
self.myarray = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
# count = 0
# while True:
# count += 1
# self.myarray[-1] = count
self.subscription = self.create_subscription(
Int64,
'/minimal_topic',
self.listener_callback,
1)
self.subscription
def listener_callback(self, msg):
self.myarray[-1] = msg.data
def worker_publisher():
rclpy.init()
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
minimal_publisher.destroy_node()
rclpy.shutdown()
def worker_subscriber(shared_variable):
rclpy.init()
minimal_subscriber = MinimalSubscriber(shared_variable)
rclpy.spin(minimal_subscriber)
minimal_subscriber.destroy_node()
rclpy.shutdown()
shm = shared_memory.SharedMemory(create=True, size=np.zeros(6).nbytes)
ctx = get_context('spawn')
pub_process = Process(target=worker_publisher, args=())
sub_process = ctx.Process(target=worker_subscriber, args=(shm,))
def get_value():
read_value = np.ndarray((6,), dtype=np.int64, buffer=shm.buf)
return read_value
if __name__ == '__main__':
pub_process.start()
sub_process.start()
while True:
print("Value read is ", get_value())
pub_process.join()
sub_process.join()