Kalkulator

Setiap kalkulator adalah simpul grafik. Kami menjelaskan cara membuat kalkulator baru, cara menginisialisasi kalkulator, cara melakukan penghitungan, aliran input dan output, stempel waktu, dan opsinya. Setiap node dalam grafik diimplementasikan sebagai Calculator. Sebagian besar eksekusi grafik terjadi di dalam kalkulatornya. Kalkulator dapat menerima nol atau beberapa aliran input dan/atau paket samping serta menghasilkan nol atau lebih aliran output dan/atau paket samping.

CalculatorBase

Kalkulator dibuat dengan menentukan subclass baru dari class CalculatorBase, mengimplementasikan sejumlah metode, dan mendaftarkan subclass baru tersebut dengan Mediapipe. Setidaknya, kalkulator baru harus mengimplementasikan empat metode di bawah ini

  • GetContract()
    • Penulis kalkulator dapat menentukan jenis input dan output kalkulator yang diharapkan di GetContract(). Saat grafik diinisialisasi, framework akan memanggil metode statis untuk memverifikasi apakah jenis paket dari input dan output yang terhubung cocok dengan informasi dalam spesifikasi ini.
  • Open()
    • Setelah grafik dimulai, framework akan memanggil Open(). Paket sisi input tersedia untuk kalkulator pada saat ini. Open() menafsirkan operasi konfigurasi node (lihat Grafik) dan menyiapkan status per grafik yang dijalankan pada kalkulator. Fungsi ini juga dapat menulis paket ke output kalkulator. Error selama Open() dapat menghentikan jalan grafik.
  • Process()
    • Untuk kalkulator dengan input, framework memanggil Process() berulang kali setiap kali setidaknya satu aliran input memiliki paket yang tersedia. Framework secara default menjamin bahwa semua input memiliki stempel waktu yang sama (lihat Sinkronisasi untuk informasi selengkapnya). Beberapa panggilan Process() dapat dipanggil secara bersamaan saat eksekusi paralel diaktifkan. Jika terjadi error selama Process(), framework akan memanggil Close() dan grafik yang dijalankan akan dihentikan.
  • Close()
    • Setelah semua panggilan ke Process() selesai atau saat semua aliran input ditutup, framework akan memanggil Close(). Fungsi ini selalu dipanggil jika Open() dipanggil dan berhasil, dan meskipun grafik berjalan dihentikan karena terjadi error. Tidak ada input yang tersedia melalui aliran input apa pun selama Close(), tetapi masih memiliki akses ke paket samping input, sehingga dapat menulis output. Setelah Close() ditampilkan, kalkulator harus dianggap sebagai node mati. Objek kalkulator dihancurkan segera setelah grafik selesai berjalan.

Berikut adalah cuplikan kode dari CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

Cara kerja kalkulator

Selama inisialisasi grafik MediaPipe, framework memanggil metode statis GetContract() untuk menentukan jenis paket yang diharapkan.

Framework ini membuat dan menghancurkan seluruh kalkulator untuk setiap grafik yang dijalankan (misalnya, sekali per video atau sekali per gambar). Objek yang mahal atau besar yang tetap konstan di seluruh operasi grafik harus disediakan sebagai paket sisi input sehingga perhitungan tidak diulang pada operasi berikutnya.

Setelah inisialisasi, untuk setiap pengoperasian grafik, urutan berikut akan terjadi:

  • Open()
  • Process() (berulang)
  • Close()

Framework ini memanggil Open() untuk menginisialisasi kalkulator. Open() harus menafsirkan opsi apa pun dan menyiapkan status per grafik yang dijalankan pada kalkulator. Open() dapat memperoleh paket samping input dan menulis paket ke output kalkulator. Jika sesuai, aplikasi harus memanggil SetOffset() untuk mengurangi potensi buffering paket pada aliran input.

Jika terjadi error selama Open() atau Process() (seperti yang ditunjukkan oleh salah satu dari mereka yang menampilkan status non-Ok), proses grafik akan dihentikan tanpa panggilan lebih lanjut ke metode kalkulator, dan kalkulator akan dihancurkan.

