Rechner

Jeder Rechner ist ein Knoten einer Grafik. Sie erfahren, wie Sie einen neuen Rechner erstellen, wie Sie einen Rechner initialisieren, wie Sie seine Berechnungen durchführen, Eingabe- und Ausgabestreams, Zeitstempel und Optionen. Jeder Knoten im Diagramm wird als Calculator implementiert. Der Großteil der Diagrammausführung erfolgt innerhalb der Rechner. Ein Rechner kann null oder mehr Eingabestreams und/oder Nebenpakete empfangen und null oder mehr Ausgabestreams und/oder Nebenpakete erzeugen.

CalculatorBase

Ein Rechner wird erstellt, indem eine neue Unterklasse der CalculatorBase-Klasse definiert, eine Reihe von Methoden implementiert und die neue Unterklasse bei Mediapipe registriert wird. In einem neuen Rechner müssen mindestens die folgenden vier Methoden implementiert werden.

  • GetContract()
    • Autoren können die erwarteten Arten von Ein- und Ausgaben eines Taschenrechners in GetContract() angeben. Wenn eine Grafik initialisiert wird, ruft das Framework eine statische Methode auf, um zu prüfen, ob die Pakettypen der verbundenen Ein- und Ausgaben mit den Informationen in dieser Spezifikation übereinstimmen.
  • Open()
    • Nach dem Start eines Diagramms ruft das Framework Open() auf. Die Seitenpakete der Eingabe stehen dem Rechner jetzt zur Verfügung. Open() interpretiert die Knotenkonfigurationsvorgänge (siehe Grafiken) und bereitet den Rechner, um den Status pro Diagrammausführung vorzubereiten. Diese Funktion kann auch Pakete in Rechnerausgaben schreiben. Ein Fehler während des Open()-Vorgangs kann die Diagrammausführung beenden.
  • Process()
    • Bei einem Rechner mit Eingaben ruft das Framework wiederholt Process() auf, wenn mindestens ein Eingabestream ein Paket verfügbar ist. Das Framework garantiert standardmäßig, dass alle Eingaben denselben Zeitstempel haben (weitere Informationen finden Sie unter Synchronisierung). Wenn die parallele Ausführung aktiviert ist, können mehrere Process()-Aufrufe gleichzeitig aufgerufen werden. Wenn während Process() ein Fehler auftritt, ruft das Framework Close() auf und die Grafik wird beendet.
  • Close()
    • Nachdem alle Process()-Aufrufe abgeschlossen sind oder alle Eingabestreams geschlossen wurden, ruft das Framework Close() auf. Diese Funktion wird immer aufgerufen, wenn Open() aufgerufen wurde und erfolgreich ausgeführt wurde, und auch wenn die Diagrammausführung aufgrund eines Fehlers beendet wurde. Während Close() sind über Eingabestreams keine Eingaben verfügbar, hat aber weiterhin Zugriff auf Pakete auf der Eingabeseite und kann daher Ausgaben schreiben. Nachdem Close() zurückgegeben wurde, sollte der Rechner als inaktiver Knoten betrachtet werden. Das Rechnerobjekt wird gelöscht, sobald die Grafik fertig ist.

Im Folgenden finden Sie Code-Snippets aus CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

So funktioniert der Taschenrechner

Während der Initialisierung einer MediaPipe-Grafik ruft das Framework eine statische Methode GetContract() auf, um festzustellen, welche Arten von Paketen erwartet werden.

Das Framework erstellt und zerstört den gesamten Rechner für jeden Diagrammdurchlauf (z.B. einmal pro Video oder einmal pro Bild). Teure oder große Objekte, die über Diagrammdurchläufe hinweg konstant bleiben, sollten als Pakete auf der Eingabeseite bereitgestellt werden, damit die Berechnungen bei nachfolgenden Durchläufen nicht wiederholt werden.

Nach der Initialisierung erfolgt bei jeder Ausführung des Graphen die folgende Sequenz:

  • Open()
  • Process() (wiederholt)
  • Close()

Das Framework ruft Open() auf, um den Rechner zu initialisieren. Open() sollte alle Optionen interpretieren und den Status pro Diagrammausführung festlegen. Open() kann Pakete auf der Eingabeseite abrufen und in die Rechnerausgaben schreiben. Gegebenenfalls sollte SetOffset() aufgerufen werden, um die potenzielle Paketpufferung von Eingabestreams zu reduzieren.

Wenn während Open() oder Process() ein Fehler auftritt (was daran zu erkennen ist, dass einer von ihnen einen Nicht-Ok-Status zurückgibt), wird die Ausführung der Grafik ohne weitere Aufrufe der Rechnermethoden beendet und der Rechner gelöscht.

