Cada calculadora é um nó de um grafo. Descrevemos como criar uma nova calculadora, como inicializar uma calculadora, como executar os cálculos, streams de entrada e saída, carimbos de data/hora e opções. Cada nó no gráfico é
implementado como um Calculator
. A maior parte da execução de gráficos acontece dentro de calculadoras. Uma calculadora pode receber zero ou mais streams de entrada e/ou pacotes
secundários e produz zero ou mais streams de saída e/ou pacotes secundários.
CalculatorBase
Uma calculadora é criada ao definir uma nova subclasse da classe
CalculatorBase
,
implementando vários métodos e registrando a nova subclasse no
Mediapipe. No mínimo, uma nova calculadora precisa implementar os quatro métodos abaixo.
GetContract()
- Os autores da calculadora podem especificar os tipos esperados de entradas e saídas de uma calculadora em GetContract(). Quando um gráfico é inicializado, o framework chama um método estático para verificar se os tipos de pacote das entradas e saídas conectadas correspondem às informações nesta especificação.
Open()
- Depois que um gráfico é iniciado, o framework chama
Open()
. Os pacotes secundários de entrada estão disponíveis para a calculadora neste momento.Open()
interpreta as operações de configuração do nó (consulte Gráficos) e prepara o estado por execução do gráfico da calculadora. Essa função também pode gravar pacotes nas saídas da calculadora. Um erro duranteOpen()
pode encerrar a execução do gráfico.
- Depois que um gráfico é iniciado, o framework chama
Process()
- Para uma calculadora com entradas, o framework chama
Process()
repetidamente sempre que pelo menos um stream de entrada tem um pacote disponível. Por padrão, o framework garante que todas as entradas tenham o mesmo carimbo de data/hora. Consulte Sincronização para mais informações. Várias chamadasProcess()
podem ser invocadas simultaneamente quando a execução paralela está ativada. Se ocorrer um erro duranteProcess()
, o framework chamaráClose()
e a execução do gráfico será encerrada.
- Para uma calculadora com entradas, o framework chama
Close()
- Depois que todas as chamadas para
Process()
forem concluídas ou quando todos os streams de entrada forem fechados, o framework chamaráClose()
. Essa função será sempre chamada seOpen()
for chamado e bem-sucedido e mesmo se a execução do gráfico for encerrada devido a um erro. Nenhuma entrada está disponível por meio de fluxos de entrada duranteClose()
, mas ainda tem acesso a pacotes de entrada e, portanto, pode gravar saídas. Depois queClose()
retornar, a calculadora vai ser considerada um nó inativo. O objeto de calculadora é destruído assim que o gráfico termina de ser executado.
- Depois que todas as chamadas para
Veja a seguir snippets de código de CalculatorBase.h.
class CalculatorBase {
public:
...
// The subclasses of CalculatorBase must implement GetContract.
// ...
static absl::Status GetContract(CalculatorContract* cc);
// Open is called before any Process() calls, on a freshly constructed
// calculator. Subclasses may override this method to perform necessary
// setup, and possibly output Packets and/or set output streams' headers.
// ...
virtual absl::Status Open(CalculatorContext* cc) {
return absl::OkStatus();
}
// Processes the incoming inputs. May call the methods on cc to access
// inputs and produce outputs.
// ...
virtual absl::Status Process(CalculatorContext* cc) = 0;
// Is called if Open() was called and succeeded. Is called either
// immediately after processing is complete or after a graph run has ended
// (if an error occurred in the graph). ...
virtual absl::Status Close(CalculatorContext* cc) {
return absl::OkStatus();
}
...
};
O ciclo de uma calculadora
Durante a inicialização de um gráfico MediaPipe, o framework chama um
método estático GetContract()
para determinar quais tipos de pacotes são esperados.
O framework constrói e destrói toda a calculadora para cada execução do gráfico (por exemplo, uma vez por vídeo ou por imagem). Objetos caros ou grandes que permanecem constantes em execuções de gráfico precisam ser fornecidos como pacotes de entrada para que os cálculos não sejam repetidos nas execuções subsequentes.
Após a inicialização, para cada execução do gráfico, ocorre a seguinte sequência:
Open()
Process()
(repetidamente)Close()
O framework chama Open()
para inicializar a calculadora. Open()
precisa
interpretar todas as opções e configurar o estado por execução do gráfico da calculadora. Open()
pode receber pacotes de entrada e gravar pacotes nas saídas da calculadora. Se
adequado, ele precisa chamar SetOffset()
para reduzir o possível armazenamento em buffer de pacote
dos streams de entrada.
Se ocorrer um erro durante Open()
ou Process()
(conforme indicado por um deles
que retorna um status diferente de Ok
), a execução do gráfico será encerrada sem mais chamadas
para os métodos da calculadora e a calculadora será destruída.
Para uma calculadora com entradas, o framework chama Process()
sempre que pelo menos
uma entrada tem um pacote disponível. O framework garante que todas as entradas tenham
o mesmo carimbo de data/hora, que esses carimbos aumentem com cada chamada para Process()
e
que todos os pacotes sejam entregues. Como consequência, algumas entradas podem não ter nenhum
pacote quando Process()
é chamado. Uma entrada com pacote ausente parece
produzir um pacote vazio (sem carimbo de data/hora).
O framework chama Close()
depois de todas as chamadas para Process()
. Todas as entradas terão sido esgotadas, mas Close()
tem acesso aos pacotes de entrada e pode gravar saídas. Depois que Close retornar, a calculadora será destruída.
Calculadoras sem entradas são chamadas de fontes. Uma calculadora de origem
continua sendo chamada Process()
, desde que retorne um status Ok
. Uma calculadora de fonte indica que o fim foi esgotado ao retornar um status de parada (ou seja, mediaPipe::tool::StatusStop()
).
Como identificar entradas e saídas
A interface pública para uma calculadora consiste em um conjunto de streams de entrada e
de saída. Em uma CalculatorGraphConfiguration, as saídas de algumas
calculadoras são conectadas às entradas de outras calculadoras usando fluxos
nomeados. Os nomes dos streams costumam ser em letra minúscula, enquanto as tags de entrada e saída costumam estar em LETRA MAIÚSCULA. No exemplo abaixo, a saída com o nome de tag VIDEO
está
conectada à entrada com o nome de tag VIDEO_IN
usando o stream chamado
video_stream
.
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "INPUT:combined_input"
output_stream: "VIDEO:video_stream"
}
node {
calculator: "SomeVideoCalculator"
input_stream: "VIDEO_IN:video_stream"
output_stream: "VIDEO_OUT:processed_video"
}
Os streams de entrada e saída podem ser identificados pelo número de índice, pelo nome da tag ou por uma
combinação de nome de tag e número de índice. Confira abaixo alguns exemplos de identificadores de entrada e saída. O SomeAudioVideoCalculator
identifica a saída de vídeo por tag e as saídas de áudio pela combinação de tag e índice. A entrada com a tag VIDEO
está conectada ao stream chamado
video_stream
. As saídas com a tag AUDIO
e os índices 0
e 1
estão
conectadas aos streams chamados audio_left
e audio_right
.
SomeAudioCalculator
identifica as entradas de áudio apenas por índice (nenhuma tag é necessária).
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "combined_input"
output_stream: "VIDEO:video_stream"
output_stream: "AUDIO:0:audio_left"
output_stream: "AUDIO:1:audio_right"
}
node {
calculator: "SomeAudioCalculator"
input_stream: "audio_left"
input_stream: "audio_right"
output_stream: "audio_energy"
}
Na implementação da calculadora, as entradas e saídas também são identificadas pelo nome da tag e pelo número do índice. Na função abaixo, a entrada e a saída são identificadas:
- Por número de índice: o stream de entrada combinado é identificado simplesmente pelo índice
0
. - Por nome da tag: o stream de saída de vídeo é identificado pelo nome de tag "VIDEO".
- Por nome da tag e número de índice: os streams de áudio de saída são identificados pela
combinação do nome de tag
AUDIO
e dos números de índice0
e1
.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
// SetAny() is used to specify that whatever the type of the
// stream is, it's acceptable. This does not mean that any
// packet is acceptable. Packets in the stream still have a
// particular type. SetAny() has the same effect as explicitly
// setting the type to be the stream's type.
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
return absl::OkStatus();
}
Processamento
Process()
chamado em um nó que não é de origem precisa retornar absl::OkStatus()
para indicar que tudo correu bem ou qualquer outro código de status para sinalizar um erro.
Se uma calculadora que não é da fonte retornar tool::StatusStop()
, isso indica que o
gráfico está sendo cancelado antecipadamente. Nesse caso, todas as calculadoras de origem e os fluxos de entrada
de gráfico serão fechados, e os pacotes restantes serão propagados pelo
gráfico.
Um nó de origem em um gráfico continuará a ter Process()
chamado, desde que
retorne absl::OkStatus(
. Para indicar que não há mais dados a serem
gerados, retorne tool::StatusStop()
. Qualquer outro status indica a ocorrência de um erro.
Close()
retorna absl::OkStatus()
para indicar o sucesso. Qualquer outro status
indica uma falha.
Esta é a função Process()
básica. Ela usa o método Input()
(que só pode
ser utilizado se a calculadora tiver uma única entrada) para solicitar os dados de entrada. Em seguida, ele usa std::unique_ptr
para alocar a memória necessária para o pacote de saída e faz os cálculos. Quando concluído, ele libera o ponteiro ao adicioná-lo ao
fluxo de saída.
absl::Status MyCalculator::Process() {
const Matrix& input = Input()->Get<Matrix>();
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
// do your magic here....
// output->row(n) = ...
Output()->Add(output.release(), InputTimestamp());
return absl::OkStatus();
}
Opções da calculadora
As calculadoras aceitam parâmetros de processamento por meio de (1) pacotes de stream de entrada, (2)
pacotes de entrada e (3) opções de calculadora. As opções da calculadora, se especificadas, aparecem como valores literais no campo node_options
da mensagem CalculatorGraphConfiguration.Node
.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
O campo node_options
aceita a sintaxe do proto3. Como alternativa, as opções de calculadora podem ser especificadas no campo options
usando a sintaxe proto2.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Nem todas as calculadoras aceitam as opções de calculadora. Para aceitar as opções, uma
calculadora normalmente define um novo tipo de mensagem protobuf para representar as
opções, como PacketClonerCalculatorOptions
. A calculadora vai ler
essa mensagem protobuf no método CalculatorBase::Open
e, possivelmente,
também na função CalculatorBase::GetContract
ou no
método CalculatorBase::Process
. Normalmente, o novo tipo de mensagem protobuf será
definido como um esquema protobuf usando um arquivo ".proto" e uma
regra de build mediapipe_proto_library()
.
mediapipe_proto_library(
name = "packet_cloner_calculator_proto",
srcs = ["packet_cloner_calculator.proto"],
visibility = ["//visibility:public"],
deps = [
"//mediapipe/framework:calculator_options_proto",
"//mediapipe/framework:calculator_proto",
],
)
Exemplo de calculadora
Esta seção discute a implementação de PacketClonerCalculator
, que
faz um job relativamente simples e é usada em muitos gráficos de calculadora.
PacketClonerCalculator
simplesmente produz uma cópia dos pacotes de entrada mais recentes
sob demanda.
PacketClonerCalculator
é útil quando os carimbos de data/hora dos pacotes de dados que chegam não estão perfeitamente alinhados. Imagine que temos uma sala com microfone, sensor de luz e câmera de vídeo que esteja coletando dados sensoriais. Cada um dos sensores opera de maneira independente e coleta dados de maneira intermitente. Suponha que a saída
de cada sensor seja:
- microfone = volume em decibéis do som do ambiente (número inteiro)
- sensor de luz = brilho do ambiente (inteiro)
- câmera de vídeo = frame da imagem RGB do ambiente (ImageFrame)
Nosso pipeline de percepção simples foi projetado para processar dados sensoriais desses três sensores para que, a qualquer momento, quando tivermos dados de frame de imagem da câmera sincronizados com os últimos dados de volume do microfone e de brilho do sensor de luz coletados. Para fazer isso com o MediaPipe, nosso pipeline de percepção tem três fluxos de entrada:
- Room_mic_signal: cada pacote de dados do stream de entrada é composto por números inteiros que representam o volume do áudio em um ambiente com carimbo de data/hora.
- Room_lightening_sensor: cada pacote de dados do stream de entrada contém dados inteiros que representam a iluminação do ambiente com um carimbo de data/hora.
- room_video_tick_signal: cada pacote de dados do stream de entrada é um frame de imagem de dados de vídeo que representa o vídeo coletado da câmera na sala com carimbo de data/hora.
Confira abaixo a implementação do PacketClonerCalculator
. Você pode ver os métodos GetContract()
, Open()
e Process()
, bem como a variável de instância current_
que contém os pacotes de entrada mais recentes.
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
// cc->Inputs().NumEntries() returns the number of input streams
// for the PacketClonerCalculator
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
// cc->Inputs().Index(i) returns the input stream pointer by index
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
// Sets the output stream of index i header to be the same as
// the header for the input stream of index i
}
}
return absl::OkStatus();
}
absl::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output if the tick signal is non-empty.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
// Add a packet to output stream of index i a packet from inputstream i
// with timestamp common to all present inputs
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
// if current_[i], 1 packet buffer for input stream i is empty, we will set
// next allowed timestamp for input stream i to be current timestamp + 1
}
}
}
return absl::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
Normalmente, uma calculadora tem apenas um arquivo .cc. Nenhum .h é necessário, porque o mediapipe usa registro para tornar as calculadoras conhecidas. Depois de definir sua classe de calculadora, registre-a com uma invocação de macro REGISTER_CALCULATOR(calculator_class_name).
Confira abaixo um gráfico trivial do MediaPipe com três fluxos de entrada, um nó (PacketClonerCalculator) e dois fluxos de saída.
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
node {
calculator: "PacketClonerCalculator"
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
output_stream: "cloned_room_mic_signal"
output_stream: "cloned_lighting_sensor"
}
O diagrama abaixo mostra como o PacketClonerCalculator
define os pacotes
de saída (na parte de baixo) com base na série de pacotes de entrada (na parte de cima).
Cada vez que recebe um pacote no fluxo de entrada TICK, o PacketClonerCalculator gera o pacote mais recente de cada um dos fluxos de entrada. A sequência dos pacotes de saída (abaixo) é determinada pela sequência dos pacotes de entrada (topo) e os respectivos carimbos de data/hora. Os carimbos de data/hora são mostrados no lado direito do diagrama. |