Calculatrices

Chaque calculateur est un nœud d'un graphique. Nous décrivons comment créer une calculatrice, comment initialiser un calculateur, comment effectuer ses calculs, les flux d'entrée et de sortie, les codes temporels et les options. Chaque nœud du graphique est implémenté en tant que Calculator. La majeure partie de l'exécution du graphe se fait dans ses calculatrices. Un calculateur peut recevoir zéro, un ou plusieurs flux d'entrée et/ou paquets secondaires, et ne produit aucun flux de sortie ou plusieurs paquets secondaires.

CalculatorBase

Pour créer un calculateur, vous devez définir une nouvelle sous-classe de la classe CalculatorBase, implémenter un certain nombre de méthodes et enregistrer la nouvelle sous-classe auprès de Mediapipe. Au minimum, un nouveau calculateur doit implémenter les quatre méthodes ci-dessous

  • GetContract()
    • Les auteurs du calculateur peuvent spécifier les types d'entrées et de sorties attendus d'un calculateur dans GetContract(). Lorsqu'un graphe est initialisé, le framework appelle une méthode statique pour vérifier si les types de paquets des entrées et sorties connectées correspondent aux informations de cette spécification.
  • Open()
    • Après le démarrage d'un graphique, le framework appelle Open(). Les paquets côté entrée sont disponibles pour le calculateur à ce stade. Open() interprète les opérations de configuration des nœuds (voir Graphiques) et prépare l'état d'exécution de chaque graphique du simulateur. Cette fonction peut également écrire des paquets dans les résultats du calculateur. Une erreur lors de l'exécution de Open() peut mettre fin à l'exécution du graphe.
  • Process()
    • Pour un calculateur avec des entrées, le framework appelle Process() de manière répétée chaque fois qu'au moins un flux d'entrée dispose d'un paquet disponible. Par défaut, le framework garantit que toutes les entrées ont le même horodatage (consultez la section Synchronisation pour plus d'informations). Plusieurs appels Process() peuvent être appelés simultanément lorsque l'exécution en parallèle est activée. Si une erreur se produit pendant Process(), le framework appelle Close() et l'exécution du graphique s'arrête.
  • Close()
    • Une fois tous les appels à Process() terminés ou lorsque tous les flux d'entrée sont fermés, le framework appelle Close(). Cette fonction est toujours appelée si Open() a été appelé et réussi, et même si l'exécution du graphe s'est arrêtée en raison d'une erreur. Aucune entrée n'est disponible via les flux d'entrée pendant Close(), mais l'application a toujours accès aux paquets secondaires d'entrée et peut donc écrire des sorties. Une fois Close() renvoyée, le simulateur doit être considéré comme un nœud mort. L'objet de calculateur est détruit dès que le graphique est exécuté.

Voici des extraits de code de CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

La vie d'une calculatrice

Lors de l'initialisation d'un graphique MediaPipe, le framework appelle une méthode statique GetContract() pour déterminer les types de paquets attendus.

Le framework construit et détruit l'intégralité du calculateur pour chaque exécution de graphique (par exemple, une fois par vidéo ou une fois par image). Des objets coûteux ou volumineux qui restent constants lors des exécutions de graphe doivent être fournis en tant que paquets secondaires d'entrée afin que les calculs ne soient pas répétés lors des exécutions suivantes.

Après l'initialisation, à chaque exécution du graphe, la séquence suivante se produit:

  • Open()
  • Process() (répété)
  • Close()

Le framework appelle Open() pour initialiser le simulateur. Open() doit interpréter toutes les options et configurer l'état par exécution du graphique du calculateur. Open() peut obtenir des paquets côté entrée et écrire des paquets dans les sorties du calculateur. Le cas échéant, il doit appeler SetOffset() pour réduire la mise en mémoire tampon potentielle des paquets des flux d'entrée.

Si une erreur se produit pendant Open() ou Process() (comme indiqué par l'une d'entre elles renvoyant un état autre que Ok), l'exécution du graphique est arrêtée sans aucun autre appel aux méthodes du simulateur, et le simulateur est détruit.

