From 5c08732f6c70161601c4968ccc7f66005bdff657 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20R=C3=B6ger?= Date: Tue, 10 Mar 2026 19:22:18 +0100 Subject: [PATCH] feat(pipe): use pipe status propagation --- include/mosh-me/pipeline.hpp | 27 +++++++++++++++------------ test/pipeline.cpp | 30 +++++++++++++----------------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/include/mosh-me/pipeline.hpp b/include/mosh-me/pipeline.hpp index 02be97f..b43cae2 100644 --- a/include/mosh-me/pipeline.hpp +++ b/include/mosh-me/pipeline.hpp @@ -1,18 +1,19 @@ #pragma once +#include #include -#include namespace mosh_me { -template -concept AttachableTo = requires { - typename A::output_type; - typename B::input_type; -} && std::convertible_to; +enum class PipeStatus { + NO_INPUT_ATTACHED, + PIPE_EOF, +}; + +template using PipeData = std::expected; template class PipeOutput { public: - virtual std::optional pull() noexcept = 0; + virtual PipeData pull() noexcept = 0; }; template class PipeInput { @@ -20,7 +21,7 @@ private: std::shared_ptr> input_; protected: - std::optional fetchInput() { return input_->pull(); } + PipeData fetchInput() { return input_->pull(); } public: void linkInput(std::shared_ptr> input) { input_ = input; }; @@ -35,11 +36,11 @@ public: PipeSource(std::shared_ptr> p) : source_(p) {} - std::optional pull() { + PipeData pull() { if (source_) { return source_->pull(); } else { - return std::nullopt; + return std::unexpected(PipeStatus::NO_INPUT_ATTACHED); } } }; @@ -67,7 +68,8 @@ public: } }; -template class AsPipeSource { +template +class AsPipeSource : public PipeOutput { public: template static PipeSource link(Args &&...args) { return PipeSource( @@ -75,7 +77,8 @@ public: } }; -template class AsPipeLink { +template +class AsPipeLink : public PipeWorker { public: template static PipeLink link(Args &&...args) { return PipeLink( diff --git a/test/pipeline.cpp b/test/pipeline.cpp index 0d2d9cf..c82ca69 100644 --- a/test/pipeline.cpp +++ b/test/pipeline.cpp @@ -5,59 +5,55 @@ using namespace mosh_me; -class CountProducer : public PipeOutput, - public AsPipeSource { +class CountProducer : public AsPipeSource { private: int counter_ = 0; public: - std::optional pull() noexcept override { + PipeData pull() noexcept override { if (auto i = counter_++; i < 10) { return i; } else { - return std::nullopt; + return std::unexpected(PipeStatus::PIPE_EOF); } } }; -class Doubler : public PipeWorker, - public AsPipeLink { +class Doubler : public AsPipeLink { public: - std::optional pull() noexcept override { - return fetchInput().and_then( - [](int x) { return std::make_optional(x * 2); }); + PipeData pull() noexcept override { + return fetchInput().and_then([](int x) -> PipeData { return x * 2; }); } }; -class Multiplier : public PipeWorker, - public AsPipeLink { +class Multiplier : public AsPipeLink { int f_; public: Multiplier(int f) : f_(f) {} - std::optional pull() noexcept override { + PipeData pull() noexcept override { return fetchInput().and_then( - [this](int x) { return std::make_optional(f_ * x); }); + [this](int x) -> PipeData { return f_ * x; }); } }; -class Sum2 : public PipeWorker, public AsPipeLink { +class Sum2 : public AsPipeLink { private: std::optional last_; public: - std::optional pull() noexcept override { + PipeData pull() noexcept override { if (auto x = fetchInput(); x) { if (last_) { auto sum = *last_ + *x; last_ = std::nullopt; return sum; } else { - last_ = x; + last_ = *x; return pull(); } } - return std::nullopt; + return std::unexpected(PipeStatus::PIPE_EOF); } };