計算機

每個計算機都是圖形的節點。我們會說明如何建立新的計算機、如何初始化計算機、如何執行計算、輸入和輸出串流、時間戳記及選項。圖表中的每個節點都會以 Calculator 的形式實作。大量圖形執行是在計算機中完成。計算機可能會接收零個或多個輸入串流和/或側邊封包,並產生零個或多個輸出串流和/或側封包。

CalculatorBase

如要建立計算機,請定義 CalculatorBase 類別的新子類別、實作多種方法,並使用 Mediapipe 註冊新的子類別。新的計算機必須至少執行下列 4 種方法

  • GetContract()
    • 計算機作者可在 GetContract() 中指定計算的計算機的預期輸入類型及輸出內容。當圖表初始化時,架構會呼叫靜態方法,驗證已連接的輸入和輸出的封包類型是否與這項規格中的資訊相符。
  • Open()
    • 圖表開始後,架構會呼叫 Open()。目前,計算機可以使用輸入的側邊封包。Open() 會解讀節點設定作業 (請參閱圖表),並準備計算機的個別圖表執行狀態。這個函式可能也會將封包寫入計算機輸出。Open() 期間發生錯誤,可能會終止圖形執行作業。
  • Process()
    • 如果是內含輸入內容的計算機,只要至少一個輸入串流擁有可用封包,架構就會重複呼叫 Process()。這個架構預設可保證所有輸入的時間戳記皆相同 (詳情請參閱同步處理)。啟用平行執行功能時,可以同時叫用多個 Process() 呼叫。如果 Process() 期間發生錯誤,架構會呼叫 Close(),且圖表執行會終止。
  • Close()
    • 在所有對 Process() 的呼叫完成時或所有輸入串流關閉後,架構會呼叫 Close()。如果呼叫 Open() 且成功,即使圖表因發生錯誤而終止,系統一律會呼叫這個函式。在 Close() 期間,無法透過任何輸入串流取得任何輸入,但仍可存取輸入側邊封包,因此可能會寫入輸出內容。Close() 傳回後,這個計算機應會視為無效節點。當圖形執行完畢時,計算機物件會立即刪除。

以下是 CalculatorBase.h 的程式碼片段。

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

計算機的壽命

在 MediaPipe 圖表初始化期間,架構會呼叫 GetContract() 靜態方法,判斷預期的封包類型。

針對每次執行圖表,這個架構會建構並刪除整個計算機 (例如每部影片或每張圖片一次)。建議以輸入端封包的形式提供在圖形執行期間保持不變的大量或大型物件,以免後續的執行作業重複計算。

完成初始化後,每次執行圖形時,都會發生以下順序:

  • Open()
  • Process() (重複)
  • Close()

架構會呼叫 Open() 來初始化計算機。Open() 應解讀任何選項,並設定計算機的個別圖表執行狀態。Open() 可能會取得輸入端封包,並將封包寫入計算機輸出結果。應視情況呼叫 SetOffset(),以減少輸入串流的潛在封包緩衝處理。

如果在 Open()Process() 期間發生錯誤 (如其中一種傳回非 Ok 狀態),則系統會終止圖形執行作業,且不會再呼叫計算機方法,且計算機也會遭到刪除。

如果是包含輸入的計算機,只要至少一個輸入有可用的封包,架構就會呼叫 Process()。架構可保證輸入的所有時間戳記都具有相同的時間戳記,時間戳記會隨每次呼叫 Process() 而增加,且所有封包都會傳送。因此,呼叫 Process() 時,部分輸入內容可能不會有任何封包。缺少封包的輸入似乎會產生一個沒有時間戳記的封包。

架構會在所有呼叫 Process() 後呼叫 Close()。所有輸入都已用盡,但 Close() 可以存取輸入側封包,並有可能寫入輸出內容。Close 回來後,這個計算機就會銷毀。

不含輸入內容的計算機稱為來源。如果來源計算機傳回 Ok 狀態,系統就會繼續呼叫 Process()。來源計算機表示已藉由傳回停靠狀態 (例如 mediaPipe::tool::StatusStop()) 而已用盡。

識別輸入與輸出內容

計算機的公開介麵包含一組輸入串流和輸出串流。在 CalculatorGraphConfiguration 中,部分計算機的輸出結果會使用已命名串流的其他計算機的輸入資料。串流名稱通常為小寫,但輸入和輸出標記通常是大寫。在下例中,標記名稱為 VIDEO 的輸出內容會透過名為 video_stream 的串流,連接至具有標記名稱 VIDEO_IN 的輸入。

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

輸入和輸出串流可以按照索引編號、標記名稱,或標記名稱和索引編號的組合來識別。您可以參考以下範例中的輸入和輸出 ID 範例。SomeAudioVideoCalculator 會透過標記和索引的組合來識別其影片輸出,以及其音訊輸出。含有 VIDEO 標記的輸入會連線至名為 video_stream 的串流。含有 AUDIO 標記、索引 01 的輸出內容會連線至名為 audio_leftaudio_right 的串流。SomeAudioCalculator 只會透過索引識別音訊輸入 (不需要任何標記)。

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

