计算器

每个计算器都是一个图表的一个节点。我们将介绍如何创建新的 计算器、如何初始化计算器、如何进行计算 输入和输出流、时间戳和选项。图中的每个节点都是 以 Calculator 的形式实现。图执行的主要工作是在其 计算。计算器可能会接收零个或多个输入流和/或 发送数据包,并生成零个或多个输出流和/或侧边数据包。

CalculatorBase

您可以通过定义 CalculatorBase 类,实现多个方法,并使用 Mediapipe。新计算器必须至少实现以下四种方法

  • GetContract()
    • 计算器作者可以指定预期的输入和输出类型 一个计算器 - GetContract() 中的一部分。初始化图后, 框架调用静态方法来验证 连接的输入和输出 规范
  • Open()
    • 图表启动后,框架会调用 Open()。输入端 此时计算器可以收到数据包。Open() 解释节点配置操作(请参阅图表) 并准备计算器每次图的运行状态。此函数 也会向计算器输出写入数据包。Open()期间出错可能 终止图表运行。
  • Process()
    • 对于包含输入的计算器,框架会反复调用 Process() 。框架 默认情况下可保证所有输入都具有相同的时间戳(请参阅 同步。多个 并行执行时可以同时调用 Process() 调用 已启用。如果在 Process() 期间发生错误,框架会调用 Close(),且图运行终止。
  • Close()
    • Process() 的所有调用都结束或所有输入流都关闭后, 框架调用 Close()。如果出现以下情况,则始终会调用此函数: 已调用 Open() 并成功,即使图运行已终止 错误。没有任何输入流可通过任何输入流提供 但它仍然可以访问输入端数据包和Close() 因此可能会写入输出。Close()返回后,计算器 应被视为死节点。计算器对象会被销毁为 立即运行。

以下是 CalculatorBase.h

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

计算器的生命周期

在 MediaPipe 图初始化期间,框架会调用 GetContract() 静态方法,用于确定预期的数据包类型。

框架会针对每次图表运行构建并销毁整个计算器 (例如,每个视频展示一次或每张图片展示一次)。仍存在昂贵或大型对象 常量应作为输入端数据包提供,以便 计算在后续运行中不会重复。

初始化后,每次运行图时,都会发生以下顺序:

  • Open()
  • Process()(重复)
  • Close()

框架调用 Open() 来初始化计算器。Open()应该 解释任何选项并设置计算器每次图的运行状态。Open() 可以获取输入端数据包并将数据包写入计算器输出。如果 它应调用 SetOffset() 以减少可能出现的数据包缓冲 输出流。

Open()Process() 期间发生错误(如其中一个所示) 返回非 Ok 状态),图谱运行将终止,没有进一步的调用 计算器的方法,那么计算器就会被销毁。

对于具有输入的计算器,框架至少每时一次调用 Process() 有一个输入有可用的数据包。框架可保证所有输入都 相同的时间戳,但时间戳会随着每次调用 Process() 而增加, 确保所有数据包均已传送完毕因此,某些输入可能没有任何 在调用 Process() 时发送数据包。数据包丢失的输入似乎 生成一个空数据包(没有时间戳)。

框架会在对 Process() 的所有调用后调用 Close()。所有输入均将 但 Close() 可以访问输入端数据包,并且可能 写入输出。Close 返回后,计算器将被销毁。

不含输入内容的计算器称为“源”。来源计算器 只要 Process() 返回 Ok 状态,就会继续调用它。答 来源计算器可通过返回停止状态来指示设备已用尽 (即 mediaPipe::tool::StatusStop())。

识别输入和输出

计算器的公共界面由一组输入流和 输出流。在 CalculatorGraphConfiguration 中, 计算器使用 。流名称通常采用小写形式,而输入和输出标记则采用小写形式 通常是大写。在下面的示例中,标记名称为 VIDEO 的输出为 使用名为“VIDEO_IN”的输入流连接到 video_stream

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

输入流和输出流可通过索引编号、标记名称或 标记名称和索引编号的组合。您可以看到一些输入和 输出标识符。SomeAudioVideoCalculator标识 视频输出(按标签显示),而音频输出按标签和 索引中。带有 VIDEO 标记的输入已连接到名为 video_stream。标记为 AUDIO 且索引为 01 的输出为 连接到名为 audio_leftaudio_right 的数据流。 SomeAudioCalculator 仅通过索引标识其音频输入(无需标记)。

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

