Skip to content

Make constructor methods for pub/sub/etc. take rcl_node_t mutex #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions rclrs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use rosidl_runtime_rs::Message;

use crate::error::{RclReturnCode, ToResult};
use crate::MessageCow;
use crate::Node;
use crate::{rcl_bindings::*, RclrsError};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
Expand Down Expand Up @@ -56,8 +55,11 @@ type RequestId = i64;

/// Main class responsible for sending requests to a ROS service.
///
/// The only available way to instantiate clients is via [`Node::create_client`], this is to
/// ensure that [`Node`]s can track all the clients that have been created.
/// The only available way to instantiate clients is via [`Node::create_client`][1], this is to
/// ensure that [`Node`][2]s can track all the clients that have been created.
///
/// [1]: crate::Node::create_client
/// [2]: crate::Node
pub struct Client<T>
where
T: rosidl_runtime_rs::Service,
Expand All @@ -72,7 +74,7 @@ where
T: rosidl_runtime_rs::Service,
{
/// Creates a new client.
pub(crate) fn new(node: &Node, topic: &str) -> Result<Self, RclrsError>
pub(crate) fn new(rcl_node_mtx: Arc<Mutex<rcl_node_t>>, topic: &str) -> Result<Self, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_client`], see the struct's documentation for the rationale
where
Expand All @@ -86,7 +88,6 @@ where
err,
s: topic.into(),
})?;
let rcl_node = { &mut *node.rcl_node_mtx.lock().unwrap() };

// SAFETY: No preconditions for this function.
let client_options = unsafe { rcl_client_get_default_options() };
Expand All @@ -98,7 +99,7 @@ where
// afterwards.
rcl_client_init(
&mut rcl_client,
rcl_node,
&*rcl_node_mtx.lock().unwrap(),
type_support,
topic_c_string.as_ptr(),
&client_options,
Expand All @@ -108,7 +109,7 @@ where

let handle = Arc::new(ClientHandle {
rcl_client_mtx: Mutex::new(rcl_client),
rcl_node_mtx: node.rcl_node_mtx.clone(),
rcl_node_mtx,
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Expand Down
17 changes: 13 additions & 4 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl Node {
where
T: rosidl_runtime_rs::Service,
{
let client = Arc::new(Client::<T>::new(self, topic)?);
let client = Arc::new(Client::<T>::new(Arc::clone(&self.rcl_node_mtx), topic)?);
self.clients
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
Ok(client)
Expand Down Expand Up @@ -243,7 +243,7 @@ impl Node {
where
T: Message,
{
Publisher::<T>::new(self, topic, qos)
Publisher::<T>::new(Arc::clone(&self.rcl_node_mtx), topic, qos)
}

/// Creates a [`Service`][1].
Expand All @@ -259,7 +259,11 @@ impl Node {
T: rosidl_runtime_rs::Service,
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
{
let service = Arc::new(Service::<T>::new(self, topic, callback)?);
let service = Arc::new(Service::<T>::new(
Arc::clone(&self.rcl_node_mtx),
topic,
callback,
)?);
self.services
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
Ok(service)
Expand All @@ -278,7 +282,12 @@ impl Node {
where
T: Message,
{
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
let subscription = Arc::new(Subscription::<T>::new(
Arc::clone(&self.rcl_node_mtx),
topic,
qos,
callback,
)?);
self.subscriptions
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
Ok(subscription)
Expand Down
12 changes: 7 additions & 5 deletions rclrs/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use rosidl_runtime_rs::{Message, RmwMessage};
use crate::error::{RclrsError, ToResult};
use crate::qos::QoSProfile;
use crate::rcl_bindings::*;
use crate::Node;

mod loaned_message;
pub use loaned_message::*;
Expand Down Expand Up @@ -69,7 +68,11 @@ where
/// Creates a new `Publisher`.
///
/// Node and namespace changes are always applied _before_ topic remapping.
pub fn new(node: &Node, topic: &str, qos: QoSProfile) -> Result<Self, RclrsError>
pub fn new(
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
topic: &str,
qos: QoSProfile,
) -> Result<Self, RclrsError>
where
T: Message,
{
Expand All @@ -81,7 +84,6 @@ where
err,
s: topic.into(),
})?;
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();

// SAFETY: No preconditions for this function.
let mut publisher_options = unsafe { rcl_publisher_get_default_options() };
Expand All @@ -94,7 +96,7 @@ where
// TODO: type support?
rcl_publisher_init(
&mut rcl_publisher,
rcl_node,
&*rcl_node_mtx.lock().unwrap(),
type_support_ptr,
topic_c_string.as_ptr(),
&publisher_options,
Expand All @@ -104,7 +106,7 @@ where

Ok(Self {
rcl_publisher_mtx: Mutex::new(rcl_publisher),
rcl_node_mtx: Arc::clone(&node.rcl_node_mtx),
rcl_node_mtx,
type_support_ptr,
message: PhantomData,
})
Expand Down
22 changes: 14 additions & 8 deletions rclrs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex, MutexGuard};
use rosidl_runtime_rs::Message;

use crate::error::{RclReturnCode, ToResult};
use crate::{rcl_bindings::*, MessageCow, Node, RclrsError};
use crate::{rcl_bindings::*, MessageCow, RclrsError};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
Expand Down Expand Up @@ -51,8 +51,11 @@ type ServiceCallback<Request, Response> =

/// Main class responsible for responding to requests sent by ROS clients.
///
/// The only available way to instantiate services is via [`Node::create_service`], this is to
/// ensure that [`Node`]s can track all the services that have been created.
/// The only available way to instantiate services is via [`Node::create_service()`][1], this is to
/// ensure that [`Node`][2]s can track all the services that have been created.
///
/// [1]: crate::Node::create_service
/// [2]: crate::Node
pub struct Service<T>
where
T: rosidl_runtime_rs::Service,
Expand All @@ -67,7 +70,11 @@ where
T: rosidl_runtime_rs::Service,
{
/// Creates a new service.
pub(crate) fn new<F>(node: &Node, topic: &str, callback: F) -> Result<Self, RclrsError>
pub(crate) fn new<F>(
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
topic: &str,
callback: F,
) -> Result<Self, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_service`], see the struct's documentation for the rationale
where
Expand All @@ -82,7 +89,6 @@ where
err,
s: topic.into(),
})?;
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();

