Calculatrices

Chaque calculatrice est un nœud d'un graphique. Nous décrivons comment créer calculatrice, comment initialiser une calculatrice, comment effectuer ses calculs, flux d'entrée et de sortie, codes temporels et options. Chaque nœud du graphique est implémentée en tant que Calculator. L'essentiel de l'exécution du graphe a lieu calculatrices. Une calculatrice peut recevoir zéro, un ou plusieurs flux d'entrée et/ou côtés et produit zéro, un ou plusieurs flux de sortie et/ou paquets secondaires.

CalculatorBase

Un calculateur est créé en définissant une nouvelle sous-classe de CalculatorBase la classe, l'implémentation d'un certain nombre de méthodes et l'enregistrement de la nouvelle sous-classe avec Mediapipe Une nouvelle calculatrice doit, au minimum, implémenter les quatre méthodes ci-dessous.

  • GetContract()
    • Les auteurs de calculatrices peuvent spécifier les types d'entrées et de sorties attendus d'un calculateur dans GetContract(). Lorsqu'un graphique est initialisé, appelle une méthode statique pour vérifier si les types de paquets connectées correspondent aux informations de ce spécifique.
  • Open()
    • Lorsqu'un graphique a démarré, le framework appelle Open(). Côté entrée paquets sont disponibles pour le calculateur à ce stade. Open() interprète les opérations de configuration des nœuds (voir Graphiques) et prépare l'état par exécution du graphique du calculateur. Cette fonction peut écrivent également des paquets sur les sorties du calculateur. Une erreur pendant Open() peut pour terminer l'exécution du graphe.
  • Process()
    • Pour un calculateur avec des entrées, le framework appelle Process() à plusieurs reprises chaque fois qu'un paquet est disponible pour au moins un flux d'entrée. Le cadre garantit par défaut que toutes les entrées ont le même code temporel (voir Synchronisation. Multiples Les appels Process() peuvent être appelés simultanément lors d'une exécution parallèle est activé. Si une erreur se produit pendant Process(), le framework appelle Close() et l'exécution du graphe se termine.
  • Close()
    • Une fois tous les appels à Process() terminés ou lorsque tous les flux d'entrée se ferment, le framework appelle Close(). Cette fonction est toujours appelée si Open() a été appelé et a réussi, et même si l'exécution du graphe s'est arrêtée en raison d'une erreur. Aucune entrée n'est disponible via les flux d'entrée pendant Close(), mais il a toujours accès aux paquets côté entrée et peut donc écrire des sorties. Après le retour de Close(), le simulateur doit être considéré comme un nœud mort. L'objet calculatrice est détruit en tant que dès que l'exécution du graphique est terminée.

Voici des extraits de code CalculatorBase.h :

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

Durée de vie d'une calculatrice

Lors de l'initialisation d'un graphique MediaPipe, le framework appelle un GetContract() pour déterminer les types de paquets attendus.

Le framework construit et détruit l'intégralité du calculateur à chaque exécution du graphe (une fois par vidéo ou une fois par image, par exemple). Objets coûteux ou volumineux qui restent constante entre les exécutions du graphique doit être fournie sous forme de paquets côté entrée pour que de calcul ne sont pas répétés lors des exécutions suivantes.

Après l'initialisation, pour chaque exécution du graphique, la séquence suivante se produit:

  • Open()
  • Process() (répété)
  • Close()

Le framework appelle Open() pour initialiser le calculateur. Open() doit pour interpréter les options et configurer l'état d'exécution du graphique. Open() peut obtenir des paquets côté entrée et écrire des paquets dans les sorties du calculateur. Si nécessaire, elle doit appeler SetOffset() pour réduire la mise en mémoire tampon potentielle des paquets de flux d'entrée.

Si une erreur se produit pendant Open() ou Process() (comme indiqué par l'un d'entre eux) renvoyant un état autre que Ok), l'exécution du graphique s'arrête sans aucun autre appel. aux méthodes de la calculatrice, et celle-ci est détruite.

