use futures_channel::{mpsc, oneshot};
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task;
use futures_core::task::Poll;
use futures_util::future::FutureExt;
use futures_util::stream::StreamExt;
use std::marker::Unpin;
use std::pin;
use std::pin::Pin;
use Continue;
use MainContext;
use Priority;
use Source;
pub struct SourceFuture<F, T> {
create_source: Option<F>,
source: Option<(Source, oneshot::Receiver<T>)>,
}
impl<F, T: 'static> SourceFuture<F, T>
where
F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
{
pub fn new(create_source: F) -> SourceFuture<F, T> {
SourceFuture {
create_source: Some(create_source),
source: None,
}
}
}
impl<F, T> Unpin for SourceFuture<F, T> {}
impl<F, T> Future for SourceFuture<F, T>
where
F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
{
type Output = T;
fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T> {
let SourceFuture {
ref mut create_source,
ref mut source,
..
} = *self;
if let Some(create_source) = create_source.take() {
let main_context = MainContext::ref_thread_default();
assert!(
main_context.is_owner(),
"Spawning futures only allowed if the thread is owning the MainContext"
);
let (send, recv) = oneshot::channel();
let s = create_source(send);
s.attach(Some(&main_context));
*source = Some((s, recv));
}
let res = {
let &mut (_, ref mut receiver) = source.as_mut().unwrap();
receiver.poll_unpin(ctx)
};
#[allow(clippy::match_wild_err_arm)]
match res {
Poll::Ready(Err(_)) => panic!("Source sender was unexpectedly closed"),
Poll::Ready(Ok(v)) => {
let _ = source.take();
Poll::Ready(v)
}
Poll::Pending => Poll::Pending,
}
}
}
impl<T, F> Drop for SourceFuture<T, F> {
fn drop(&mut self) {
if let Some((source, _)) = self.source.take() {
source.destroy();
}
}
}
pub fn timeout_future(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
timeout_future_with_priority(::PRIORITY_DEFAULT, value)
}
pub fn timeout_future_with_priority(
priority: Priority,
value: u32,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(SourceFuture::new(move |send| {
let mut send = Some(send);
::timeout_source_new(value, None, priority, move || {
let _ = send.take().unwrap().send(());
Continue(false)
})
}))
}
pub fn timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
timeout_future_seconds_with_priority(::PRIORITY_DEFAULT, value)
}
pub fn timeout_future_seconds_with_priority(
priority: Priority,
value: u32,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(SourceFuture::new(move |send| {
let mut send = Some(send);
::timeout_source_new_seconds(value, None, priority, move || {
let _ = send.take().unwrap().send(());
Continue(false)
})
}))
}
pub fn child_watch_future(
pid: ::Pid,
) -> Pin<Box<dyn Future<Output = (::Pid, i32)> + Send + 'static>> {
child_watch_future_with_priority(::PRIORITY_DEFAULT, pid)
}
pub fn child_watch_future_with_priority(
priority: Priority,
pid: ::Pid,
) -> Pin<Box<dyn Future<Output = (::Pid, i32)> + Send + 'static>> {
Box::pin(SourceFuture::new(move |send| {
let mut send = Some(send);
::child_watch_source_new(pid, None, priority, move |pid, code| {
let _ = send.take().unwrap().send((pid, code));
})
}))
}
#[cfg(any(unix, feature = "dox"))]
pub fn unix_signal_future(signum: i32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
unix_signal_future_with_priority(::PRIORITY_DEFAULT, signum)
}
#[cfg(any(unix, feature = "dox"))]
pub fn unix_signal_future_with_priority(
priority: Priority,
signum: i32,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(SourceFuture::new(move |send| {
let mut send = Some(send);
::unix_signal_source_new(signum, None, priority, move || {
let _ = send.take().unwrap().send(());
Continue(false)
})
}))
}
pub struct SourceStream<F, T> {
create_source: Option<F>,
source: Option<(Source, mpsc::UnboundedReceiver<T>)>,
}
impl<F, T> Unpin for SourceStream<F, T> {}
impl<F, T: 'static> SourceStream<F, T>
where
F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
{
pub fn new(create_source: F) -> SourceStream<F, T> {
SourceStream {
create_source: Some(create_source),
source: None,
}
}
}
impl<F, T> Stream for SourceStream<F, T>
where
F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
{
type Item = T;
fn poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>> {
let SourceStream {
ref mut create_source,
ref mut source,
..
} = *self;
if let Some(create_source) = create_source.take() {
let main_context = MainContext::ref_thread_default();
assert!(
main_context.is_owner(),
"Spawning futures only allowed if the thread is owning the MainContext"
);
let (send, recv) = mpsc::unbounded();
let s = create_source(send);
s.attach(Some(&main_context));
*source = Some((s, recv));
}
let res = {
let &mut (_, ref mut receiver) = source.as_mut().unwrap();
receiver.poll_next_unpin(ctx)
};
#[allow(clippy::match_wild_err_arm)]
match res {
Poll::Ready(v) => {
if v.is_none() {
let _ = source.take();
}
Poll::Ready(v)
}
Poll::Pending => Poll::Pending,
}
}
}
impl<T, F> Drop for SourceStream<T, F> {
fn drop(&mut self) {
if let Some((source, _)) = self.source.take() {
source.destroy();
}
}
}
pub fn interval_stream(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
interval_stream_with_priority(::PRIORITY_DEFAULT, value)
}
pub fn interval_stream_with_priority(
priority: Priority,
value: u32,
) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
Box::pin(SourceStream::new(move |send| {
::timeout_source_new(value, None, priority, move || {
if send.unbounded_send(()).is_err() {
Continue(false)
} else {
Continue(true)
}
})
}))
}
pub fn interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
interval_stream_seconds_with_priority(::PRIORITY_DEFAULT, value)
}
pub fn interval_stream_seconds_with_priority(
priority: Priority,
value: u32,
) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
Box::pin(SourceStream::new(move |send| {
::timeout_source_new_seconds(value, None, priority, move || {
if send.unbounded_send(()).is_err() {
Continue(false)
} else {
Continue(true)
}
})
}))
}
#[cfg(any(unix, feature = "dox"))]
pub fn unix_signal_stream(signum: i32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
unix_signal_stream_with_priority(::PRIORITY_DEFAULT, signum)
}
#[cfg(any(unix, feature = "dox"))]
pub fn unix_signal_stream_with_priority(
priority: Priority,
signum: i32,
) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
Box::pin(SourceStream::new(move |send| {
::unix_signal_source_new(signum, None, priority, move || {
if send.unbounded_send(()).is_err() {
Continue(false)
} else {
Continue(true)
}
})
}))
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_timeout() {
let c = MainContext::new();
let res = c.block_on(timeout_future(20));
assert_eq!(res, ());
}
#[test]
fn test_timeout_send() {
let c = MainContext::new();
let l = ::MainLoop::new(Some(&c), false);
let l_clone = l.clone();
c.spawn(timeout_future(20).then(move |()| {
l_clone.quit();
futures_util::future::ready(())
}));
l.run();
}
#[test]
fn test_interval() {
let c = MainContext::new();
let mut count = 0;
{
let count = &mut count;
let res = c.block_on(
interval_stream(20)
.take(2)
.for_each(|()| {
*count = *count + 1;
futures_util::future::ready(())
})
.map(|_| ()),
);
assert_eq!(res, ());
}
assert_eq!(count, 2);
}
#[test]
fn test_timeout_and_channel() {
let c = MainContext::default();
let res = c.block_on(timeout_future(20).then(|()| {
let (sender, receiver) = oneshot::channel();
thread::spawn(move || {
sender.send(1).unwrap();
});
receiver.then(|i| futures_util::future::ready(i.unwrap()))
}));
assert_eq!(res, 1);
}
}