feat(pipe): use pipe status propagation

This commit is contained in:
2026-03-10 19:22:18 +01:00
parent cc86f0932e
commit 5c08732f6c
2 changed files with 28 additions and 29 deletions

View File

@@ -1,18 +1,19 @@
#pragma once #pragma once
#include <expected>
#include <memory> #include <memory>
#include <optional>
namespace mosh_me { namespace mosh_me {
template <typename A, typename B> enum class PipeStatus {
concept AttachableTo = requires { NO_INPUT_ATTACHED,
typename A::output_type; PIPE_EOF,
typename B::input_type; };
} && std::convertible_to<typename A::output_type, typename B::input_type>;
template <typename Data> using PipeData = std::expected<Data, PipeStatus>;
template <typename Output> class PipeOutput { template <typename Output> class PipeOutput {
public: public:
virtual std::optional<Output> pull() noexcept = 0; virtual PipeData<Output> pull() noexcept = 0;
}; };
template <typename Input> class PipeInput { template <typename Input> class PipeInput {
@@ -20,7 +21,7 @@ private:
std::shared_ptr<PipeOutput<Input>> input_; std::shared_ptr<PipeOutput<Input>> input_;
protected: protected:
std::optional<Input> fetchInput() { return input_->pull(); } PipeData<Input> fetchInput() { return input_->pull(); }
public: public:
void linkInput(std::shared_ptr<PipeOutput<Input>> input) { input_ = input; }; void linkInput(std::shared_ptr<PipeOutput<Input>> input) { input_ = input; };
@@ -35,11 +36,11 @@ public:
PipeSource(std::shared_ptr<PipeOutput<Output>> p) : source_(p) {} PipeSource(std::shared_ptr<PipeOutput<Output>> p) : source_(p) {}
std::optional<Output> pull() { PipeData<Output> pull() {
if (source_) { if (source_) {
return source_->pull(); return source_->pull();
} else { } else {
return std::nullopt; return std::unexpected(PipeStatus::NO_INPUT_ATTACHED);
} }
} }
}; };
@@ -67,7 +68,8 @@ public:
} }
}; };
template <typename PipeType, typename O> class AsPipeSource { template <typename PipeType, typename O>
class AsPipeSource : public PipeOutput<O> {
public: public:
template <typename... Args> static PipeSource<O> link(Args &&...args) { template <typename... Args> static PipeSource<O> link(Args &&...args) {
return PipeSource<O>( return PipeSource<O>(
@@ -75,7 +77,8 @@ public:
} }
}; };
template <typename PipeType, typename I, typename O> class AsPipeLink { template <typename PipeType, typename I, typename O>
class AsPipeLink : public PipeWorker<I, O> {
public: public:
template <typename... Args> static PipeLink<I, O> link(Args &&...args) { template <typename... Args> static PipeLink<I, O> link(Args &&...args) {
return PipeLink<I, O>( return PipeLink<I, O>(

View File

@@ -5,59 +5,55 @@
using namespace mosh_me; using namespace mosh_me;
class CountProducer : public PipeOutput<int>, class CountProducer : public AsPipeSource<CountProducer, int> {
public AsPipeSource<CountProducer, int> {
private: private:
int counter_ = 0; int counter_ = 0;
public: public:
std::optional<int> pull() noexcept override { PipeData<int> pull() noexcept override {
if (auto i = counter_++; i < 10) { if (auto i = counter_++; i < 10) {
return i; return i;
} else { } else {
return std::nullopt; return std::unexpected(PipeStatus::PIPE_EOF);
} }
} }
}; };
class Doubler : public PipeWorker<int, int>, class Doubler : public AsPipeLink<Doubler, int, int> {
public AsPipeLink<Doubler, int, int> {
public: public:
std::optional<int> pull() noexcept override { PipeData<int> pull() noexcept override {
return fetchInput().and_then( return fetchInput().and_then([](int x) -> PipeData<int> { return x * 2; });
[](int x) { return std::make_optional(x * 2); });
} }
}; };
class Multiplier : public PipeWorker<int, int>, class Multiplier : public AsPipeLink<Multiplier, int, int> {
public AsPipeLink<Multiplier, int, int> {
int f_; int f_;
public: public:
Multiplier(int f) : f_(f) {} Multiplier(int f) : f_(f) {}
std::optional<int> pull() noexcept override { PipeData<int> pull() noexcept override {
return fetchInput().and_then( return fetchInput().and_then(
[this](int x) { return std::make_optional(f_ * x); }); [this](int x) -> PipeData<int> { return f_ * x; });
} }
}; };
class Sum2 : public PipeWorker<int, int>, public AsPipeLink<Sum2, int, int> { class Sum2 : public AsPipeLink<Sum2, int, int> {
private: private:
std::optional<int> last_; std::optional<int> last_;
public: public:
std::optional<int> pull() noexcept override { PipeData<int> pull() noexcept override {
if (auto x = fetchInput(); x) { if (auto x = fetchInput(); x) {
if (last_) { if (last_) {
auto sum = *last_ + *x; auto sum = *last_ + *x;
last_ = std::nullopt; last_ = std::nullopt;
return sum; return sum;
} else { } else {
last_ = x; last_ = *x;
return pull(); return pull();
} }
} }
return std::nullopt; return std::unexpected(PipeStatus::PIPE_EOF);
} }
}; };