Pour un calculateur avec des entrées, le framework appelle Process() chaque fois au moins une entrée a un paquet disponible. Le framework garantit que toutes les entrées le même code temporel, que ces codes augmentent à chaque appel de Process() et que tous les paquets sont distribués. Il est donc possible que certaines entrées n'aient pas paquets lorsque Process() est appelé. Une entrée dont le paquet est manquant semble génèrent un paquet vide (sans code temporel).

Le framework appelle Close() après tous les appels à Process(). Toutes les entrées sont épuisés, mais Close() a accès aux paquets côté entrée et peut des sorties en écriture. Lorsque Close est renvoyée, la calculatrice est détruite.

Les calculatrices sans entrée sont appelées "sources". Un calculateur de source continue d'être appelé Process() tant qu'il renvoie un état Ok. A le calculateur de source indique qu'il est épuisé en renvoyant l'état d'un arrêt (par exemple, mediaPipe::tool::StatusStop()).

Identifier les entrées et les sorties

L'interface publique d'une calculatrice est composée d'un ensemble de flux d'entrée et flux de sortie. Dans une CalculatorGraphConfiguration, les sorties de certains les calculatrices sont connectées aux entrées d'autres calculatrices à l'aide d'un nom flux. Normalement, les noms des flux sont en minuscules, tandis que les tags d'entrée et de sortie sont normalement en MAJUSCULES. Dans l'exemple ci-dessous, la sortie avec le nom de tag VIDEO est connecté à l'entrée avec le tag VIDEO_IN à l'aide du flux nommé video_stream

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Les flux d'entrée et de sortie peuvent être identifiés par un numéro d'index, un nom de balise ou un élément combinaison du nom de la balise et du numéro d'index. Vous pouvez voir des exemples d'entrées et de sortie dans l'exemple ci-dessous. SomeAudioVideoCalculator identifie sa sortie vidéo par tag et ses sorties audio en combinant les tags de l'index. L'entrée associée au tag VIDEO est connectée au flux intitulé video_stream Les sorties avec la balise AUDIO et les index 0 et 1 sont connecté aux flux nommés audio_left et audio_right. SomeAudioCalculator identifie ses entrées audio par index uniquement (aucune balise n'est nécessaire).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

Dans l'implémentation du calculateur, les entrées et les sorties sont également identifiées par une balise. son nom et son numéro d'index. Dans la fonction ci-dessous, les entrées et les sorties sont identifiées:

  • Par numéro d'index: le flux d'entrée combiné est simplement identifié par un indice 0
  • Par nom de balise: le flux de sortie vidéo est identifié par le nom de tag "VIDEO".
  • Par nom de balise et numéro d'index: les flux audio de sortie sont identifiés par le combinaison du nom de balise AUDIO et des numéros d'index 0 et 1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Traitement

La méthode Process() appelée sur un nœud non source doit renvoyer absl::OkStatus() à indiquer que tout s'est bien déroulé ou tout autre code d'état pour signaler une erreur ;

Si un calculateur non source renvoie tool::StatusStop(), cela indique le graphique est annulé de façon anticipée. Dans ce cas, tous les calculateurs sources et les graphiques flux d'entrée sont fermés (et les paquets restants se propagent via le graphique).

Un nœud source dans un graphe continuera d'être appelé Process() aussi longtemps lorsqu'elle renvoie absl::OkStatus(). Pour indiquer qu'il n'y a plus de données a généré un retour tool::StatusStop(). Tout autre état indique qu'une erreur s'est produit.

Close() renvoie absl::OkStatus() pour indiquer que l'opération a réussi. Tout autre état indique un échec.

