Каждый калькулятор представляет собой узел графа. Мы описываем, как создать новый калькулятор, как инициализировать калькулятор, как выполнять его вычисления, потоки ввода и вывода, временные метки и параметры. Каждый узел графа реализован как Calculator
. Основная часть выполнения графика происходит внутри его калькуляторов. Вычислитель может принимать ноль или более входных потоков и/или побочных пакетов и создавать ноль или более выходных потоков и/или побочных пакетов.
КалькуляторБаза
Калькулятор создается путем определения нового подкласса класса CalculatorBase
, реализации ряда методов и регистрации нового подкласса в Mediapipe. Как минимум, новый калькулятор должен реализовывать следующие четыре метода.
-
GetContract()
- Авторы калькулятора могут указать ожидаемые типы входных и выходных данных калькулятора в GetContract(). При инициализации графа платформа вызывает статический метод, чтобы проверить, соответствуют ли типы пакетов подключенных входов и выходов информации в этой спецификации.
-
Open()
- После запуска графа платформа вызывает
Open()
. На этом этапе входные пакеты доступны калькулятору.Open()
интерпретирует операции настройки узла (см. Графики ) и подготавливает состояние калькулятора для каждого графа. Эта функция также может записывать пакеты на выходы калькулятора. Ошибка во времяOpen()
может привести к прекращению выполнения графика.
- После запуска графа платформа вызывает
-
Process()
- Для калькулятора с входными данными платформа вызывает
Process()
повторно всякий раз, когда хотя бы в одном входном потоке есть доступный пакет. Платформа по умолчанию гарантирует, что все входные данные имеют одинаковую метку времени (дополнительную информацию см. в разделе Синхронизация ). Несколько вызововProcess()
могут быть вызваны одновременно, если включено параллельное выполнение. Если во времяProcess()
возникает ошибка, платформа вызываетClose()
и выполнение графа завершается.
- Для калькулятора с входными данными платформа вызывает
-
Close()
- После завершения всех вызовов
Process()
или закрытия всех входных потоков платформа вызываетClose()
. Эта функция всегда вызывается, еслиOpen()
была вызвана и завершилась успешно, и даже если выполнение графа завершилось из-за ошибки. Во времяClose()
входные данные не доступны ни через какие входные потоки, но он по-прежнему имеет доступ к пакетам входной стороны и, следовательно, может записывать выходные данные. После возвратаClose()
калькулятор следует считать мертвым узлом. Объект калькулятора уничтожается, как только график завершает работу.
- После завершения всех вызовов
Ниже приведены фрагменты кода из CalculatorBase.h .
class CalculatorBase {
public:
...
// The subclasses of CalculatorBase must implement GetContract.
// ...
static absl::Status GetContract(CalculatorContract* cc);
// Open is called before any Process() calls, on a freshly constructed
// calculator. Subclasses may override this method to perform necessary
// setup, and possibly output Packets and/or set output streams' headers.
// ...
virtual absl::Status Open(CalculatorContext* cc) {
return absl::OkStatus();
}
// Processes the incoming inputs. May call the methods on cc to access
// inputs and produce outputs.
// ...
virtual absl::Status Process(CalculatorContext* cc) = 0;
// Is called if Open() was called and succeeded. Is called either
// immediately after processing is complete or after a graph run has ended
// (if an error occurred in the graph). ...
virtual absl::Status Close(CalculatorContext* cc) {
return absl::OkStatus();
}
...
};
Жизнь калькулятора
Во время инициализации графа MediaPipe платформа вызывает статический метод GetContract()
, чтобы определить, какие типы пакетов ожидаются.
Платформа создает и уничтожает весь калькулятор для каждого запуска графика (например, один раз для видео или один раз для изображения). Дорогие или большие объекты, которые остаются постоянными при прогоне графика, должны поставляться как входные пакеты, чтобы вычисления не повторялись при последующих запусках.
После инициализации для каждого прогона графика происходит следующая последовательность:
-
Open()
-
Process()
(неоднократно) -
Close()
Платформа вызывает Open()
для инициализации калькулятора. Open()
должен интерпретировать любые параметры и устанавливать состояние калькулятора для каждого графика. Open()
может получать пакеты на входной стороне и записывать пакеты на выходы калькулятора. При необходимости следует вызвать SetOffset()
, чтобы уменьшить потенциальную буферизацию пакетов входных потоков.
Если во время Open()
или Process()
возникает ошибка (на что указывает то, что один из них возвращает состояние, отличное Ok
), выполнение графика прекращается без дальнейших вызовов методов калькулятора, и калькулятор уничтожается.
Для калькулятора с входными данными платформа вызывает Process()
всякий раз, когда хотя бы на одном входе имеется доступный пакет. Платформа гарантирует, что все входные данные имеют одинаковую временную метку, что временные метки увеличиваются с каждым вызовом Process()
и что все пакеты доставлены. Как следствие, некоторые входные данные могут не иметь пакетов при вызове Process()
. Входной сигнал, пакет которого отсутствует, создает пустой пакет (без метки времени).
Платформа вызывает Close()
после всех вызовов Process()
. Все входные данные будут исчерпаны, но Close()
имеет доступ к пакетам на входной стороне и может записывать выходные данные. После возвращения Клоуз калькулятор уничтожается.
Калькуляторы без входов называются источниками. Исходный калькулятор продолжает вызывать Process()
до тех пор, пока он возвращает статус Ok
. Калькулятор источника указывает, что он исчерпан, возвращая статус остановки (т.е. mediaPipe::tool::StatusStop()
.).
Определение входов и выходов
Открытый интерфейс калькулятора состоит из набора входных и выходных потоков. В CalculatorGraphConfiguration выходные данные некоторых калькуляторов соединяются с входами других калькуляторов с помощью именованных потоков. Имена потоков обычно пишутся строчными буквами, а теги ввода и вывода — ПРОПИСНЫМИ РЕГИСТРАМИ. В приведенном ниже примере выход с именем тега VIDEO
подключен к входу с именем тега VIDEO_IN
с помощью потока с именем video_stream
.
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "INPUT:combined_input"
output_stream: "VIDEO:video_stream"
}
node {
calculator: "SomeVideoCalculator"
input_stream: "VIDEO_IN:video_stream"
output_stream: "VIDEO_OUT:processed_video"
}
Входные и выходные потоки можно идентифицировать по индексному номеру, по имени тега или по комбинации имени тега и индексного номера. Некоторые примеры входных и выходных идентификаторов вы можете увидеть в примере ниже. SomeAudioVideoCalculator
идентифицирует свой видеовыход по тегу, а аудиовыходы — по комбинации тега и индекса. Вход с тегом VIDEO
подключен к потоку с именем video_stream
. Выходы с тегом AUDIO
и индексами 0
и 1
подключены к потокам с именами audio_left
и audio_right
. SomeAudioCalculator
идентифицирует свои аудиовходы только по индексу (тег не требуется).
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "combined_input"
output_stream: "VIDEO:video_stream"
output_stream: "AUDIO:0:audio_left"
output_stream: "AUDIO:1:audio_right"
}
node {
calculator: "SomeAudioCalculator"
input_stream: "audio_left"
input_stream: "audio_right"
output_stream: "audio_energy"
}
В реализации калькулятора входы и выходы также идентифицируются по имени тега и индексному номеру. В приведенной ниже функции идентифицируются вход и выход:
- По номеру индекса: объединенный входной поток идентифицируется просто индексом
0
. - По имени тега: выходной видеопоток идентифицируется по имени тега «ВИДЕО».
- По имени тега и индексному номеру: выходные аудиопотоки идентифицируются комбинацией имени тега
AUDIO
и индексных номеров0
и1
.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
// SetAny() is used to specify that whatever the type of the
// stream is, it's acceptable. This does not mean that any
// packet is acceptable. Packets in the stream still have a
// particular type. SetAny() has the same effect as explicitly
// setting the type to be the stream's type.
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
return absl::OkStatus();
}
Обработка
Process()
, вызванный на узле, не являющемся исходным, должен вернуть absl::OkStatus()
чтобы указать, что все прошло хорошо, или любой другой код состояния, чтобы сигнализировать об ошибке.
Если калькулятор, не являющийся исходным кодом, возвращаетtool tool::StatusStop()
, то это сигнализирует о досрочной отмене графика. В этом случае все исходные калькуляторы и входные потоки графа будут закрыты (а оставшиеся пакеты будут распространяться через граф).
Исходный узел в графе будет по-прежнему вызывать Process()
до тех пор, пока он возвращает absl::OkStatus(
). Чтобы указать, что больше нет данных для генерации, верните tool::StatusStop()
. Любое другое состояние указывает на то, что произошла ошибка.
Close()
возвращает absl::OkStatus()
чтобы указать на успех. Любое другое состояние указывает на сбой.
Вот основная функция Process()
. Он использует метод Input()
(который можно использовать только в том случае, если калькулятор имеет один входной параметр) для запроса входных данных. Затем он использует std::unique_ptr
для выделения памяти, необходимой для выходного пакета, и выполняет вычисления. По завершении он освобождает указатель при добавлении его в выходной поток.
absl::Status MyCalculator::Process() {
const Matrix& input = Input()->Get<Matrix>();
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
// do your magic here....
// output->row(n) = ...
Output()->Add(output.release(), InputTimestamp());
return absl::OkStatus();
}
Параметры калькулятора
Калькуляторы принимают параметры обработки через (1) пакеты входного потока (2) пакеты входной стороны и (3) опции калькулятора. Параметры калькулятора, если они указаны, отображаются как литеральные значения в поле node_options
сообщения CalculatorGraphConfiguration.Node
.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Поле node_options
принимает синтаксис proto3. Альтернативно параметры калькулятора можно указать в поле options
, используя синтаксис proto2.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Не все калькуляторы поддерживают параметры калькулятора. Чтобы принять параметры, калькулятор обычно определяет новый тип сообщения protobuf для представления своих параметров, например PacketClonerCalculatorOptions
. Затем калькулятор прочитает это сообщение protobuf в своем методе CalculatorBase::Open
и, возможно, также в своей функции CalculatorBase::GetContract
или методе CalculatorBase::Process
. Обычно новый тип сообщения protobuf определяется как схема protobuf с использованием файла «.proto» и правила сборки mediapipe_proto_library()
.
mediapipe_proto_library(
name = "packet_cloner_calculator_proto",
srcs = ["packet_cloner_calculator.proto"],
visibility = ["//visibility:public"],
deps = [
"//mediapipe/framework:calculator_options_proto",
"//mediapipe/framework:calculator_proto",
],
)
Пример калькулятора
В этом разделе обсуждается реализация PacketClonerCalculator
, который выполняет относительно простую работу и используется во многих графах калькулятора. PacketClonerCalculator
просто создает копии своих последних входных пакетов по требованию.
PacketClonerCalculator
полезен, когда временные метки прибывающих пакетов данных не совпадают идеально. Предположим, у нас есть комната с микрофоном, датчиком освещенности и видеокамерой, собирающей сенсорные данные. Каждый из датчиков работает независимо и периодически собирает данные. Предположим, что выходной сигнал каждого датчика:
- микрофон = громкость звука в комнате в децибелах (целое число)
- датчик освещенности = яркость помещения (целое число)
- видеокамера = кадр изображения комнаты RGB (ImageFrame)
Наш простой конвейер восприятия предназначен для обработки сенсорных данных от этих трех датчиков таким образом, чтобы в любой момент, когда мы получаем данные кадра изображения с камеры, они синхронизировались с последними собранными данными громкости микрофона и данными яркости датчика освещенности. Чтобы сделать это с помощью MediaPipe, наш конвейер восприятия имеет 3 входных потока:
- room_mic_signal — каждый пакет данных в этом входном потоке представляет собой целочисленные данные, показывающие громкость звука в комнате с отметкой времени.
- room_lightening_sensor — каждый пакет данных в этом входном потоке представляет собой целочисленные данные, показывающие, насколько ярко освещена комната, с отметкой времени.
- room_video_tick_signal — каждый пакет данных в этом входном потоке представляет собой кадр видеоданных, представляющий видео, собранное с камеры в комнате, с отметкой времени.
Ниже представлена реализация PacketClonerCalculator
. Вы можете увидеть методы GetContract()
, Open()
и Process()
, а также переменную экземпляра current_
, которая содержит самые последние входные пакеты.
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
// cc->Inputs().NumEntries() returns the number of input streams
// for the PacketClonerCalculator
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
// cc->Inputs().Index(i) returns the input stream pointer by index
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
// Sets the output stream of index i header to be the same as
// the header for the input stream of index i
}
}
return absl::OkStatus();
}
absl::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output if the tick signal is non-empty.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
// Add a packet to output stream of index i a packet from inputstream i
// with timestamp common to all present inputs
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
// if current_[i], 1 packet buffer for input stream i is empty, we will set
// next allowed timestamp for input stream i to be current timestamp + 1
}
}
}
return absl::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
Обычно калькулятор имеет только файл .cc. Расширение .h не требуется, поскольку mediapipe использует регистрацию, чтобы сделать калькулятор известным ему. После того, как вы определили свой класс калькулятора, зарегистрируйте его с помощью вызова макроса REGISTER_CALCULATOR(calculator_class_name).
Ниже приведен тривиальный граф MediaPipe, который имеет 3 входных потока, 1 узел (PacketClonerCalculator) и 2 выходных потока.
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
node {
calculator: "PacketClonerCalculator"
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
output_stream: "cloned_room_mic_signal"
output_stream: "cloned_lighting_sensor"
}
На диаграмме ниже показано, как PacketClonerCalculator
определяет свои выходные пакеты (внизу) на основе серии входных пакетов (вверху).
Каждый раз, когда он получает пакет во входном потоке TICK, PacketClonerCalculator выводит самый последний пакет из каждого из своих входных потоков. Последовательность выходных пакетов (внизу) определяется последовательностью входных пакетов (вверху) и их временными метками. Временные метки показаны в правой части диаграммы. |