Calcolatrici

Ogni calcolatrice è un nodo di un grafico. Descriviamo come creare una nuova calcolatrice, come inizializzare una calcolatrice, come eseguire i calcoli, i flussi di input e di output, i timestamp e le opzioni. Ogni nodo nel grafico viene implementato come Calculator. La maggior parte dell'esecuzione dei grafici avviene all'interno delle sue calcolatrici. Una calcolatrice può ricevere zero o più flussi di input e/o pacchetti laterali e produce zero o più flussi di output e/o pacchetti laterali.

CalculatorBase

Viene creata una calcolatrice definendo una nuova sottoclasse della classe CalculatorBase, implementando una serie di metodi e registrando la nuova sottoclasse con Mediapipe. Un nuovo calcolatore deve implementare almeno i quattro metodi seguenti

  • GetContract()
    • Gli autori delle calcolatrici possono specificare i tipi di input e output previsti di una calcolatrice in GetContract(). Quando un grafico viene inizializzato, il framework chiama un metodo statico per verificare se i tipi di pacchetto degli ingressi e degli output collegati corrispondono alle informazioni in questa specifica.
  • Open()
    • Dopo l'inizio di un grafico, il framework chiama Open(). I pacchetti laterali di input sono attualmente disponibili per la calcolatrice. Open() interpreta le operazioni di configurazione dei nodi (vedi Grafici) e prepara lo stato della calcolatrice per ogni grafico eseguito. Questa funzione può anche scrivere pacchetti negli output della calcolatrice. Un errore durante Open() può terminare l'esecuzione del grafico.
  • Process()
    • Per una calcolatrice con input, il framework chiama Process() ripetutamente ogni volta che almeno un flusso di input ha un pacchetto disponibile. Per impostazione predefinita, il framework garantisce che tutti gli input abbiano lo stesso timestamp (per ulteriori informazioni, consulta la sezione Sincronizzazione). È possibile richiamare più chiamate Process() contemporaneamente quando è abilitata l'esecuzione parallela. Se si verifica un errore durante Process(), il framework chiama Close() e l'esecuzione del grafico termina.
  • Close()
    • Al termine di tutte le chiamate a Process() o alla chiusura di tutti i flussi di input, il framework chiama Close(). Questa funzione viene sempre richiamata se Open() è stata chiamata e ha avuto esito positivo e anche se l'esecuzione del grafico è stata terminata a causa di un errore. Nessun input è disponibile tramite flussi di input durante Close(), ma ha comunque accesso ai pacchetti lato input e, pertanto, può scrivere output. Dopo la restituzione di Close(), la calcolatrice dovrebbe essere considerata un nodo non funzionante. L'oggetto Calcolatrice viene distrutto non appena termina l'esecuzione del grafico.

Di seguito sono riportati gli snippet di codice di CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

La vita di una calcolatrice

Durante l'inizializzazione di un grafico MediaPipe, il framework chiama un metodo statico GetContract() per determinare quali tipi di pacchetti sono previsti.

Il framework crea e distrugge l'intera calcolatrice per ogni esecuzione del grafico (ad es. una volta per video o una volta per immagine). Gli oggetti costosi o di grandi dimensioni che rimangono costanti nelle esecuzioni dei grafici devono essere forniti come pacchetti lato input, in modo che i calcoli non vengano ripetuti nelle esecuzioni successive.

Dopo l'inizializzazione, per ogni esecuzione del grafico, si verifica la seguente sequenza:

  • Open()
  • Process() (ripetutamente)
  • Close()

Il framework chiama Open() per inizializzare la calcolatrice. Open() dovrebbe interpretare le opzioni e impostare lo stato per esecuzione della calcolatrice. Open() potrebbe ricevere pacchetti lato di input e pacchetti di scrittura negli output della calcolatrice. Se appropriato, dovrebbe chiamare SetOffset() per ridurre il potenziale buffering dei pacchetti dei flussi di input.

Se si verifica un errore durante Open() o Process() (come indicato da uno di questi che restituisce uno stato non Ok), l'esecuzione del grafico viene terminata senza ulteriori chiamate ai metodi della calcolatrice, che viene eliminata.