Pour un calculateur avec des entrées, le framework appelle Process() chaque fois qu'au moins une entrée dispose d'un paquet. Le framework garantit que toutes les entrées ont le même horodatage, que les horodatages augmentent à chaque appel à Process() et que tous les paquets sont distribués. Par conséquent, certaines entrées peuvent ne contenir aucun paquet lorsque Process() est appelé. Une entrée dont le paquet est manquant semble produire un paquet vide (sans code temporel).

Le framework appelle Close() après tous les appels à Process(). Toutes les entrées sont épuisées, mais Close() a accès aux paquets secondaires d'entrée et peut écrire des sorties. Une fois le retour de Close, la calculatrice est détruite.

Les calculatrices sans entrée sont appelées sources. Un simulateur de source continue d'appeler Process() tant qu'il renvoie un état Ok. Un simulateur de source indique qu'il est épuisé en renvoyant l'état d'un arrêt (par exemple, mediaPipe::tool::StatusStop()).

Identifier les entrées et les sorties

L'interface publique d'un calculateur se compose d'un ensemble de flux d'entrée et de flux de sortie. Dans une CalculatorGraphConfiguration, les sorties de certains calculateurs sont connectées aux entrées d'autres calculateurs à l'aide de flux nommés. Les noms de flux sont normalement en minuscules, tandis que les tags d'entrée et de sortie sont normalement en MAJUSCULES. Dans l'exemple ci-dessous, la sortie portant le nom de tag VIDEO est connectée à l'entrée portant le nom de tag VIDEO_IN à l'aide du flux nommé video_stream.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Les flux d'entrée et de sortie peuvent être identifiés par numéro d'index, par nom de balise, ou par une combinaison de nom de balise et de numéro d'index. Vous trouverez des exemples d'identifiants d'entrée et de sortie dans l'exemple ci-dessous. SomeAudioVideoCalculator identifie sa sortie vidéo par tag et ses sorties audio grâce à la combinaison d'un tag et d'un index. L'entrée avec la balise VIDEO est connectée au flux nommé video_stream. Les sorties associées à la balise AUDIO et aux index 0 et 1 sont connectées aux flux nommés audio_left et audio_right. SomeAudioCalculator identifie ses entrées audio uniquement à l'aide de l'index (aucune balise n'est nécessaire).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

Dans la mise en œuvre du simulateur, les entrées et les sorties sont également identifiées par le nom de la balise et le numéro d'index. Dans la fonction ci-dessous, les entrées et les sorties sont identifiées:

  • Par numéro d'index: le flux d'entrée combiné est identifié simplement par l'index 0.
  • Par nom de tag: le flux de sortie vidéo est identifié par le nom de tag "VIDEO".
  • Par nom de balise et numéro d'index: les flux audio de sortie sont identifiés par la combinaison du nom de balise AUDIO et des numéros d'index 0 et 1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Traitement

Process() appelé sur un nœud non source doit renvoyer absl::OkStatus() pour indiquer que tout s'est déroulé correctement, ou tout autre code d'état pour signaler une erreur

Si un simulateur non source renvoie tool::StatusStop(), cela signifie que le graphique est annulé de manière anticipée. Dans ce cas, tous les calculateurs de sources et tous les flux d'entrée du graphe seront fermés (et les paquets restants seront propagés dans le graphe).

