Calculadoras

Cada calculadora es el nodo de un gráfico. Describimos cómo crear una nueva calculadora, cómo inicializar una calculadora, cómo realizar sus cálculos, transmisiones, marcas de tiempo y opciones de entrada y salida. Cada nodo del gráfico implementado como Calculator. La mayor parte de la ejecución por grafos se produce dentro de con calculadoras. Una calculadora puede recibir cero o más transmisiones de entrada y produce cero o más transmisiones de salida o paquetes adicionales.

CalculatorBase

La calculadora se crea definiendo una nueva subclase de la CalculatorBase implementar una serie de métodos y registrar la nueva subclase con Mediapipe: Como mínimo, una nueva calculadora debe implementar los cuatro métodos que se indican a continuación.

  • GetContract()
    • Los autores de la calculadora pueden especificar los tipos de entradas y salidas esperados. de una calculadora en GetContract(). Cuando se inicializa un gráfico, el llama a un método estático para verificar si los tipos de paquetes del y las entradas y salidas conectadas coinciden con la información de esta especificación.
  • Open()
    • Después de que se inicia un gráfico, el framework llama a Open(). El lado de entrada paquetes están disponibles para la calculadora en este punto. Open() interpreta las operaciones de configuración de nodos (consulta Gráficos) y prepara el estado de ejecución por grafo de la calculadora. Esta función puede y también escribir paquetes en los resultados de la calculadora. Los errores que ocurren durante Open() pueden terminar la ejecución del grafo.
  • Process()
    • En el caso de una calculadora con entradas, el framework llama a Process() de forma reiterada. cuando al menos una transmisión de entrada tenga un paquete disponible. El marco de trabajo de forma predeterminada garantiza que todas las entradas tengan la misma marca de tiempo (consulta Sincronización para obtener más información). Múltiples Las llamadas a Process() se pueden invocar de forma simultánea cuando se ejecuta en paralelo. esté habilitado. Si se produce un error durante Process(), el framework llamará a Close() y la ejecución del grafo finaliza.
  • Close()
    • Cuando finalicen todas las llamadas a Process() o cuando se cierren todas las transmisiones de entrada, el framework llama a Close(). Esta función siempre se llama si Se llamó a Open() y se realizó correctamente, incluso si la ejecución del grafo finalizó. debido a un error. No hay entradas disponibles a través de ninguna transmisión de entrada durante Close(), pero todavía tiene acceso a los paquetes laterales de entrada y por lo que pueden escribir resultados. Después de que se devuelva Close(), la calculadora debe considerarse un nodo muerto. El objeto de la calculadora se destruye en cuanto el grafo termina de ejecutarse.

Los siguientes son fragmentos de código de CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

La vida de una calculadora

Durante la inicialización de un grafo de MediaPipe, el framework llama a un Se trata del método estático GetContract() para determinar qué tipos de paquetes se esperan.

El framework construye y destruye toda la calculadora para cada ejecución de grafo (p.ej., una vez por video o una vez por imagen). Objetos grandes o costosos que permanecen constante en las ejecuciones de grafos debe suministrarse como paquetes de entrada los cálculos no se repiten en ejecuciones posteriores.

Después de la inicialización, para cada ejecución del grafo, se produce la siguiente secuencia:

  • Open()
  • Process() (repetidamente)
  • Close()

El framework llama a Open() para inicializar la calculadora. Open() debería a interpretar las opciones y configurar el estado de ejecución por grafo de la calculadora. Open() puede obtener paquetes laterales de entrada y escribir paquetes en las salidas de la calculadora. Si apropiado, debe llamar a SetOffset() para reducir el almacenamiento en búfer de paquetes potencial de flujos de entrada.

Si se produce un error durante Open() o Process() (según lo indique uno de ellos) y se muestra un estado que no es Ok), la ejecución del gráfico finaliza sin más llamadas. a los métodos de la calculadora y esta se destruye.

