每个计算器都是一个图表的一个节点。我们将介绍如何创建新的
计算器、如何初始化计算器、如何进行计算
输入和输出流、时间戳和选项。图中的每个节点都是
以 Calculator
的形式实现。图执行的主要工作是在其
计算。计算器可能会接收零个或多个输入流和/或
发送数据包,并生成零个或多个输出流和/或侧边数据包。
CalculatorBase
您可以通过定义
CalculatorBase
类,实现多个方法,并使用
Mediapipe。新计算器必须至少实现以下四种方法
GetContract()
- 计算器作者可以指定预期的输入和输出类型 一个计算器 - GetContract() 中的一部分。初始化图后, 框架调用静态方法来验证 连接的输入和输出 规范
Open()
- 图表启动后,框架会调用
Open()
。输入端 此时计算器可以收到数据包。Open()
解释节点配置操作(请参阅图表) 并准备计算器每次图的运行状态。此函数 也会向计算器输出写入数据包。Open()
期间出错可能 终止图表运行。
- 图表启动后,框架会调用
Process()
- 对于包含输入的计算器,框架会反复调用
Process()
。框架 默认情况下可保证所有输入都具有相同的时间戳(请参阅 同步。多个 并行执行时可以同时调用Process()
调用 已启用。如果在Process()
期间发生错误,框架会调用Close()
,且图运行终止。
- 对于包含输入的计算器,框架会反复调用
Close()
- 对
Process()
的所有调用都结束或所有输入流都关闭后, 框架调用Close()
。如果出现以下情况,则始终会调用此函数: 已调用Open()
并成功,即使图运行已终止 错误。没有任何输入流可通过任何输入流提供 但它仍然可以访问输入端数据包和Close()
因此可能会写入输出。Close()
返回后,计算器 应被视为死节点。计算器对象会被销毁为 立即运行。
- 对
以下是 CalculatorBase.h。
class CalculatorBase {
public:
...
// The subclasses of CalculatorBase must implement GetContract.
// ...
static absl::Status GetContract(CalculatorContract* cc);
// Open is called before any Process() calls, on a freshly constructed
// calculator. Subclasses may override this method to perform necessary
// setup, and possibly output Packets and/or set output streams' headers.
// ...
virtual absl::Status Open(CalculatorContext* cc) {
return absl::OkStatus();
}
// Processes the incoming inputs. May call the methods on cc to access
// inputs and produce outputs.
// ...
virtual absl::Status Process(CalculatorContext* cc) = 0;
// Is called if Open() was called and succeeded. Is called either
// immediately after processing is complete or after a graph run has ended
// (if an error occurred in the graph). ...
virtual absl::Status Close(CalculatorContext* cc) {
return absl::OkStatus();
}
...
};
计算器的生命周期
在 MediaPipe 图初始化期间,框架会调用
GetContract()
静态方法,用于确定预期的数据包类型。
框架会针对每次图表运行构建并销毁整个计算器 (例如,每个视频展示一次或每张图片展示一次)。仍存在昂贵或大型对象 常量应作为输入端数据包提供,以便 计算在后续运行中不会重复。
初始化后,每次运行图时,都会发生以下顺序:
Open()
Process()
(重复)Close()
框架调用 Open()
来初始化计算器。Open()
应该
解释任何选项并设置计算器每次图的运行状态。Open()
可以获取输入端数据包并将数据包写入计算器输出。如果
它应调用 SetOffset()
以减少可能出现的数据包缓冲
输出流。
在 Open()
或 Process()
期间发生错误(如其中一个所示)
返回非 Ok
状态),图谱运行将终止,没有进一步的调用
计算器的方法,那么计算器就会被销毁。
对于具有输入的计算器,框架至少每时一次调用 Process()
有一个输入有可用的数据包。框架可保证所有输入都
相同的时间戳,但时间戳会随着每次调用 Process()
而增加,
确保所有数据包均已传送完毕因此,某些输入可能没有任何
在调用 Process()
时发送数据包。数据包丢失的输入似乎
生成一个空数据包(没有时间戳)。
框架会在对 Process()
的所有调用后调用 Close()
。所有输入均将
但 Close()
可以访问输入端数据包,并且可能
写入输出。Close 返回后,计算器将被销毁。
不含输入内容的计算器称为“源”。来源计算器
只要 Process()
返回 Ok
状态,就会继续调用它。答
来源计算器可通过返回停止状态来指示设备已用尽
(即 mediaPipe::tool::StatusStop()
)。
识别输入和输出
计算器的公共界面由一组输入流和
输出流。在 CalculatorGraphConfiguration 中,
计算器使用
。流名称通常采用小写形式,而输入和输出标记则采用小写形式
通常是大写。在下面的示例中,标记名称为 VIDEO
的输出为
使用名为“VIDEO_IN
”的输入流连接到
video_stream
。
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "INPUT:combined_input"
output_stream: "VIDEO:video_stream"
}
node {
calculator: "SomeVideoCalculator"
input_stream: "VIDEO_IN:video_stream"
output_stream: "VIDEO_OUT:processed_video"
}
输入流和输出流可通过索引编号、标记名称或
标记名称和索引编号的组合。您可以看到一些输入和
输出标识符。SomeAudioVideoCalculator
标识
视频输出(按标签显示),而音频输出按标签和
索引中。带有 VIDEO
标记的输入已连接到名为
video_stream
。标记为 AUDIO
且索引为 0
和 1
的输出为
连接到名为 audio_left
和 audio_right
的数据流。
SomeAudioCalculator
仅通过索引标识其音频输入(无需标记)。
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "combined_input"
output_stream: "VIDEO:video_stream"
output_stream: "AUDIO:0:audio_left"
output_stream: "AUDIO:1:audio_right"
}
node {
calculator: "SomeAudioCalculator"
input_stream: "audio_left"
input_stream: "audio_right"
output_stream: "audio_energy"
}
在计算器实现中,输入和输出也由代码标识 名称和索引编号。在以下函数中,输入和输出已标识:
- 按索引编号:组合的输入流仅通过索引标识
0
。 - 按标签名称:视频输出流由标签名称“VIDEO”标识。
- 按标记名称和索引编号:输出音频流由
标记名称
AUDIO
与索引编号0
和1
的组合。
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
// SetAny() is used to specify that whatever the type of the
// stream is, it's acceptable. This does not mean that any
// packet is acceptable. Packets in the stream still have a
// particular type. SetAny() has the same effect as explicitly
// setting the type to be the stream's type.
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
return absl::OkStatus();
}
正在处理
在非源节点上调用的 Process()
必须返回 absl::OkStatus()
至
表示一切正常,或其他任何表示出错的状态代码
如果非源计算器返回 tool::StatusStop()
,则表示
图表提前取消。在这种情况下,所有来源计算器和图表
输入流将被关闭(剩余的数据包将通过
图表)。
只要存在图中的源节点,系统会继续对其调用 Process()
因为它会返回 absl::OkStatus(
)。表明已没有更多数据可供使用
生成的返回值 tool::StatusStop()
。任何其他状态都表示出现错误,
错误。
Close()
会返回 absl::OkStatus()
以指示成功。任何其他状态
表示失败。
以下是基本的 Process()
函数。它使用 Input()
方法(
仅在计算器只有一个输入时使用)请求其输入数据。它
然后使用 std::unique_ptr
分配输出数据包所需的内存。
并进行计算完成后,在将指针添加到
输出流。
absl::Status MyCalculator::Process() {
const Matrix& input = Input()->Get<Matrix>();
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
// do your magic here....
// output->row(n) = ...
Output()->Add(output.release(), InputTimestamp());
return absl::OkStatus();
}
计算器选项
计算器通过 (1) 输入流数据包 (2) 接受处理参数
输入端数据包,以及 (3) 计算器选项。计算器选项
会以字面量值的形式出现在node_options
CalculatorGraphConfiguration.Node
条消息。
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
node_options
字段接受 proto3 语法。或者使用计算器
可以使用 proto2 语法在 options
字段中指定选项。
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
并非所有计算器都支持计算器选项。为了接受选项,
计算器通常会定义一个新的 protobuf 消息类型来表示其
选项,例如 PacketClonerCalculatorOptions
。计算器就会
在其 CalculatorBase::Open
方法中读取该 protobuf 消息,
也可以在其 CalculatorBase::GetContract
函数或其
CalculatorBase::Process
方法。通常,新的 protobuf 消息类型
使用“.proto”定义为 protobuf 架构文件和
mediapipe_proto_library()
构建规则。
mediapipe_proto_library(
name = "packet_cloner_calculator_proto",
srcs = ["packet_cloner_calculator.proto"],
visibility = ["//visibility:public"],
deps = [
"//mediapipe/framework:calculator_options_proto",
"//mediapipe/framework:calculator_proto",
],
)
计算器示例
本部分将讨论 PacketClonerCalculator
的实现,
完成的比较简单,在许多计算器图表中都使用了。
PacketClonerCalculator
仅生成其最近输入数据包的副本
。
当到达的数据包的时间戳时,PacketClonerCalculator
非常有用
没有完全对齐假设我们的房间里有麦克风,
传感器和正在收集感官数据的摄像机。每个传感器
独立运行,且间歇性收集数据。假设输出
为:
- 麦克风 = 房间内声音的音量(以分贝为单位)(整数)
- 光传感器 = 房间亮度(整数)
- 摄像头 = 房间的 RGB 图片帧 (ImageFrame)
我们简单的感知管道旨在处理以下 3 种类型的感官数据: 这样,当我们获得来自相机的图像帧数据时, 会与上次收集的麦克风音量数据和光线同步 传感器亮度数据。为了使用 MediaPipe 来做到这一点,我们的感知管道需要 输入流:
- Room_mic_signal - 此输入流中的每个数据包都是整数数据 用时间戳表示房间内音频的音量大小。
- Room_lightening_sensor - 此输入流中的每个数据包都是整数 表示房间照明强度的数据和时间戳。
- Room_video_tick_signal - 此输入流中的每个数据包 视频数据的 imageFrame,表示从 带有时间戳的聊天室。
下面是 PacketClonerCalculator
的实现。您可以看到
GetContract()
、Open()
和 Process()
方法以及实例
变量 current_
,用于存储最近的输入数据包。
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
// cc->Inputs().NumEntries() returns the number of input streams
// for the PacketClonerCalculator
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
// cc->Inputs().Index(i) returns the input stream pointer by index
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
// Sets the output stream of index i header to be the same as
// the header for the input stream of index i
}
}
return absl::OkStatus();
}
absl::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output if the tick signal is non-empty.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
// Add a packet to output stream of index i a packet from inputstream i
// with timestamp common to all present inputs
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
// if current_[i], 1 packet buffer for input stream i is empty, we will set
// next allowed timestamp for input stream i to be current timestamp + 1
}
}
}
return absl::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
通常,计算器只有一个 .cc 文件。不需要 .h,因为 mediapipe 利用注册机制来让计算器为之知晓。完成 定义您的计算器类,并通过宏调用注册该类 REGISTER_CALCULATOR(calculator_class_name).
下面是一个简单的 MediaPipe 图,其中包含 3 个输入流和 1 个节点 (PacketClonerCalculator) 和 2 个输出流。
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
node {
calculator: "PacketClonerCalculator"
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
output_stream: "cloned_room_mic_signal"
output_stream: "cloned_lighting_sensor"
}
下图显示了 PacketClonerCalculator
如何定义其输出
根据其系列的输入数据包(顶部)显示数据包(底部)。
每次在其 TICK 输入流中收到数据包时,PacketClonerCalculator 都会输出每个输入流中的最新数据包。输出数据包的顺序(底部)取决于输入数据包序列(顶部)及其时间戳。时间戳显示在图表的右侧。 |