Calculadoras

Cada calculadora es el nodo de un gráfico. Se describe cómo crear una calculadora nueva, cómo inicializarla, cómo realizar sus cálculos, flujos de entrada y salida, marcas de tiempo y opciones. Cada nodo del gráfico se implementa como un Calculator. La mayor parte de la ejecución por grafos ocurre dentro de sus calculadoras. Una calculadora puede recibir cero o más transmisiones de entrada o paquetes laterales y producir cero o más transmisiones de salida o paquetes laterales.

CalculatorBase

Para crear una calculadora, se define una nueva subclase de la clase CalculatorBase, se implementan varios métodos y se registra la nueva subclase con Mediapipe. Como mínimo, una nueva calculadora debe implementar los cuatro métodos que se indican a continuación.

  • GetContract()
    • Los autores de calculadoras pueden especificar los tipos de entradas y salidas esperados de una calculadora en GetContract(). Cuando se inicializa un gráfico, el framework llama a un método estático para verificar si los tipos de paquetes de las entradas y salidas conectadas coinciden con la información de esta especificación.
  • Open()
    • Después de que se inicia un gráfico, el framework llama a Open(). Los paquetes laterales de entrada están disponibles para la calculadora en este momento. Open() interpreta las operaciones de configuración del nodo (consulta Grafos) y prepara el estado de la calculadora por grafo ejecutado. Esta función también puede escribir paquetes en los resultados de la calculadora. Un error durante Open() puede finalizar la ejecución del gráfico.
  • Process()
    • Para una calculadora con entradas, el framework llama a Process() de forma repetida cada vez que al menos una transmisión de entrada tiene un paquete disponible. De forma predeterminada, el framework garantiza que todas las entradas tengan la misma marca de tiempo (consulta Sincronización para obtener más información). Se pueden invocar varias llamadas a Process() de forma simultánea cuando se habilita la ejecución paralela. Si se produce un error durante Process(), el framework llama a Close() y finaliza la ejecución del gráfico.
  • Close()
    • Después de que todas las llamadas a Process() finalizan o cuando se cierran todas las transmisiones de entradas, el framework llama a Close(). Siempre se llama a esta función si se llamó a Open() y se realizó correctamente, incluso si la ejecución del gráfico finalizó debido a un error. No hay entradas disponibles a través de ninguna transmisión de entrada durante Close(), pero todavía tiene acceso a los paquetes de entrada y, por lo tanto, puede escribir salidas. Después de que se muestra Close(), la calculadora debe considerarse como nodo muerto. El objeto de la calculadora se destruye cuando el grafo termina de ejecutarse.

Los siguientes son fragmentos de código de CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

La vida de una calculadora

Durante la inicialización de un gráfico de MediaPipe, el framework llama a un método estático GetContract() para determinar qué tipos de paquetes se esperan.

El framework construye y destruye toda la calculadora para cada ejecución de gráfico (p.ej., una vez por video o por imagen). Los objetos costosos o grandes que permanecen constantes en las ejecuciones de grafos deben suministrarse como paquetes de entrada para que los cálculos no se repitan en ejecuciones posteriores.

Después de la inicialización, para cada ejecución del grafo, se produce la siguiente secuencia:

  • Open()
  • Process() (reiteradamente)
  • Close()

El framework llama a Open() para inicializar la calculadora. Open() debe interpretar las opciones y configurar el estado de ejecución de la calculadora por gráfico. Open() puede obtener paquetes de entrada y escribir paquetes en los resultados de la calculadora. Si corresponde, debería llamar a SetOffset() para reducir el posible almacenamiento en búfer de paquetes de las transmisiones de entrada.

Si se produce un error durante Open() o Process() (como lo indica uno de ellos que muestra un estado distinto de Ok), la ejecución del gráfico finaliza sin más llamadas a los métodos de la calculadora y se destruye la calculadora.