Untuk kalkulator dengan input, framework akan memanggil Process() setiap kali setidaknya satu input memiliki paket yang tersedia. Framework ini menjamin bahwa semua input memiliki stempel waktu yang sama, stempel waktu akan meningkat seiring dengan setiap panggilan ke Process() dan semua paket dikirimkan. Akibatnya, beberapa input mungkin tidak memiliki paket apa pun saat Process() dipanggil. Input yang paketnya hilang tampaknya menghasilkan paket kosong (tanpa stempel waktu).

Framework akan memanggil Close() setelah semua panggilan ke Process(). Semua input akan habis, tetapi Close() memiliki akses ke paket samping input dan dapat menulis output. Setelah Close ditampilkan, kalkulator akan dihancurkan.

Kalkulator tanpa {i>input<i} disebut sebagai sumber. Kalkulator sumber tetap memiliki Process() yang dipanggil selama menampilkan status Ok. Kalkulator sumber menunjukkan bahwa kalkulator habis dengan menampilkan status perhentian (yaitu mediaPipe::tool::StatusStop().).

Mengidentifikasi input dan output

Antarmuka publik pada kalkulator terdiri dari sekumpulan aliran input dan aliran output. Pada CalculatorGraphConfiguration, output dari beberapa kalkulator terhubung ke input kalkulator lain menggunakan aliran bernama. Nama aliran data biasanya huruf kecil, sedangkan tag input dan output biasanya HURUF BESAR. Pada contoh di bawah, output dengan nama tag VIDEO terhubung ke input dengan nama tag VIDEO_IN menggunakan aliran data bernama video_stream.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Aliran input dan output dapat diidentifikasi berdasarkan nomor indeks, nama tag, atau kombinasi nama tag dan nomor indeks. Anda dapat melihat beberapa contoh ID input dan output pada contoh di bawah ini. SomeAudioVideoCalculator mengidentifikasi output videonya berdasarkan tag dan output audionya berdasarkan kombinasi tag dan indeks. Input dengan tag VIDEO terhubung ke aliran data bernama video_stream. Output dengan tag AUDIO serta indeks 0 dan 1 terhubung ke aliran data bernama audio_left dan audio_right. SomeAudioCalculator mengidentifikasi input audionya hanya berdasarkan indeks (tidak diperlukan tag).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

Dalam implementasi kalkulator, input dan output juga diidentifikasi oleh nama tag dan nomor indeks. Pada fungsi di bawah ini, input dan output diidentifikasi:

  • Menurut nomor indeks: Aliran input gabungan diidentifikasi hanya berdasarkan indeks 0.
  • Menurut nama tag: Aliran output video diidentifikasi berdasarkan nama tag "VIDEO".
  • Berdasarkan nama tag dan nomor indeks: Streaming audio output diidentifikasi dengan kombinasi nama tag AUDIO serta nomor indeks 0 dan 1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Memproses

Process() yang dipanggil pada node non-sumber harus menampilkan absl::OkStatus() untuk menunjukkan bahwa semuanya berjalan dengan baik, atau kode status lainnya untuk menandakan error

Jika kalkulator non-sumber menampilkan tool::StatusStop(), ini menandakan grafik dibatalkan lebih awal. Dalam hal ini, semua kalkulator sumber dan aliran input grafik akan ditutup (dan Paket yang tersisa akan disebarkan melalui grafik).

Node sumber dalam grafik akan terus memanggil Process() selama menampilkan absl::OkStatus(). Untuk menunjukkan bahwa tidak ada lagi data yang akan dihasilkan, tampilkan tool::StatusStop(). Status lainnya menunjukkan bahwa telah terjadi error.

Close() menampilkan absl::OkStatus() untuk menunjukkan keberhasilan. Status lainnya menunjukkan kegagalan.

Berikut adalah fungsi Process() dasar. Contoh ini menggunakan metode Input() (yang hanya dapat digunakan jika kalkulator memiliki satu input) untuk meminta data inputnya. Selanjutnya, std::unique_ptr akan digunakan untuk mengalokasikan memori yang diperlukan oleh paket output, dan melakukan penghitungan. Setelah selesai, pointer akan dilepas saat menambahkannya ke streaming output.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Opsi kalkulator