Bei einem Rechner mit Eingaben ruft das Framework Process() auf, wenn für mindestens eine Eingabe ein Paket verfügbar ist. Das Framework garantiert, dass alle Eingaben den gleichen Zeitstempel haben, dass die Zeitstempel mit jedem Aufruf von Process() zunehmen und dass alle Pakete zugestellt werden. Daher enthalten einige Eingaben möglicherweise keine Pakete, wenn Process() aufgerufen wird. Eine Eingabe, deren Paket fehlt, scheint ein leeres Paket (ohne Zeitstempel) zu erzeugen.

Das Framework ruft nach allen Aufrufen von Process() Close() auf. Alle Eingaben sind erschöpft, aber Close() hat Zugriff auf Pakete auf der Eingabeseite und kann Ausgaben schreiben. Nachdem Close zurückgegeben wurde, wird der Rechner gelöscht.

Rechner ohne Eingaben werden als Quellen bezeichnet. Bei einem Quellenrechner wird Process() weiterhin aufgerufen, solange der Status Ok zurückgegeben wird. Ein Quellrechner gibt an, dass der Rechner erschöpft ist, indem ein Haltestellenstatus (z. B. mediaPipe::tool::StatusStop()) zurückgegeben wird.

Eingaben und Ausgaben identifizieren

Die öffentliche Schnittstelle zu einem Taschenrechner besteht aus einer Reihe von Eingabe- und Ausgabestreams. In einer CalculatorGraphConfiguration werden die Ausgaben einiger Rechner mithilfe benannter Streams mit den Eingaben anderer Taschenrechner verbunden. Streamnamen werden normalerweise in Kleinbuchstaben geschrieben, während Eingabe- und Ausgabe-Tags normalerweise in Großbuchstaben geschrieben werden. Im folgenden Beispiel wird die Ausgabe mit dem Tag-Namen VIDEO über den Stream video_stream mit der Eingabe mit dem Tag-Namen VIDEO_IN verbunden.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Eingabe- und Ausgabestreams können anhand der Indexnummer, des Tagnamens oder durch eine Kombination aus Tagname und Indexnummer identifiziert werden. Im folgenden Beispiel sehen Sie einige Beispiele für Eingabe- und Ausgabekennungen. SomeAudioVideoCalculator identifiziert die Videoausgabe anhand des Tags und die Audioausgaben durch die Kombination aus Tag und Index. Die Eingabe mit dem Tag VIDEO ist mit dem Stream video_stream verbunden. Die Ausgaben mit dem Tag AUDIO und den Indexen 0 und 1 werden mit den Streams mit den Namen audio_left und audio_right verbunden. SomeAudioCalculator identifiziert die Audioeingaben nur anhand des Index (kein Tag erforderlich).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

In der Taschenrechnerimplementierung werden Ein- und Ausgaben auch durch den Tag-Namen und die Indexnummer identifiziert. In der folgenden Funktion werden Eingabe und Ausgabe identifiziert:

  • Nach Indexnummer: Der kombinierte Eingabestream wird einfach durch den Index 0 identifiziert.
  • Nach Tag-Name: Der Videoausgabestream wird durch den Tag-Namen "VIDEO" identifiziert.
  • Nach Tag-Name und Indexnummer: Die Ausgabe-Audiostreams werden durch eine Kombination aus dem Tag-Namen AUDIO und den Indexnummern 0 und 1 identifiziert.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Wird verarbeitet

Process(), der auf einem Nicht-Quellknoten aufgerufen wird, muss absl::OkStatus() zurückgeben, um anzugeben, dass alles gut gelaufen ist, oder einen anderen Statuscode, der einen Fehler signalisiert.

Wenn ein Nicht-Quellrechner tool::StatusStop() zurückgibt, bedeutet dies, dass die Grafik vorzeitig abgebrochen wird. In diesem Fall werden alle Quellrechner und Grafik-Eingabestreams geschlossen (und die verbleibenden Pakete werden über die Grafik weitergegeben).

