計算機

每個計算工具都是圖表節點。我們將說明如何建立新的 計算機、如何初始化計算機、如何進行計算 輸入和輸出串流、時間戳記和選項圖表中的每個節點都是 以 Calculator 的形式實作。大量圖形執行作業是在 計算機。計算機可能會收到零或多個輸入串流和/或側邊 並產生零個或多個輸出串流和/或側邊封包。

CalculatorBase

方法是透過定義新的子類別 CalculatorBase敬上 類別、實作多種方法,以及使用 Mediapipe。新的計算機至少必須實作以下四種方法

  • GetContract()
    • 計算機作者可以指定預期的輸入和輸出類型 在 GetContract() 中包含一個計算機。圖表初始化後, 架構會呼叫靜態方法,以驗證 連結的輸入和輸出內容 規格。
  • Open()
    • 圖表開始後,架構會呼叫 Open()。輸入端 此時計算機可以使用封包Open() 會解讀節點設定作業 (請參閱圖表) 並準備計算器的每次圖形執行狀態。這個函式可能會 並將封包寫入計算機的輸出Open() 可發生錯誤 終止執行圖
  • Process()
    • 針對含有輸入內容的計算機,架構會重複呼叫 Process() 每個輸入串流都有可用封包時。架構 預設會保證所有輸入內容的時間戳記都相同 (請參閱 同步處理)。多個 平行執行作業時可同時叫用 Process() 呼叫 如果 Process() 期間發生錯誤,架構呼叫 Close(),圖表執行作業也會終止。
  • Close()
    • 所有對 Process() 的呼叫完成或所有輸入串流關閉後, 架構呼叫 Close()在下列情況下,系統一律會呼叫此函式: 成功呼叫並成功呼叫 Open(),即使圖形執行作業已終止 。沒有任何來自任何輸入串流的輸入來源 期間超過 Close(),但仍可存取輸入端封包 因此可能會寫入輸出內容Close() 傳回後,計算機 應視為無效節點計算機物件時會刪除為 就會立即顯示圖表

以下是 Google Cloud 的 CalculatorBase.h

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

計算機的生命週期

在初始化 MediaPipe 圖表期間,該架構會呼叫 GetContract() 靜態方法,判斷預期的封包種類。

架構會建構並銷毀每個執行圖的整個計算機 (例如每部影片一次或每張圖片一次)。剩餘的昂貴或大型物件 圖表執行時的常數應以輸入端封包的形式提供, 後續執行作業不會重複計算。

初始化後,圖表每次執行時會發生以下順序:

  • Open()
  • Process() (重複)
  • Close()

架構會呼叫 Open() 以初始化計算機。Open()應該 解讀任何選項,並設定計算工具的每次圖形執行狀態。Open() 可能會取得輸入端封包,並將封包寫入計算機的輸出。如果 應呼叫 SetOffset() 以減少潛在的封包緩衝處理情形 輸入串流的個別項目

Open()Process() 期間發生錯誤 (如其中一個程序所示) 傳回非 Ok 狀態),表示圖表執行作業會終止,不會再發出呼叫 並刪除計算機。

對於含輸入內容的計算機,架構會呼叫 Process(),至少 有一個輸入含有封包這個架構可保證所有輸入值 相同的時間戳記則會隨著每次呼叫 Process() 而增加 所有封包的交付時間因此,有些輸入內容 封包來自呼叫 Process()。缺少封包的輸入來源 會產生空白封包 (沒有時間戳記)。

架構在所有呼叫 Process() 之後,都會呼叫 Close()。所有輸入內容 已用盡,但「Close()」可以存取輸入端封包,且可能 這類 Pod 會用於寫入輸出內容Close 傳回後,系統就會銷毀計算機。

沒有任何輸入的計算機稱為來源。來源計算工具 系統會繼續呼叫 Process(),前提是它會傳回 Ok 狀態。A 罩杯 來源計算工具指出,如果傳回停靠站狀態,電池用盡 (例如 mediaPipe::tool::StatusStop())。

識別輸入和輸出