在計算機實作中,也可以利用標記名稱和索引編號來識別輸入和輸出內容。在下方函式中識別了輸入和輸出內容:

  • 依索引編號:合併的輸入串流只會由索引 0 識別。
  • 依據標記名稱:影片輸出串流會以標記名稱「VIDEO」識別。
  • 依標記名稱和索引編號:系統會透過標記名稱 AUDIO 以及索引編號 01 的組合來識別輸出音訊串流。
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

處理中

在非來源節點上呼叫的 Process() 必須傳回 absl::OkStatus() 以表示一切運作順利,或任何其他狀態碼表示發生錯誤

如果非來源計算機傳回 tool::StatusStop(),表示圖形要提早取消。在這種情況下,所有來源計算器和圖形輸入串流都會關閉 (剩餘的封包會透過圖表傳播)。

只要圖表中的來源節點傳回 absl::OkStatus(,來源節點就會持續呼叫 Process()。表示沒有其他可產生的資料會傳回 tool::StatusStop()。如果傳回任何其他狀態,則代表發生錯誤。

Close() 會傳回 absl::OkStatus() 表示成功。其他任何狀態則代表失敗。

以下是基本的 Process() 函式。接著使用 Input() 方法 (只有在計算機需要輸入一次時才能使用) 方法來要求輸入資料。接著,系統會使用 std::unique_ptr 分配輸出封包所需的記憶體,並進行計算。完成後,系統會將指標新增至輸出串流時放開。

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

計算機選項

計算機接受透過 (1) 輸入串流封包 (2) 輸入端封包與 (3) 計算機選項來接收處理參數。計算機選項 (如有指定) 後,會在 CalculatorGraphConfiguration.Node 訊息的 node_options 欄位中顯示為常值。

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

node_options 欄位接受 proto3 語法。或者,您也可以使用 proto2 語法,在 options 欄位中指定計算機選項。

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

並非所有計算機都接受計算機選項。為了接受選項,計算機通常會定義新的 protobuf 訊息類型來代表選項 (例如 PacketClonerCalculatorOptions)。接著,計算機會在 CalculatorBase::Open 方法中讀取該 protobuf 訊息,也可能在其 CalculatorBase::GetContract 函式或 CalculatorBase::Process 方法中讀取該訊息。通常,新的 protobuf 訊息類型會使用「.proto」檔案和 mediapipe_proto_library() 建構規則,定義為 protobuf 結構定義。

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

計算機範例

本節說明 PacketClonerCalculator 的實作方式,這是一種相對簡單的工作,並用於許多計算機圖。PacketClonerCalculator 只會視需求產生最近輸入封包的副本。

如果抵達資料封包的時間戳記未完全對齊,PacketClonerCalculator 就非常實用。假設我們有一個房間,內有麥克風、光感測器,以及一個正在收集感官資料的攝影機。每個感應器都會獨立運作,並會間歇地收集資料。假設每個感應器的輸出內容如下:

  • 麥克風 = 房間聲音的音量 (整數)
  • 光度感應器 = 房間亮度 (整數)
  • 攝影機 = RGB 空間的影格 (ImageFrame)

我們的簡易感知管道旨在處理 3 個感應器的感官資料,如此一來,當我們擁有相機的影像影格資料時,就會與最近收集到的麥克風音量資料和光度感應器亮度資料同步。如要使用 MediaPipe 執行此操作,我們的感知管道會有 3 個輸入串流:

  • room_mic_signal - 此輸入串流中的每個資料封包是整數資料,代表具有時間戳記的空間中音訊的音量。
  • room_lightening_sensor - 這個輸入串流中的每個資料封包都是整數資料,代表透過時間戳記標示房間的亮度。
  • room_video_tick_signal - 此輸入串流中的每個資料封包都是影片資料影像影格,代表從攝影機在具有時間戳記的房間中收集的影片。

以下是 PacketClonerCalculator 的實作。您可以看到 GetContract()Open()Process() 方法,以及保存最新輸入封包的執行個體變數 current_

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

一般來說,計算機中只有 .cc 檔案。不需要 .h,因為媒體管道會使用註冊功能來識別計算機。定義計算機類別後,請以巨集叫用 REGISTER_CALCULATOR(calculator_class_name) 註冊該類別。

下方是具有 3 個輸入串流、1 個節點 (PacketClonerCalculator) 和 2 個輸出串流的精簡 MediaPipe 圖表。

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

下圖說明 PacketClonerCalculator 如何根據一系列輸入封包 (頂端) 定義輸出封包 (下方)。

使用 PacketClonerCalculator 繪製圖形
每次在其 TICK 輸入串流收到封包時,PetClonerCalculator 就會從每個輸入串流輸出最新的封包。輸出封包 (下方) 的順序取決於輸入封包 (頂端) 及其時間戳記的順序。這些時間戳記會顯示在圖表的右側。