在计算器实现中,输入和输出也由代码标识 名称和索引编号。在以下函数中,输入和输出已标识:

  • 按索引编号:组合的输入流仅通过索引标识 0
  • 按标签名称:视频输出流由标签名称“VIDEO”标识。
  • 按标记名称和索引编号:输出音频流由 标记名称 AUDIO 与索引编号 01 的组合。
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

正在处理

在非源节点上调用的 Process() 必须返回 absl::OkStatus() 至 表示一切正常,或其他任何表示出错的状态代码

如果非源计算器返回 tool::StatusStop(),则表示 图表提前取消。在这种情况下,所有来源计算器和图表 输入流将被关闭(剩余的数据包将通过 图表)。

只要存在图中的源节点,系统会继续对其调用 Process() 因为它会返回 absl::OkStatus()。表明已没有更多数据可供使用 生成的返回值 tool::StatusStop()。任何其他状态都表示出现错误, 错误。

Close() 会返回 absl::OkStatus() 以指示成功。任何其他状态 表示失败。

以下是基本的 Process() 函数。它使用 Input() 方法( 仅在计算器只有一个输入时使用)请求其输入数据。它 然后使用 std::unique_ptr 分配输出数据包所需的内存。 并进行计算完成后,在将指针添加到 输出流。

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

计算器选项

计算器通过 (1) 输入流数据包 (2) 接受处理参数 输入端数据包,以及 (3) 计算器选项。计算器选项 会以字面量值的形式出现在node_options CalculatorGraphConfiguration.Node 条消息。

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

node_options 字段接受 proto3 语法。或者使用计算器 可以使用 proto2 语法在 options 字段中指定选项。

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

并非所有计算器都支持计算器选项。为了接受选项, 计算器通常会定义一个新的 protobuf 消息类型来表示其 选项,例如 PacketClonerCalculatorOptions。计算器就会 在其 CalculatorBase::Open 方法中读取该 protobuf 消息, 也可以在其 CalculatorBase::GetContract 函数或其 CalculatorBase::Process 方法。通常,新的 protobuf 消息类型 使用“.proto”定义为 protobuf 架构文件和 mediapipe_proto_library() 构建规则。

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

计算器示例

本部分将讨论 PacketClonerCalculator 的实现, 完成的比较简单,在许多计算器图表中都使用了。 PacketClonerCalculator 仅生成其最近输入数据包的副本 。

当到达的数据包的时间戳时,PacketClonerCalculator 非常有用 没有完全对齐假设我们的房间里有麦克风, 传感器和正在收集感官数据的摄像机。每个传感器 独立运行,且间歇性收集数据。假设输出 为:

  • 麦克风 = 房间内声音的音量(以分贝为单位)(整数)
  • 光传感器 = 房间亮度(整数)
  • 摄像头 = 房间的 RGB 图片帧 (ImageFrame)

我们简单的感知管道旨在处理以下 3 种类型的感官数据: 这样,当我们获得来自相机的图像帧数据时, 会与上次收集的麦克风音量数据和光线同步 传感器亮度数据。为了使用 MediaPipe 来做到这一点,我们的感知管道需要 输入流:

  • Room_mic_signal - 此输入流中的每个数据包都是整数数据 用时间戳表示房间内音频的音量大小。
  • Room_lightening_sensor - 此输入流中的每个数据包都是整数 表示房间照明强度的数据和时间戳。
  • Room_video_tick_signal - 此输入流中的每个数据包 视频数据的 imageFrame,表示从 带有时间戳的聊天室。

下面是 PacketClonerCalculator 的实现。您可以看到 GetContract()Open()Process() 方法以及实例 变量 current_,用于存储最近的输入数据包。

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

通常,计算器只有一个 .cc 文件。不需要 .h,因为 mediapipe 利用注册机制来让计算器为之知晓。完成 定义您的计算器类,并通过宏调用注册该类 REGISTER_CALCULATOR(calculator_class_name).

下面是一个简单的 MediaPipe 图,其中包含 3 个输入流和 1 个节点 (PacketClonerCalculator) 和 2 个输出流。

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

下图显示了 PacketClonerCalculator 如何定义其输出 根据其系列的输入数据包(顶部)显示数据包(底部)。

使用 PacketClonerCalculator 绘制图表
每次在其 TICK 输入流中收到数据包时,PacketClonerCalculator 都会输出每个输入流中的最新数据包。输出数据包的顺序(底部)取决于输入数据包序列(顶部)及其时间戳。时间戳显示在图表的右侧。