Çdo kalkulator është një nyje e një grafiku. Ne përshkruajmë se si të krijohet një kalkulator i ri, si të inicializohet një kalkulator, si të kryhen llogaritjet e tij, rrjedhat hyrëse dhe dalëse, vulat kohore dhe opsionet. Çdo nyje në grafik zbatohet si një Calculator
. Pjesa më e madhe e ekzekutimit të grafikut ndodh brenda kalkulatorëve të tij. Një kalkulator mund të marrë zero ose më shumë rryma hyrëse dhe/ose paketa anësore dhe prodhon zero ose më shumë rrjedha dalëse dhe/ose paketa anësore.
Baza llogaritëse
Një kalkulator krijohet duke përcaktuar një nënklasë të re të klasës CalculatorBase
, duke zbatuar një sërë metodash dhe duke regjistruar nënklasën e re me Mediapipe. Së paku, një kalkulator i ri duhet të zbatojë katër metodat e mëposhtme
-
GetContract()
- Autorët e makinës llogaritëse mund të specifikojnë llojet e pritshme të hyrjeve dhe daljeve të një kalkulatori në GetContract(). Kur një grafik inicializohet, korniza thërret një metodë statike për të verifikuar nëse llojet e paketave të hyrjeve dhe daljeve të lidhura përputhen me informacionin në këtë specifikim.
-
Open()
- Pasi të fillojë një grafik, korniza thërret
Open()
. Paketat anësore të hyrjes janë të disponueshme për kalkulatorin në këtë pikë.Open()
interpreton operacionet e konfigurimit të nyjeve (shih Grafikët ) dhe përgatit gjendjen e kalkulatorit për ekzekutimin e grafikut. Ky funksion gjithashtu mund të shkruajë paketa në daljet e kalkulatorit. Një gabim gjatëOpen()
mund të përfundojë ekzekutimin e grafikut.
- Pasi të fillojë një grafik, korniza thërret
-
Process()
- Për një kalkulator me hyrje, korniza thërret
Process()
në mënyrë të përsëritur sa herë që të paktën një rrymë hyrëse ka një paketë të disponueshme. Korniza si parazgjedhje garanton që të gjitha hyrjet të kenë të njëjtën vulë kohore (shih Sinkronizimin për më shumë informacion). Thirrjet MultipleProcess()
mund të thirren njëkohësisht kur ekzekutimi paralel është i aktivizuar. Nëse ndodh një gabim gjatëProcess()
, korniza thërretClose()
dhe ekzekutimi i grafikut përfundon.
- Për një kalkulator me hyrje, korniza thërret
-
Close()
- Pas përfundimit të të gjitha thirrjeve në
Process()
ose kur mbyllen të gjitha transmetimet hyrëse, korniza thërretClose()
. Ky funksion thirret gjithmonë nëseOpen()
thirrej dhe kishte sukses dhe edhe nëse ekzekutimi i grafikut përfundon për shkak të një gabimi. Asnjë hyrje nuk disponohet nëpërmjet asnjë rryme hyrëse gjatëClose()
, por ai ende ka akses në paketat anësore hyrëse dhe për këtë arsye mund të shkruajë dalje. Pasi të kthehetClose()
, kalkulatori duhet të konsiderohet si një nyje e vdekur. Objekti i kalkulatorit shkatërrohet sapo grafiku përfundon ekzekutimin.
- Pas përfundimit të të gjitha thirrjeve në
Më poshtë janë pjesët e kodit nga CalculatorBase.h .
class CalculatorBase {
public:
...
// The subclasses of CalculatorBase must implement GetContract.
// ...
static absl::Status GetContract(CalculatorContract* cc);
// Open is called before any Process() calls, on a freshly constructed
// calculator. Subclasses may override this method to perform necessary
// setup, and possibly output Packets and/or set output streams' headers.
// ...
virtual absl::Status Open(CalculatorContext* cc) {
return absl::OkStatus();
}
// Processes the incoming inputs. May call the methods on cc to access
// inputs and produce outputs.
// ...
virtual absl::Status Process(CalculatorContext* cc) = 0;
// Is called if Open() was called and succeeded. Is called either
// immediately after processing is complete or after a graph run has ended
// (if an error occurred in the graph). ...
virtual absl::Status Close(CalculatorContext* cc) {
return absl::OkStatus();
}
...
};
Jeta e një kalkulatori
Gjatë inicializimit të një grafiku MediaPipe, korniza thërret një metodë statike GetContract()
për të përcaktuar se çfarë lloj paketash priten.
Korniza ndërton dhe shkatërron të gjithë kalkulatorin për çdo ekzekutim të grafikut (p.sh. një herë për video ose një herë për imazh). Objektet e shtrenjta ose të mëdha që mbeten konstante përgjatë ekzekutimeve të grafikut duhet të sigurohen si pako anësore hyrëse, në mënyrë që llogaritjet të mos përsëriten në ekzekutimet pasuese.
Pas inicializimit, për çdo ekzekutim të grafikut, ndodh sekuenca e mëposhtme:
-
Open()
-
Process()
(në mënyrë të përsëritur) -
Close()
Korniza thërret Open()
për të inicializuar kalkulatorin. Open()
duhet të interpretojë çdo opsion dhe të vendosë gjendjen e kalkulatorit për ekzekutimin e grafikut. Open()
mund të marrë paketa anësore hyrëse dhe të shkruajë paketa në daljet e kalkulatorit. Nëse është e përshtatshme, duhet të thërrasë SetOffset()
për të reduktuar buferimin e mundshëm të paketave të rrymave hyrëse.
Nëse ndodh një gabim gjatë Open()
ose Process()
(siç tregohet nga njëri prej tyre që kthen një status jo- Ok
), ekzekutimi i grafikut përfundon pa thirrje të mëtejshme në metodat e kalkulatorit dhe kalkulatori shkatërrohet.
Për një kalkulator me hyrje, korniza thërret Process()
sa herë që të paktën një hyrje ka një paketë të disponueshme. Korniza garanton që të gjitha hyrjet kanë të njëjtën stamp kohore, që stampat kohore rriten me çdo thirrje në Process()
dhe që të gjitha paketat dorëzohen. Si pasojë, disa hyrje mund të mos kenë asnjë pako kur thirret Process()
. Një hyrje, paketa e së cilës mungon, duket se prodhon një paketë të zbrazët (pa stampim kohor).
Korniza thërret Close()
pas të gjitha thirrjeve në Process()
. Të gjitha hyrjet do të jenë shteruar, por Close()
ka akses në paketat anësore hyrëse dhe mund të shkruajë dalje. Pasi të kthehet Mbyllja, kalkulatori shkatërrohet.
Llogaritësit pa hyrje referohen si burime. Një kalkulator burimi vazhdon të ketë Process()
të thirrur për sa kohë që kthen një status Ok
. Llogaritësi i burimit tregon se është shteruar duke kthyer statusin e ndalimit (p.sh. mediaPipe::tool::StatusStop()
.).
Identifikimi i hyrjeve dhe daljeve
Ndërfaqja publike me një kalkulator përbëhet nga një grup rrymash hyrëse dhe rrjedhash dalëse. Në një CalculatorGraphConfiguration, daljet nga disa kalkulatorë lidhen me hyrjet e kalkulatorëve të tjerë duke përdorur rryma të emërtuara. Emrat e transmetimit zakonisht janë me shkronja të vogla, ndërsa etiketat hyrëse dhe dalëse janë zakonisht ME SHQIPËRI. Në shembullin e mëposhtëm, dalja me emrin e etiketës VIDEO
lidhet me hyrjen me emrin e etiketës VIDEO_IN
duke përdorur transmetimin e quajtur video_stream
.
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "INPUT:combined_input"
output_stream: "VIDEO:video_stream"
}
node {
calculator: "SomeVideoCalculator"
input_stream: "VIDEO_IN:video_stream"
output_stream: "VIDEO_OUT:processed_video"
}
Rrjedhat hyrëse dhe dalëse mund të identifikohen me numrin e indeksit, me emrin e etiketës ose nga një kombinim i emrit të etiketës dhe numrit të indeksit. Ju mund të shihni disa shembuj të identifikuesve të hyrjes dhe daljes në shembullin e mëposhtëm. SomeAudioVideoCalculator
identifikon daljen e videos sipas etiketës dhe daljet audio nga kombinimi i etiketës dhe indeksit. Hyrja me etiketën VIDEO
është e lidhur me transmetimin e quajtur video_stream
. Daljet me etiketën AUDIO
dhe indekset 0
dhe 1
janë të lidhura me transmetimet e quajtura audio_left
dhe audio_right
. SomeAudioCalculator
identifikon hyrjet e tij audio vetëm sipas indeksit (nuk nevojitet etiketë).
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "combined_input"
output_stream: "VIDEO:video_stream"
output_stream: "AUDIO:0:audio_left"
output_stream: "AUDIO:1:audio_right"
}
node {
calculator: "SomeAudioCalculator"
input_stream: "audio_left"
input_stream: "audio_right"
output_stream: "audio_energy"
}
Në zbatimin e kalkulatorit, hyrjet dhe daljet identifikohen gjithashtu me emrin e etiketës dhe numrin e indeksit. Në funksionin e mëposhtëm identifikohen hyrjet dhe daljet:
- Sipas numrit të indeksit: Rrjedha e kombinuar e hyrjes identifikohet thjesht nga indeksi
0
. - Sipas emrit të etiketës: Transmetimi i daljes së videos identifikohet me emrin e etiketës "VIDEO".
- Sipas emrit të etiketës dhe numrit të indeksit: Transmetimet audio në dalje identifikohen nga kombinimi i emrit të etiketës
AUDIO
dhe numrave të indeksit0
dhe1
.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
// SetAny() is used to specify that whatever the type of the
// stream is, it's acceptable. This does not mean that any
// packet is acceptable. Packets in the stream still have a
// particular type. SetAny() has the same effect as explicitly
// setting the type to be the stream's type.
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
return absl::OkStatus();
}
Përpunimi
Process()
i thirrur në një nyje jo-burimore duhet të kthejë absl::OkStatus()
për të treguar që gjithçka shkoi mirë, ose ndonjë kod tjetër statusi për të sinjalizuar një gabim
Nëse një kalkulator pa burim kthen tool::StatusStop()
, atëherë kjo sinjalizon se grafiku po anulohet herët. Në këtë rast, të gjithë kalkulatorët e burimit dhe rrjedhat e hyrjes së grafikut do të mbyllen (dhe Paketat e mbetura do të përhapen nëpër grafik).
Një nyje burimi në një grafik do të vazhdojë të ketë Process()
të thirrur për sa kohë që kthen absl::OkStatus(
). Për të treguar se nuk ka më të dhëna për t'u gjeneruar, kthen tool::StatusStop()
. Çdo status tjetër tregon se ka ndodhur një gabim.
Close()
kthen absl::OkStatus()
për të treguar suksesin. Çdo status tjetër tregon një dështim.
Këtu është funksioni bazë Process()
. Përdor metodën Input()
(e cila mund të përdoret vetëm nëse kalkulatori ka një hyrje të vetme) për të kërkuar të dhënat e tij hyrëse. Më pas përdor std::unique_ptr
për të ndarë memorien e nevojshme për paketën dalëse dhe bën llogaritjet. Kur të përfundojë, ai lëshon treguesin kur e shton atë në rrjedhën e daljes.
absl::Status MyCalculator::Process() {
const Matrix& input = Input()->Get<Matrix>();
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
// do your magic here....
// output->row(n) = ...
Output()->Add(output.release(), InputTimestamp());
return absl::OkStatus();
}
Opsionet e kalkulatorit
Llogaritësit pranojnë parametrat e përpunimit përmes (1) paketave të rrjedhës hyrëse (2) paketave anësore hyrëse dhe (3) opsioneve të kalkulatorit. Opsionet e kalkulatorit, nëse specifikohen, shfaqen si vlera literale në fushën node_options
të mesazhit CalculatorGraphConfiguration.Node
.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Fusha node_options
pranon sintaksën proto3. Përndryshe, opsionet e kalkulatorit mund të specifikohen në fushën e options
duke përdorur sintaksën proto2.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Jo të gjithë kalkulatorët pranojnë opsionet e kalkulatorit. Për të pranuar opsionet, një kalkulator normalisht do të përcaktojë një lloj të ri mesazhi protobuf për të përfaqësuar opsionet e tij, të tilla si PacketClonerCalculatorOptions
. Llogaritësi do të lexojë atë mesazh protobuf në metodën e tij CalculatorBase::Open
, dhe ndoshta edhe në funksionin e tij CalculatorBase::GetContract
ose metodën e tij CalculatorBase::Process
. Normalisht, lloji i ri i mesazhit protobuf do të përcaktohet si një skemë protobuf duke përdorur një skedar ".proto" dhe një rregull ndërtimi mediapipe_proto_library()
.
mediapipe_proto_library(
name = "packet_cloner_calculator_proto",
srcs = ["packet_cloner_calculator.proto"],
visibility = ["//visibility:public"],
deps = [
"//mediapipe/framework:calculator_options_proto",
"//mediapipe/framework:calculator_proto",
],
)
Shembull kalkulator
Ky seksion diskuton zbatimin e PacketClonerCalculator
, i cili bën një punë relativisht të thjeshtë dhe përdoret në shumë grafikë kalkulatorësh. PacketClonerCalculator
thjesht prodhon një kopje të paketave të tij më të fundit hyrëse sipas kërkesës.
PacketClonerCalculator
është i dobishëm kur vulat kohore të mbërritjes së paketave të të dhënave nuk janë të përafruara në mënyrë të përsosur. Supozoni se kemi një dhomë me një mikrofon, sensor drite dhe një videokamerë që mbledh të dhëna shqisore. Secili prej sensorëve funksionon në mënyrë të pavarur dhe mbledh të dhëna me ndërprerje. Supozoni se dalja e secilit sensor është:
- mikrofon = zëri në decibel në dhomë (Numër i plotë)
- sensori i dritës = ndriçimi i dhomës (Numër i plotë)
- videokamera = kornizë imazhi RGB e dhomës (ImageFrame)
Tubacioni ynë i thjeshtë i perceptimit është krijuar për të përpunuar të dhënat ndijore nga këta 3 sensorë, në mënyrë që në çdo kohë kur kemi të dhëna të kornizës së imazhit nga kamera që sinkronizohen me të dhënat e grumbulluara të fundit të zhurmës së mikrofonit dhe të dhënat e ndriçimit të sensorit të dritës. Për ta bërë këtë me MediaPipe, tubacioni ynë i perceptimit ka 3 prurje hyrëse:
- room_mic_signal - Çdo paketë e të dhënave në këtë transmetim hyrës është të dhëna me numër të plotë që përfaqëson se sa e lartë është audio në një dhomë me vulën kohore.
- room_lightening_sensor - Çdo paketë e të dhënave në këtë rrymë hyrëse është të dhëna të plota që përfaqësojnë sa e ndritshme është dhoma e ndriçuar me vulën kohore.
- room_video_tick_signal - Çdo paketë e të dhënave në këtë transmetim hyrës është kornizë imazhi e të dhënave video që përfaqësojnë videon e mbledhur nga kamera në dhomë me vulën kohore.
Më poshtë është implementimi i PacketClonerCalculator
. Ju mund të shihni metodat GetContract()
, Open()
dhe Process()
si dhe variablin e shembullit current_
që mban paketat hyrëse më të fundit.
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
// cc->Inputs().NumEntries() returns the number of input streams
// for the PacketClonerCalculator
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
// cc->Inputs().Index(i) returns the input stream pointer by index
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
// Sets the output stream of index i header to be the same as
// the header for the input stream of index i
}
}
return absl::OkStatus();
}
absl::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output if the tick signal is non-empty.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
// Add a packet to output stream of index i a packet from inputstream i
// with timestamp common to all present inputs
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
// if current_[i], 1 packet buffer for input stream i is empty, we will set
// next allowed timestamp for input stream i to be current timestamp + 1
}
}
}
return absl::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
Në mënyrë tipike, një kalkulator ka vetëm një skedar .cc. Nuk kërkohet .h, sepse mediapipe përdor regjistrimin për t'i bërë të njohur llogaritësit. Pasi të keni përcaktuar klasën tuaj të kalkulatorit, regjistrojeni atë me një thirrje makro REGISTER_CALCULATOR(calculator_class_name).
Më poshtë është një grafik i parëndësishëm MediaPipe që ka 3 prurje hyrëse, 1 nyje (PacketClonerCalculator) dhe 2 rrjedha dalëse.
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
node {
calculator: "PacketClonerCalculator"
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
output_stream: "cloned_room_mic_signal"
output_stream: "cloned_lighting_sensor"
}
Diagrami më poshtë tregon se si PacketClonerCalculator
përcakton paketat e tij dalëse (poshtë) bazuar në serinë e paketave hyrëse (lart).
![]() |
---|
Sa herë që merr një paketë në rrymën e tij hyrëse TICK, PacketClonerCalculator nxjerr paketën më të fundit nga secili prej rrymave të tij hyrëse. Sekuenca e paketave dalëse (poshtë) përcaktohet nga sekuenca e paketave hyrëse (lart) dhe vulat e tyre kohore. Vula kohore tregohen përgjatë anës së djathtë të diagramit. |