Para una calculadora con entradas, el framework llama a Process() cada vez que al menos una entrada tiene un paquete disponible. El framework garantiza que todas las entradas tengan la misma marca de tiempo, que las marcas de tiempo aumentan con cada llamada a Process() y que se entregan todos los paquetes. Como consecuencia, es posible que algunas entradas no tengan ningún paquete cuando se llama a Process(). Una entrada cuyo paquete falta parece producir un paquete vacío (sin marca de tiempo).

El framework llama a Close() después de todas las llamadas a Process(). Se habrán agotado todas las entradas, pero Close() tiene acceso a los paquetes de entrada y puede escribir salidas. Cuando se muestra Close, la calculadora se destruye.

Las calculadoras sin entradas se denominan fuentes. Se seguirá llamando a Process() a una calculadora de origen, siempre y cuando muestre un estado Ok. Una calculadora de origen indica que se agotó y muestra un estado de parada (es decir, mediaPipe::tool::StatusStop()).

Identifica entradas y salidas

La interfaz pública de una calculadora consta de un conjunto de flujos de entrada y de salida. En una CalculatorGraphConfiguration, los resultados de algunas calculadoras se conectan a las entradas de otras calculadoras mediante transmisiones con nombre. Los nombres de las transmisiones suelen estar en minúscula, mientras que las etiquetas de entrada y salida suelen estar en MAYÚSCULA. En el siguiente ejemplo, el resultado con el nombre de etiqueta VIDEO se conecta a la entrada con el nombre de etiqueta VIDEO_IN mediante el flujo denominado video_stream.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Las transmisiones de entrada y salida se pueden identificar por número de índice, nombre de etiqueta o una combinación del nombre de etiqueta y el número de índice. Puedes ver algunos ejemplos de identificadores de entrada y salida en el siguiente ejemplo. SomeAudioVideoCalculator identifica las salidas de video por etiqueta y las salidas de audio mediante la combinación de la etiqueta y el índice. La entrada con la etiqueta VIDEO se conecta al flujo llamado video_stream. Los resultados con la etiqueta AUDIO y los índices 0 y 1 se conectan a las transmisiones llamadas audio_left y audio_right. SomeAudioCalculator identifica sus entradas de audio solo mediante índice (no se necesita una etiqueta).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

En la implementación de la calculadora, las entradas y salidas también se identifican por nombre de etiqueta y número de índice. En la función a continuación, se identifican las entradas y salidas:

  • Por número de índice: el flujo de entrada combinado se identifica simplemente mediante el índice 0.
  • Por nombre de etiqueta: la transmisión de salida de video se identifica con el nombre de etiqueta "VIDEO".
  • Por nombre de etiqueta y número de índice: las transmisiones de audio de salida se identifican mediante la combinación del nombre de etiqueta AUDIO y los números de índice 0 y 1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Procesamiento

La llamada a Process() en un nodo que no es el origen debe mostrar absl::OkStatus() para indicar que todo salió bien, o cualquier otro código de estado que indique un error

Si una calculadora que no es de origen muestra tool::StatusStop(), esto indica que el gráfico se canceló con anticipación. En este caso, todas las calculadoras de origen y los flujos de entradas de gráficos se cerrarán (y los paquetes restantes se propagarán por el gráfico).

