From 9e90e40fdd85ec1de52c1577d388c73c3d177275 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20R=C3=B6ger?= Date: Sat, 14 Mar 2026 02:56:51 +0100 Subject: [PATCH] feat: add avpipes --- CMakeLists.txt | 8 +- app/mosh-pipe.cpp | 32 ++++++ include/mosh-me/avpipe.hpp | 85 ++++++++++++++++ include/mosh-me/pipeline.hpp | 75 +++++++++------ src/avpipe.cpp | 182 +++++++++++++++++++++++++++++++++++ 5 files changed, 353 insertions(+), 29 deletions(-) create mode 100644 app/mosh-pipe.cpp create mode 100644 include/mosh-me/avpipe.hpp create mode 100644 src/avpipe.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e95249..455f34b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,7 @@ find_package(avcpp REQUIRED) # ############################################################################## add_library(mosh-me-lib STATIC + ${PROJECT_SOURCE_DIR}/src/avpipe.cpp ${PROJECT_SOURCE_DIR}/src/pipeline.cpp ) target_link_libraries(mosh-me-lib PUBLIC avcpp::avcpp spdlog::spdlog) @@ -33,8 +34,13 @@ target_link_libraries(mk-moshable avcpp::avcpp spdlog::spdlog) target_compile_features(mk-moshable PUBLIC cxx_std_23) target_include_directories(mk-moshable PRIVATE ${PROJECT_SOURCE_DIR}/include) +add_executable(mosh-pipe + ${PROJECT_SOURCE_DIR}/app/mosh-pipe.cpp +) +target_link_libraries(mosh-pipe mosh-me-lib) -install(TARGETS mosh-me mk-moshable) + +install(TARGETS mosh-me mk-moshable mosh-pipe) ################################################################################ diff --git a/app/mosh-pipe.cpp b/app/mosh-pipe.cpp new file mode 100644 index 0000000..67f9d43 --- /dev/null +++ b/app/mosh-pipe.cpp @@ -0,0 +1,32 @@ + +#include "mosh-me/avpipe.hpp" +#include +#include +#include +#include +#include + +int main(int argc, char *argv[]) { + if (argc < 3) { + spdlog::error("Please invoke with {} ", argv[0]); + } + + av::init(); + + try { + auto pipe = mosh_me::VideoMuxer::link(std::filesystem::path(argv[2])) + << mosh_me::DropIFrames::link() + << mosh_me::MoshableVideoEncoder::link() + << mosh_me::VideoDecoder::link() + << mosh_me::VideoPacketSource::link(argv[1]); + + pipe.propagateMeta(); + + if (auto result = pipe.pull(); !result.has_value()) { + spdlog::error("Pipe failed"); + } + + } catch (const av::Exception &e) { + spdlog::error("{}", e.what()); + } +} diff --git a/include/mosh-me/avpipe.hpp b/include/mosh-me/avpipe.hpp new file mode 100644 index 0000000..3211159 --- /dev/null +++ b/include/mosh-me/avpipe.hpp @@ -0,0 +1,85 @@ +#pragma once +#include "mosh-me/pipeline.hpp" +#include +#include +#include +#include +#include + +namespace mosh_me { + +class VideoPacketSource + : public AsPipeSource> { + av::FormatContext ctx_; + +public: + VideoPacketSource(const std::filesystem::path &path); + + mosh_me::PipeData pull() noexcept override; + std::reference_wrapper propagateMeta() noexcept override; +}; + +struct FrameMeta { + av::PixelFormat pix_fmt; + av::Rational frame_rate; + int width = 0; + int height = 0; +}; + +class VideoDecoder + : public AsPipeLink, FrameMeta> { +private: + av::VideoDecoderContext ctx_; + av::Stream stream_; + +public: + mosh_me::PipeData pull() noexcept override; + FrameMeta propagateMeta() noexcept override; +}; + +class MoshableVideoEncoder + : public AsPipeLink< + MoshableVideoEncoder, av::VideoFrame, av::Packet, FrameMeta, + std::pair, + FrameMeta>> { +private: + av::VideoEncoderContext ctx_; + +public: + MoshableVideoEncoder(); + mosh_me::PipeData pull() noexcept override; + std::pair, FrameMeta> + propagateMeta() noexcept override; +}; + +class DropIFrames + : public AsPipeLink< + DropIFrames, av::Packet, av::Packet, + std::pair, FrameMeta>, + std::pair, + FrameMeta>> { +public: + mosh_me::PipeData pull() noexcept override; + std::pair, FrameMeta> + propagateMeta() noexcept override; +}; + +class VideoMuxer + : public AsPipeLink< + VideoMuxer, av::Packet, void, + std::pair, FrameMeta>, + void> { +private: + av::FormatContext ctx_; + +public: + VideoMuxer(const std::filesystem::path &path); + + mosh_me::PipeData pull() noexcept override; + + void propagateMeta() noexcept override; +}; + +} // namespace mosh_me diff --git a/include/mosh-me/pipeline.hpp b/include/mosh-me/pipeline.hpp index b43cae2..2943e7c 100644 --- a/include/mosh-me/pipeline.hpp +++ b/include/mosh-me/pipeline.hpp @@ -7,34 +7,46 @@ namespace mosh_me { enum class PipeStatus { NO_INPUT_ATTACHED, PIPE_EOF, + PIPE_ERROR, }; template using PipeData = std::expected; -template class PipeOutput { +template class PipeOutput { public: + virtual OMeta propagateMeta() noexcept = 0; virtual PipeData pull() noexcept = 0; }; -template class PipeInput { +template class PipeOutput { +public: + virtual void propagateMeta() noexcept {} + virtual PipeData pull() noexcept = 0; +}; + +template class PipeInput { private: - std::shared_ptr> input_; + std::shared_ptr> input_; protected: PipeData fetchInput() { return input_->pull(); } + IMeta fetchMeta() { return input_->propagateMeta(); } public: - void linkInput(std::shared_ptr> input) { input_ = input; }; + void linkInput(std::shared_ptr> input) { + input_ = input; + }; }; -template -class PipeWorker : public PipeOutput, public PipeInput {}; +template +class PipeWorker : public PipeOutput, + public PipeInput {}; -template class PipeSource { +template class PipeSource { public: - std::shared_ptr> source_; + std::shared_ptr> source_; - PipeSource(std::shared_ptr> p) : source_(p) {} + PipeSource(std::shared_ptr> p) : source_(p) {} PipeData pull() { if (source_) { @@ -43,45 +55,52 @@ public: return std::unexpected(PipeStatus::NO_INPUT_ATTACHED); } } + + void propagateMeta() { source_->propagateMeta(); } }; -template class PipeLink { +template +class PipeLink { public: - std::shared_ptr> input_; - std::shared_ptr> output_; + std::shared_ptr> input_; + std::shared_ptr> output_; - PipeLink(std::shared_ptr> i, - std::shared_ptr> o) + PipeLink(std::shared_ptr> i, + std::shared_ptr> o) : input_(i), output_(o) {} - PipeLink(std::shared_ptr> p) + PipeLink(std::shared_ptr> p) : input_(p), output_(p) {} - template - PipeLink operator<<(PipeLink &&other) { + template + PipeLink operator<<(PipeLink &&other) { input_->linkInput(std::move(other.output_)); - return PipeLink(std::move(other.input_), std::move(output_)); + return PipeLink(std::move(other.input_), + std::move(output_)); } - template PipeSource operator<<(PipeSource &&other) { + template + PipeSource operator<<(PipeSource &&other) { input_->linkInput(std::move(other.source_)); - return PipeSource(std::move(output_)); + return PipeSource(std::move(output_)); } }; -template -class AsPipeSource : public PipeOutput { +template +class AsPipeSource : public PipeOutput { public: - template static PipeSource link(Args &&...args) { - return PipeSource( + template static PipeSource link(Args &&...args) { + return PipeSource( std::make_shared(std::forward(args)...)); } }; -template -class AsPipeLink : public PipeWorker { +template +class AsPipeLink : public PipeWorker { public: - template static PipeLink link(Args &&...args) { - return PipeLink( + template + static PipeLink link(Args &&...args) { + return PipeLink( std::make_shared(std::forward(args)...)); } }; diff --git a/src/avpipe.cpp b/src/avpipe.cpp new file mode 100644 index 0000000..be5df93 --- /dev/null +++ b/src/avpipe.cpp @@ -0,0 +1,182 @@ +#include "mosh-me/avpipe.hpp" + +mosh_me::VideoPacketSource::VideoPacketSource( + const std::filesystem::path &path) { + ctx_.openInput(path); +} + +mosh_me::PipeData mosh_me::VideoPacketSource::pull() noexcept { + std::error_code ec; + + if (auto pkt = ctx_.readPacket(ec); !pkt.isNull()) { + if (ec) { + return std::unexpected(PipeStatus::PIPE_ERROR); + } + return pkt; + } + + return std::unexpected(PipeStatus::PIPE_EOF); +} + +std::reference_wrapper +mosh_me::VideoPacketSource::propagateMeta() noexcept { + return ctx_; +} + +mosh_me::PipeData mosh_me::VideoDecoder::pull() noexcept { + auto idx = ctx_.stream().index(); + for (;;) { + auto input = fetchInput(); + if (!input) { + return std::unexpected(input.error()); + } + + // Filter stream + auto pkt = input.value(); + if (pkt.streamIndex() != idx) { + continue; + } + + // Decode + std::error_code ec; + auto frame = ctx_.decode(pkt, ec); + if (ec) { + return std::unexpected(PipeStatus::PIPE_ERROR); + } + + // Frame completed + if (frame) { + return frame; + } + } +} + +mosh_me::FrameMeta mosh_me::VideoDecoder::propagateMeta() noexcept { + auto &ictx = fetchMeta().get(); + ictx.findStreamInfo(); + for (int i = 0; i < ictx.streamsCount(); i++) { + if (ictx.stream(i).isValid() && ictx.stream(i).isVideo()) { + ctx_ = av::VideoDecoderContext(ictx.stream(i)); + } + } + ctx_.open(); + return FrameMeta{ + .pix_fmt = ctx_.pixelFormat(), + .frame_rate = ctx_.stream().frameRate(), + .width = ctx_.width(), + .height = ctx_.height(), + }; +} + +mosh_me::MoshableVideoEncoder::MoshableVideoEncoder() + : ctx_(av::findEncodingCodec("libxvid")) { + ctx_.setGopSize(1000); + ctx_.setMaxBFrames(0); + ctx_.setGlobalQuality(1); + ctx_.raw()->qmin = 1; + ctx_.raw()->qmax = 1; + ctx_.setFlags(AV_CODEC_FLAG_QPEL | AV_CODEC_FLAG_4MV); + ctx_.setTimeBase({1, 1000}); +} + +mosh_me::PipeData mosh_me::MoshableVideoEncoder::pull() noexcept { + for (;;) { + auto input = fetchInput(); + if (!input) { + return std::unexpected(input.error()); + } + + auto frame = *input; + + frame.setTimeBase(ctx_.timeBase()); + frame.setStreamIndex(0); + frame.setPictureType(); + + std::error_code ec; + auto pkt = ctx_.encode(frame, ec); + + if (ec) { + return std::unexpected(PipeStatus::PIPE_ERROR); + } + + // packet completed + if (pkt) { + pkt.setStreamIndex(0); + return pkt; + } + } +} + +std::pair, mosh_me::FrameMeta> +mosh_me::MoshableVideoEncoder::propagateMeta() noexcept { + auto frame_meta = fetchMeta(); + ctx_.setPixelFormat(frame_meta.pix_fmt); + ctx_.setWidth(frame_meta.width); + ctx_.setHeight(frame_meta.height); + ctx_.open(); + return {ctx_, frame_meta}; +} + +mosh_me::PipeData mosh_me::DropIFrames::pull() noexcept { + for (;;) { + auto input = fetchInput(); + if (!input) { + return std::unexpected(input.error()); + } + + if (input->isKeyPacket()) { + continue; + } + + return input; + } +} + +std::pair, mosh_me::FrameMeta> +mosh_me::DropIFrames::propagateMeta() noexcept { + return fetchMeta(); +} + +mosh_me::VideoMuxer::VideoMuxer(const std::filesystem::path &path) { + av::OutputFormat ofmt; + ofmt.setFormat(path); + ctx_.setFormat(ofmt); + ctx_.openOutput(path); +} + +mosh_me::PipeData mosh_me::VideoMuxer::pull() noexcept { + std::error_code ec; + + ctx_.writeHeader(ec); + if (ec) { + return std::unexpected(PipeStatus::PIPE_ERROR); + } + + for (;;) { + auto input = fetchInput(); + if (!input && input.error() == PipeStatus::PIPE_EOF) { + break; + } else if (!input) { + return std::unexpected(input.error()); + } + + ctx_.writePacket(*input, ec); + if (ec) { + return std::unexpected(PipeStatus::PIPE_ERROR); + } + } + + ctx_.writeTrailer(ec); + if (ec) { + return std::unexpected(PipeStatus::PIPE_ERROR); + } + ctx_.flush(); + + return {}; +} + +void mosh_me::VideoMuxer::propagateMeta() noexcept { + auto [encoder, frame_meta] = fetchMeta(); + auto ostream = ctx_.addStream(encoder.get()); + ostream.setFrameRate(frame_meta.frame_rate); +}