Setiap kalkulator adalah simpul grafik. Kami menjelaskan cara membuat
kalkulator baru, cara menginisialisasi kalkulator, cara melakukan penghitungan,
aliran input dan output, stempel waktu, dan opsinya. Setiap node dalam grafik
diimplementasikan sebagai Calculator
. Sebagian besar eksekusi grafik terjadi
di dalam kalkulatornya. Kalkulator dapat menerima nol atau beberapa aliran input dan/atau paket
samping serta menghasilkan nol atau lebih aliran output dan/atau paket samping.
CalculatorBase
Kalkulator dibuat dengan menentukan subclass baru dari
class
CalculatorBase
, mengimplementasikan sejumlah metode, dan mendaftarkan subclass baru tersebut dengan
Mediapipe. Setidaknya, kalkulator baru harus mengimplementasikan empat metode di bawah ini
GetContract()
- Penulis kalkulator dapat menentukan jenis input dan output kalkulator yang diharapkan di GetContract(). Saat grafik diinisialisasi, framework akan memanggil metode statis untuk memverifikasi apakah jenis paket dari input dan output yang terhubung cocok dengan informasi dalam spesifikasi ini.
Open()
- Setelah grafik dimulai, framework akan memanggil
Open()
. Paket sisi input tersedia untuk kalkulator pada saat ini.Open()
menafsirkan operasi konfigurasi node (lihat Grafik) dan menyiapkan status per grafik yang dijalankan pada kalkulator. Fungsi ini juga dapat menulis paket ke output kalkulator. Error selamaOpen()
dapat menghentikan jalan grafik.
- Setelah grafik dimulai, framework akan memanggil
Process()
- Untuk kalkulator dengan input, framework memanggil
Process()
berulang kali setiap kali setidaknya satu aliran input memiliki paket yang tersedia. Framework secara default menjamin bahwa semua input memiliki stempel waktu yang sama (lihat Sinkronisasi untuk informasi selengkapnya). Beberapa panggilanProcess()
dapat dipanggil secara bersamaan saat eksekusi paralel diaktifkan. Jika terjadi error selamaProcess()
, framework akan memanggilClose()
dan grafik yang dijalankan akan dihentikan.
- Untuk kalkulator dengan input, framework memanggil
Close()
- Setelah semua panggilan ke
Process()
selesai atau saat semua aliran input ditutup, framework akan memanggilClose()
. Fungsi ini selalu dipanggil jikaOpen()
dipanggil dan berhasil, dan meskipun grafik berjalan dihentikan karena terjadi error. Tidak ada input yang tersedia melalui aliran input apa pun selamaClose()
, tetapi masih memiliki akses ke paket samping input, sehingga dapat menulis output. SetelahClose()
ditampilkan, kalkulator harus dianggap sebagai node mati. Objek kalkulator dihancurkan segera setelah grafik selesai berjalan.
- Setelah semua panggilan ke
Berikut adalah cuplikan kode dari CalculatorBase.h.
class CalculatorBase {
public:
...
// The subclasses of CalculatorBase must implement GetContract.
// ...
static absl::Status GetContract(CalculatorContract* cc);
// Open is called before any Process() calls, on a freshly constructed
// calculator. Subclasses may override this method to perform necessary
// setup, and possibly output Packets and/or set output streams' headers.
// ...
virtual absl::Status Open(CalculatorContext* cc) {
return absl::OkStatus();
}
// Processes the incoming inputs. May call the methods on cc to access
// inputs and produce outputs.
// ...
virtual absl::Status Process(CalculatorContext* cc) = 0;
// Is called if Open() was called and succeeded. Is called either
// immediately after processing is complete or after a graph run has ended
// (if an error occurred in the graph). ...
virtual absl::Status Close(CalculatorContext* cc) {
return absl::OkStatus();
}
...
};
Cara kerja kalkulator
Selama inisialisasi grafik MediaPipe, framework memanggil
metode statis GetContract()
untuk menentukan jenis paket yang diharapkan.
Framework ini membuat dan menghancurkan seluruh kalkulator untuk setiap grafik yang dijalankan (misalnya, sekali per video atau sekali per gambar). Objek yang mahal atau besar yang tetap konstan di seluruh operasi grafik harus disediakan sebagai paket sisi input sehingga perhitungan tidak diulang pada operasi berikutnya.
Setelah inisialisasi, untuk setiap pengoperasian grafik, urutan berikut akan terjadi:
Open()
Process()
(berulang)Close()
Framework ini memanggil Open()
untuk menginisialisasi kalkulator. Open()
harus menafsirkan opsi apa pun dan menyiapkan status per grafik yang dijalankan pada kalkulator. Open()
dapat memperoleh paket samping input dan menulis paket ke output kalkulator. Jika
sesuai, aplikasi harus memanggil SetOffset()
untuk mengurangi potensi buffering paket
pada aliran input.
Jika terjadi error selama Open()
atau Process()
(seperti yang ditunjukkan oleh salah satu dari mereka
yang menampilkan status non-Ok
), proses grafik akan dihentikan tanpa panggilan lebih lanjut
ke metode kalkulator, dan kalkulator akan dihancurkan.
Untuk kalkulator dengan input, framework akan memanggil Process()
setiap kali setidaknya
satu input memiliki paket yang tersedia. Framework ini menjamin bahwa semua input memiliki
stempel waktu yang sama, stempel waktu akan meningkat seiring dengan setiap panggilan ke Process()
dan
semua paket dikirimkan. Akibatnya, beberapa input mungkin tidak memiliki
paket apa pun saat Process()
dipanggil. Input yang paketnya hilang tampaknya
menghasilkan paket kosong (tanpa stempel waktu).
Framework akan memanggil Close()
setelah semua panggilan ke Process()
. Semua input akan
habis, tetapi Close()
memiliki akses ke paket samping input dan dapat
menulis output. Setelah Close ditampilkan, kalkulator akan dihancurkan.
Kalkulator tanpa {i>input<i} disebut sebagai sumber. Kalkulator sumber
tetap memiliki Process()
yang dipanggil selama menampilkan status Ok
. Kalkulator sumber menunjukkan bahwa kalkulator habis dengan menampilkan status perhentian (yaitu mediaPipe::tool::StatusStop()
.).
Mengidentifikasi input dan output
Antarmuka publik pada kalkulator terdiri dari sekumpulan aliran input dan
aliran output. Pada CalculatorGraphConfiguration, output dari beberapa
kalkulator terhubung ke input kalkulator lain menggunakan aliran
bernama. Nama aliran data biasanya huruf kecil, sedangkan tag input dan output biasanya HURUF BESAR. Pada contoh di bawah, output dengan nama tag VIDEO
terhubung ke input dengan nama tag VIDEO_IN
menggunakan aliran data bernama
video_stream
.
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "INPUT:combined_input"
output_stream: "VIDEO:video_stream"
}
node {
calculator: "SomeVideoCalculator"
input_stream: "VIDEO_IN:video_stream"
output_stream: "VIDEO_OUT:processed_video"
}
Aliran input dan output dapat diidentifikasi berdasarkan nomor indeks, nama tag, atau kombinasi nama tag dan nomor indeks. Anda dapat melihat beberapa contoh ID input
dan output pada contoh di bawah ini. SomeAudioVideoCalculator
mengidentifikasi output videonya berdasarkan tag dan output audionya berdasarkan kombinasi tag dan indeks. Input dengan tag VIDEO
terhubung ke aliran data bernama
video_stream
. Output dengan tag AUDIO
serta indeks 0
dan 1
terhubung ke aliran data bernama audio_left
dan audio_right
.
SomeAudioCalculator
mengidentifikasi input audionya hanya berdasarkan indeks (tidak diperlukan tag).
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "combined_input"
output_stream: "VIDEO:video_stream"
output_stream: "AUDIO:0:audio_left"
output_stream: "AUDIO:1:audio_right"
}
node {
calculator: "SomeAudioCalculator"
input_stream: "audio_left"
input_stream: "audio_right"
output_stream: "audio_energy"
}
Dalam implementasi kalkulator, input dan output juga diidentifikasi oleh nama tag dan nomor indeks. Pada fungsi di bawah ini, input dan output diidentifikasi:
- Menurut nomor indeks: Aliran input gabungan diidentifikasi hanya berdasarkan indeks
0
. - Menurut nama tag: Aliran output video diidentifikasi berdasarkan nama tag "VIDEO".
- Berdasarkan nama tag dan nomor indeks: Streaming audio output diidentifikasi dengan kombinasi nama tag
AUDIO
serta nomor indeks0
dan1
.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
// SetAny() is used to specify that whatever the type of the
// stream is, it's acceptable. This does not mean that any
// packet is acceptable. Packets in the stream still have a
// particular type. SetAny() has the same effect as explicitly
// setting the type to be the stream's type.
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
return absl::OkStatus();
}
Memproses
Process()
yang dipanggil pada node non-sumber harus menampilkan absl::OkStatus()
untuk
menunjukkan bahwa semuanya berjalan dengan baik, atau kode status lainnya untuk menandakan error
Jika kalkulator non-sumber menampilkan tool::StatusStop()
, ini menandakan
grafik dibatalkan lebih awal. Dalam hal ini, semua kalkulator sumber dan aliran input grafik akan ditutup (dan Paket yang tersisa akan disebarkan melalui grafik).
Node sumber dalam grafik akan terus memanggil Process()
selama
menampilkan absl::OkStatus(
). Untuk menunjukkan bahwa tidak ada lagi data yang akan
dihasilkan, tampilkan tool::StatusStop()
. Status lainnya menunjukkan bahwa
telah terjadi error.
Close()
menampilkan absl::OkStatus()
untuk menunjukkan keberhasilan. Status lainnya
menunjukkan kegagalan.
Berikut adalah fungsi Process()
dasar. Contoh ini menggunakan metode Input()
(yang hanya dapat
digunakan jika kalkulator memiliki satu input) untuk meminta data inputnya. Selanjutnya,
std::unique_ptr
akan digunakan untuk mengalokasikan memori yang diperlukan oleh paket output,
dan melakukan penghitungan. Setelah selesai, pointer akan dilepas saat menambahkannya ke
streaming output.
absl::Status MyCalculator::Process() {
const Matrix& input = Input()->Get<Matrix>();
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
// do your magic here....
// output->row(n) = ...
Output()->Add(output.release(), InputTimestamp());
return absl::OkStatus();
}
Opsi kalkulator
Kalkulator menerima parameter pemrosesan melalui (1) paket aliran input (2) paket sisi input, dan (3) opsi kalkulator. Opsi kalkulator, jika ditentukan, muncul sebagai nilai literal di kolom node_options
dari pesan CalculatorGraphConfiguration.Node
.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Kolom node_options
menerima sintaksis proto3. Atau, opsi
kalkulator dapat ditentukan dalam kolom options
menggunakan sintaksis proto2.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Tidak semua kalkulator menerima opsi kalkulator. Untuk menerima opsi, kalkulator biasanya akan menentukan jenis pesan protobuf baru untuk mewakili
opsinya, seperti PacketClonerCalculatorOptions
. Kalkulator kemudian akan
membaca pesan protobuf tersebut dalam metode CalculatorBase::Open
, dan mungkin
juga dalam fungsi CalculatorBase::GetContract
atau
metode CalculatorBase::Process
. Biasanya, jenis pesan protobuf baru akan ditentukan sebagai skema protobuf menggunakan file ".proto" dan aturan build mediapipe_proto_library()
.
mediapipe_proto_library(
name = "packet_cloner_calculator_proto",
srcs = ["packet_cloner_calculator.proto"],
visibility = ["//visibility:public"],
deps = [
"//mediapipe/framework:calculator_options_proto",
"//mediapipe/framework:calculator_proto",
],
)
Kalkulator contoh
Bagian ini membahas implementasi PacketClonerCalculator
, yang
melakukan tugas yang relatif sederhana, dan digunakan di banyak grafik kalkulator.
PacketClonerCalculator
hanya menghasilkan salinan paket input terbaru
sesuai permintaan.
PacketClonerCalculator
berguna saat stempel waktu paket data yang masuk
tidak diselaraskan dengan sempurna. Misalkan kita memiliki ruangan dengan mikrofon, sensor
cahaya, dan kamera video yang mengumpulkan data sensorik. Setiap sensor
beroperasi secara independen dan mengumpulkan data secara bergantian. Misalkan output
dari setiap sensor:
- mikrofon = kenyaringan dalam desibel suara di ruangan (Bilangan Bulat)
- sensor cahaya = kecerahan ruangan (Bilangan bulat)
- kamera video = bingkai gambar RGB dalam ruangan (ImageFrame)
Pipeline persepsi sederhana kita dirancang untuk memproses data sensorik dari 3 sensor ini, sehingga kapan saja saat kita memiliki data frame gambar dari kamera yang disinkronkan dengan data kenyaringan mikrofon yang terakhir dikumpulkan dan data kecerahan sensor cahaya. Untuk melakukannya dengan MediaPipe, pipeline persepsi kita memiliki 3 aliran input:
- room_mic_signal - Setiap paket data dalam aliran input ini adalah data bilangan bulat yang mewakili seberapa keras audio di ruang dengan stempel waktu.
- room_lightening_sensor - Setiap paket data dalam aliran input ini adalah data bilangan bulat yang mewakili seberapa terang ruangan yang diterangi dengan stempel waktu.
- room_video_tick_signal - Setiap paket data dalam aliran input ini adalah imageframe data video yang mewakili video yang dikumpulkan dari kamera di ruang dengan stempel waktu.
Berikut adalah implementasi PacketClonerCalculator
. Anda dapat melihat
metode GetContract()
, Open()
, dan Process()
serta variabel
instance current_
yang menyimpan paket input terbaru.
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
// cc->Inputs().NumEntries() returns the number of input streams
// for the PacketClonerCalculator
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
// cc->Inputs().Index(i) returns the input stream pointer by index
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
// Sets the output stream of index i header to be the same as
// the header for the input stream of index i
}
}
return absl::OkStatus();
}
absl::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output if the tick signal is non-empty.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
// Add a packet to output stream of index i a packet from inputstream i
// with timestamp common to all present inputs
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
// if current_[i], 1 packet buffer for input stream i is empty, we will set
// next allowed timestamp for input stream i to be current timestamp + 1
}
}
}
return absl::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
Biasanya, kalkulator hanya memiliki file .cc. Tidak diperlukan {i>.h<i}, karena mediapipe menggunakan pendaftaran untuk membuat kalkulator mengetahuinya. Setelah menentukan class kalkulator, daftarkan class dengan panggilan makro kumpulan_CALCULATOR(calculator_class_name).
Di bawah ini adalah grafik MediaPipe sederhana yang memiliki 3 aliran input, 1 node (PacketClonerCalculator), dan 2 aliran output.
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
node {
calculator: "PacketClonerCalculator"
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
output_stream: "cloned_room_mic_signal"
output_stream: "cloned_lighting_sensor"
}
Diagram di bawah menunjukkan cara PacketClonerCalculator
menentukan paket
outputnya (bawah) berdasarkan rangkaian paket inputnya (atas).
Setiap kali menerima paket pada aliran input TICK-nya, PacketClonerCalculator menghasilkan paket terbaru dari setiap aliran inputnya. Urutan paket output (bawah) ditentukan oleh urutan paket input (atas) dan stempel waktunya. Stempel waktu ditampilkan di sepanjang sisi kanan diagram. |