Se seguirá llamando a Process() a un nodo de origen de un gráfico mientras muestre absl::OkStatus(. Para indicar que no hay más datos para generar, muestra tool::StatusStop(). Cualquier otro estado indica que se produjo un error.

Close() muestra absl::OkStatus() para indicar el éxito. Cualquier otro estado indica un error.

Esta es la función Process() básica. Utiliza el método Input() (que solo puede usarse si la calculadora tiene una sola entrada) para solicitar sus datos de entrada. Luego, usa std::unique_ptr para asignar la memoria necesaria para el paquete de salida y realiza los cálculos. Cuando esté listo, libera el puntero para agregarlo al flujo de salida.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Opciones de la calculadora

Las calculadoras aceptan parámetros de procesamiento a través de (1) paquetes de flujo de entrada (2) paquetes laterales de entrada y (3) opciones de calculadora. Si se especifican, las opciones de la calculadora se muestran como valores literales en el campo node_options del mensaje CalculatorGraphConfiguration.Node.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

El campo node_options acepta la sintaxis proto3. Como alternativa, las opciones de la calculadora se pueden especificar en el campo options con la sintaxis de proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

No todas las calculadoras aceptan opciones de calculadora. Para aceptar opciones, una calculadora suele definir un nuevo tipo de mensaje protobuf para representar sus opciones, como PacketClonerCalculatorOptions. Luego, la calculadora leerá ese mensaje protobuf en su método CalculatorBase::Open y, posiblemente, también en su función CalculatorBase::GetContract o su método CalculatorBase::Process. Por lo general, el nuevo tipo de mensaje protobuf se definirá como un esquema protobuf con un archivo “.proto” y una regla de compilación mediapipe_proto_library().

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Calculadora de ejemplo

En esta sección, se analiza la implementación de PacketClonerCalculator, que es un trabajo relativamente simple y se usa en muchos gráficos de calculadoras. PacketClonerCalculator solo produce una copia de sus paquetes de entrada más recientes a pedido.

PacketClonerCalculator es útil cuando las marcas de tiempo de los paquetes de datos que llegan no están alineadas a la perfección. Supongamos que tenemos una habitación con un micrófono, un sensor de luz y una cámara de video que recopila datos sensoriales. Cada uno de los sensores opera de forma independiente y recopila datos de forma intermitente. Supongamos que el resultado de cada sensor es el siguiente:

  • micrófono = volumen en decibeles de sonido en la habitación (número entero)
  • sensor de luz = brillo de la habitación (número entero)
  • Cámara de video = marco de imagen RGB de la habitación (ImageFrame)

Nuestra canalización de percepción simple está diseñada para procesar datos sensoriales de estos 3 sensores de modo que, en cualquier momento, tengamos datos de marcos de imagen de la cámara que se sincronicen con los datos de volumen del micrófono y de brillo del sensor de luz más recientes. Para hacer esto con MediaPipe, nuestra canalización de percepción tiene 3 flujos de entrada:

  • Room_mic_signal: Cada paquete de datos en este flujo de entrada es un número entero que representa qué tan alto es el audio en una habitación con una marca de tiempo.
  • Room_lightening_sensor: Cada paquete de datos en este flujo de entrada es un número entero que representa el brillo de la habitación con una marca de tiempo.
  • Room_video_tick_signal: Cada paquete de datos en esta transmisión de entrada es un fotograma de imagen de datos de video que representa el video recopilado de la cámara en la habitación con una marca de tiempo.

A continuación, se muestra la implementación de PacketClonerCalculator. Puedes ver los métodos GetContract(), Open() y Process(), así como la variable de instancia current_, que contiene los paquetes de entrada más recientes.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

Por lo general, una calculadora solo tiene un archivo .cc. No se requiere .h, porque Mediapipe usa el registro para que las calculadoras lo conozcan. Después de definir tu clase de calculadora, regístrala con una invocación de macro REGISTER_CALCULATOR(calculator_class_name).

A continuación, se muestra un gráfico de MediaPipe trivial que tiene 3 transmisiones de entrada, 1 nodo (PacketClonerCalculator) y 2 transmisiones de salida.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

En el siguiente diagrama, se muestra cómo PacketClonerCalculator define sus paquetes de salida (parte inferior) en función de su serie de paquetes de entrada (parte superior).

Grafo con PacketClonerCalculator
Cada vez que recibe un paquete en su flujo de entrada TICK, PacketClonerCalculator emite el paquete más reciente de cada una de sus transmisiones de entrada. La secuencia de los paquetes de salida (parte inferior) se determina mediante la secuencia de los paquetes de entrada (arriba) y sus marcas de tiempo. Las marcas de tiempo se muestran en el lado derecho del diagrama.