En una calculadora con entradas, el framework llama a Process() siempre que, al menos, una entrada tiene un paquete disponible. El framework garantiza que todas las entradas tengan la misma marca de tiempo, que las marcas de tiempo aumentan con cada llamada a Process() y que se entreguen todos los paquetes. Como consecuencia, es posible que algunas entradas no tengan paquetes cuando se llama a Process(). Una entrada cuyo paquete falta se producir un paquete vacío (sin marca de tiempo).

El framework llama a Close() después de todas las llamadas a Process(). Todas las entradas se agotaron, pero Close() tiene acceso a los paquetes laterales de entrada y puede escribir resultados. Después de que se muestra Close, la calculadora se destruye.

Las calculadoras sin entradas se conocen como fuentes. Una calculadora de fuente Se seguirá llamando a Process(), siempre que se muestre un estado Ok. R la calculadora de fuente indica que se agotó y muestra un estado de parada (es decir, mediaPipe::tool::StatusStop()).

Identifica las entradas y salidas

La interfaz pública de una calculadora consta de un conjunto de flujos de entrada y transmisiones de salida. En una CalculatorGraphConfiguration, los resultados de algunas Las calculadoras se conectan a las entradas de otras calculadoras usando transmisiones continuas. Los nombres de las transmisiones suelen estar en minúscula, mientras que las etiquetas de entrada y salida normalmente en MAYÚSCULAS. En el siguiente ejemplo, el resultado con el nombre de etiqueta VIDEO es Se conectó a la entrada con el nombre de etiqueta VIDEO_IN usando la transmisión llamada video_stream

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Las transmisiones de entrada y salida se pueden identificar por el número de índice, el nombre de la etiqueta o un una combinación del nombre de la etiqueta y el número de índice. Puedes ver algunos ejemplos de entradas identificadores de salida en el siguiente ejemplo. SomeAudioVideoCalculator identifica la salida de video por etiqueta y sus salidas de audio mediante la combinación de etiquetas y índice. La entrada con la etiqueta VIDEO está conectada a la transmisión llamada video_stream Los resultados con la etiqueta AUDIO y los índices 0 y 1 son los siguientes: conectadas a las transmisiones llamadas audio_left y audio_right. SomeAudioCalculator identifica sus entradas de audio solo por índice (no se necesita etiqueta).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

En la implementación de la calculadora, las entradas y las salidas también se identifican por etiqueta. y el número de índice. En la siguiente función, se identifican las entradas y salidas:

  • Por número de índice: la transmisión de entrada combinada se identifica simplemente por el índice. 0
  • Por nombre de etiqueta: La transmisión de salida de video se identifica con el nombre de etiqueta "VIDEO".
  • Por nombre de etiqueta y número de índice: las transmisiones de audio de salida se identifican con el combinación del nombre de la etiqueta AUDIO y los números de índice 0 y 1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Procesando

Se llama a Process() en un nodo que no es de origen debe mostrar absl::OkStatus() a Indicar que todo salió bien o cualquier otro código de estado que indique un error

Si una calculadora que no es una fuente muestra tool::StatusStop(), esto indicará la gráfico se cancela anticipadamente. En este caso, todas las calculadoras de origen y los gráficos las transmisiones de entrada se cerrarán (y los paquetes restantes se propagarán a través del gráfico).

A un nodo de origen de un gráfico se seguirá teniendo llamado Process() durante todo el tiempo ya que muestra absl::OkStatus(). Para indicar que no hay más datos que se generó tool::StatusStop(). Cualquier otro estado indica que un error para determinar si se produjo un error.

Close() muestra absl::OkStatus() para indicar que la operación se realizó correctamente. Cualquier otro estado indica un error.

Esta es la función Process() básica. Usa el método Input() (que puede usarse solo si la calculadora tiene una sola entrada) para solicitar sus datos de entrada. Integra Luego, usa std::unique_ptr para asignar la memoria necesaria para el paquete de salida. y hace los cálculos. Cuando está listo, se libera el puntero cuando se agrega al el flujo de salida.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Opciones de la calculadora