Voici la fonction Process() de base. Il utilise la méthode Input() (qui peut à utiliser uniquement si le calculateur ne dispose que d'une seule entrée) pour demander ses données d'entrée. Il utilise ensuite std::unique_ptr pour allouer la mémoire nécessaire au paquet de sortie, et fait les calculs. Une fois l'opération terminée, elle libère le pointeur lors de son ajout le flux de sortie.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Options de la calculatrice

Les calculateurs acceptent les paramètres de traitement par le biais de (1) paquets de flux d'entrée (2) paquets côté entrée et (3) options du calculateur. Options du calculateur, si spécifié, apparaissent sous forme de valeurs littérales dans le champ node_options de CalculatorGraphConfiguration.Node message.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Le champ node_options accepte la syntaxe proto3. La calculatrice Les options peuvent être spécifiées dans le champ options à l'aide de la syntaxe proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Toutes les calculatrices n'acceptent pas toutes les options de calculatrice. Pour accepter les options, le calculateur définira normalement un nouveau type de message protobuf pour représenter son options, telles que PacketClonerCalculatorOptions. Le calculateur lire ce message protobuf dans sa méthode CalculatorBase::Open, et éventuellement également dans sa fonction CalculatorBase::GetContract ou dans CalculatorBase::Process. Normalement, le nouveau type de message protobuf être défini comme un schéma protobuf à l'aide d'un fichier ".proto" et un Règle de compilation mediapipe_proto_library().

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Exemple de calculatrice

Cette section traite de l'implémentation de PacketClonerCalculator, qui effectue un travail relativement simple et est utilisé dans de nombreux graphiques de calculatrice. PacketClonerCalculator génère simplement une copie de ses paquets d'entrée les plus récents. à la demande.

PacketClonerCalculator est utile lorsque les codes temporels des paquets de données entrants ne sont pas parfaitement alignés. Imaginons une pièce équipée d'un micro, d'un système d'éclairage et une caméra qui collecte des données sensorielles. Chacun des capteurs fonctionne de manière indépendante et collecte des données par intermittence. Supposons que la sortie de chaque capteur est:

  • microphone = volume en décibels dans la pièce (nombre entier)
  • capteur de lumière = luminosité de la pièce (entier)
  • Caméra vidéo = cadre d'image RVB de la pièce (ImageFrame)

Notre pipeline de perception simple est conçu pour traiter les données sensorielles issues de ces 3 de sorte qu'à chaque fois que l'appareil photo reçoit des données de trame d'image, est synchronisé avec les dernières données collectées sur le volume et la lumière du micro sur la luminosité du capteur. Pour cela, avec MediaPipe, notre pipeline de perception flux d'entrée:

  • room_mic_signal : chaque paquet de données de ce flux d'entrée correspond à un nombre entier qui représente le volume de l'audio dans une pièce avec un code temporel.
  • room_lightening_sensor : chaque paquet de données de ce flux d'entrée est un nombre entier. données représentant la luminosité de la pièce avec code temporel.
  • room_video_tick_signal - Chaque paquet de données de ce flux d'entrée est image de données vidéo représentant la vidéo collectée à partir de la caméra dans salle avec code temporel.

Vous trouverez ci-dessous l'implémentation de PacketClonerCalculator. Vous pouvez voir GetContract(), Open() et Process(), ainsi que l'instance la variable current_ qui contient les paquets d'entrée les plus récents.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

Généralement, une calculatrice n'a qu'un fichier .cc. Aucun .h n'est requis, car mediapipe utilise l'enregistrement pour faire connaître les calculatrices. Après avoir défini votre classe de calculatrice, enregistrez-la avec un appel de macro REGISTER_CALCULATOR(calculator_class_name).

Voici un graphique MediaPipe trivial comportant trois flux d'entrée et un nœud (PacketClonerCalculator) et deux flux de sortie.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

Le schéma ci-dessous montre comment PacketClonerCalculator définit sa sortie paquets (en bas) en fonction de sa série de paquets d'entrée (en haut).

Graphique à l&#39;aide de PacketClonerCalculator
Chaque fois qu'il reçoit un paquet sur son flux d'entrée TICK, PacketClonerCalculator génère le paquet le plus récent de chacun de ses flux d'entrée. L'ordre des paquets de sortie (en bas) est déterminé par la séquence de paquets d'entrée (en haut) et par leur horodatage. Les codes temporels sont indiqués à droite du diagramme.