Setiap kalkulator adalah simpul dari suatu grafik. Kami menjelaskan cara membuat
kalkulator, cara menginisialisasi kalkulator, cara melakukan perhitungan,
aliran data input dan output, stempel waktu, dan opsi. Setiap {i>node<i} dalam grafik tersebut
diimplementasikan sebagai Calculator
. Sebagian besar eksekusi grafik terjadi di dalam
kalkulator. Kalkulator dapat menerima nol atau beberapa aliran input dan/atau sisi
paket dan menghasilkan nol atau
lebih aliran {i>output<i} dan/atau paket samping.
CalculatorBase
Kalkulator dibuat dengan mendefinisikan {i>sub-class <i}baru dari
CalculatorBase
mengimplementasikan sejumlah metode, dan mendaftarkan sub-class baru dengan
dan Mediapipe. Minimal, kalkulator baru harus menerapkan empat metode di bawah
GetContract()
- Penulis kalkulator dapat menentukan jenis input dan output yang diharapkan kalkulator di GetContract(). Ketika grafik diinisialisasi, kerangka kerja memanggil metode statis untuk memverifikasi apakah jenis paket untuk input dan output yang terhubung sesuai dengan informasi dalam spesifikasi pendukung.
Open()
- Setelah grafik dimulai, framework akan memanggil
Open()
. Sisi masukan paket tersedia untuk kalkulator pada tahap ini.Open()
menginterpretasikan operasi konfigurasi node (lihat Graphs) dan menyiapkan status kalkulator per-grafik-run. Fungsi ini mungkin juga menulis paket ke {i>output<i} kalkulator. Error selamaOpen()
dapat menghentikan proses grafik.
- Setelah grafik dimulai, framework akan memanggil
Process()
- Untuk kalkulator dengan input, framework memanggil
Process()
berulang kali setiap kali setidaknya satu aliran input memiliki paket yang tersedia. Kerangka kerja secara default menjamin bahwa semua input memiliki stempel waktu yang sama (lihat Sinkronisasi untuk informasi selengkapnya). Beberapa PanggilanProcess()
dapat dipanggil secara bersamaan saat eksekusi paralel diaktifkan. Jika terjadi error selamaProcess()
, framework akan memanggilClose()
dan operasi grafik akan berakhir.
- Untuk kalkulator dengan input, framework memanggil
Close()
- Setelah semua panggilan ke
Process()
selesai atau saat semua streaming input ditutup, framework memanggilClose()
. Fungsi ini selalu dipanggil jikaOpen()
dipanggil dan berhasil, meskipun operasi grafik dihentikan karena adanya {i>error<i}. Tidak ada input yang tersedia melalui aliran input apa pun selamaClose()
, tetapi masih memiliki akses ke paket sisi input dan sehingga dapat menulis output. SetelahClose()
ditampilkan, kalkulator harus dianggap sebagai simpul mati. Objek kalkulator dihancurkan sebagai segera setelah grafik selesai berjalan.
- Setelah semua panggilan ke
Berikut ini adalah cuplikan kode dari CalculatorBase.h.
class CalculatorBase {
public:
...
// The subclasses of CalculatorBase must implement GetContract.
// ...
static absl::Status GetContract(CalculatorContract* cc);
// Open is called before any Process() calls, on a freshly constructed
// calculator. Subclasses may override this method to perform necessary
// setup, and possibly output Packets and/or set output streams' headers.
// ...
virtual absl::Status Open(CalculatorContext* cc) {
return absl::OkStatus();
}
// Processes the incoming inputs. May call the methods on cc to access
// inputs and produce outputs.
// ...
virtual absl::Status Process(CalculatorContext* cc) = 0;
// Is called if Open() was called and succeeded. Is called either
// immediately after processing is complete or after a graph run has ended
// (if an error occurred in the graph). ...
virtual absl::Status Close(CalculatorContext* cc) {
return absl::OkStatus();
}
...
};
Cara kerja kalkulator
Selama inisialisasi grafik MediaPipe, framework memanggil
Metode statis GetContract()
untuk menentukan jenis paket yang diharapkan.
Framework ini membuat dan menghancurkan seluruh kalkulator untuk setiap proses grafik (misalnya, sekali per video atau sekali per gambar). Benda mahal atau besar yang tersisa di seluruh grafik yang berjalan harus disediakan sebagai paket sisi masukan sehingga penghitungan tidak diulang pada operasi berikutnya.
Setelah inisialisasi, untuk setiap pengoperasian grafik, urutan berikut akan terjadi:
Open()
Process()
(berulang kali)Close()
Framework ini memanggil Open()
untuk melakukan inisialisasi kalkulator. Open()
seharusnya
menafsirkan opsi apa pun dan mengatur
status per-grafik-run di kalkulator. Open()
dapat memperoleh paket sisi masukan dan
menulis paket ke {i>output<i} kalkulator. Jika
sesuai, fungsi ini harus memanggil SetOffset()
untuk mengurangi potensi buffering paket
aliran input.
Jika terjadi error selama Open()
atau Process()
(seperti yang ditunjukkan oleh salah satunya
menampilkan status non-Ok
), operasi grafik dihentikan tanpa panggilan lebih lanjut
dengan metode kalkulator, dan kalkulator itu dihancurkan.
Untuk kalkulator dengan input, framework akan memanggil Process()
setiap kali setidaknya
satu input memiliki satu paket yang tersedia. Framework ini menjamin bahwa semua input memiliki
stempel waktu yang sama, stempel waktu tersebut akan meningkat seiring dengan setiap panggilan ke Process()
dan
bahwa semua paket
dikirim. Akibatnya, beberapa {i>input<i}
mungkin tidak memiliki
paket saat Process()
dipanggil. Masukan yang paketnya
hilang muncul di
menghasilkan paket kosong (tanpa stempel waktu).
Framework ini memanggil Close()
setelah semua panggilan ke Process()
. Semua input akan
telah habis, tetapi Close()
memiliki akses ke paket sisi input dan mungkin
menulis outputnya. Setelah Close menampilkan hasil, kalkulator akan dihancurkan.
Kalkulator tanpa input disebut sebagai sumber. Kalkulator sumber
Process()
akan terus dipanggil selama yang menampilkan status Ok
. J
kalkulator sumber menunjukkan bahwa resource habis dengan menampilkan status perhentian
(yaitu mediaPipe::tool::StatusStop()
.).
Mengidentifikasi {i>input<i} dan {i>output<i}
Antarmuka publik untuk kalkulator
terdiri dari satu set aliran {i>input <i}dan
aliran data output. Dalam CalculatorGraphConfiguration, hasil dari beberapa
kalkulator terhubung ke input kalkulator lain
menggunakan nama kalkulator
feed. Nama aliran data biasanya menggunakan huruf kecil, sedangkan tag input dan output ditulis
biasanya dengan HURUF BESAR. Pada contoh di bawah, output dengan nama tag VIDEO
adalah
terhubung ke input dengan nama tag VIDEO_IN
menggunakan aliran data bernama
video_stream
.
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "INPUT:combined_input"
output_stream: "VIDEO:video_stream"
}
node {
calculator: "SomeVideoCalculator"
input_stream: "VIDEO_IN:video_stream"
output_stream: "VIDEO_OUT:processed_video"
}
Aliran data input dan output dapat diidentifikasi dengan nomor indeks, nama tag, atau
kombinasi nama tag dan nomor indeks. Anda dapat melihat beberapa contoh input dan
ID output dalam contoh di bawah ini. SomeAudioVideoCalculator
mengidentifikasi
output video menurut tag dan output audionya dengan kombinasi tag
Google Cloud. Input dengan tag VIDEO
terhubung ke aliran data yang diberi nama
video_stream
. Output dengan tag AUDIO
serta indeks 0
dan 1
adalah
terhubung ke aliran data bernama audio_left
dan audio_right
.
SomeAudioCalculator
mengidentifikasi input audionya hanya berdasarkan indeks (tidak diperlukan tag).
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "combined_input"
output_stream: "VIDEO:video_stream"
output_stream: "AUDIO:0:audio_left"
output_stream: "AUDIO:1:audio_right"
}
node {
calculator: "SomeAudioCalculator"
input_stream: "audio_left"
input_stream: "audio_right"
output_stream: "audio_energy"
}
Dalam penerapan kalkulator, input dan output juga diidentifikasi berdasarkan tag nama dan nomor indeks. Pada fungsi di bawah ini, input dan output diidentifikasi:
- Dengan nomor indeks: Aliran input gabungan diidentifikasi hanya dengan indeks
0
. - Menurut nama tag: Streaming output video diidentifikasi dengan nama tag "VIDEO".
- Menurut nama tag dan nomor indeks: Streaming audio output diidentifikasi oleh
kombinasi nama tag
AUDIO
serta nomor indeks0
dan1
.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
// SetAny() is used to specify that whatever the type of the
// stream is, it's acceptable. This does not mean that any
// packet is acceptable. Packets in the stream still have a
// particular type. SetAny() has the same effect as explicitly
// setting the type to be the stream's type.
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
return absl::OkStatus();
}
Memproses
Process()
yang dipanggil pada node non-sumber harus menampilkan absl::OkStatus()
ke
menunjukkan bahwa semua berjalan dengan baik, atau kode
status lainnya untuk menandakan adanya {i>error<i}
Jika kalkulator non-sumber menampilkan tool::StatusStop()
, ini menandakan bahwa
grafik ini dibatalkan lebih awal. Dalam hal ini, semua kalkulator
dan grafik sumber
aliran data input akan ditutup (dan Paket yang tersisa akan menyebar melalui
).
Node sumber dalam grafik akan terus memanggil Process()
selama
karena menampilkan absl::OkStatus(
). Untuk menunjukkan bahwa tidak ada lagi data untuk
menghasilkan tool::StatusStop()
hasil. Status lainnya menunjukkan
bahwa {i>error<i} telah
terjadi.
Close()
menampilkan absl::OkStatus()
untuk menunjukkan keberhasilan. Status lainnya
menunjukkan kegagalan.
Berikut adalah fungsi Process()
dasar. Class tersebut menggunakan metode Input()
(yang dapat
hanya digunakan jika kalkulator memiliki satu input) untuk meminta data inputnya. Ini
kemudian menggunakan std::unique_ptr
untuk mengalokasikan
memori yang dibutuhkan untuk paket output,
dan melakukan penghitungan. Setelah selesai, ia melepaskan pointer saat menambahkannya ke
{i>output stream<i}.
absl::Status MyCalculator::Process() {
const Matrix& input = Input()->Get<Matrix>();
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
// do your magic here....
// output->row(n) = ...
Output()->Add(output.release(), InputTimestamp());
return absl::OkStatus();
}
Opsi kalkulator
Kalkulator menerima parameter pemrosesan melalui (1) memasukkan paket aliran data (2)
paket sisi input, dan (3) opsi kalkulator. Opsi kalkulator, jika
yang ditentukan, muncul sebagai nilai literal di kolom node_options
dari kolom
CalculatorGraphConfiguration.Node
pesan.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Kolom node_options
menerima sintaksis proto3. Atau, kalkulator
opsi dapat ditentukan di kolom options
menggunakan sintaksis proto2.
node {
calculator: "TfLiteInferenceCalculator"
input_stream: "TENSORS:main_model_input"
output_stream: "TENSORS:main_model_output"
node_options: {
[type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
model_path: "mediapipe/models/detection_model.tflite"
}
}
}
Tidak semua kalkulator menerima opsi kalkulator. Untuk menerima opsi,
kalkulator biasanya akan menentukan jenis pesan protobuf baru untuk mewakili
opsi, seperti PacketClonerCalculatorOptions
. Kemudian, kalkulator akan
membaca pesan protobuf tersebut dalam metode CalculatorBase::Open
-nya, dan mungkin
juga dalam fungsi CalculatorBase::GetContract
atau
Metode CalculatorBase::Process
. Biasanya, jenis pesan protobuf baru akan
didefinisikan sebagai skema protobuf yang menggunakan ".proto" dan
mediapipe_proto_library()
aturan build.
mediapipe_proto_library(
name = "packet_cloner_calculator_proto",
srcs = ["packet_cloner_calculator.proto"],
visibility = ["//visibility:public"],
deps = [
"//mediapipe/framework:calculator_options_proto",
"//mediapipe/framework:calculator_proto",
],
)
Contoh kalkulator
Bagian ini membahas implementasi PacketClonerCalculator
, yang
melakukan pekerjaan yang relatif sederhana, dan
digunakan dalam banyak grafik kalkulator.
PacketClonerCalculator
hanya menghasilkan salinan paket input terbarunya
sesuai permintaan.
PacketClonerCalculator
berguna saat stempel waktu paket data yang tiba
tidak disejajarkan
dengan sempurna. Misalkan kita memiliki ruangan
dengan mikrofon, lampu
sensor dan kamera video yang
mengumpulkan data sensorik. Setiap sensor
beroperasi secara independen dan mengumpulkan data secara sesekali. Misalkan output
dari setiap sensor adalah:
- mikrofon = kenyaringan dalam desibel suara di dalam ruangan (Bilangan Bulat)
- sensor cahaya = kecerahan ruangan (Bilangan bulat)
- kamera video = bingkai gambar RGB ruangan (ImageFrame)
Pipeline persepsi sederhana kami dirancang untuk memproses data sensoris dari ketiga sehingga ketika kita memiliki data bingkai gambar dari kamera yang disinkronkan dengan data kenyaringan mikrofon dan cahaya yang terakhir dikumpulkan data kecerahan sensor. Untuk melakukannya dengan MediaPipe, pipeline persepsi kami memiliki 3 input stream:
- room_mic_signal - Setiap paket data dalam aliran data input ini adalah data bilangan bulat merepresentasikan seberapa keras audio di ruangan dengan stempel waktu.
- room_lightening_sensor - Setiap paket data dalam aliran input ini adalah bilangan bulat data yang menunjukkan seberapa terang ruangan yang diterangi dengan stempel waktu.
- room_video_tick_signal - Setiap paket data dalam aliran data input ini {i>imageframe<i} data video yang mewakili video yang dikumpulkan dari kamera di dengan stempel waktu.
Berikut adalah implementasi PacketClonerCalculator
. Anda dapat melihat
Metode GetContract()
, Open()
, dan Process()
, serta instance
variabel current_
yang menyimpan paket input terbaru.
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
// cc->Inputs().NumEntries() returns the number of input streams
// for the PacketClonerCalculator
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
// cc->Inputs().Index(i) returns the input stream pointer by index
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final {
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
// Sets the output stream of index i header to be the same as
// the header for the input stream of index i
}
}
return absl::OkStatus();
}
absl::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output if the tick signal is non-empty.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
// Add a packet to output stream of index i a packet from inputstream i
// with timestamp common to all present inputs
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
// if current_[i], 1 packet buffer for input stream i is empty, we will set
// next allowed timestamp for input stream i to be current timestamp + 1
}
}
}
return absl::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
Biasanya, kalkulator hanya memiliki file .cc. Tidak perlu .h, karena mediapipe menggunakan pendaftaran agar kalkulator mengetahuinya. Setelah Anda memiliki menentukan class kalkulator Anda, daftarkan dengan pemanggilan makro REGISTER_CALCULATOR(calculator_class_name).
Di bawah ini adalah grafik sederhana MediaPipe yang memiliki 3 aliran input, 1 node (PacketClonerCalculator) dan 2 streaming output.
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
node {
calculator: "PacketClonerCalculator"
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
output_stream: "cloned_room_mic_signal"
output_stream: "cloned_lighting_sensor"
}
Diagram di bawah menunjukkan cara PacketClonerCalculator
menentukan output-nya
paket (bawah) berdasarkan rangkaian paket inputnya (atas).
Setiap kali menerima paket di aliran input TICK-nya, PacketClonerCalculator menghasilkan output paket terbaru dari setiap aliran inputnya. Urutan paket output (bawah) ditentukan oleh rangkaian paket input (atas) dan stempel waktunya. Stempel waktu ditampilkan di sepanjang sisi kanan diagram. |