Las calculadoras aceptan parámetros de procesamiento a través de (1) paquetes de flujo de entrada (2) paquetes laterales de entrada y (3) opciones de calculadora. de las opciones de la calculadora, si especificadas, aparecen como valores literales en el campo node_options de la CalculatorGraphConfiguration.Node mensaje.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

El campo node_options acepta la sintaxis de proto3. Como alternativa, usa la calculadora se pueden especificar opciones en el campo options con la sintaxis proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

No todas las calculadoras aceptan opciones. Para aceptar opciones, una definirá un nuevo tipo de mensaje protobuf para representar su opciones, como PacketClonerCalculatorOptions. A continuación, la calculadora leer ese mensaje protobuf en su método CalculatorBase::Open y, posiblemente, también en su función CalculatorBase::GetContract o en su CalculatorBase::Process. Por lo general, el nuevo tipo de mensaje protobuf definirse como un esquema protobuf con “.proto” y una Regla de compilación mediapipe_proto_library().

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Calculadora de ejemplo

En esta sección, se analiza la implementación de PacketClonerCalculator, que hace un trabajo relativamente simple y se usa en muchos gráficos de calculadora. PacketClonerCalculator solo produce una copia de sus paquetes de entrada más recientes. según demanda.

PacketClonerCalculator es útil cuando las marcas de tiempo de los paquetes de datos entrantes no están alineados perfectamente. Supongamos que tenemos una habitación con micrófono, y una cámara de video que recopila datos sensoriales. Cada uno de los sensores opera de forma independiente y recopila datos de forma intermitente. Supongamos que la salida de cada sensor:

  • micrófono = volumen en decibeles de sonido en la habitación (número entero)
  • sensor de luz = brillo de la habitación (número entero)
  • cámara de video = marco de imagen RGB de la habitación (ImageFrame)

Nuestra canalización de percepción simple está diseñada para procesar datos sensoriales de estas 3 sensores de manera que en cualquier momento cuando tengamos datos de marco de imagen de la cámara que se sincroniza con los últimos datos recopilados sobre el volumen y la luz del micrófono datos de brillo del sensor. Para hacerlo con MediaPipe, nuestra canalización de percepción tiene 3 flujos de entrada:

  • Room_mic_signal: Cada paquete de datos de esta transmisión de entrada corresponde a un número entero. que representa el volumen del audio en una habitación con una marca de tiempo.
  • Room_lightening_sensor: Cada paquete de datos de este flujo de entrada es un número entero. datos que representan qué tan brillante está la habitación iluminada con una marca de tiempo.
  • Room_video_tick_signal: 1 paquete de datos en este flujo de entrada fotograma de datos de video que representan los videos recopilados de la cámara en el sala con marca de tiempo.

A continuación, se muestra la implementación de PacketClonerCalculator. Puedes consultar la GetContract(), Open() y Process(), así como la instancia variable current_ que contiene los paquetes de entrada más recientes.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

Por lo general, las calculadoras solo tienen archivos .cc. No se requiere ningún dominio .h, ya que mediapipe usa el registro para dar a conocer las calculadoras. Después de que tengas definiste tu clase de calculadora, regístrala con una invocación de macro REGISTER_CALCULATOR(calculator_class_name).

A continuación, se muestra un gráfico trivial de MediaPipe que tiene 3 flujos de entrada, 1 nodo (PacketClonerCalculator) y 2 transmisiones de salida.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

En el siguiente diagrama, se muestra cómo PacketClonerCalculator define su resultado paquetes (inferior) en función de su serie de paquetes de entrada (parte superior).

Grafo con BundleClonerCalculator
Cada vez que recibe un paquete en la transmisión de entrada de TICK, batchClonerCalculator genera el paquete más reciente de cada una de sus transmisiones de entrada. La secuencia de paquetes de salida (inferior) se determina por la secuencia de paquetes de entrada (superior) y sus marcas de tiempo. Las marcas de tiempo se muestran en el lado derecho del diagrama.