同期

スケジューリングの仕組み

MediaPipe グラフのデータ処理は、CalculatorBase サブクラスとして定義された処理ノード内で行われます。スケジュール システムは、各計算ツールを実行するタイミングを決定します。

各グラフには少なくとも 1 つのスケジューラ キューがあります。各スケジューラ キューには 1 つのエグゼキュータexecutorがあります。ノードはキュー(つまりエグゼキュータ)に静的に割り当てられます。デフォルトでは、キューは 1 つあり、そのエグゼキュータはシステムの機能に応じたスレッド数を持つスレッドプールです。

各ノードにはスケジュール状態があり、準備未完了準備完了実行中のいずれかになります。readiness 関数は、ノードを実行する準備ができているかどうかを判断します。この関数は、ノードの実行が完了するたびに、またノードの入力状態が変化するたびに、グラフの初期化時に呼び出されます。

使用される readiness 関数は、ノードのタイプによって異なります。ストリーム入力のないノードはソースノードと呼ばれます。ソースノードは、フレームワークに出力するデータがないと伝えるまで、常に実行可能な状態にあります。ソースノードはソースノードを終了します。

処理する入力があり、ノードの入力ポリシー(後述)によって設定された条件に従ってそれらの入力が有効な入力セットを形成している場合、ソース以外のノードは準備完了です。ほとんどのノードはデフォルトの入力ポリシーを使用しますが、一部のノードでは別の入力ポリシーを指定します。

ノードの準備が整うと、対応するスケジューラ キュー(優先度キュー)にタスクが追加されます。優先度関数は現在固定されており、ノードの静的プロパティとグラフ内のトポロジの並べ替えが考慮されます。たとえば、グラフの出力側に近いノードほど優先度が高く、ソースノードの優先度は最も低くなります。

各キューはエグゼキュータによって処理され、エグゼキュータは計算ツールのコードを呼び出して実際にタスクを実行します。さまざまなエグゼキュータを指定して構成できます。これは、優先度の低いスレッドで特定のノードを実行するなど、実行リソースの使用をカスタマイズするために使用できます。

タイムスタンプの同期

MediaPipe グラフの実行は分散化されています。グローバル クロックはなく、異なるノードが異なるタイムスタンプのデータを同時に処理できます。これにより、パイプライン化によりスループットが向上します。

しかし、多くの認識ワークフローにとって時間情報は非常に重要です。複数の入力ストリームを受信するノードは通常、なんらかの方法で調整する必要があります。たとえば、オブジェクト検出がフレームから境界長方形のリストを出力し、その情報をレンダリング ノードにフィードすると、元のフレームと一緒に処理されます。

したがって、MediaPipe フレームワークの主な役割の一つは、ノードの入力同期を提供することです。フレームワークの仕組みでは、タイムスタンプの主な役割は同期キーとして機能することです。

さらに、MediaPipe は、多くのシナリオ(テスト、シミュレーション、バッチ処理など)で重要な決定論的オペレーションをサポートするように設計されています。また、グラフ作成者は、リアルタイムの制約を満たすために必要に応じて決定論を緩和できます。

同期と決定論という 2 つの目的は、いくつかの設計上の選択の根拠となります。特に、特定のストリームに push されるパケットのタイムスタンプは単調に増加している必要があります。これは多くのノードにとって有用な前提条件であるだけでなく、同期ロジックにも依存します。各ストリームにはタイムスタンプの範囲があります。これは、ストリーム上の新しいパケットに使用できる最小のタイムスタンプです。タイムスタンプが T のパケットが到着すると、単調な要件を反映して、バインドが自動的に T+1 に進みます。これにより、フレームワークはタイムスタンプが T 未満のパケットがこれ以上到着しないことを保証できます。

入力ポリシー

同期は、ノードで指定された入力ポリシーを使用して、各ノードでローカルに処理されます。

DefaultInputStreamHandler によって定義されるデフォルトの入力ポリシーは、次の保証の下で、入力の決定的な同期を実現します。

  • 同じタイムスタンプを持つパケットが複数の入力ストリームで提供される場合、リアルタイムでの到着順序に関係なく、常に一緒に処理されます。

  • 入力セットは厳密に昇順のタイムスタンプ順に処理されます。

  • パケットのドロップは発生せず、処理は完全に確定的です。

  • 上記の保証により、ノードはできるだけ早くデータを処理できるようになります。

その仕組みを説明するために、確定されたタイムスタンプの定義を紹介します。ストリーム内のタイムスタンプがタイムスタンプの範囲より小さい場合、そのタイムスタンプは解決されたと言います。つまり、そのタイムスタンプでの入力状態が取り消せない(パケットが存在するか、そのタイムスタンプを持つパケットが到着しないという確実性がある)ことが判明すると、そのタイムスタンプはストリームに解決されます。

タイムスタンプは、複数のストリームのそれぞれに解決されていれば、それらのストリームに解決されます。さらに、タイムスタンプが解決されると、以前のタイムスタンプもすべて解決されます。したがって、確定したタイムスタンプは確定的に昇順で処理できます。

この定義では、すべての入力ストリームで解決され、少なくとも 1 つの入力ストリームにパケットを含むタイムスタンプがあれば、デフォルトの入力ポリシーを持つ計算ツールを使用できます。入力ポリシーは、清算されたタイムスタンプで使用可能なすべてのパケットを単一の入力セットとして計算ツールに提供します。

この決定論的な動作の結果の 1 つは、複数の入力ストリームを持つノードの場合、タイムスタンプが解決されるまで理論的に無期限の待機が発生し、その間に無制限のパケットがバッファリングされる可能性があることです。(入力ストリームが 2 つあるノードについて考えてみましょう。一方はパケットを送信し続け、もう一方は何も送信せず、境界を進めません)。

そのため、カスタム入力ポリシーも提供しています。たとえば、SyncSetInputStreamHandler で定義された異なる同期セットに入力を分割したり、同期を完全に回避して、ImmediateInputStreamHandler で定義された入力を受信したらすぐに処理したりします。

フロー制御

主なフロー制御メカニズムは 2 つあります。バックプレッシャー メカニズムは、ストリームでバッファリングされたパケットが CalculatorGraphConfig::max_queue_size で定義された(構成可能な)上限に達すると、アップストリーム ノードの実行をスロットリングします。このメカニズムは決定論的な動作を維持し、必要に応じて構成済みの制限を緩和するデッドロック回避システムを含みます。

2 つ目のシステムは、FlowLimiterCalculator で定義されたリアルタイムの制約(通常はカスタム入力ポリシーを使用)に従ってパケットをドロップできる特別なノードを挿入します。たとえば、一般的なパターンでは、サブグラフの入力にフロー制御ノードを配置し、最終出力からフロー制御ノードへのループバック接続を行います。したがって、フロー制御ノードは、ダウンストリーム グラフで処理中のタイムスタンプの数を追跡し、この数が(構成可能な)上限に達した場合にパケットをドロップできます。パケットがアップストリームでドロップされるため、タイムスタンプを部分的に処理して中間ステージ間でパケットをドロップすることによる無駄な作業を回避できます。

この計算ベースのアプローチでは、パケットをドロップできる場所をグラフ作成者が制御でき、リソースの制約に応じてグラフの動作を柔軟に調整、カスタマイズできます。