計算機的公用介麵包含一組輸入串流和 輸出串流。在 CalculatorGraphConfiguration 中, 計算機是透過 串流。串流名稱通常為小寫,輸入和輸出標記則 通常為大寫在以下範例中,標記名稱 VIDEO 的輸出內容為 已連線至含有標籤名稱的輸入VIDEO_IN的串流 (使用名為 video_stream

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

輸入和輸出串流可透過索引編號、標記名稱或 標記名稱和索引號碼的組合這裡有幾個輸入和標籤範例 輸出 IDSomeAudioVideoCalculator 識別 標記和音訊輸出的影像 索引。含有 VIDEO 標記的輸入內容已連結至名為 video_stream。具有 AUDIO 標記、索引 01 的輸出內容如下所示 已連線至名為 audio_leftaudio_right 的串流。 SomeAudioCalculator 只會依索引識別音訊輸入 (不需要標記)。

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

在計算機實作中,輸入和輸出內容也會透過標記識別 名稱和索引號碼系統會在以下函式中識別輸入和輸出:

  • 依索引號碼:由索引識別合併的輸入串流 0
  • 依標記名稱:影片輸出串流可透過標記名稱「VIDEO」識別。
  • 根據標記名稱和索引號碼:輸出音訊串流會由 標記名稱 AUDIO 與索引號碼 01 的組合。
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

處理中

在非來源節點上呼叫的 Process() 必須傳回 absl::OkStatus() 至 表示一切運作正常,或任何其他狀態碼表示發生錯誤

如果非來源計算機傳回 tool::StatusStop(),則此信號代表 正在提早取消圖表。在本例中,所有來源計算機和圖表 系統會關閉輸入串流 (並透過 圖表)。

只要來源節點,圖表中的來源節點會持續呼叫 Process() 因為它會傳回 absl::OkStatus()。表示不再需要 已產生 tool::StatusStop()。其他狀態都表示發生錯誤 發生。

Close() 會傳回 absl::OkStatus() 表示成功。任何其他狀態 表示失敗。

以下是基本的 Process() 函式。其會使用 Input() 方法 (可 只有在計算器只有單一輸入時,才會用於要求其輸入資料。這項服務 然後使用 std::unique_ptr 分配輸出封包所需的記憶體 並實際計算新增完成後,指標會放開 然後輸出串流

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

計算機選項

計算機接受透過 (1) 輸入串流封包 (2) 接收處理參數 輸入端封包和 (3) 計算機選項計算機選項 (如果有的話) 會以常值的形式顯示在node_options CalculatorGraphConfiguration.Node 訊息。

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

node_options 欄位接受 proto3 語法。您也可以使用計算機 您可以透過 proto2 語法,在 options 欄位中指定選項。

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

並非所有計算機都接受計算機選項。為了接受選項,您必須輸入 計算機通常會定義新的 protobuf 訊息類型 選項,例如 PacketClonerCalculatorOptions。接著計算機 透過 CalculatorBase::Open 方法讀取該 protobuf 訊息, 其 CalculatorBase::GetContract 函式 CalculatorBase::Process 方法。通常,新的 protobuf 訊息類型 是使用「.proto」定義為 protobuf 結構定義檔案和 mediapipe_proto_library() 建構規則。

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

計算機範例

本節會討論 PacketClonerCalculator 的實作, 就會執行一個相對簡單的工作,而且許多計算機圖形都使用。 PacketClonerCalculator 只會產生其最新輸入封包的副本 隨選。

PacketClonerCalculator 適用於抵達資料封包的時間戳記 並非完全對齊假設我們的房間裡有麥克風、燈 感應器和攝影機正在收集感官資料。每個感應器 獨立運作,且會間歇性收集資料。假設輸出內容 包括:

  • 麥克風 = 以分貝為單位在房間內播放聲音的音量 (整數)
  • 光度感應器 = 房間亮度 (整數)
  • 攝影機 = 房間的 RGB 圖片影格 (ImageFrame)

我們簡單易用的感知管道,用來處理這 3 種類型的感官資料 感應器,當系統有來自相機、 會與最近一次收集的麥克風音量資料和光線同步設定 感應器亮度資料為了利用 MediaPipe 進行這類測試,我們的認知管道有 3 個 輸入串流:

  • Room_mic_signal - 這個輸入串流中的每個資料封包都是整數資料 以時間戳記表示房間中音訊的音量大小。
  • Room_lightening_sensor - 這個輸入串流中的每個資料封包都是整數 資料代表房間照明的亮度,並以時間戳記表示。
  • Room_video_tick_signal - 這個輸入串流中的每個資料封包都是 影片資料的影像畫面,代表從相機收集的影片 並記錄時間戳記

以下是 PacketClonerCalculator 的實作方式。您會看到 GetContract()Open()Process() 方法,以及執行個體 變數 current_,保存最新輸入封包。

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

一般來說,計算機只有一個 .cc 檔案。不需要 .h,因為 mediapipe 使用註冊機制讓計算機知道。安裝完成後 定義計算機類別,並透過巨集叫用來註冊該類別 REGISTER_CALCULATOR(calculator_class_name).

下方是含有 3 個輸入串流、1 個節點的小型 MediaPipe 圖表 (PacketClonerCalculator) 和 2 個輸出串流。

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

下圖顯示 PacketClonerCalculator 如何定義輸出內容 封包數 (底部)。

使用 PacketClonerCalculator 繪製圖表
每次在 TICK 輸入串流中收到封包時,PacketClonerCalculator 都會從各輸入串流輸出最新的封包。輸出封包的順序 (底部) 取決於輸入封包的順序 (頂端) 及其時間戳記。時間戳記會顯示在圖表右側。