Per una calcolatrice con input, il framework chiama Process() ogni volta che è disponibile un pacchetto per almeno un input. Il framework garantisce che gli input abbiano tutti lo stesso timestamp, che i timestamp aumentino a ogni chiamata a Process() e che tutti i pacchetti siano stati consegnati. Di conseguenza, alcuni input potrebbero non avere pacchetti quando viene chiamato Process(). Un input in cui manca il pacchetto sembra generare un pacchetto vuoto (senza timestamp).

Il framework chiama Close() dopo tutte le chiamate a Process(). Tutti gli input saranno esauriti, ma Close() ha accesso a pacchetti sul lato di input e potrebbe scrivere output. Quando viene restituito il caso Close, la calcolatrice viene distrutta.

I calcolatori senza input vengono indicati come sorgenti. Una calcolatrice del codice sorgente continua ad avere Process() chiamato purché restituisca lo stato Ok. Una calcolatrice di origine indica che è esaurita restituendo uno stato di interruzione (ad esempio mediaPipe::tool::StatusStop()).

Identificazione di input e output

L'interfaccia pubblica di una calcolatrice è composta da un insieme di flussi di input e di flussi di output. In CalculatorGraphConfiguration, gli output di alcune calcolatrici sono collegati agli input di altre calcolatrici utilizzando flussi denominati. I nomi dei flussi sono in genere minuscoli, mentre i tag di input e di output sono in genere in MAIUSCOLO. Nell'esempio seguente, l'output con nome tag VIDEO è collegato all'input con nome tag VIDEO_IN utilizzando lo stream denominato video_stream.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

I flussi di input e di output possono essere identificati dal numero di indice, per il nome del tag o da una combinazione di nome tag e numero di indice. Puoi vedere alcuni esempi di identificatori di input e output nell'esempio seguente. SomeAudioVideoCalculator identifica l'output video per tag e i relativi output audio in base alla combinazione di tag e indice. L'input con il tag VIDEO è collegato allo stream denominato video_stream. Gli output con il tag AUDIO e gli indici 0 e 1 sono collegati agli stream denominati audio_left e audio_right. SomeAudioCalculator identifica i propri input audio solo in base all'indice (non è necessario alcun tag).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

Nell'implementazione della calcolatrice, gli input e gli output sono identificati anche dal nome del tag e dal numero di indice. Nella funzione seguente, input e output sono identificati:

  • In base al numero di indice: il flusso di input combinato viene identificato semplicemente dall'indice 0.
  • In base al nome del tag: lo stream di output video è identificato dal nome del tag "VIDEO".
  • Per nome tag e numero di indice: gli stream audio di output sono identificati dalla combinazione del nome del tag AUDIO e dei numeri di indice 0 e 1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Elaborazione

Process() chiamato su un nodo non di origine deve restituire absl::OkStatus() per indicare che tutto è andato bene o qualsiasi altro codice di stato per segnalare un errore

Se un calcolatore non di origine restituisce tool::StatusStop(), questo indica che il grafico è stato annullato in anticipo. In questo caso, tutte le calcolatrici di origine e i flussi di input dei grafici verranno chiusi (e i pacchetti rimanenti si propagano attraverso il grafico).

