feat: add avpipes

This commit is contained in:
2026-03-14 02:56:51 +01:00
parent 5c08732f6c
commit 9e90e40fdd
5 changed files with 353 additions and 29 deletions

View File

@@ -13,6 +13,7 @@ find_package(avcpp REQUIRED)
# ##############################################################################
add_library(mosh-me-lib STATIC
${PROJECT_SOURCE_DIR}/src/avpipe.cpp
${PROJECT_SOURCE_DIR}/src/pipeline.cpp
)
target_link_libraries(mosh-me-lib PUBLIC avcpp::avcpp spdlog::spdlog)
@@ -33,8 +34,13 @@ target_link_libraries(mk-moshable avcpp::avcpp spdlog::spdlog)
target_compile_features(mk-moshable PUBLIC cxx_std_23)
target_include_directories(mk-moshable PRIVATE ${PROJECT_SOURCE_DIR}/include)
add_executable(mosh-pipe
${PROJECT_SOURCE_DIR}/app/mosh-pipe.cpp
)
target_link_libraries(mosh-pipe mosh-me-lib)
install(TARGETS mosh-me mk-moshable)
install(TARGETS mosh-me mk-moshable mosh-pipe)
################################################################################

32
app/mosh-pipe.cpp Normal file
View File

@@ -0,0 +1,32 @@
#include "mosh-me/avpipe.hpp"
#include <avcpp/av.h>
#include <avcpp/codeccontext.h>
#include <avcpp/formatcontext.h>
#include <avcpp/stream.h>
#include <spdlog/spdlog.h>
int main(int argc, char *argv[]) {
if (argc < 3) {
spdlog::error("Please invoke with {} <ifile> <ofile>", argv[0]);
}
av::init();
try {
auto pipe = mosh_me::VideoMuxer::link(std::filesystem::path(argv[2]))
<< mosh_me::DropIFrames::link()
<< mosh_me::MoshableVideoEncoder::link()
<< mosh_me::VideoDecoder::link()
<< mosh_me::VideoPacketSource::link(argv[1]);
pipe.propagateMeta();
if (auto result = pipe.pull(); !result.has_value()) {
spdlog::error("Pipe failed");
}
} catch (const av::Exception &e) {
spdlog::error("{}", e.what());
}
}

View File

@@ -0,0 +1,85 @@
#pragma once
#include "mosh-me/pipeline.hpp"
#include <avcpp/codeccontext.h>
#include <avcpp/formatcontext.h>
#include <avcpp/frame.h>
#include <avcpp/packet.h>
#include <filesystem>
namespace mosh_me {
class VideoPacketSource
: public AsPipeSource<VideoPacketSource, av::Packet,
std::reference_wrapper<av::FormatContext>> {
av::FormatContext ctx_;
public:
VideoPacketSource(const std::filesystem::path &path);
mosh_me::PipeData<av::Packet> pull() noexcept override;
std::reference_wrapper<av::FormatContext> propagateMeta() noexcept override;
};
struct FrameMeta {
av::PixelFormat pix_fmt;
av::Rational frame_rate;
int width = 0;
int height = 0;
};
class VideoDecoder
: public AsPipeLink<VideoDecoder, av::Packet, av::VideoFrame,
std::reference_wrapper<av::FormatContext>, FrameMeta> {
private:
av::VideoDecoderContext ctx_;
av::Stream stream_;
public:
mosh_me::PipeData<av::VideoFrame> pull() noexcept override;
FrameMeta propagateMeta() noexcept override;
};
class MoshableVideoEncoder
: public AsPipeLink<
MoshableVideoEncoder, av::VideoFrame, av::Packet, FrameMeta,
std::pair<std::reference_wrapper<av::VideoEncoderContext>,
FrameMeta>> {
private:
av::VideoEncoderContext ctx_;
public:
MoshableVideoEncoder();
mosh_me::PipeData<av::Packet> pull() noexcept override;
std::pair<std::reference_wrapper<av::VideoEncoderContext>, FrameMeta>
propagateMeta() noexcept override;
};
class DropIFrames
: public AsPipeLink<
DropIFrames, av::Packet, av::Packet,
std::pair<std::reference_wrapper<av::VideoEncoderContext>, FrameMeta>,
std::pair<std::reference_wrapper<av::VideoEncoderContext>,
FrameMeta>> {
public:
mosh_me::PipeData<av::Packet> pull() noexcept override;
std::pair<std::reference_wrapper<av::VideoEncoderContext>, FrameMeta>
propagateMeta() noexcept override;
};
class VideoMuxer
: public AsPipeLink<
VideoMuxer, av::Packet, void,
std::pair<std::reference_wrapper<av::VideoEncoderContext>, FrameMeta>,
void> {
private:
av::FormatContext ctx_;
public:
VideoMuxer(const std::filesystem::path &path);
mosh_me::PipeData<void> pull() noexcept override;
void propagateMeta() noexcept override;
};
} // namespace mosh_me

View File

