Kalkulator

Setiap kalkulator adalah simpul dari suatu grafik. Kami menjelaskan cara membuat kalkulator, cara menginisialisasi kalkulator, cara melakukan perhitungan, aliran data input dan output, stempel waktu, dan opsi. Setiap {i>node<i} dalam grafik tersebut diimplementasikan sebagai Calculator. Sebagian besar eksekusi grafik terjadi di dalam kalkulator. Kalkulator dapat menerima nol atau beberapa aliran input dan/atau sisi paket dan menghasilkan nol atau lebih aliran {i>output<i} dan/atau paket samping.

CalculatorBase

Kalkulator dibuat dengan mendefinisikan {i>sub-class <i}baru dari CalculatorBase mengimplementasikan sejumlah metode, dan mendaftarkan sub-class baru dengan dan Mediapipe. Minimal, kalkulator baru harus menerapkan empat metode di bawah

  • GetContract()
    • Penulis kalkulator dapat menentukan jenis input dan output yang diharapkan kalkulator di GetContract(). Ketika grafik diinisialisasi, kerangka kerja memanggil metode statis untuk memverifikasi apakah jenis paket untuk input dan output yang terhubung sesuai dengan informasi dalam spesifikasi pendukung.
  • Open()
    • Setelah grafik dimulai, framework akan memanggil Open(). Sisi masukan paket tersedia untuk kalkulator pada tahap ini. Open() menginterpretasikan operasi konfigurasi node (lihat Graphs) dan menyiapkan status kalkulator per-grafik-run. Fungsi ini mungkin juga menulis paket ke {i>output<i} kalkulator. Error selama Open() dapat menghentikan proses grafik.
  • Process()
    • Untuk kalkulator dengan input, framework memanggil Process() berulang kali setiap kali setidaknya satu aliran input memiliki paket yang tersedia. Kerangka kerja secara default menjamin bahwa semua input memiliki stempel waktu yang sama (lihat Sinkronisasi untuk informasi selengkapnya). Beberapa Panggilan Process() dapat dipanggil secara bersamaan saat eksekusi paralel diaktifkan. Jika terjadi error selama Process(), framework akan memanggil Close() dan operasi grafik akan berakhir.
  • Close()
    • Setelah semua panggilan ke Process() selesai atau saat semua streaming input ditutup, framework memanggil Close(). Fungsi ini selalu dipanggil jika Open() dipanggil dan berhasil, meskipun operasi grafik dihentikan karena adanya {i>error<i}. Tidak ada input yang tersedia melalui aliran input apa pun selama Close(), tetapi masih memiliki akses ke paket sisi input dan sehingga dapat menulis output. Setelah Close() ditampilkan, kalkulator harus dianggap sebagai simpul mati. Objek kalkulator dihancurkan sebagai segera setelah grafik selesai berjalan.

Berikut ini adalah cuplikan kode dari CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

Cara kerja kalkulator

Selama inisialisasi grafik MediaPipe, framework memanggil Metode statis GetContract() untuk menentukan jenis paket yang diharapkan.

Framework ini membuat dan menghancurkan seluruh kalkulator untuk setiap proses grafik (misalnya, sekali per video atau sekali per gambar). Benda mahal atau besar yang tersisa di seluruh grafik yang berjalan harus disediakan sebagai paket sisi masukan sehingga penghitungan tidak diulang pada operasi berikutnya.

Setelah inisialisasi, untuk setiap pengoperasian grafik, urutan berikut akan terjadi:

  • Open()
  • Process() (berulang kali)
  • Close()

Framework ini memanggil Open() untuk melakukan inisialisasi kalkulator. Open() seharusnya menafsirkan opsi apa pun dan mengatur status per-grafik-run di kalkulator. Open() dapat memperoleh paket sisi masukan dan menulis paket ke {i>output<i} kalkulator. Jika sesuai, fungsi ini harus memanggil SetOffset() untuk mengurangi potensi buffering paket aliran input.

Jika terjadi error selama Open() atau Process() (seperti yang ditunjukkan oleh salah satunya menampilkan status non-Ok), operasi grafik dihentikan tanpa panggilan lebih lanjut dengan metode kalkulator, dan kalkulator itu dihancurkan.