Un nodo di origine in un grafico continuerà a essere chiamato Process() finché restituisce absl::OkStatus(. Per indicare che non ci sono altri dati da generare, restituisce tool::StatusStop(). Qualsiasi altro stato indica che si è verificato un errore.

Close() restituisce absl::OkStatus() per indicare che l'operazione è andata a buon fine. Qualsiasi altro stato indica un errore.

Ecco la funzione Process() di base. Utilizza il metodo Input() (che può essere utilizzato solo se la calcolatrice ha un solo input) per richiedere i dati di input. Quindi utilizza std::unique_ptr per allocare la memoria necessaria per il pacchetto di output ed esegue i calcoli. Al termine, rilascia il puntatore quando lo aggiungi allo stream di output.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Opzioni della calcolatrice

I calcolatori accettano i parametri di elaborazione tramite (1) pacchetti di flussi di input (2) pacchetti sul lato di input e (3) opzioni di calcolatrice. Le opzioni della Calcolatrice, se specificate, vengono visualizzate come valori letterali nel campo node_options del messaggio CalculatorGraphConfiguration.Node.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Il campo node_options accetta la sintassi proto3. In alternativa, è possibile specificare le opzioni della calcolatrice nel campo options utilizzando la sintassi proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Non tutte le calcolatrici accettano le opzioni disponibili. Per accettare le opzioni, una calcolatrice di solito definisce un nuovo tipo di messaggio protobuf per rappresentare le sue opzioni, ad esempio PacketClonerCalculatorOptions. La calcolatrice leggerà quindi il messaggio protobuf nel metodo CalculatorBase::Open e possibilmente anche nella funzione CalculatorBase::GetContract o nel metodo CalculatorBase::Process. Normalmente, il nuovo tipo di messaggio protobuf viene definito come schema protobuf utilizzando un file ".proto" e una regola di build mediapipe_proto_library().

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Calcolatrice di esempio

Questa sezione illustra l'implementazione di PacketClonerCalculator, che svolge un lavoro relativamente semplice ed è utilizzato in molti grafici di calcolatrice. PacketClonerCalculator produce semplicemente una copia dei suoi pacchetti di input più recenti on demand.

PacketClonerCalculator è utile quando i timestamp dei pacchetti di dati in arrivo non sono perfettamente allineati. Supponiamo di avere una stanza con un microfono, un sensore di luce e una videocamera che raccoglie dati sensoriali. Ciascuno dei sensori funziona in modo indipendente e raccoglie dati a intermittenza. Supponiamo che l'output di ciascun sensore sia:

  • microfono = volume in decibel di suono nella stanza (numero intero)
  • sensore di luce = luminosità della stanza (numero intero)
  • videocamera = cornice immagine RGB della stanza (ImageFrame)

La nostra semplice pipeline di rilevamento è progettata per elaborare i dati sensoriali di questi tre sensori in modo che, in qualsiasi momento, i dati del fotogramma delle immagini della videocamera siano sincronizzati con gli ultimi dati sul volume del microfono e sulla luminosità del sensore di luce raccolti. Per fare ciò con MediaPipe, la nostra pipeline di percezione ha 3 flussi di input:

  • room_mic_signal: ogni pacchetto di dati in questo stream di input è costituito da dati interi che rappresentano il volume dell'audio in una stanza con timestamp.
  • room_lightening_sensor: ogni pacchetto di dati in questo flusso di input è costituito da dati interi che rappresentano il livello di illuminazione della stanza mediante timestamp.
  • room_video_tick_signal: ogni pacchetto di dati in questo stream di input è un frame immagine di dati video che rappresentano i video raccolti dalla videocamera in una stanza con timestamp.

Di seguito è riportata l'implementazione di PacketClonerCalculator. Puoi vedere i metodi GetContract(), Open() e Process(), nonché la variabile di istanza current_ che contiene i pacchetti di input più recenti.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

In genere, una calcolatrice contiene solo un file .cc. Non è necessario il prefisso .h, perché mediapipe utilizza la registrazione per rendere noti i calcolatori. Dopo aver definito la classe della calcolatrice, registrala con una chiamata della macro REGISTER_CALCULATOR(nome_classe_calcolatrice).

Di seguito è riportato un grafico MediaPipe banale con 3 flussi di input, 1 nodo (PacketClonerCalculator) e 2 flussi di output.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

Il diagramma seguente mostra in che modo PacketClonerCalculator definisce i suoi pacchetti di output (in basso) in base alla serie di pacchetti di input (in alto).

Grafico con PacketClonerCalculator
Ogni volta che riceve un pacchetto sul suo flusso di input TICK, PacketClonerCalculator restituisce il pacchetto più recente da ciascuno dei suoi flussi di input. La sequenza dei pacchetti di output (in basso) è determinata dalla sequenza di pacchetti di input (in alto) e dai relativi timestamp. I timestamp sono mostrati lungo il lato destro del diagramma.