ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question
2

Using multiprocessing shared variable in ROS2 nodes

asked 2020-10-06 06:37:41 -0500

mjm522 gravatar image

updated 2020-10-07 02:15:30 -0500

I would like to use the multiprocessing shared_memory between two ros2 nodes. The idea is to pass the get_value function handle to many other ros independent classes to use it. If I uncomment the lines in the "__init__" of MinimalSubscriber, the get_value function works indicating that there is no issues with the shared vairable. However, starting the subscriber does not seem to work. I do not get any error. Is there any way to make this work?

import rclpy
import numpy as np
from rclpy.node import Node
from std_msgs.msg import Int64
from multiprocessing import shared_memory, Process, get_context

class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(Int64, '/minimal_topic', 1)
        timer_period = 0.5
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = Int64()
        msg.data = self.i
        self.publisher_.publish(msg)
        self.i += 1

class MinimalSubscriber(Node):

    def __init__(self, shared_variable):
        super().__init__('minimal_subscriber')
        existing_shm = shared_memory.SharedMemory(name=shared_variable)
        self.myarray = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
        # count = 0
        # while True:
        #     count += 1
        #     self.myarray[-1] =  count
        self.subscription = self.create_subscription(
            Int64,
            '/minimal_topic',
            self.listener_callback,
            1)
        self.subscription

    def listener_callback(self, msg):
        self.myarray[-1] = msg.data


def worker_publisher():
    rclpy.init()
    minimal_publisher = MinimalPublisher()
    rclpy.spin(minimal_publisher)
    minimal_publisher.destroy_node()
    rclpy.shutdown()

def worker_subscriber(shared_variable):
    rclpy.init()
    minimal_subscriber = MinimalSubscriber(shared_variable)
    rclpy.spin(minimal_subscriber)
    minimal_subscriber.destroy_node()
    rclpy.shutdown()

shm = shared_memory.SharedMemory(create=True, size=np.zeros(6).nbytes)
ctx = get_context('spawn')
pub_process = Process(target=worker_publisher, args=())
sub_process = ctx.Process(target=worker_subscriber, args=(shm,))

def get_value():
    read_value = np.ndarray((6,), dtype=np.int64, buffer=shm.buf)
    return read_value

if __name__ == '__main__':
    pub_process.start()
    sub_process.start()
    while True:
        print("Value read is ", get_value())
    pub_process.join()
    sub_process.join()
edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted
0

answered 2022-01-30 11:09:24 -0500

Hi I also faced the same situation. It seems to me that node.spin() (and ros executor in general) is not compatible with multiprocessing and blocks it. So as soon as the first process starts the programs get stuck there.

edit flag offensive delete link more

Question Tools

2 followers

Stats

Asked: 2020-10-06 06:37:41 -0500

Seen: 1,038 times

Last updated: Oct 07 '20