Każdy kalkulator jest węzłem wykresu. Opisujemy, jak utworzyć nowy kalkulator, jak go uruchomić, wykonać obliczenia, jakie są strumienie danych wejściowych i wyjściowych, sygnatury czasowe oraz opcje. Każdy węzeł na wykresie jest zaimplementowany jako Calculator
. Większość wykresów wykonuje się
w kalkulatorach. Kalkulator może otrzymać zero lub więcej strumieni wejściowych lub pakietów dodatkowych i wygenerować 0 lub więcej strumieni wyjściowych lub pakietów bocznych.
CalculatorBase
Aby utworzyć kalkulator, trzeba zdefiniować nową podklasę klasy CalculatorBase
, wdrożyć kilka metod i zarejestrować nową podklasę w Mediapipe. Nowy kalkulator musi implementować przynajmniej 4 metody opisane poniżej
GetContract()
- Autorzy kalkulatora mogą określić w GetContract() oczekiwane typy danych wejściowych i wyjściowych kalkulatora. Po zainicjowaniu wykresu platforma wywołuje metodę statyczną, aby sprawdzić, czy typy pakietów połączonych danych wejściowych i wyjściowych są zgodne z informacjami zawartymi w tej specyfikacji.
Open()
- Po uruchomieniu wykresu platforma wywołuje
Open()
. W tym momencie kalkulator ma dostęp do pakietów dodatkowych.Open()
interpretuje operacje konfiguracji węzłów (patrz Wykresy) i przygotowuje stan kalkulatora dla poszczególnych wykresów. Ta funkcja może też zapisywać pakiety w wynikach kalkulatora. Błąd podczasOpen()
może zakończyć generowanie wykresu.
- Po uruchomieniu wykresu platforma wywołuje
Process()
- W przypadku kalkulatora z danymi wejściowych platforma wielokrotnie wywołuje funkcję
Process()
za każdym razem, gdy w co najmniej 1 strumieniu wejściowym jest dostępny pakiet. Platforma domyślnie gwarantuje, że wszystkie dane wejściowe mają tę samą sygnaturę czasową (więcej informacji znajdziesz w artykule Synchronizacja). Gdy włączone jest wykonywanie równoległe, można wywoływać wiele wywołańProcess()
jednocześnie. Jeśli w czasieProcess()
wystąpi błąd, platforma wywoła metodęClose()
i bieg wykresu się zakończy.
- W przypadku kalkulatora z danymi wejściowych platforma wielokrotnie wywołuje funkcję
Close()
- Po zakończeniu wszystkich wywołań funkcji
Process()
lub zamknięciu wszystkich strumieni danych wejściowych platforma wywołuje metodęClose()
. Ta funkcja jest zawsze wywoływana, jeśli funkcjaOpen()
została wywołana i pomyślna, a nawet wtedy, gdy uruchomienie wykresu zostało zakończone z powodu błędu. PodczasClose()
żadne dane wejściowe nie są dostępne przez żadne strumienie wejściowe, ale nadal ma ona dostęp do pakietów wejściowych, więc może zapisywać dane wyjściowe. Po zwróceniu funkcjiClose()
kalkulator powinien być uważany za martwy węzeł. Obiekt kalkulatora jest niszczony wraz z zakończeniem wykresu.
- Po zakończeniu wszystkich wywołań funkcji
Poniżej znajdziesz fragmenty kodu z CalculatorBase.h.
class CalculatorBase {
public:
...
// The subclasses of CalculatorBase must implement GetContract.
// ...
static absl::Status GetContract(CalculatorContract* cc);
// Open is called before any Process() calls, on a freshly constructed
// calculator. Subclasses may override this method to perform necessary
// setup, and possibly output Packets and/or set output streams' headers.
// ...
virtual absl::Status Open(CalculatorContext* cc) {
return absl::OkStatus();
}
// Processes the incoming inputs. May call the methods on cc to access
// inputs and produce outputs.
// ...
virtual absl::Status Process(CalculatorContext* cc) = 0;
// Is called if Open() was called and succeeded. Is called either
// immediately after processing is complete or after a graph run has ended
// (if an error occurred in the graph). ...
virtual absl::Status Close(CalculatorContext* cc) {
return absl::OkStatus();
}
...
};
Jak działa kalkulator
Podczas inicjowania wykresu MediaPipe platforma wywołuje metodę statyczną GetContract()
, aby określić oczekiwane rodzaje pakietów.
Platforma tworzy i niszczy cały kalkulator przy każdym uruchomieniu wykresu (np. raz na film lub raz na obraz). Drogie lub duże obiekty, które pozostają stałe w ramach przebiegu wykresu, powinny być dostarczane jako wejściowe pakiety boczne, aby obliczenia nie były powtarzane przy kolejnych uruchomieniach.
Po zainicjowaniu przy każdym uruchomieniu wykresu zachodzi taka sekwencja:
Open()
Process()
(wielokrotnie)Close()
Platforma wywołuje kalkulator Open()
. Funkcja Open()
powinna zinterpretować dowolne opcje i skonfigurować stan kalkulatora dla poszczególnych wykresów. Funkcja Open()
może uzyskiwać pakiety po stronie wejściowej i zapisywać je na danych wyjściowych kalkulatora. W razie potrzeby powinien wywołać SetOffset()
, aby ograniczyć potencjalne buforowanie pakietów strumieni wejściowych.
Jeśli w funkcji Open()
lub Process()
wystąpi błąd (gdy jeden z nich zwraca stan inny niż Ok
), wykres zostanie zakończony bez dalszych wywołań metod kalkulatora, a kalkulator zostanie zniszczony.
W przypadku kalkulatora z danymi wejściowych platforma wywołuje funkcję Process()
, gdy dostępne jest co najmniej 1 pakiet danych wejściowych. Platforma gwarantuje, że wszystkie dane wejściowe będą miały tę samą sygnaturę czasową, że będą one zwiększać się z każdym wywołaniem funkcji Process()
i że wszystkie pakiety zostaną dostarczone. W efekcie po wywołaniu funkcji Process()
niektóre dane wejściowe mogą nie mieć żadnych pakietów. Dane wejściowe, których brakuje pakietu, prawdopodobnie powoduje utworzenie pustego pakietu (bez sygnatury czasowej).
Platforma wywołuje Close()
po wszystkich wywołaniach funkcji Process()
. Wszystkie dane wejściowe zostały wyczerpane, ale Close()
ma dostęp do pakietów po stronie wejściowej i może zapisywać dane wyjściowe. Po zwróceniu Zamknij kalkulator jest niszczony.
Kalkulatory bez danych wejściowych są nazywane źródłami. Kalkulator źródeł nadal wywołuje funkcję Process()
, dopóki zwraca stan Ok
. Kalkulator źródeł pokazuje, że zasoby są wyczerpane przez zwrócenie stanu zatrzymania (np. mediaPipe::tool::StatusStop()
).
Identyfikowanie danych wejściowych i wyjściowych
Publiczny interfejs kalkulatora składa się z zestawu strumieni wejściowych i wyjściowych. W konfiguracji CalculatorGraphConfiguration wyniki niektórych kalkulatorów są łączone z danymi innych kalkulatorów za pomocą nazwanych strumieni. Nazwy strumieni są zwykle zapisywane małymi literami, a tagi danych wejściowych i wyjściowych są zwykle pisane WIELKIMI LITERAMI. W poniższym przykładzie dane wyjściowe o nazwie tagu VIDEO
są połączone z danymi wejściowymi tagu o nazwie VIDEO_IN
za pomocą strumienia o nazwie video_stream
.
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "INPUT:combined_input"
output_stream: "VIDEO:video_stream"
}
node {
calculator: "SomeVideoCalculator"
input_stream: "VIDEO_IN:video_stream"
output_stream: "VIDEO_OUT:processed_video"
}
Strumienie wejściowe i wyjściowe można identyfikować według numeru indeksu, nazwy tagu lub kombinacji nazwy tagu i numeru indeksu. W przykładzie poniżej znajdziesz kilka przykładów identyfikatorów danych wejściowych i wyjściowych. SomeAudioVideoCalculator
identyfikuje wyjścia wideo według tagu, a wyjściowe wyjścia audio – na podstawie kombinacji tagu i indeksu. Dane wejściowe z tagiem VIDEO
są połączone ze strumieniem o nazwie video_stream
. Dane wyjściowe z tagiem AUDIO
i indeksami 0
i 1
są połączone ze strumieniami o nazwach audio_left
i audio_right
.
SomeAudioCalculator
identyfikuje dane wejściowe audio tylko według indeksu (tag nie jest potrzebny).
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "combined_input"
output_stream: "VIDEO:video_stream"
output_stream: "AUDIO:0:audio_left"
output_stream: "AUDIO:1:audio_right"
}
node {
calculator: "SomeAudioCalculator"
input_stream: "audio_left"
input_stream: "audio_right"
output_stream: "audio_energy"
}
W implementacji kalkulatora dane wejściowe i wyniki są również identyfikowane za pomocą nazwy tagu i numeru indeksu. W funkcji poniżej dane wejściowe i wyjściowe są identyfikowane:
- Według numeru indeksu: połączony strumień danych wejściowych jest identyfikowany po prostu przez indeks
0
. - Według nazwy tagu: strumień wyjścia wideo jest identyfikowany przez tag „VIDEO”.
- Według nazwy tagu i numeru indeksu: wyjściowe strumienie audio są identyfikowane przez połączenie nazwy tagu
AUDIO
oraz numerów indeksu0
i1
.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
// SetAny() is used to specify that whatever the type of the
// stream is, it's acceptable. This does not mean that any
// packet is acceptable. Packets in the stream still have a
// particular type. SetAny() has the same effect as explicitly
// setting the type to be the stream's type.
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
return absl::OkStatus();
}
W trakcie przetwarzania
Funkcja Process()
wywołana z węzła innego niż źródłowy musi zwrócić wartość absl::OkStatus()
, aby zasygnalizować, że wszystko poszło dobrze, lub dowolny inny kod stanu sygnalizujący błąd
Jeśli kalkulator nieźródłowy zwraca wartość tool::StatusStop()
, oznacza to, że wykres jest wcześniej anulowany. W tym przypadku wszystkie kalkulatory źródeł i strumienie danych wejściowych wykresu zostaną zamknięte (a pozostałe pakiety zostaną rozpowszechnione na wykresie).
Węzeł źródłowy na wykresie będzie nadal wywoływany przez funkcję Process()
, dopóki będzie zwracać wartość absl::OkStatus(
. Aby wskazać, że nie ma więcej danych do wygenerowania, zwróć wartość tool::StatusStop()
. Każdy inny stan oznacza,
że wystąpił błąd.
Close()
zwraca wartość absl::OkStatus()
, co oznacza, że się udało. Każdy inny stan wskazuje na błąd.
Oto podstawowa funkcja Process()
. Korzysta z metody Input()
(której można używać tylko wtedy, gdy kalkulator ma tylko jedno dane wejściowe) do żądania danych wejściowych. Następnie wykorzystuje std::unique_ptr
do przydzielenia pamięci potrzebnej na pakiet wyjściowy i wykonuje obliczenia. Gdy skończysz, zwalnia wskaźnik podczas dodawania go do strumienia danych wyjściowych.
absl::Status MyCalculator::Process() {
const Matrix& input = Input()->Get<Matrix>();
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
// do your magic here....
// output->row(n) = ...
Output()->Add(output.release(), InputTimestamp());
return absl::OkStatus();
}
Opcje kalkulatora
Kalkulatory akceptują parametry przetwarzania przez (1) pakiety strumienia wejściowego (2) pakiety po stronie wejściowej i (3) opcje kalkulatora. Opcje kalkulatora (jeśli są określone) pojawiają się jako wartości literałowe w polu node_options
komunikatu CalculatorGraphConfiguration.Node
.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Pole node_options
akceptuje składnię proto3. Opcje kalkulatora można też określić w polu options
przy użyciu składni proto2.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Nie wszystkie kalkulatory obsługują opcje. Aby akceptować opcje, kalkulator zwykle definiuje nowy typ wiadomości protobuf reprezentującej opcje, np. PacketClonerCalculatorOptions
. Kalkulator odczyta następnie tę wiadomość protobuf za pomocą metody CalculatorBase::Open
, a także również w funkcji CalculatorBase::GetContract
lub metod CalculatorBase::Process
. Normalnie nowy typ wiadomości protobuf będzie zdefiniowany jako schemat protokołu za pomocą pliku „.proto” i reguły kompilacji mediapipe_proto_library()
.
mediapipe_proto_library(
name = "packet_cloner_calculator_proto",
srcs = ["packet_cloner_calculator.proto"],
visibility = ["//visibility:public"],
deps = [
"//mediapipe/framework:calculator_options_proto",
"//mediapipe/framework:calculator_proto",
],
)
Przykładowy kalkulator
W tej sekcji opisujemy implementację funkcji PacketClonerCalculator
, która jest stosunkowo prosta i używana w wielu wykresach kalkulatorów.
PacketClonerCalculator
po prostu tworzy na żądanie kopię najnowszych pakietów wejściowych.
Funkcja PacketClonerCalculator
jest przydatna, gdy sygnatury czasowe przychodzących pakietów danych nie są idealnie dopasowane. Załóżmy, że mamy pomieszczenie z mikrofonem,
czujnikiem światła i kamerą, która zbiera dane sensoryczne. Każdy z czujników działa niezależnie i zbiera dane nieregularnie. Załóżmy, że dane wyjściowe każdego czujnika to:
- mikrofon = głośność w decybelach dźwięku w pomieszczeniu (liczba całkowita)
- czujnik światła = jasność w pomieszczeniu (liczba całkowita)
- kamera wideo = ramka obrazu RGB pokoju (ImageFrame)
Nasz prosty potok percepcyjny został zaprojektowany do przetwarzania danych sensorycznych z tych 3 czujników w dowolnym momencie, gdy mamy dane klatki obrazu z kamery synchronizowane z ostatnio zebranymi danymi na temat głośności mikrofonu i jasnością z czujnika światła. W tym celu w MediaPipe nasz potok percepcyjny zawiera 3 strumienie danych wejściowych:
- Room_mic_signal – każda pakiet danych w tym strumieniu wejściowym jest liczbą całkowitą reprezentującą głośność dźwięku w pomieszczeniu (z sygnaturą czasową).
- Room_lightening_sensor – każda pakiet danych w tym strumieniu wejściowym jest liczbą całkowitą określającą jasność pomieszczenia pomieszczeń za pomocą sygnatury czasowej.
- Room_video_tick_signal – każda pakiet danych w tym strumieniu wejściowym to ramka obrazu danych wideo reprezentująca film zebrany przez kamerę w sali z sygnaturą czasową.
Poniżej znajduje się implementacja PacketClonerCalculator
. Widać metody GetContract()
, Open()
i Process()
, a także zmienną instancji current_
, która zawiera najnowsze pakiety wejściowe.
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
// cc->Inputs().NumEntries() returns the number of input streams
// for the PacketClonerCalculator
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
// cc->Inputs().Index(i) returns the input stream pointer by index
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
// Sets the output stream of index i header to be the same as
// the header for the input stream of index i
}
}
return absl::OkStatus();
}
absl::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output if the tick signal is non-empty.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
// Add a packet to output stream of index i a packet from inputstream i
// with timestamp common to all present inputs
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
// if current_[i], 1 packet buffer for input stream i is empty, we will set
// next allowed timestamp for input stream i to be current timestamp + 1
}
}
}
return absl::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
Kalkulator zawiera zazwyczaj tylko plik .cc. Rozszerzenie .h nie jest wymagane, ponieważ Mediapipe korzysta z rejestracji do uzyskiwania informacji o kalkulatorach. Po zdefiniowaniu klasy kalkulatora zarejestruj ją za pomocą wywołania REGISTER_CALCULATOR(nazwa_klasy_kalkulatora).
Poniżej znajduje się prosty wykres MediaPipe zawierający 3 strumienie wejściowe, 1 węzeł (PacketClonerCalculator) i 2 strumienie wyjściowe.
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
node {
calculator: "PacketClonerCalculator"
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
output_stream: "cloned_room_mic_signal"
output_stream: "cloned_lighting_sensor"
}
Poniższy diagram przedstawia, jak PacketClonerCalculator
definiuje swoje pakiety wyjściowe (na dole) na podstawie serii pakietów wejściowych (góra).
Za każdym razem, gdy otrzymuje pakiet w strumieniu wejściowym TICK, PacketClonerCalculator wysyła ostatni pakiet z każdego ze swoich strumieni wejściowych. Kolejność pakietów wyjściowych (u dołu) jest określana na podstawie sekwencji pakietów wejściowych (u góry) i ich sygnatur czasowych. Sygnatury czasowe są widoczne po prawej stronie wykresu. |