各計算ツールはグラフのノードです。新しい計算ツールの作成方法、計算ツールの初期化方法、計算の実行方法、入出力ストリーム、タイムスタンプ、オプションについて説明します。グラフ内の各ノードは Calculator
として実装されます。グラフ実行の大部分は計算機内で行われます。計算機は、ゼロ個以上の入力ストリームおよび/またはサイドパケットを受け取り、ゼロ個以上の出力ストリームおよび/またはサイドパケットを生成することができます。
CalculatorBase
計算ツールを作成するには、CalculatorBase
クラスの新しいサブクラスを定義し、いくつかのメソッドを実装して、新しいサブクラスを Mediapipe に登録します。新しい計算ツールで、少なくとも以下の 4 つの方法を実装する必要があります
GetContract()
- 電卓の作成者は、GetContract() で電卓の想定される入力と出力のタイプを指定できます。グラフが初期化されると、フレームワークは静的メソッドを呼び出して、接続された入力と出力のパケットタイプがこの仕様の情報と一致するかどうかを確認します。
Open()
- グラフが開始されると、フレームワークは
Open()
を呼び出します。入力サイドパケットは、この時点で計算機で利用できます。Open()
は、ノード構成オペレーション(グラフを参照)を解釈し、計算ツールのグラフ実行ごとの状態を準備します。この関数は、計算機の出力にパケットを書き込むこともできます。Open()
でエラーが発生すると、グラフの実行が終了する可能性があります。
- グラフが開始されると、フレームワークは
Process()
- 入力のある計算ツールの場合、少なくとも 1 つの入力ストリームにパケットが使用可能になるたびに、フレームワークは
Process()
を繰り返し呼び出します。フレームワークはデフォルトで、すべての入力が同じタイムスタンプを持つことが保証されます(詳細については、同期をご覧ください)。並列実行が有効になっている場合、複数のProcess()
呼び出しを同時に呼び出すことができます。Process()
でエラーが発生すると、フレームワークはClose()
を呼び出し、グラフの実行は終了します。
- 入力のある計算ツールの場合、少なくとも 1 つの入力ストリームにパケットが使用可能になるたびに、フレームワークは
Close()
Process()
へのすべての呼び出しが終了した後、またはすべての入力ストリームが閉じられたときに、フレームワークはClose()
を呼び出します。この関数は、Open()
が呼び出されて成功した場合や、グラフの実行がエラーで終了した場合でも常に呼び出されます。Close()
の間、入力ストリームを介した入力はありませんが、入力側のパケットにはアクセスできるため、出力を書き込むことができます。Close()
が返されると、計算ツールはデッドノードとみなされます。計算ツールは、グラフの実行が完了するとすぐに破棄されます。
CalculatorBase.h のコード スニペットを以下に示します。
class CalculatorBase {
public:
...
// The subclasses of CalculatorBase must implement GetContract.
// ...
static absl::Status GetContract(CalculatorContract* cc);
// Open is called before any Process() calls, on a freshly constructed
// calculator. Subclasses may override this method to perform necessary
// setup, and possibly output Packets and/or set output streams' headers.
// ...
virtual absl::Status Open(CalculatorContext* cc) {
return absl::OkStatus();
}
// Processes the incoming inputs. May call the methods on cc to access
// inputs and produce outputs.
// ...
virtual absl::Status Process(CalculatorContext* cc) = 0;
// Is called if Open() was called and succeeded. Is called either
// immediately after processing is complete or after a graph run has ended
// (if an error occurred in the graph). ...
virtual absl::Status Close(CalculatorContext* cc) {
return absl::OkStatus();
}
...
};
電卓の機能
MediaPipe グラフの初期化中に、フレームワークは GetContract()
静的メソッドを呼び出して、想定されるパケットの種類を特定します。
フレームワークは、グラフを実行するたびに計算ツール全体を構築および破棄します(例: 動画ごとに 1 回、画像ごとに 1 回)。グラフの実行全体で一定に保たれる高価なオブジェクトや大きなオブジェクトは、入力側パケットとして提供する必要があります。これにより、後続の実行で計算が繰り返されないようにします。
初期化後、グラフを実行するたびに、次のシーケンスが発生します。
Open()
Process()
(繰り返し)Close()
フレームワークが Open()
を呼び出して計算ツールを初期化します。Open()
はオプションを解釈し、計算ツールのグラフ実行ごとの状態を設定します。Open()
は、入力側のパケットを取得し、パケットを計算機の出力に書き込むことができます。必要に応じて、SetOffset()
を呼び出して、入力ストリームのパケット バッファリングの可能性を減らす必要があります。
Open()
または Process()
でエラーが発生すると(いずれか 1 つが Ok
以外のステータスを返す)、グラフの実行は終了し、計算ツールのメソッドがそれ以上呼び出されなくなるため、計算ツールが破棄されます。
入力がある計算ツールの場合、少なくとも 1 つの入力が使用可能なパケットを持つたびに、フレームワークは Process()
を呼び出します。フレームワークは、すべての入力が同じタイムスタンプを持つこと、Process()
が呼び出されるたびにタイムスタンプが増加すること、すべてのパケットが配信されることを保証します。その結果、Process()
が呼び出されたとき、一部の入力にパケットが含まれていない場合があります。パケットが欠落している入力は、空のパケット(タイムスタンプなし)を生成したように見えます。
Process()
へのすべての呼び出しの後に、フレームワークが Close()
を呼び出します。すべての入力が枯渇しますが、Close()
は入力側のパケットにアクセスでき、出力を書き込む可能性があります。Close が戻ると、計算ツールは破棄されます。
入力のない計算機はソースと呼ばれます。ソース計算ツールでは、Ok
ステータスを返す限り、引き続き Process()
が呼び出されます。ソース計算ツールでは、停止ステータス(mediaPipe::tool::StatusStop()
など)を返すことにより、リソースが不足していることが示されます。
入力と出力の特定
電卓の公開インターフェースは、一連の入力ストリームと出力ストリームで構成されています。CalculatorGraphConfiguration では、一部の計算機からの出力が、名前付きストリームを使用して他の計算機の入力に接続されます。通常、ストリーム名は小文字ですが、入力タグと出力タグは通常大文字です。次の例では、タグ名 VIDEO
の出力が、video_stream
という名前のストリームを使用して、タグ名 VIDEO_IN
の入力に接続されます。
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "INPUT:combined_input"
output_stream: "VIDEO:video_stream"
}
node {
calculator: "SomeVideoCalculator"
input_stream: "VIDEO_IN:video_stream"
output_stream: "VIDEO_OUT:processed_video"
}
入力ストリームと出力ストリームは、インデックス番号、タグ名、またはタグ名とインデックス番号の組み合わせで識別できます。入力識別子と出力識別子の例を以下に示します。SomeAudioVideoCalculator
は動画出力をタグで識別し、音声出力をタグとインデックスの組み合わせで識別します。VIDEO
タグの付いた入力は、video_stream
という名前のストリームに接続されます。タグ AUDIO
、インデックス 0
、1
を持つ出力は、audio_left
と audio_right
という名前のストリームに接続されます。SomeAudioCalculator
は、インデックスのみで音声入力を識別します(タグは必要ありません)。
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "combined_input"
output_stream: "VIDEO:video_stream"
output_stream: "AUDIO:0:audio_left"
output_stream: "AUDIO:1:audio_right"
}
node {
calculator: "SomeAudioCalculator"
input_stream: "audio_left"
input_stream: "audio_right"
output_stream: "audio_energy"
}
計算ツールの実装では、入力と出力はタグ名とインデックス番号でも識別されます。以下の関数では、入力と出力が示されています。
- インデックス番号: 結合された入力ストリームは、単にインデックス
0
によって識別されます。 - タグ名: 動画出力ストリームはタグ名「VIDEO」で識別されます。
- タグ名とインデックス番号: 出力オーディオ ストリームは、タグ名
AUDIO
とインデックス番号0
および1
の組み合わせで識別されます。
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
// SetAny() is used to specify that whatever the type of the
// stream is, it's acceptable. This does not mean that any
// packet is acceptable. Packets in the stream still have a
// particular type. SetAny() has the same effect as explicitly
// setting the type to be the stream's type.
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
return absl::OkStatus();
}
処理
ソース以外のノードで呼び出された Process()
は、すべてが正常に行われたことを示す absl::OkStatus()
か、エラーを示すその他のステータス コードを返す必要があります。
ソース以外の計算ツールから tool::StatusStop()
が返された場合は、グラフが早期にキャンセルされていることを示します。この場合、すべてのソース計算ストリームとグラフ入力ストリームが閉じられます(残りのパケットはグラフを通じて伝播されます)。
グラフ内のソースノードは、absl::OkStatus(
を返す限り、引き続き Process()
を呼び出します。これ以上生成するデータがないことを示すには、tool::StatusStop()
を返します。その他のステータスは、エラーが発生したことを示します。
Close()
は、成功を示す absl::OkStatus()
を返します。その他のステータスは失敗を示します。
基本的な Process()
関数は次のとおりです。Input()
メソッドを使用して(計算機の入力が 1 つの場合にのみ使用できます)、入力データをリクエストします。次に、std::unique_ptr
を使用して出力パケットに必要なメモリを割り当て、計算を行います。完了すると、出力ストリームに追加する際にポインタが解放されます。
absl::Status MyCalculator::Process() {
const Matrix& input = Input()->Get<Matrix>();
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
// do your magic here....
// output->row(n) = ...
Output()->Add(output.release(), InputTimestamp());
return absl::OkStatus();
}
電卓オプション
計算機は、(1)入力ストリーム パケット(2)入力側パケット、(3)計算機オプションを介して処理パラメータを受け入れます。電卓オプションが指定されている場合は、CalculatorGraphConfiguration.Node
メッセージの node_options
フィールドにリテラル値として表示されます。
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
node_options
フィールドには、proto3 構文を指定できます。または、proto2 構文を使用して options
フィールドで計算オプションを指定することもできます。
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
計算ツールによっては、計算オプションに対応していないものもあります。オプションを受け入れるために、計算ツールは通常、オプションを表す新しい protobuf メッセージ タイプ(PacketClonerCalculatorOptions
など)を定義します。計算ツールは、CalculatorBase::Open
メソッドで、場合によっては CalculatorBase::GetContract
関数または CalculatorBase::Process
メソッドでも、その protobuf メッセージを読み取ります。通常、新しい protobuf メッセージ タイプは、「.proto」ファイルと mediapipe_proto_library()
ビルドルールを使用して protobuf スキーマとして定義されます。
mediapipe_proto_library(
name = "packet_cloner_calculator_proto",
srcs = ["packet_cloner_calculator.proto"],
visibility = ["//visibility:public"],
deps = [
"//mediapipe/framework:calculator_options_proto",
"//mediapipe/framework:calculator_proto",
],
)
計算ツールの例
このセクションでは、比較的簡単な処理を行う PacketClonerCalculator
の実装について説明します。この実装は多くの計算グラフで使用されます。PacketClonerCalculator
は、最新の入力パケットのコピーをオンデマンドで単に生成します。
PacketClonerCalculator
は、到着するデータパケットのタイムスタンプが完全には揃っていない場合に便利です。マイク、光センサー、ビデオカメラが設置された部屋に、センサーデータを収集しているとします。各センサーは独立して動作し、データを断続的に収集します。各センサーの出力が次のようになっているとします。
- マイク = 音量(部屋の音のデシベル単位)(整数)
- 光センサー = 部屋の明るさ(整数)
- ビデオカメラ = 部屋の RGB 画像フレーム(ImageFrame)
Google のシンプルな認識パイプラインは、これら 3 つのセンサーからの感覚データを処理し、カメラからの画像フレーム データを最後に収集されたマイクの音量データと光センサーの明るさデータと同期させるように設計されています。MediaPipe でこれを行うため、認識パイプラインには次の 3 つの入力ストリームがあります。
- Room_mic_signal - この入力ストリームのデータの各パケットは、部屋での音量の大きさを表す整数データで、タイムスタンプ付きです。
- Room_lightening_sensor - この入力ストリームの各データのパケットは、部屋の明るさをタイムスタンプで表す整数データです。
- Room_video_tick_signal - この入力ストリームのデータの各パケットは、部屋のカメラから収集された動画を表す動画データの画像フレームで、タイムスタンプ付きです。
PacketClonerCalculator
の実装を以下に示します。GetContract()
、Open()
、Process()
の各メソッドと、最新の入力パケットを保持するインスタンス変数 current_
を確認できます。
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
// cc->Inputs().NumEntries() returns the number of input streams
// for the PacketClonerCalculator
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
// cc->Inputs().Index(i) returns the input stream pointer by index
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
// Sets the output stream of index i header to be the same as
// the header for the input stream of index i
}
}
return absl::OkStatus();
}
absl::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output if the tick signal is non-empty.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
// Add a packet to output stream of index i a packet from inputstream i
// with timestamp common to all present inputs
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
// if current_[i], 1 packet buffer for input stream i is empty, we will set
// next allowed timestamp for input stream i to be current timestamp + 1
}
}
}
return absl::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
通常、電卓には .cc ファイルのみが含まれます。Mediapipe は登録を使用して計算ツールを認識するため、.h は必要ありません。電卓クラスを定義したら、マクロ呼び出し REGISTER_CALCULATOR(calculator_class_name) を使用してクラスを登録します。
以下は、3 つの入力ストリーム、1 つのノード(PacketClonerCalculator)、2 つの出力ストリームがある簡単な MediaPipe グラフです。
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
node {
calculator: "PacketClonerCalculator"
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
output_stream: "cloned_room_mic_signal"
output_stream: "cloned_lighting_sensor"
}
次の図は、PacketClonerCalculator
が一連の入力パケット(上)に基づいて出力パケット(下)を定義する方法を示しています。
PacketClonerCalculator は、TICK 入力ストリームでパケットを受信するたびに、各入力ストリームから最新のパケットを出力します。出力パケットのシーケンス(下)は、入力パケットのシーケンス(上)とそのタイムスタンプによって決まります。タイムスタンプは図の右側に示されています。 |