// SAFETY: No preconditions for this function.
let service_options = unsafe { rcl_service_get_default_options() };
Expand All @@ -93,8 +99,8 @@ where
// The topic name and the options are copied by this function, so they can be dropped
// afterwards.
rcl_service_init(
&mut rcl_service as *mut _,
rcl_node as *mut _,
&mut rcl_service,
&*rcl_node_mtx.lock().unwrap(),
type_support,
topic_c_string.as_ptr(),
&service_options as *const _,
Expand All @@ -104,7 +110,7 @@ where

let handle = Arc::new(ServiceHandle {
rcl_service_mtx: Mutex::new(rcl_service),
rcl_node_mtx: node.rcl_node_mtx.clone(),
rcl_node_mtx,
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Expand Down
14 changes: 7 additions & 7 deletions rclrs/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use rosidl_runtime_rs::{Message, RmwMessage};

use crate::error::{RclReturnCode, ToResult};
use crate::qos::QoSProfile;
use crate::Node;
use crate::{rcl_bindings::*, RclrsError};

mod callback;
Expand Down Expand Up @@ -63,11 +62,13 @@ pub trait SubscriptionBase: Send + Sync {
/// When a subscription is created, it may take some time to get "matched" with a corresponding
/// publisher.
///
/// The only available way to instantiate subscriptions is via [`Node::create_subscription`], this
/// is to ensure that [`Node`]s can track all the subscriptions that have been created.
/// The only available way to instantiate subscriptions is via [`Node::create_subscription()`][3], this
/// is to ensure that [`Node`][4]s can track all the subscriptions that have been created.
///
/// [1]: crate::spin_once
/// [2]: crate::spin
/// [3]: crate::Node::create_subscription
/// [4]: crate::Node
pub struct Subscription<T>
where
T: Message,
Expand All @@ -84,7 +85,7 @@ where
{
/// Creates a new subscription.
pub(crate) fn new<Args>(
node: &Node,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
topic: &str,
qos: QoSProfile,
callback: impl SubscriptionCallback<T, Args>,
Expand All @@ -102,7 +103,6 @@ where
err,
s: topic.into(),
})?;
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();

// SAFETY: No preconditions for this function.
let mut subscription_options = unsafe { rcl_subscription_get_default_options() };
Expand All @@ -115,7 +115,7 @@ where
// TODO: type support?
rcl_subscription_init(
&mut rcl_subscription,
rcl_node,
&*rcl_node_mtx.lock().unwrap(),
type_support,
topic_c_string.as_ptr(),
&subscription_options,
Expand All @@ -125,7 +125,7 @@ where

let handle = Arc::new(SubscriptionHandle {
rcl_subscription_mtx: Mutex::new(rcl_subscription),
rcl_node_mtx: node.rcl_node_mtx.clone(),
rcl_node_mtx,
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Expand Down