@@ -7,34 +7,46 @@ namespace mosh_me {
enum class PipeStatus {
NO_INPUT_ATTACHED,
PIPE_EOF,
PIPE_ERROR,
};
template <typename Data> using PipeData = std::expected<Data, PipeStatus>;
template <typename Output> class PipeOutput {
template <typename Output, typename OMeta> class PipeOutput {
public:
virtual OMeta propagateMeta() noexcept = 0;
virtual PipeData<Output> pull() noexcept = 0;
};
template <typename Input> class PipeInput {
template <typename Output> class PipeOutput<Output, void> {
public:
virtual void propagateMeta() noexcept {}
virtual PipeData<Output> pull() noexcept = 0;
};
template <typename Input, typename IMeta> class PipeInput {
private:
std::shared_ptr<PipeOutput<Input>> input_;
std::shared_ptr<PipeOutput<Input, IMeta>> input_;
protected:
PipeData<Input> fetchInput() { return input_->pull(); }
IMeta fetchMeta() { return input_->propagateMeta(); }
public:
void linkInput(std::shared_ptr<PipeOutput<Input>> input) { input_ = input; };
void linkInput(std::shared_ptr<PipeOutput<Input, IMeta>> input) {
input_ = input;
};
};
template <typename Input, typename Output>
class PipeWorker : public PipeOutput<Output>, public PipeInput<Input> {};
template <typename Input, typename Output, typename IMeta, typename OMeta>
class PipeWorker : public PipeOutput<Output, OMeta>,
public PipeInput<Input, IMeta> {};
template <typename Output> class PipeSource {
template <typename Output, typename OMeta> class PipeSource {
public:
std::shared_ptr<PipeOutput<Output>> source_;
std::shared_ptr<PipeOutput<Output, OMeta>> source_;
PipeSource(std::shared_ptr<PipeOutput<Output>> p) : source_(p) {}
PipeSource(std::shared_ptr<PipeOutput<Output, OMeta>> p) : source_(p) {}
PipeData<Output> pull() {
if (source_) {
@@ -43,45 +55,52 @@ public:
return std::unexpected(PipeStatus::NO_INPUT_ATTACHED);
}
}
void propagateMeta() { source_->propagateMeta(); }
};
template <typename Input, typename Output> class PipeLink {
template <typename Input, typename Output, typename IMeta, typename OMeta>
class PipeLink {
public:
std::shared_ptr<PipeInput<Input>> input_;
std::shared_ptr<PipeOutput<Output>> output_;
std::shared_ptr<PipeInput<Input, IMeta>> input_;
std::shared_ptr<PipeOutput<Output, OMeta>> output_;
PipeLink(std::shared_ptr<PipeInput<Input>> i,
std::shared_ptr<PipeOutput<Output>> o)
PipeLink(std::shared_ptr<PipeInput<Input, IMeta>> i,
std::shared_ptr<PipeOutput<Output, OMeta>> o)
: input_(i), output_(o) {}
PipeLink(std::shared_ptr<PipeWorker<Input, Output>> p)
PipeLink(std::shared_ptr<PipeWorker<Input, Output, IMeta, OMeta>> p)
: input_(p), output_(p) {}
template <typename I, typename O>
PipeLink<I, Output> operator<<(PipeLink<I, O> &&other) {
template <typename I, typename O, typename IM, typename OM>
PipeLink<I, Output, IM, OMeta> operator<<(PipeLink<I, O, IM, OM> &&other) {
input_->linkInput(std::move(other.output_));
return PipeLink<I, Output>(std::move(other.input_), std::move(output_));
return PipeLink<I, Output, IM, OMeta>(std::move(other.input_),
std::move(output_));
}
template <typename O> PipeSource<Output> operator<<(PipeSource<O> &&other) {
template <typename O, typename OM>
PipeSource<Output, OMeta> operator<<(PipeSource<O, OM> &&other) {
input_->linkInput(std::move(other.source_));
return PipeSource<Output>(std::move(output_));
return PipeSource<Output, OMeta>(std::move(output_));
}
};
template <typename PipeType, typename O>
class AsPipeSource : public PipeOutput<O> {
template <typename PipeType, typename O, typename OMeta = void>
class AsPipeSource : public PipeOutput<O, OMeta> {
public:
template <typename... Args> static PipeSource<O> link(Args &&...args) {
return PipeSource<O>(
template <typename... Args> static PipeSource<O, OMeta> link(Args &&...args) {
return PipeSource<O, OMeta>(
std::make_shared<PipeType>(std::forward<Args>(args)...));
}
};
template <typename PipeType, typename I, typename O>
class AsPipeLink : public PipeWorker<I, O> {
template <typename PipeType, typename I, typename O, typename IMeta = void,
typename OMeta = void>
class AsPipeLink : public PipeWorker<I, O, IMeta, OMeta> {
public:
template <typename... Args> static PipeLink<I, O> link(Args &&...args) {
return PipeLink<I, O>(
template <typename... Args>
static PipeLink<I, O, IMeta, OMeta> link(Args &&...args) {
return PipeLink<I, O, IMeta, OMeta>(
std::make_shared<PipeType>(std::forward<Args>(args)...));
}
};

182
src/avpipe.cpp Normal file
View File

@@ -0,0 +1,182 @@
#include "mosh-me/avpipe.hpp"
mosh_me::VideoPacketSource::VideoPacketSource(
const std::filesystem::path &path) {
ctx_.openInput(path);
}
mosh_me::PipeData<av::Packet> mosh_me::VideoPacketSource::pull() noexcept {
std::error_code ec;
if (auto pkt = ctx_.readPacket(ec); !pkt.isNull()) {
if (ec) {
return std::unexpected(PipeStatus::PIPE_ERROR);
}
return pkt;
}
return std::unexpected(PipeStatus::PIPE_EOF);
}
std::reference_wrapper<av::FormatContext>
mosh_me::VideoPacketSource::propagateMeta() noexcept {
return ctx_;
}
mosh_me::PipeData<av::VideoFrame> mosh_me::VideoDecoder::pull() noexcept {
auto idx = ctx_.stream().index();
for (;;) {
auto input = fetchInput();
if (!input) {
return std::unexpected(input.error());
}
// Filter stream
auto pkt = input.value();
if (pkt.streamIndex() != idx) {
continue;
}
// Decode
std::error_code ec;
auto frame = ctx_.decode(pkt, ec);
if (ec) {
return std::unexpected(PipeStatus::PIPE_ERROR);
}
// Frame completed
if (frame) {
return frame;
}
}
}
mosh_me::FrameMeta mosh_me::VideoDecoder::propagateMeta() noexcept {
auto &ictx = fetchMeta().get();
ictx.findStreamInfo();
for (int i = 0; i < ictx.streamsCount(); i++) {
if (ictx.stream(i).isValid() && ictx.stream(i).isVideo()) {
ctx_ = av::VideoDecoderContext(ictx.stream(i));
}
}
ctx_.open();
return FrameMeta{
.pix_fmt = ctx_.pixelFormat(),
.frame_rate = ctx_.stream().frameRate(),
.width = ctx_.width(),
.height = ctx_.height(),
};
}
mosh_me::MoshableVideoEncoder::MoshableVideoEncoder()
: ctx_(av::findEncodingCodec("libxvid")) {
ctx_.setGopSize(1000);
ctx_.setMaxBFrames(0);
ctx_.setGlobalQuality(1);
ctx_.raw()->qmin = 1;
ctx_.raw()->qmax = 1;
ctx_.setFlags(AV_CODEC_FLAG_QPEL | AV_CODEC_FLAG_4MV);
ctx_.setTimeBase({1, 1000});
}
mosh_me::PipeData<av::Packet> mosh_me::MoshableVideoEncoder::pull() noexcept {
for (;;) {
auto input = fetchInput();
if (!input) {
return std::unexpected(input.error());
}
auto frame = *input;
frame.setTimeBase(ctx_.timeBase());
frame.setStreamIndex(0);
frame.setPictureType();
std::error_code ec;
auto pkt = ctx_.encode(frame, ec);
if (ec) {
return std::unexpected(PipeStatus::PIPE_ERROR);
}
// packet completed
if (pkt) {
pkt.setStreamIndex(0);
return pkt;
}
}
}
std::pair<std::reference_wrapper<av::VideoEncoderContext>, mosh_me::FrameMeta>
mosh_me::MoshableVideoEncoder::propagateMeta() noexcept {
auto frame_meta = fetchMeta();
ctx_.setPixelFormat(frame_meta.pix_fmt);
ctx_.setWidth(frame_meta.width);
ctx_.setHeight(frame_meta.height);
ctx_.open();
return {ctx_, frame_meta};
}
mosh_me::PipeData<av::Packet> mosh_me::DropIFrames::pull() noexcept {
for (;;) {
auto input = fetchInput();
if (!input) {
return std::unexpected(input.error());
}
if (input->isKeyPacket()) {
continue;
}
return input;
}
}
std::pair<std::reference_wrapper<av::VideoEncoderContext>, mosh_me::FrameMeta>
mosh_me::DropIFrames::propagateMeta() noexcept {
return fetchMeta();
}
mosh_me::VideoMuxer::VideoMuxer(const std::filesystem::path &path) {
av::OutputFormat ofmt;
ofmt.setFormat(path);
ctx_.setFormat(ofmt);
ctx_.openOutput(path);
}
mosh_me::PipeData<void> mosh_me::VideoMuxer::pull() noexcept {
std::error_code ec;
ctx_.writeHeader(ec);
if (ec) {
return std::unexpected(PipeStatus::PIPE_ERROR);
}
for (;;) {
auto input = fetchInput();
if (!input && input.error() == PipeStatus::PIPE_EOF) {
break;
} else if (!input) {
return std::unexpected(input.error());
}
ctx_.writePacket(*input, ec);
if (ec) {
return std::unexpected(PipeStatus::PIPE_ERROR);
}
}
ctx_.writeTrailer(ec);
if (ec) {
return std::unexpected(PipeStatus::PIPE_ERROR);
}
ctx_.flush();
return {};
}
void mosh_me::VideoMuxer::propagateMeta() noexcept {
auto [encoder, frame_meta] = fetchMeta();
auto ostream = ctx_.addStream(encoder.get());
ostream.setFrameRate(frame_meta.frame_rate);
}