Untuk kalkulator dengan input, framework akan memanggil Process() setiap kali setidaknya satu input memiliki satu paket yang tersedia. Framework ini menjamin bahwa semua input memiliki stempel waktu yang sama, stempel waktu tersebut akan meningkat seiring dengan setiap panggilan ke Process() dan bahwa semua paket dikirim. Akibatnya, beberapa {i>input<i} mungkin tidak memiliki paket saat Process() dipanggil. Masukan yang paketnya hilang muncul di menghasilkan paket kosong (tanpa stempel waktu).

Framework ini memanggil Close() setelah semua panggilan ke Process(). Semua input akan telah habis, tetapi Close() memiliki akses ke paket sisi input dan mungkin menulis outputnya. Setelah Close menampilkan hasil, kalkulator akan dihancurkan.

Kalkulator tanpa input disebut sebagai sumber. Kalkulator sumber Process() akan terus dipanggil selama yang menampilkan status Ok. J kalkulator sumber menunjukkan bahwa resource habis dengan menampilkan status perhentian (yaitu mediaPipe::tool::StatusStop().).

Mengidentifikasi {i>input<i} dan {i>output<i}

Antarmuka publik untuk kalkulator terdiri dari satu set aliran {i>input <i}dan aliran data output. Dalam CalculatorGraphConfiguration, hasil dari beberapa kalkulator terhubung ke input kalkulator lain menggunakan nama kalkulator feed. Nama aliran data biasanya menggunakan huruf kecil, sedangkan tag input dan output ditulis biasanya dengan HURUF BESAR. Pada contoh di bawah, output dengan nama tag VIDEO adalah terhubung ke input dengan nama tag VIDEO_IN menggunakan aliran data bernama video_stream.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Aliran data input dan output dapat diidentifikasi dengan nomor indeks, nama tag, atau kombinasi nama tag dan nomor indeks. Anda dapat melihat beberapa contoh input dan ID output dalam contoh di bawah ini. SomeAudioVideoCalculator mengidentifikasi output video menurut tag dan output audionya dengan kombinasi tag Google Cloud. Input dengan tag VIDEO terhubung ke aliran data yang diberi nama video_stream. Output dengan tag AUDIO serta indeks 0 dan 1 adalah terhubung ke aliran data bernama audio_left dan audio_right. SomeAudioCalculator mengidentifikasi input audionya hanya berdasarkan indeks (tidak diperlukan tag).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

Dalam penerapan kalkulator, input dan output juga diidentifikasi berdasarkan tag nama dan nomor indeks. Pada fungsi di bawah ini, input dan output diidentifikasi:

  • Dengan nomor indeks: Aliran input gabungan diidentifikasi hanya dengan indeks 0.
  • Menurut nama tag: Streaming output video diidentifikasi dengan nama tag "VIDEO".
  • Menurut nama tag dan nomor indeks: Streaming audio output diidentifikasi oleh kombinasi nama tag AUDIO serta nomor indeks 0 dan 1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Memproses

Process() yang dipanggil pada node non-sumber harus menampilkan absl::OkStatus() ke menunjukkan bahwa semua berjalan dengan baik, atau kode status lainnya untuk menandakan adanya {i>error<i}

Jika kalkulator non-sumber menampilkan tool::StatusStop(), ini menandakan bahwa grafik ini dibatalkan lebih awal. Dalam hal ini, semua kalkulator dan grafik sumber aliran data input akan ditutup (dan Paket yang tersisa akan menyebar melalui ).

Node sumber dalam grafik akan terus memanggil Process() selama karena menampilkan absl::OkStatus(). Untuk menunjukkan bahwa tidak ada lagi data untuk menghasilkan tool::StatusStop() hasil. Status lainnya menunjukkan bahwa {i>error<i} telah terjadi.

Close() menampilkan absl::OkStatus() untuk menunjukkan keberhasilan. Status lainnya menunjukkan kegagalan.

Berikut adalah fungsi Process() dasar. Class tersebut menggunakan metode Input() (yang dapat hanya digunakan jika kalkulator memiliki satu input) untuk meminta data inputnya. Ini kemudian menggunakan std::unique_ptr untuk mengalokasikan memori yang dibutuhkan untuk paket output, dan melakukan penghitungan. Setelah selesai, ia melepaskan pointer saat menambahkannya ke {i>output stream<i}.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Opsi kalkulator

