#ifndef coro_h #define coro_h 1 #include #include #include #include #include #include #include #include namespace as { class coro { private: struct state_type; public: class id { public: id(const void *p = nullptr): ptr(p) {} operator const void *() const { return ptr; } private: const void *ptr = nullptr; }; coro() noexcept = default; template requires std::invocable, std::decay_t...> explicit coro(Fn &&fn, Args &&...args); coro(const coro &) = delete; coro &operator=(const coro &) = delete; coro(coro &&o) noexcept : handle(std::exchange(o.handle, {})), state(std::exchange(o.state, nullptr)) { } coro &operator=(coro &&o) noexcept { if (this != &o) { if (joinable()) std::terminate(); handle = std::exchange(o.handle, {}); state = std::exchange(o.state, nullptr); } return *this; } ~coro() noexcept { if (joinable()) std::terminate(); } void detach() { if (!joinable()) throw std::logic_error("detach on non-joinable as::coro!"); handle = {}; state.reset(); } void join(); bool joinable() const noexcept { return static_cast(state); } id get_id() const noexcept { return id(state.get()); } private: friend class coro_scheduler; struct state_type { std::atomic finished = false; std::exception_ptr exception = nullptr; coro::id id = {}; }; struct task_type { struct promise_type { std::shared_ptr state = std::make_shared(); task_type get_return_object() { auto handle = std::coroutine_handle::from_promise( *this); state->id = id(handle.address()); return task_type(handle, state); } std::suspend_always initial_suspend() noexcept { return {}; } std::suspend_always final_suspend() noexcept { state->finished.store(true, std::memory_order_release); return {}; } void unhandled_exception() {} void return_void() noexcept {} }; using handle_type = std::coroutine_handle; task_type() noexcept = default; explicit task_type(handle_type h, std::shared_ptr s) : handle(h), state(std::move(s)) { } task_type(const task_type &) = delete; task_type &operator=(const task_type &) = delete; task_type(task_type &&o) noexcept : handle(std::exchange(o.handle, {})), state(std::exchange(o.state, nullptr)) { } task_type &operator=(task_type &&o) noexcept { if (this != &o) { if (handle) handle.destroy(); handle = std::exchange(o.handle, {}); state = std::exchange(o.state, nullptr); } return *this; } ~task_type() { if (handle) handle.destroy(); handle = {}; state.reset(); } void release() { handle = {}; state.reset(); } handle_type handle; std::shared_ptr state; }; #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wsubobject-linkage" template static task_type make_task(Fn fn, Args... args) { std::invoke(std::move(fn), std::move(args)...); co_return; } #pragma GCC diagnostic pop task_type::handle_type handle; std::shared_ptr state; }; class coro_scheduler { public: using state_type = coro::state_type; // ~coro_scheduler() noexcept { while (run()); } static coro_scheduler &active() { static thread_local coro_scheduler instance; return instance; } void emplace(std::coroutine_handle<> handle) { ready_queue.push(handle); } bool run() { if (ready_queue.empty()) return false; auto handle = ready_queue.front(); ready_queue.pop(); auto prev = running_state; state_type *current_state = nullptr; if (handle) { using handle_type = std::coroutine_handle; auto typed_handle = handle_type::from_address(handle.address()); if (typed_handle && typed_handle.promise().state) { current_state = typed_handle.promise().state.get(); } } running_state = current_state; handle.resume(); if (!handle.done()) ready_queue.push(handle); else handle.destroy(); running_state = prev; return true; } state_type *running() const { return running_state; } auto suspend() noexcept { return std::suspend_always{}; } private: std::queue> ready_queue; static thread_local state_type *running_state; }; thread_local coro_scheduler::state_type *coro_scheduler::running_state = nullptr; //----------------------------------------------------- // coro methods implementations //----------------------------------------------------- template requires std::invocable, std::decay_t...> coro::coro(Fn &&fn, Args &&...args) { auto task = make_task(std::decay_t(std::forward(fn)), std::decay_t(std::forward(args))...); handle = task.handle; state = task.state; task.release(); if (handle) coro_scheduler::active().emplace(handle); } void coro::join() { if (!joinable()) throw std::logic_error("join called on non-joinable coro!"); if (coro_scheduler::active().running() == state.get()) throw std::logic_error("join on current coro would deadlock!"); while (!state->finished.load(std::memory_order_acquire)) coro_scheduler::active().run(); if (state->exception) std::rethrow_exception(state->exception); handle = {}; state.reset(); } //----------------------------------------------------- namespace this_coro { using id = coro::id; inline id get_id() noexcept { if (auto *s = coro_scheduler::active().running(); s) return s->id; else return id(reinterpret_cast(pthread_self())); } inline auto yield() noexcept { return coro_scheduler::active().suspend(); } } // namespace this_coro } // namespace as #endif // coro_h