Calculadoras

Cada calculadora é um nó de um grafo. Descrevemos como criar uma nova calculadora, como inicializar uma calculadora, como executar os cálculos, streams de entrada e saída, carimbos de data/hora e opções. Cada nó no gráfico é implementado como um Calculator. A maior parte da execução de gráficos acontece dentro de calculadoras. Uma calculadora pode receber zero ou mais streams de entrada e/ou pacotes secundários e produz zero ou mais streams de saída e/ou pacotes secundários.

CalculatorBase

Uma calculadora é criada ao definir uma nova subclasse da classe CalculatorBase, implementando vários métodos e registrando a nova subclasse no Mediapipe. No mínimo, uma nova calculadora precisa implementar os quatro métodos abaixo.

  • GetContract()
    • Os autores da calculadora podem especificar os tipos esperados de entradas e saídas de uma calculadora em GetContract(). Quando um gráfico é inicializado, o framework chama um método estático para verificar se os tipos de pacote das entradas e saídas conectadas correspondem às informações nesta especificação.
  • Open()
    • Depois que um gráfico é iniciado, o framework chama Open(). Os pacotes secundários de entrada estão disponíveis para a calculadora neste momento. Open() interpreta as operações de configuração do nó (consulte Gráficos) e prepara o estado por execução do gráfico da calculadora. Essa função também pode gravar pacotes nas saídas da calculadora. Um erro durante Open() pode encerrar a execução do gráfico.
  • Process()
    • Para uma calculadora com entradas, o framework chama Process() repetidamente sempre que pelo menos um stream de entrada tem um pacote disponível. Por padrão, o framework garante que todas as entradas tenham o mesmo carimbo de data/hora. Consulte Sincronização para mais informações. Várias chamadas Process() podem ser invocadas simultaneamente quando a execução paralela está ativada. Se ocorrer um erro durante Process(), o framework chamará Close() e a execução do gráfico será encerrada.
  • Close()
    • Depois que todas as chamadas para Process() forem concluídas ou quando todos os streams de entrada forem fechados, o framework chamará Close(). Essa função será sempre chamada se Open() for chamado e bem-sucedido e mesmo se a execução do gráfico for encerrada devido a um erro. Nenhuma entrada está disponível por meio de fluxos de entrada durante Close(), mas ainda tem acesso a pacotes de entrada e, portanto, pode gravar saídas. Depois que Close() retornar, a calculadora vai ser considerada um nó inativo. O objeto de calculadora é destruído assim que o gráfico termina de ser executado.

Veja a seguir snippets de código de CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

O ciclo de uma calculadora

Durante a inicialização de um gráfico MediaPipe, o framework chama um método estático GetContract() para determinar quais tipos de pacotes são esperados.

O framework constrói e destrói toda a calculadora para cada execução do gráfico (por exemplo, uma vez por vídeo ou por imagem). Objetos caros ou grandes que permanecem constantes em execuções de gráfico precisam ser fornecidos como pacotes de entrada para que os cálculos não sejam repetidos nas execuções subsequentes.

Após a inicialização, para cada execução do gráfico, ocorre a seguinte sequência:

  • Open()
  • Process() (repetidamente)
  • Close()

O framework chama Open() para inicializar a calculadora. Open() precisa interpretar todas as opções e configurar o estado por execução do gráfico da calculadora. Open() pode receber pacotes de entrada e gravar pacotes nas saídas da calculadora. Se adequado, ele precisa chamar SetOffset() para reduzir o possível armazenamento em buffer de pacote dos streams de entrada.

Se ocorrer um erro durante Open() ou Process() (conforme indicado por um deles que retorna um status diferente de Ok), a execução do gráfico será encerrada sem mais chamadas para os métodos da calculadora e a calculadora será destruída.

Para uma calculadora com entradas, o framework chama Process() sempre que pelo menos uma entrada tem um pacote disponível. O framework garante que todas as entradas tenham o mesmo carimbo de data/hora, que esses carimbos aumentem com cada chamada para Process() e que todos os pacotes sejam entregues. Como consequência, algumas entradas podem não ter nenhum pacote quando Process() é chamado. Uma entrada com pacote ausente parece produzir um pacote vazio (sem carimbo de data/hora).

O framework chama Close() depois de todas as chamadas para Process(). Todas as entradas terão sido esgotadas, mas Close() tem acesso aos pacotes de entrada e pode gravar saídas. Depois que Close retornar, a calculadora será destruída.

Calculadoras sem entradas são chamadas de fontes. Uma calculadora de origem continua sendo chamada Process(), desde que retorne um status Ok. Uma calculadora de fonte indica que o fim foi esgotado ao retornar um status de parada (ou seja, mediaPipe::tool::StatusStop()).

Como identificar entradas e saídas

A interface pública para uma calculadora consiste em um conjunto de streams de entrada e de saída. Em uma CalculatorGraphConfiguration, as saídas de algumas calculadoras são conectadas às entradas de outras calculadoras usando fluxos nomeados. Os nomes dos streams costumam ser em letra minúscula, enquanto as tags de entrada e saída costumam estar em LETRA MAIÚSCULA. No exemplo abaixo, a saída com o nome de tag VIDEO está conectada à entrada com o nome de tag VIDEO_IN usando o stream chamado video_stream.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Os streams de entrada e saída podem ser identificados pelo número de índice, pelo nome da tag ou por uma combinação de nome de tag e número de índice. Confira abaixo alguns exemplos de identificadores de entrada e saída. O SomeAudioVideoCalculator identifica a saída de vídeo por tag e as saídas de áudio pela combinação de tag e índice. A entrada com a tag VIDEO está conectada ao stream chamado video_stream. As saídas com a tag AUDIO e os índices 0 e 1 estão conectadas aos streams chamados audio_left e audio_right. SomeAudioCalculator identifica as entradas de áudio apenas por índice (nenhuma tag é necessária).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

