Skip to content

Deallocate processors outside render thread #353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::context::AudioNodeId;
use crate::render::AudioProcessor;
use crate::AudioRenderCapacityEvent;

use std::any::Any;
Expand All @@ -21,6 +22,7 @@ pub(crate) enum EventType {
SinkChange,
RenderCapacity,
ProcessorError(AudioNodeId),
DropProcessor,
}

/// The Error Event interface
Expand All @@ -39,6 +41,7 @@ pub(crate) enum EventPayload {
None,
RenderCapacity(AudioRenderCapacityEvent),
ProcessorError(ErrorEvent),
DropProcessor(Box<dyn AudioProcessor>),
}

pub(crate) struct EventDispatch {
Expand Down Expand Up @@ -74,6 +77,13 @@ impl EventDispatch {
payload: EventPayload::ProcessorError(value),
}
}

pub fn drop_processor(processor: Box<dyn AudioProcessor>) -> Self {
EventDispatch {
type_: EventType::DropProcessor,
payload: EventPayload::DropProcessor(processor),
}
}
}

pub(crate) enum EventHandler {
Expand Down
9 changes: 9 additions & 0 deletions src/node/delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,15 @@ impl AudioProcessor for DelayReader {

true
}

// Clear the ring buffer so that the processor can be safely sent back
// to the control thread.
//
// @note - only implement on the reader side as it may outlive the writer
fn release_resources(&mut self) {
let mut ring_buffer = self.ring_buffer_mut();
ring_buffer.clear();
}
}

impl DelayReader {
Expand Down
6 changes: 6 additions & 0 deletions src/node/dynamics_compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ impl AudioProcessor for DynamicsCompressorRenderer {

true
}

// Clear the ring buffer so that the processor can be safely sent back
// to the control thread.
fn release_resources(&mut self) {
self.ring_buffer.clear();
}
}

#[cfg(test)]
Expand Down
71 changes: 61 additions & 10 deletions src/render/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use rustc_hash::FxHashMap;
use smallvec::{smallvec, SmallVec};

use super::{Alloc, AudioParamValues, AudioProcessor, AudioRenderQuantum};
use crate::events::EventDispatch;
use crate::node::ChannelConfig;
use crate::render::RenderScope;

Expand All @@ -24,7 +25,7 @@ struct OutgoingEdge {
/// Renderer Node in the Audio Graph
pub struct Node {
/// Renderer: converts inputs to outputs
processor: Box<dyn AudioProcessor>,
processor: Option<Box<dyn AudioProcessor>>,
/// Reusable input buffers
inputs: Vec<AudioRenderQuantum>,
/// Reusable output buffers, consumed by subsequent Nodes in this graph
Expand All @@ -45,6 +46,8 @@ impl Node {
/// Render an audio quantum
fn process(&mut self, params: AudioParamValues<'_>, scope: &RenderScope) -> bool {
self.processor
.as_mut()
.expect("A Node should always have a processor")
.process(&self.inputs[..], &mut self.outputs[..], params, scope)
}

Expand Down Expand Up @@ -129,7 +132,9 @@ impl Graph {
self.nodes.insert(
index,
RefCell::new(Node {
processor,
// we need to take back the processor when the node is freed
// to send it back to the control thread for garbage collection
processor: Some(processor),
inputs,
outputs,
channel_config,
Expand Down Expand Up @@ -203,6 +208,8 @@ impl Graph {
.unwrap()
.get_mut()
.processor
.as_mut()
.expect("A Node should always have a processor")
.onmessage(msg);
}

Expand Down Expand Up @@ -427,6 +434,7 @@ impl Graph {
output_node.inputs[edge.other_index].add(signal, channel_config);
});

// Check if we can decommission this node (end of life)
let can_free = !success || node.can_free(tail_time);

// Node is not dropped.
Expand All @@ -440,25 +448,68 @@ impl Graph {
node.has_inputs_connected = false;
}

drop(node); // release borrow of self.nodes
// Node is dropped.
if can_free {
// ask the processor to release its owned resources that are not thread safe
// i.e. AudioRenderQuantum for buffering in delay and compressor nodes
let mut processor = node
.processor
.take()
.expect("A Node should always have a processor");

processor.release_resources();

// Send processor back to control thread so that it can freed
// or even recycled in a pool later
// Note that the event sender only exists for online contexts
//
// @note - we are abusing the event dispatch system here for
// prototyping, this should probably be cleaned out
if let Some(sender) = &scope.event_sender {
// we don't want to block the thread, just do the best we can
let _ = sender.try_send(EventDispatch::drop_processor(processor));
} else {
drop(processor);
}
}

// release borrow of self.nodes
drop(node);

// Check if we can decommission this node (end of life)
if can_free {
// Node is dropped, remove it from the node list
// remove node from the node list
nodes.remove(index);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could say here,
let node = nodes.remove(index).unwrap();
and then move the added section above to here


// And remove it from the ordering after we have processed all nodes
// flag that we need to clean the ordering after we have processed all nodes
nodes_dropped = true;

// Nodes are only dropped when they do not have incoming connections.
// But they may have AudioParams feeding into them, these can de dropped too.
nodes.retain(|id, n| {
id.0 < 2 // never drop Listener and Destination node
|| !n
nodes.retain(|id, node| {
let retain_node = id.0 < 2 // never drop Listener and Destination node
|| !node
.borrow()
.outgoing_edges
.iter()
.any(|e| e.other_id == *index)
.any(|e| e.other_id == *index);

if !retain_node {
let mut processor = node
.borrow_mut()
.processor
.take()
.expect("A Node should always have a processor");

processor.release_resources();

if let Some(sender) = &scope.event_sender {
let _ = sender.try_send(EventDispatch::drop_processor(processor));
} else {
drop(processor);
}
}

retain_node
});
}
});
Expand Down
6 changes: 6 additions & 0 deletions src/render/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ pub trait AudioProcessor: Send {
fn onmessage(&mut self, msg: &mut dyn Any) {
log::warn!("Ignoring incoming message");
}

/// Method called before the AudioProcessor is recycled, i.e. when sent back
/// to the control thread when its rendering has finished.
fn release_resources(&mut self) {
Copy link
Owner

@orottier orottier Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is really tricky..
If a user implements this interface incorrectly and its processor will be reused, very subtle bugs will arise...
Or we need a mechanism to only recycle our own nodes

// nothing to do
}
}

struct DerefAudioRenderQuantumChannel<'a>(std::cell::Ref<'a, Node>);
Expand Down