feat(pipeline): add pipeline header

This commit is contained in:
2026-03-10 19:02:04 +01:00
parent 07a6f1b3d8
commit 8b23d09ac5
6 changed files with 184 additions and 1 deletions

75
test/pipeline.cpp Normal file
View File

@@ -0,0 +1,75 @@
#include "mosh-me/pipeline.hpp"
#include <catch2/catch_all.hpp>
#include <spdlog/fmt/fmt.h>
#include <spdlog/fmt/ranges.h>
using namespace mosh_me;
class CountProducer : public PipeOutput<int>,
public AsPipeSource<CountProducer, int> {
private:
int counter_ = 0;
public:
std::optional<int> pull() noexcept override {
if (auto i = counter_++; i < 10) {
return i;
} else {
return std::nullopt;
}
}
};
class Doubler : public PipeWorker<int, int>,
public AsPipeLink<Doubler, int, int> {
public:
std::optional<int> pull() noexcept override {
return fetchInput().and_then(
[](int x) { return std::make_optional(x * 2); });
}
};
class Multiplier : public PipeWorker<int, int>,
public AsPipeLink<Multiplier, int, int> {
int f_;
public:
Multiplier(int f) : f_(f) {}
std::optional<int> pull() noexcept override {
return fetchInput().and_then(
[this](int x) { return std::make_optional(f_ * x); });
}
};
class Sum2 : public PipeWorker<int, int>, public AsPipeLink<Sum2, int, int> {
private:
std::optional<int> last_;
public:
std::optional<int> pull() noexcept override {
if (auto x = fetchInput(); x) {
if (last_) {
auto sum = *last_ + *x;
last_ = std::nullopt;
return sum;
} else {
last_ = x;
return pull();
}
}
return std::nullopt;
}
};
TEST_CASE("Pipeline") {
auto pipe = Sum2::link() << Doubler::link() << Multiplier::link(10)
<< CountProducer::link();
auto count = 0;
while (auto s = pipe.pull()) {
REQUIRE(*s == ((count * 2) * 20) + ((count * 2 + 1) * 20));
count++;
}
REQUIRE(count == 5);
}