Na implementação da calculadora, as entradas e saídas também são identificadas pelo nome da tag e pelo número do índice. Na função abaixo, a entrada e a saída são identificadas:

  • Por número de índice: o stream de entrada combinado é identificado simplesmente pelo índice 0.
  • Por nome da tag: o stream de saída de vídeo é identificado pelo nome de tag "VIDEO".
  • Por nome da tag e número de índice: os streams de áudio de saída são identificados pela combinação do nome de tag AUDIO e dos números de índice 0 e 1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Processamento

Process() chamado em um nó que não é de origem precisa retornar absl::OkStatus() para indicar que tudo correu bem ou qualquer outro código de status para sinalizar um erro.

Se uma calculadora que não é da fonte retornar tool::StatusStop(), isso indica que o gráfico está sendo cancelado antecipadamente. Nesse caso, todas as calculadoras de origem e os fluxos de entrada de gráfico serão fechados, e os pacotes restantes serão propagados pelo gráfico.

Um nó de origem em um gráfico continuará a ter Process() chamado, desde que retorne absl::OkStatus(. Para indicar que não há mais dados a serem gerados, retorne tool::StatusStop(). Qualquer outro status indica a ocorrência de um erro.

Close() retorna absl::OkStatus() para indicar o sucesso. Qualquer outro status indica uma falha.

Esta é a função Process() básica. Ela usa o método Input() (que só pode ser utilizado se a calculadora tiver uma única entrada) para solicitar os dados de entrada. Em seguida, ele usa std::unique_ptr para alocar a memória necessária para o pacote de saída e faz os cálculos. Quando concluído, ele libera o ponteiro ao adicioná-lo ao fluxo de saída.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Opções da calculadora

As calculadoras aceitam parâmetros de processamento por meio de (1) pacotes de stream de entrada, (2) pacotes de entrada e (3) opções de calculadora. As opções da calculadora, se especificadas, aparecem como valores literais no campo node_options da mensagem CalculatorGraphConfiguration.Node.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

O campo node_options aceita a sintaxe do proto3. Como alternativa, as opções de calculadora podem ser especificadas no campo options usando a sintaxe proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Nem todas as calculadoras aceitam as opções de calculadora. Para aceitar as opções, uma calculadora normalmente define um novo tipo de mensagem protobuf para representar as opções, como PacketClonerCalculatorOptions. A calculadora vai ler essa mensagem protobuf no método CalculatorBase::Open e, possivelmente, também na função CalculatorBase::GetContract ou no método CalculatorBase::Process. Normalmente, o novo tipo de mensagem protobuf será definido como um esquema protobuf usando um arquivo ".proto" e uma regra de build mediapipe_proto_library().

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Exemplo de calculadora

Esta seção discute a implementação de PacketClonerCalculator, que faz um job relativamente simples e é usada em muitos gráficos de calculadora. PacketClonerCalculator simplesmente produz uma cópia dos pacotes de entrada mais recentes sob demanda.

PacketClonerCalculator é útil quando os carimbos de data/hora dos pacotes de dados que chegam não estão perfeitamente alinhados. Imagine que temos uma sala com microfone, sensor de luz e câmera de vídeo que esteja coletando dados sensoriais. Cada um dos sensores opera de maneira independente e coleta dados de maneira intermitente. Suponha que a saída de cada sensor seja:

  • microfone = volume em decibéis do som do ambiente (número inteiro)
  • sensor de luz = brilho do ambiente (inteiro)
  • câmera de vídeo = frame da imagem RGB do ambiente (ImageFrame)

Nosso pipeline de percepção simples foi projetado para processar dados sensoriais desses três sensores para que, a qualquer momento, quando tivermos dados de frame de imagem da câmera sincronizados com os últimos dados de volume do microfone e de brilho do sensor de luz coletados. Para fazer isso com o MediaPipe, nosso pipeline de percepção tem três fluxos de entrada:

  • Room_mic_signal: cada pacote de dados do stream de entrada é composto por números inteiros que representam o volume do áudio em um ambiente com carimbo de data/hora.
  • Room_lightening_sensor: cada pacote de dados do stream de entrada contém dados inteiros que representam a iluminação do ambiente com um carimbo de data/hora.
  • room_video_tick_signal: cada pacote de dados do stream de entrada é um frame de imagem de dados de vídeo que representa o vídeo coletado da câmera na sala com carimbo de data/hora.

Confira abaixo a implementação do PacketClonerCalculator. Você pode ver os métodos GetContract(), Open() e Process(), bem como a variável de instância current_ que contém os pacotes de entrada mais recentes.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

Normalmente, uma calculadora tem apenas um arquivo .cc. Nenhum .h é necessário, porque o mediapipe usa registro para tornar as calculadoras conhecidas. Depois de definir sua classe de calculadora, registre-a com uma invocação de macro REGISTER_CALCULATOR(calculator_class_name).

Confira abaixo um gráfico trivial do MediaPipe com três fluxos de entrada, um nó (PacketClonerCalculator) e dois fluxos de saída.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

O diagrama abaixo mostra como o PacketClonerCalculator define os pacotes de saída (na parte de baixo) com base na série de pacotes de entrada (na parte de cima).

Gráfico usando a PackageClonerCalculator
Cada vez que recebe um pacote no fluxo de entrada TICK, o PacketClonerCalculator gera o pacote mais recente de cada um dos fluxos de entrada. A sequência dos pacotes de saída (abaixo) é determinada pela sequência dos pacotes de entrada (topo) e os respectivos carimbos de data/hora. Os carimbos de data/hora são mostrados no lado direito do diagrama.