Kalkulator menerima parameter pemrosesan melalui (1) memasukkan paket aliran data (2) paket sisi input, dan (3) opsi kalkulator. Opsi kalkulator, jika yang ditentukan, muncul sebagai nilai literal di kolom node_options dari kolom CalculatorGraphConfiguration.Node pesan.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Kolom node_options menerima sintaksis proto3. Atau, kalkulator opsi dapat ditentukan di kolom options menggunakan sintaksis proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Tidak semua kalkulator menerima opsi kalkulator. Untuk menerima opsi, kalkulator biasanya akan menentukan jenis pesan protobuf baru untuk mewakili opsi, seperti PacketClonerCalculatorOptions. Kemudian, kalkulator akan membaca pesan protobuf tersebut dalam metode CalculatorBase::Open-nya, dan mungkin juga dalam fungsi CalculatorBase::GetContract atau Metode CalculatorBase::Process. Biasanya, jenis pesan protobuf baru akan didefinisikan sebagai skema protobuf yang menggunakan ".proto" dan mediapipe_proto_library() aturan build.

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Contoh kalkulator

Bagian ini membahas implementasi PacketClonerCalculator, yang melakukan pekerjaan yang relatif sederhana, dan digunakan dalam banyak grafik kalkulator. PacketClonerCalculator hanya menghasilkan salinan paket input terbarunya sesuai permintaan.

PacketClonerCalculator berguna saat stempel waktu paket data yang tiba tidak disejajarkan dengan sempurna. Misalkan kita memiliki ruangan dengan mikrofon, lampu sensor dan kamera video yang mengumpulkan data sensorik. Setiap sensor beroperasi secara independen dan mengumpulkan data secara sesekali. Misalkan output dari setiap sensor adalah:

  • mikrofon = kenyaringan dalam desibel suara di dalam ruangan (Bilangan Bulat)
  • sensor cahaya = kecerahan ruangan (Bilangan bulat)
  • kamera video = bingkai gambar RGB ruangan (ImageFrame)

Pipeline persepsi sederhana kami dirancang untuk memproses data sensoris dari ketiga sehingga ketika kita memiliki data bingkai gambar dari kamera yang disinkronkan dengan data kenyaringan mikrofon dan cahaya yang terakhir dikumpulkan data kecerahan sensor. Untuk melakukannya dengan MediaPipe, pipeline persepsi kami memiliki 3 input stream:

  • room_mic_signal - Setiap paket data dalam aliran data input ini adalah data bilangan bulat merepresentasikan seberapa keras audio di ruangan dengan stempel waktu.
  • room_lightening_sensor - Setiap paket data dalam aliran input ini adalah bilangan bulat data yang menunjukkan seberapa terang ruangan yang diterangi dengan stempel waktu.
  • room_video_tick_signal - Setiap paket data dalam aliran data input ini {i>imageframe<i} data video yang mewakili video yang dikumpulkan dari kamera di dengan stempel waktu.

Berikut adalah implementasi PacketClonerCalculator. Anda dapat melihat Metode GetContract(), Open(), dan Process(), serta instance variabel current_ yang menyimpan paket input terbaru.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

Biasanya, kalkulator hanya memiliki file .cc. Tidak perlu .h, karena mediapipe menggunakan pendaftaran agar kalkulator mengetahuinya. Setelah Anda memiliki menentukan class kalkulator Anda, daftarkan dengan pemanggilan makro REGISTER_CALCULATOR(calculator_class_name).

Di bawah ini adalah grafik sederhana MediaPipe yang memiliki 3 aliran input, 1 node (PacketClonerCalculator) dan 2 streaming output.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

Diagram di bawah menunjukkan cara PacketClonerCalculator menentukan output-nya paket (bawah) berdasarkan rangkaian paket inputnya (atas).

Membuat grafik menggunakan PacketClonerCalculator
Setiap kali menerima paket di aliran input TICK-nya, PacketClonerCalculator menghasilkan output paket terbaru dari setiap aliran inputnya. Urutan paket output (bawah) ditentukan oleh rangkaian paket input (atas) dan stempel waktunya. Stempel waktu ditampilkan di sepanjang sisi kanan diagram.