Un nœud source dans un graphique continuera d'être appelé Process() tant qu'il renvoie absl::OkStatus(. Pour indiquer qu'il n'y a plus de données à générer, renvoyez tool::StatusStop(). Tout autre état indique qu'une erreur s'est produite.

Close() renvoie absl::OkStatus() pour indiquer la réussite de l'opération. Tout autre état indique un échec.

Voici la fonction Process() de base. Il utilise la méthode Input() (qui ne peut être utilisée que si le simulateur a une seule entrée) pour demander ses données d'entrée. Il utilise ensuite std::unique_ptr pour allouer la mémoire nécessaire au paquet de sortie et effectue les calculs. Une fois terminé, il libère le pointeur lors de son ajout au flux de sortie.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Options de la calculatrice

Les calculatrices acceptent les paramètres de traitement via (1) les paquets de flux d'entrée, (2) les paquets secondaires d'entrée et (3) les options du calculateur. Si elles sont spécifiées, les options de calculatrice s'affichent sous forme de valeurs littérales dans le champ node_options du message CalculatorGraphConfiguration.Node.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Le champ node_options accepte la syntaxe proto3. Les options du calculateur peuvent également être spécifiées dans le champ options à l'aide de la syntaxe proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Toutes les calculatrices n'acceptent pas toutes les options. Pour accepter des options, une calculatrice définit normalement un nouveau type de message protobuf pour représenter ses options, par exemple PacketClonerCalculatorOptions. Le simulateur lira ensuite ce message protobuf dans sa méthode CalculatorBase::Open, et éventuellement également dans sa fonction CalculatorBase::GetContract ou sa méthode CalculatorBase::Process. Normalement, le nouveau type de message protobuf sera défini en tant que schéma protobuf à l'aide d'un fichier ".proto" et d'une règle de compilation mediapipe_proto_library().

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Exemple de calculatrice

Cette section traite de l'implémentation de PacketClonerCalculator, qui effectue une tâche relativement simple et qui est utilisée dans de nombreux graphiques de calculateur. PacketClonerCalculator génère simplement une copie de ses paquets d'entrée les plus récents à la demande.

PacketClonerCalculator est utile lorsque les horodatages des paquets de données entrants ne sont pas parfaitement alignés. Supposons que nous ayons une pièce équipée d'un micro, d'un capteur de lumière et d'une caméra vidéo qui collecte des données sensorielles. Chacun des capteurs fonctionne de manière indépendante et collecte des données par intermittence. Supposons que la sortie de chaque capteur soit la suivante:

  • micro = volume en décibels de son dans la pièce (entier)
  • capteur de lumière = luminosité de la pièce (entier)
  • caméra = image RVB de la pièce (ImageFrame)

Notre pipeline de perception simple est conçu pour traiter les données sensorielles de ces trois capteurs de sorte qu'à tout moment, lorsque des données de trame d'image de l'appareil photo sont synchronisées avec les dernières données de volume du micro et de luminosité du capteur de lumière collectées. Pour ce faire, avec MediaPipe, notre pipeline de perception dispose de trois flux d'entrée:

  • room_mic_signal : chaque paquet de données de ce flux d'entrée est un nombre entier représentant le volume du son dans une pièce avec code temporel.
  • room_lightening_sensor : chaque paquet de données de ce flux d'entrée est un nombre entier représentant la luminosité de la pièce grâce à l'horodatage.
  • room_video_tick_signal : chaque paquet de données de ce flux d'entrée est une image frame de données vidéo représentant la vidéo collectée depuis la caméra de la salle avec code temporel.

Vous trouverez ci-dessous l'implémentation de PacketClonerCalculator. Vous pouvez afficher les méthodes GetContract(), Open() et Process(), ainsi que la variable d'instance current_ qui contient les paquets d'entrée les plus récents.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

En règle générale, une calculatrice n'a qu'un fichier .cc. Aucun .h n'est requis, car Mediapipe utilise l'enregistrement pour que les calculateurs soient connus. Après avoir défini votre classe de calculatrice, enregistrez-la avec un appel de macro REGISTER_CALCULATOR(calculator_class_name).

Vous trouverez ci-dessous un graphe MediaPipe basique, comportant trois flux d'entrée, un nœud (PacketClonerCalculator) et deux flux de sortie.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

Le schéma ci-dessous montre comment PacketClonerCalculator définit ses paquets de sortie (bas) en fonction de sa série de paquets d'entrée (top).

Graphique utilisant PacketClonerCalculator
Chaque fois qu'il reçoit un paquet sur son flux d'entrée TICK, PacketClonerCalculator génère le paquet le plus récent de chacun de ses flux d'entrée. La séquence des paquets de sortie (bottom) est déterminée par la séquence des paquets d'entrée (top) et leur code temporel. Les codes temporels sont indiqués à droite du diagramme.