Auf einen Quellknoten in einer Grafik wird weiterhin Process() aufgerufen, solange absl::OkStatus( zurückgegeben wird. Um anzugeben, dass keine weiteren Daten generiert werden müssen, wird tool::StatusStop() zurückgegeben. Jeder andere Status weist auf einen Fehler hin.

Close() gibt absl::OkStatus() zurück, um den Erfolg anzugeben. Jeder andere Status weist auf einen Fehler hin.

Hier ist die grundlegende Process()-Funktion. Die Eingabedaten werden mithilfe der Methode Input() angefordert, die nur verwendet werden kann, wenn der Rechner nur eine Eingabe hat. Anschließend verwendet es std::unique_ptr, um den für das Ausgabepaket erforderlichen Arbeitsspeicher zuzuweisen, und führt die Berechnungen durch. Wenn der Vorgang abgeschlossen ist, wird der Zeiger freigegeben, wenn er dem Ausgabestream hinzugefügt wird.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Rechneroptionen

Taschenrechner akzeptieren Verarbeitungsparameter durch (1) Eingabestreampakete (2) Pakete auf der Eingabeseite und (3) Taschenrechneroptionen. Falls angegeben, werden Taschenrechneroptionen als Literalwerte im Feld node_options der CalculatorGraphConfiguration.Node-Nachricht angezeigt.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Für das Feld node_options kann die Proto3-Syntax verwendet werden. Alternativ können Taschenrechneroptionen mithilfe der proto2-Syntax im Feld options angegeben werden.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Nicht alle Taschenrechner akzeptieren Taschenrechneroptionen. Um Optionen zu akzeptieren, definiert ein Taschenrechner normalerweise einen neuen protobuf-Nachrichtentyp, der die Optionen darstellt, z. B. PacketClonerCalculatorOptions. Der Rechner liest diese protobuf-Nachricht dann in seiner CalculatorBase::Open-Methode und möglicherweise auch in der CalculatorBase::GetContract-Funktion oder der CalculatorBase::Process-Methode. Normalerweise wird der neue protobuf-Nachrichtentyp als protobuf-Schema mithilfe einer „.proto“-Datei und einer mediapipe_proto_library()-Build-Regel definiert.

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Beispielrechner

In diesem Abschnitt wird die Implementierung des PacketClonerCalculator-Objekts erläutert, das eine relativ einfache Aufgabe erfüllt und in vielen Taschenrechnerdiagrammen verwendet wird. PacketClonerCalculator erstellt einfach auf Anfrage eine Kopie der letzten Eingabepakete.

PacketClonerCalculator ist nützlich, wenn die Zeitstempel eingehender Datenpakete nicht perfekt ausgerichtet sind. Angenommen, wir haben einen Raum mit einem Mikrofon, einem Lichtsensor und einer Videokamera, die sensorische Daten sammelt. Jeder der Sensoren arbeitet unabhängig und erfasst in regelmäßigen Abständen Daten. Angenommen, die Ausgabe jedes Sensors lautet:

  • Mikrofon = Lautstärke im Raum in Dezibel (Ganzzahl)
  • Lichtsensor = Helligkeit des Raums (Ganzzahl)
  • Videokamera = RGB-Bildframe des Raums (ImageFrame)

Unsere einfache Wahrnehmungspipeline ist darauf ausgelegt, die sensorischen Daten dieser drei Sensoren zu verarbeiten. So werden Bilderrahmendaten von der Kamera jederzeit mit den letzten erfassten Daten zur Lautstärke des Mikrofons und zur Helligkeit des Lichtsensors synchronisiert. Um dies mit MediaPipe zu erreichen, hat unsere Wahrnehmungspipeline drei Eingabestreams:

  • Room_mic_signal: Jedes Datenpaket in diesem Eingabestream ist eine Ganzzahl, die angibt, wie laut Audio in einem Raum mit Zeitstempel ist.
  • Room_lightening_sensor: Jedes Datenpaket in diesem Eingabestream ist Ganzzahldaten, die angeben, wie hell der Raum mit dem Zeitstempel beleuchtet ist.
  • Room_video_tick_signal: Jedes Datenpaket in diesem Eingabestream ist ein Bildausschnitt von Videodaten, die von der Kamera im Raum erfasstes Video mit Zeitstempel darstellen.

Unten ist die Implementierung von PacketClonerCalculator dargestellt. Sie sehen die Methoden GetContract(), Open() und Process() sowie die Instanzvariable current_, die die neuesten Eingabepakete enthält.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

Normalerweise verfügt ein Taschenrechner nur über eine CC-Datei. Es ist keine .h-Datei erforderlich, da Mediapipe mithilfe der Registrierung Rechner meldet. Nachdem Sie Ihre Rechnerklasse definiert haben, registrieren Sie sie mit dem Makroaufruf REGISTER_CALCULATOR(calculator_class_name).

Unten sehen Sie eine einfache MediaPipe-Grafik mit 3 Eingabestreams, 1 Knoten (PacketClonerCalculator) und 2 Ausgabestreams.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

Das folgende Diagramm zeigt, wie PacketClonerCalculator seine Ausgabepakete (unten) basierend auf der Reihe von Eingabepaketen (oben) definiert.

Mit PacketClonerCalculator grafisch darstellen
Jedes Mal, wenn er ein Paket aus seinem TICK-Eingabestream empfängt, gibt der PacketClonerCalculator das neueste Paket aus jedem seiner Eingabestreams aus. Die Reihenfolge der Ausgabepakete (unten) wird durch die Reihenfolge der Eingabepakete (oben) und deren Zeitstempel bestimmt. Die Zeitstempel werden auf der rechten Seite des Diagramms angezeigt.