Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 87 additions & 69 deletions include/exec/sequence.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
#include "../stdexec/__detail/__variant.hpp"
#include "../stdexec/execution.hpp"

#include "completion_signatures.hpp"

#include <type_traits>

STDEXEC_PRAGMA_PUSH()
STDEXEC_PRAGMA_IGNORE_GNU("-Wmissing-braces")

Expand Down Expand Up @@ -54,18 +58,11 @@ namespace experimental::execution
STDEXEC_ATTRIBUTE(host, device)
constexpr void _start_next() noexcept
{
STDEXEC_TRY
{
(*_start_next_)(this);
}
STDEXEC_CATCH_ALL
{
STDEXEC::set_error(static_cast<Rcvr&&>(_rcvr), std::current_exception());
}
(*_start_next_)(this);
}

Rcvr _rcvr;
void (*_start_next_)(_opstate_base*) = nullptr;
void (*_start_next_)(_opstate_base*) noexcept = nullptr;
};

template <class Rcvr>
Expand Down Expand Up @@ -120,15 +117,18 @@ namespace experimental::execution
{
template <class... _Ts>
STDEXEC_ATTRIBUTE(host, device, always_inline)
constexpr auto
operator()(_Ts&&... __ts) const STDEXEC_AUTO_RETURN(_Tuple{static_cast<_Ts&&>(__ts)...});
constexpr _Tuple operator()(_Ts&&... __ts) const
noexcept(STDEXEC::__nothrow_constructible_from<_Tuple, _Ts...>)
{
return _Tuple{static_cast<_Ts&&>(__ts)...};
}
};

template <class Rcvr, class... Senders>
struct _opstate;

template <class Rcvr, class Sender0, class... Senders>
struct _opstate<Rcvr, Sender0, Senders...> : _opstate_base<Rcvr>
template <class Rcvr, class CvSender0, class... Senders>
struct _opstate<Rcvr, CvSender0, Senders...> : _opstate_base<Rcvr>
{
using operation_state_concept = STDEXEC::operation_state_t;

Expand All @@ -146,20 +146,25 @@ namespace experimental::execution
using _mk_child_ops_variant_fn =
STDEXEC::__mzip_with2<STDEXEC::__q2<_child_opstate_t>, STDEXEC::__qq<STDEXEC::__variant>>;

using _ops_variant_t = STDEXEC::__minvoke<
_mk_child_ops_variant_fn,
STDEXEC::__tuple<Sender0, Senders...>,
using __is_last_mask_t =
STDEXEC::__mfill_c<sizeof...(Senders),
STDEXEC::__mfalse,
STDEXEC::__mbind_back_q<STDEXEC::__mlist, STDEXEC::__mtrue>>>;
STDEXEC::__mbind_back_q<STDEXEC::__mlist, STDEXEC::__mtrue>>;

using _ops_variant_t = STDEXEC::__minvoke<_mk_child_ops_variant_fn,
STDEXEC::__tuple<CvSender0, Senders...>,
__is_last_mask_t>;

template <class CvSndrs>
STDEXEC_ATTRIBUTE(host, device)
constexpr explicit _opstate(Rcvr&& rcvr, CvSndrs&& sndrs)
noexcept(::STDEXEC::__nothrow_applicable<__convert_tuple_fn<_senders_tuple_t>, CvSndrs>
&& ::STDEXEC::__nothrow_connectable<::STDEXEC::__tuple_element_t<0, CvSndrs>,
_rcvr_t<sizeof...(Senders) == 0>>)
: _opstate_base<Rcvr>{static_cast<Rcvr&&>(rcvr)}
// move all but the first sender into the opstate:
, _sndrs{
STDEXEC::__apply(__convert_tuple_fn<_senders_tuple_t>{}, static_cast<CvSndrs&&>(sndrs))}
// move all but the first sender into the opstate.
{
// Below, it looks like we are using `sndrs` after it has been moved from. This is not the
// case. `sndrs` is moved into a tuple type that has `__ignore` for the first element. The
Expand All @@ -170,19 +175,37 @@ namespace experimental::execution
}

template <std::size_t Remaining>
static constexpr void _start_next(_opstate_base<Rcvr>* _self)
static constexpr void _start_next(_opstate_base<Rcvr>* _self) noexcept
{
constexpr auto __nth = sizeof...(Senders) - Remaining;
auto* self = static_cast<_opstate*>(_self);
auto& sndr = STDEXEC::__get<__nth + 1>(self->_sndrs);
auto& op = self->_ops.template __emplace_from<__nth + 1>(STDEXEC::connect,
std::move(sndr),
_rcvr_t<Remaining == 1>{self});
if constexpr (Remaining > 1)
constexpr bool nothrow =
STDEXEC::__nothrow_connectable<STDEXEC::__m_at_c<__nth, Senders...>,
_rcvr_t<Remaining == 1>>;
STDEXEC_TRY
{
auto& op = self->_ops.template __emplace_from<__nth + 1>(STDEXEC::connect,
std::move(sndr),
_rcvr_t<Remaining == 1>{self});
if constexpr (Remaining > 1)
{
self->_start_next_ = &_start_next<Remaining - 1>;
}
STDEXEC::start(op);
}
STDEXEC_CATCH_ALL
{
self->_start_next_ = &_start_next<Remaining - 1>;
if constexpr (nothrow)
{
STDEXEC::__std::unreachable();
}
else
{
STDEXEC::set_error(static_cast<Rcvr&&>(static_cast<_opstate*>(_self)->_rcvr),
std::current_exception());
}
}
STDEXEC::start(op);
}

STDEXEC_ATTRIBUTE(host, device)
Expand All @@ -199,70 +222,65 @@ namespace experimental::execution
_ops_variant_t _ops{STDEXEC::__no_init};
};

