use glib_sys;
use std::collections::VecDeque;
use std::fmt;
use std::mem;
use std::ptr;
use std::sync::mpsc;
use std::sync::{Arc, Condvar, Mutex};
use translate::{mut_override, FromGlibPtrFull, ToGlib};
use Continue;
use MainContext;
use Priority;
use Source;
use SourceId;
use ThreadGuard;
enum ChannelSourceState {
NotAttached,
Attached(*mut glib_sys::GSource),
Destroyed,
}
unsafe impl Send for ChannelSourceState {}
unsafe impl Sync for ChannelSourceState {}
struct ChannelInner<T> {
queue: VecDeque<T>,
source: ChannelSourceState,
num_senders: usize,
}
impl<T> ChannelInner<T> {
fn receiver_disconnected(&self) -> bool {
match self.source {
ChannelSourceState::Destroyed => true,
ChannelSourceState::Attached(source)
if unsafe { glib_sys::g_source_is_destroyed(source) } != glib_sys::GFALSE =>
{
true
}
ChannelSourceState::NotAttached => false,
ChannelSourceState::Attached(_) => false,
}
}
fn set_ready_time(&mut self, ready_time: i64) {
if let ChannelSourceState::Attached(source) = self.source {
unsafe {
glib_sys::g_source_set_ready_time(source, ready_time);
}
}
}
}
struct ChannelBound {
bound: usize,
cond: Condvar,
}
struct Channel<T>(Arc<(Mutex<ChannelInner<T>>, Option<ChannelBound>)>);
impl<T> Clone for Channel<T> {
fn clone(&self) -> Channel<T> {
Channel(self.0.clone())
}
}
impl<T> Channel<T> {
fn new(bound: Option<usize>) -> Channel<T> {
Channel(Arc::new((
Mutex::new(ChannelInner {
queue: VecDeque::new(),
source: ChannelSourceState::NotAttached,
num_senders: 0,
}),
bound.map(|bound| ChannelBound {
bound,
cond: Condvar::new(),
}),
)))
}
fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
let mut inner = (self.0).0.lock().unwrap();
if let Some(ChannelBound { bound, ref cond }) = (self.0).1 {
while inner.queue.len() >= bound
&& !inner.queue.is_empty()
&& !inner.receiver_disconnected()
{
inner = cond.wait(inner).unwrap();
}
}
if inner.receiver_disconnected() {
return Err(mpsc::SendError(t));
}
inner.queue.push_back(t);
inner.set_ready_time(0);
if let Some(ChannelBound { bound: 0, ref cond }) = (self.0).1 {
while !inner.queue.is_empty() && !inner.receiver_disconnected() {
inner = cond.wait(inner).unwrap();
}
if inner.receiver_disconnected() {
if let Some(t) = inner.queue.pop_front() {
return Err(mpsc::SendError(t));
}
}
}
Ok(())
}
fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
let mut inner = (self.0).0.lock().unwrap();
let ChannelBound { bound, ref cond } = (self.0)
.1
.as_ref()
.expect("called try_send() on an unbounded channel");
if inner.queue.len() >= *bound && !inner.queue.is_empty() {
return Err(mpsc::TrySendError::Full(t));
}
if inner.receiver_disconnected() {
return Err(mpsc::TrySendError::Disconnected(t));
}
inner.queue.push_back(t);
inner.set_ready_time(0);
if *bound == 0 {
while !inner.queue.is_empty() && !inner.receiver_disconnected() {
inner = cond.wait(inner).unwrap();
}
if inner.receiver_disconnected() {
if let Some(t) = inner.queue.pop_front() {
return Err(mpsc::TrySendError::Disconnected(t));
}
}
}
Ok(())
}
fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
let mut inner = (self.0).0.lock().unwrap();
if let Some(item) = inner.queue.pop_front() {
if let Some(ChannelBound { ref cond, .. }) = (self.0).1 {
cond.notify_one();
}
return Ok(item);
}
if inner.num_senders == 0 {
Err(mpsc::TryRecvError::Disconnected)
} else {
Err(mpsc::TryRecvError::Empty)
}
}
}
#[repr(C)]
struct ChannelSource<T, F: FnMut(T) -> Continue + 'static> {
source: glib_sys::GSource,
source_funcs: Option<Box<glib_sys::GSourceFuncs>>,
channel: Option<Channel<T>>,
callback: Option<ThreadGuard<F>>,
}
unsafe extern "C" fn dispatch<T, F: FnMut(T) -> Continue + 'static>(
source: *mut glib_sys::GSource,
callback: glib_sys::GSourceFunc,
_user_data: glib_sys::gpointer,
) -> glib_sys::gboolean {
let source = &mut *(source as *mut ChannelSource<T, F>);
assert!(callback.is_none());
glib_sys::g_source_set_ready_time(&mut source.source, -1);
let callback = source
.callback
.as_mut()
.expect("ChannelSource called before Receiver was attached")
.get_mut();
let channel = source
.channel
.as_ref()
.expect("ChannelSource without Channel");
loop {
match channel.try_recv() {
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => return glib_sys::G_SOURCE_REMOVE,
Ok(item) => {
if callback(item) == Continue(false) {
return glib_sys::G_SOURCE_REMOVE;
}
}
}
}
glib_sys::G_SOURCE_CONTINUE
}
unsafe extern "C" fn finalize<T, F: FnMut(T) -> Continue + 'static>(
source: *mut glib_sys::GSource,
) {
let source = &mut *(source as *mut ChannelSource<T, F>);
let channel = source.channel.take().expect("Receiver without channel");
{
let mut inner = (channel.0).0.lock().unwrap();
inner.source = ChannelSourceState::Destroyed;
if let Some(ChannelBound { ref cond, .. }) = (channel.0).1 {
cond.notify_all();
}
}
let _ = source.source_funcs.take();
let _ = source.callback.take();
}
pub struct Sender<T>(Option<Channel<T>>);
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Sender").finish()
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
Sender::new(self.0.as_ref())
}
}
impl<T> Sender<T> {
fn new(channel: Option<&Channel<T>>) -> Self {
if let Some(channel) = channel {
let mut inner = (channel.0).0.lock().unwrap();
inner.num_senders += 1;
Sender(Some(channel.clone()))
} else {
Sender(None)
}
}
pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
self.0.as_ref().expect("Sender with no channel").send(t)
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let channel = self.0.take().expect("Sender with no channel");
let mut inner = (channel.0).0.lock().unwrap();
inner.num_senders -= 1;
if inner.num_senders == 0 {
inner.set_ready_time(0);
}
}
}
pub struct SyncSender<T>(Option<Channel<T>>);
impl<T> fmt::Debug for SyncSender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("SyncSender").finish()
}
}
impl<T> Clone for SyncSender<T> {
fn clone(&self) -> SyncSender<T> {
SyncSender::new(self.0.as_ref())
}
}
impl<T> SyncSender<T> {
fn new(channel: Option<&Channel<T>>) -> Self {
if let Some(channel) = channel {
let mut inner = (channel.0).0.lock().unwrap();
inner.num_senders += 1;
SyncSender(Some(channel.clone()))
} else {
SyncSender(None)
}
}
pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
self.0.as_ref().expect("Sender with no channel").send(t)
}
pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
self.0.as_ref().expect("Sender with no channel").try_send(t)
}
}
impl<T> Drop for SyncSender<T> {
fn drop(&mut self) {
let channel = self.0.take().expect("Sender with no channel");
let mut inner = (channel.0).0.lock().unwrap();
inner.num_senders -= 1;
if inner.num_senders == 0 {
inner.set_ready_time(0);
}
}
}
pub struct Receiver<T>(Option<Channel<T>>, Priority);
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Receiver").finish()
}
}
unsafe impl<T: Send> Send for Receiver<T> {}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if let Some(channel) = self.0.take() {
let mut inner = (channel.0).0.lock().unwrap();
inner.source = ChannelSourceState::Destroyed;
if let Some(ChannelBound { ref cond, .. }) = (channel.0).1 {
cond.notify_all();
}
}
}
}
impl<T> Receiver<T> {
pub fn attach<F: FnMut(T) -> Continue + 'static>(
mut self,
context: Option<&MainContext>,
func: F,
) -> SourceId {
unsafe {
let channel = self.0.take().expect("Receiver without channel");
let source_funcs = Box::new(glib_sys::GSourceFuncs {
check: None,
prepare: None,
dispatch: Some(dispatch::<T, F>),
finalize: Some(finalize::<T, F>),
closure_callback: None,
closure_marshal: None,
});
let source = glib_sys::g_source_new(
mut_override(&*source_funcs),
mem::size_of::<ChannelSource<T, F>>() as u32,
) as *mut ChannelSource<T, F>;
assert!(!source.is_null());
{
let source = &mut *source;
let mut inner = (channel.0).0.lock().unwrap();
glib_sys::g_source_set_priority(mut_override(&source.source), self.1.to_glib());
glib_sys::g_source_set_ready_time(
mut_override(&source.source),
if !inner.queue.is_empty() || inner.num_senders == 0 {
0
} else {
-1
},
);
inner.source = ChannelSourceState::Attached(&mut source.source);
}
{
let source = &mut *source;
ptr::write(&mut source.channel, Some(channel));
ptr::write(&mut source.callback, Some(ThreadGuard::new(func)));
ptr::write(&mut source.source_funcs, Some(source_funcs));
}
let source = Source::from_glib_full(mut_override(&(*source).source));
if let Some(context) = context {
assert!(context.is_owner());
source.attach(Some(context))
} else {
let context = MainContext::ref_thread_default();
assert!(context.is_owner());
source.attach(Some(&context))
}
}
}
}
impl MainContext {
pub fn channel<T>(priority: Priority) -> (Sender<T>, Receiver<T>) {
let channel = Channel::new(None);
let receiver = Receiver(Some(channel.clone()), priority);
let sender = Sender::new(Some(&channel));
(sender, receiver)
}
pub fn sync_channel<T>(priority: Priority, bound: usize) -> (SyncSender<T>, Receiver<T>) {
let channel = Channel::new(Some(bound));
let receiver = Receiver(Some(channel.clone()), priority);
let sender = SyncSender::new(Some(&channel));
(sender, receiver)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;
use std::rc::Rc;
use std::thread;
use std::time;
use MainLoop;
#[test]
fn test_channel() {
let c = MainContext::new();
let l = MainLoop::new(Some(&c), false);
c.acquire();
let (sender, receiver) = MainContext::channel(Priority::default());
let sum = Rc::new(RefCell::new(0));
let sum_clone = sum.clone();
let l_clone = l.clone();
receiver.attach(Some(&c), move |item| {
*sum_clone.borrow_mut() += item;
if *sum_clone.borrow() == 6 {
l_clone.quit();
Continue(false)
} else {
Continue(true)
}
});
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
l.run();
assert_eq!(*sum.borrow(), 6);
}
#[test]
fn test_drop_sender() {
let c = MainContext::new();
let l = MainLoop::new(Some(&c), false);
c.acquire();
let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
struct Helper(MainLoop);
impl Drop for Helper {
fn drop(&mut self) {
self.0.quit();
}
}
let helper = Helper(l.clone());
receiver.attach(Some(&c), move |_| {
let _ = helper;
Continue(true)
});
drop(sender);
l.run();
}
#[test]
fn test_drop_receiver() {
let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
drop(receiver);
assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
}
#[test]
fn test_remove_receiver() {
let c = MainContext::new();
c.acquire();
let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
let source_id = receiver.attach(Some(&c), move |_| Continue(true));
let source = c.find_source_by_id(&source_id).unwrap();
source.destroy();
assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
}
#[test]
fn test_remove_receiver_and_drop_source() {
let c = MainContext::new();
c.acquire();
let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
struct Helper(Arc<Mutex<bool>>);
impl Drop for Helper {
fn drop(&mut self) {
*self.0.lock().unwrap() = true;
}
}
let dropped = Arc::new(Mutex::new(false));
let helper = Helper(dropped.clone());
let source_id = receiver.attach(Some(&c), move |_| {
let _helper = &helper;
Continue(true)
});
let source = c.find_source_by_id(&source_id).unwrap();
source.destroy();
drop(source);
assert_eq!(*dropped.lock().unwrap(), true);
assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
}
#[test]
fn test_sync_channel() {
let c = MainContext::new();
let l = MainLoop::new(Some(&c), false);
c.acquire();
let (sender, receiver) = MainContext::sync_channel(Priority::default(), 2);
let sum = Rc::new(RefCell::new(0));
let sum_clone = sum.clone();
let l_clone = l.clone();
receiver.attach(Some(&c), move |item| {
*sum_clone.borrow_mut() += item;
if *sum_clone.borrow() == 6 {
l_clone.quit();
Continue(false)
} else {
Continue(true)
}
});
let (wait_sender, wait_receiver) = mpsc::channel();
let thread = thread::spawn(move || {
sender.try_send(1).unwrap();
sender.try_send(2).unwrap();
assert!(sender.try_send(3).is_err());
wait_sender.send(()).unwrap();
sender.send(3).unwrap();
});
let _ = wait_receiver.recv().unwrap();
thread::sleep(time::Duration::from_millis(50));
l.run();
thread.join().unwrap();
assert_eq!(*sum.borrow(), 6);
}
#[test]
fn test_sync_channel_drop_wakeup() {
let c = MainContext::new();
let l = MainLoop::new(Some(&c), false);
c.acquire();
let (sender, receiver) = MainContext::sync_channel(Priority::default(), 3);
let sum = Rc::new(RefCell::new(0));
let sum_clone = sum.clone();
let l_clone = l.clone();
receiver.attach(Some(&c), move |item| {
*sum_clone.borrow_mut() += item;
if *sum_clone.borrow() == 6 {
l_clone.quit();
Continue(false)
} else {
Continue(true)
}
});
let (wait_sender, wait_receiver) = mpsc::channel();
let thread = thread::spawn(move || {
sender.try_send(1).unwrap();
sender.try_send(2).unwrap();
sender.try_send(3).unwrap();
wait_sender.send(()).unwrap();
for i in 4.. {
if let Err(_) = sender.send(i) {
break;
}
}
});
let _ = wait_receiver.recv().unwrap();
thread::sleep(time::Duration::from_millis(50));
l.run();
thread.join().unwrap();
assert_eq!(*sum.borrow(), 6);
}
#[test]
fn test_sync_channel_drop_receiver_wakeup() {
let c = MainContext::new();
c.acquire();
let (sender, receiver) = MainContext::sync_channel(Priority::default(), 2);
let (wait_sender, wait_receiver) = mpsc::channel();
let thread = thread::spawn(move || {
sender.try_send(1).unwrap();
sender.try_send(2).unwrap();
wait_sender.send(()).unwrap();
assert!(sender.send(3).is_err());
});
let _ = wait_receiver.recv().unwrap();
thread::sleep(time::Duration::from_millis(50));
drop(receiver);
thread.join().unwrap();
}
#[test]
fn test_sync_channel_rendezvous() {
let c = MainContext::new();
let l = MainLoop::new(Some(&c), false);
c.acquire();
let (sender, receiver) = MainContext::sync_channel(Priority::default(), 0);
let (wait_sender, wait_receiver) = mpsc::channel();
let thread = thread::spawn(move || {
wait_sender.send(()).unwrap();
sender.send(1).unwrap();
wait_sender.send(()).unwrap();
sender.send(2).unwrap();
wait_sender.send(()).unwrap();
sender.send(3).unwrap();
wait_sender.send(()).unwrap();
});
let _ = wait_receiver.recv().unwrap();
assert_eq!(
wait_receiver.recv_timeout(time::Duration::from_millis(50)),
Err(mpsc::RecvTimeoutError::Timeout)
);
let sum = Rc::new(RefCell::new(0));
let sum_clone = sum.clone();
let l_clone = l.clone();
receiver.attach(Some(&c), move |item| {
let _ = wait_receiver.recv().unwrap();
*sum_clone.borrow_mut() += item;
if *sum_clone.borrow() == 6 {
assert_eq!(
wait_receiver.recv_timeout(time::Duration::from_millis(50)),
Err(mpsc::RecvTimeoutError::Disconnected)
);
l_clone.quit();
Continue(false)
} else {
assert_eq!(
wait_receiver.recv_timeout(time::Duration::from_millis(50)),
Err(mpsc::RecvTimeoutError::Timeout)
);
Continue(true)
}
});
l.run();
thread.join().unwrap();
assert_eq!(*sum.borrow(), 6);
}
}