Kalkulator menerima parameter pemrosesan melalui (1) paket aliran input (2) paket sisi input, dan (3) opsi kalkulator. Opsi kalkulator, jika ditentukan, muncul sebagai nilai literal di kolom node_options dari pesan CalculatorGraphConfiguration.Node.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Kolom node_options menerima sintaksis proto3. Atau, opsi kalkulator dapat ditentukan dalam kolom options menggunakan sintaksis proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Tidak semua kalkulator menerima opsi kalkulator. Untuk menerima opsi, kalkulator biasanya akan menentukan jenis pesan protobuf baru untuk mewakili opsinya, seperti PacketClonerCalculatorOptions. Kalkulator kemudian akan membaca pesan protobuf tersebut dalam metode CalculatorBase::Open, dan mungkin juga dalam fungsi CalculatorBase::GetContract atau metode CalculatorBase::Process. Biasanya, jenis pesan protobuf baru akan ditentukan sebagai skema protobuf menggunakan file ".proto" dan aturan build mediapipe_proto_library().

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Kalkulator contoh

Bagian ini membahas implementasi PacketClonerCalculator, yang melakukan tugas yang relatif sederhana, dan digunakan di banyak grafik kalkulator. PacketClonerCalculator hanya menghasilkan salinan paket input terbaru sesuai permintaan.

PacketClonerCalculator berguna saat stempel waktu paket data yang masuk tidak diselaraskan dengan sempurna. Misalkan kita memiliki ruangan dengan mikrofon, sensor cahaya, dan kamera video yang mengumpulkan data sensorik. Setiap sensor beroperasi secara independen dan mengumpulkan data secara bergantian. Misalkan output dari setiap sensor:

  • mikrofon = kenyaringan dalam desibel suara di ruangan (Bilangan Bulat)
  • sensor cahaya = kecerahan ruangan (Bilangan bulat)
  • kamera video = bingkai gambar RGB dalam ruangan (ImageFrame)

Pipeline persepsi sederhana kita dirancang untuk memproses data sensorik dari 3 sensor ini, sehingga kapan saja saat kita memiliki data frame gambar dari kamera yang disinkronkan dengan data kenyaringan mikrofon yang terakhir dikumpulkan dan data kecerahan sensor cahaya. Untuk melakukannya dengan MediaPipe, pipeline persepsi kita memiliki 3 aliran input:

  • room_mic_signal - Setiap paket data dalam aliran input ini adalah data bilangan bulat yang mewakili seberapa keras audio di ruang dengan stempel waktu.
  • room_lightening_sensor - Setiap paket data dalam aliran input ini adalah data bilangan bulat yang mewakili seberapa terang ruangan yang diterangi dengan stempel waktu.
  • room_video_tick_signal - Setiap paket data dalam aliran input ini adalah imageframe data video yang mewakili video yang dikumpulkan dari kamera di ruang dengan stempel waktu.

Berikut adalah implementasi PacketClonerCalculator. Anda dapat melihat metode GetContract(), Open(), dan Process() serta variabel instance current_ yang menyimpan paket input terbaru.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

Biasanya, kalkulator hanya memiliki file .cc. Tidak diperlukan {i>.h<i}, karena mediapipe menggunakan pendaftaran untuk membuat kalkulator mengetahuinya. Setelah menentukan class kalkulator, daftarkan class dengan panggilan makro kumpulan_CALCULATOR(calculator_class_name).

Di bawah ini adalah grafik MediaPipe sederhana yang memiliki 3 aliran input, 1 node (PacketClonerCalculator), dan 2 aliran output.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

Diagram di bawah menunjukkan cara PacketClonerCalculator menentukan paket outputnya (bawah) berdasarkan rangkaian paket inputnya (atas).

Membuat grafik menggunakan PacketClonerCalculator
Setiap kali menerima paket pada aliran input TICK-nya, PacketClonerCalculator menghasilkan paket terbaru dari setiap aliran inputnya. Urutan paket output (bawah) ditentukan oleh urutan paket input (atas) dan stempel waktunya. Stempel waktu ditampilkan di sepanjang sisi kanan diagram.