// The completions of the sequence sender are the error and stopped completions of all the
// child senders plus the value completions of the last child sender.
template <class... Env>
struct _completions_fn
{
// When folding left, the first sender folded will be the last sender in the list. That is
// also when the "state" of the fold is void. For this case we want to include the value
// completions; otherwise, we want to exclude them.
template <class State, class... Args>
struct _fold_left;

template <class State, class Head, class... Tail>
struct _fold_left<State, Head, Tail...>
{
using __t = STDEXEC::__gather_completion_signatures_t<
STDEXEC::__completion_signatures_of_t<Head, Env...>,
STDEXEC::set_value_t,
STDEXEC::__mconst<STDEXEC::completion_signatures<>>::__f,
STDEXEC::__cmplsigs::__default_completion,
STDEXEC::__mtry_q<STDEXEC::__concat_completion_signatures_t>::__f,
STDEXEC::__t<_fold_left<State, Tail...>>>;
};

template <class Head>
struct _fold_left<void, Head>
{
using __t = STDEXEC::__mtry_q<STDEXEC::__concat_completion_signatures_t>::__f<
STDEXEC::completion_signatures<STDEXEC::set_error_t(std::exception_ptr)>,
STDEXEC::__completion_signatures_of_t<Head, Env...>>;
};

template <class... Sender>
using __f = STDEXEC::__t<_fold_left<void, Sender...>>;
};
template <class Sender>
concept __has_eptr_completion =
STDEXEC::sender_in<Sender>
&& exec::transform_completion_signatures(STDEXEC::get_completion_signatures<Sender>(),
exec::ignore_completion(),
exec::decay_arguments<STDEXEC::set_error_t>(),
exec::ignore_completion())
.__contains(STDEXEC::__fn_ptr_t<STDEXEC::set_error_t, std::exception_ptr>());

template <class Sender0, class... Senders>
struct _sndr<Sender0, Senders...>
{
using sender_concept = STDEXEC::sender_t;

// Even without an Env, we can sometimes still determine the completion signatures
// of the sequence sender. If any of the child senders has a
// set_error(exception_ptr) completion, then the sequence sender has a
// set_error(exception_ptr) completion. We don't have to ask if any connect call
// throws.
template <class Self, class... Env>
using _completions_t = STDEXEC::__minvoke<_completions_fn<Env...>,
STDEXEC::__copy_cvref_t<Self, Sender0>,
Senders...>;

template <class Self, class... Env>
requires(sizeof...(Env) > 0)
|| __has_eptr_completion<STDEXEC::__copy_cvref_t<Self, Sender0>>
|| (__has_eptr_completion<Senders> || ...)
STDEXEC_ATTRIBUTE(host, device)
static consteval auto get_completion_signatures()
{
if constexpr (STDEXEC::__decay_copyable<Self>)
{
return _completions_t<Self, Env...>{};
}
else
if constexpr (!STDEXEC::__decay_copyable<Self>)
{
return STDEXEC::__throw_compile_time_error<
STDEXEC::_SENDER_TYPE_IS_NOT_DECAY_COPYABLE_,
STDEXEC::_WITH_PRETTY_SENDER_<_sndr<Sender0, Senders...>>>();
}
else
{
using __env_t = STDEXEC::__mfront<Env..., STDEXEC::env<>>;
using __rcvr_t = STDEXEC::__receiver_archetype<__env_t>;
constexpr bool nothrow = (STDEXEC::__nothrow_connectable<Senders, __rcvr_t> && ...);

// The completions of the sequence sender are the error and stopped completions of all the
// child senders plus the value completions of the last child sender.
return exec::concat_completion_signatures(
exec::transform_completion_signatures(
STDEXEC::get_completion_signatures<STDEXEC::__copy_cvref_t<Self, Sender0>, Env...>(),
exec::ignore_completion()),
exec::transform_completion_signatures(
STDEXEC::get_completion_signatures<Senders, Env...>(),
exec::ignore_completion())...,
STDEXEC::get_completion_signatures<STDEXEC::__mback<Senders...>, Env...>(),
STDEXEC::__eptr_completion_unless<nothrow>());
}
}

template <STDEXEC::__decay_copyable Self, class Rcvr>
STDEXEC_ATTRIBUTE(host, device)
constexpr STDEXEC_EXPLICIT_THIS_BEGIN(auto connect)(this Self&& self, Rcvr rcvr)
noexcept(STDEXEC::__nothrow_constructible_from<
_opstate<Rcvr, STDEXEC::__copy_cvref_t<Self, Sender0>, Senders...>,
Rcvr,
decltype((static_cast<Self&&>(self)._sndrs))>)
{
return _opstate<Rcvr, STDEXEC::__copy_cvref_t<Self, Sender0>, Senders...>{
static_cast<Rcvr&&>(rcvr),
Expand Down
Loading
Loading