Hesap Makineleri

Her hesap makinesi bir grafiğin düğümüdür. Yeni bir hesap makinesinin nasıl oluşturulacağını, bir hesap makinesinin nasıl başlatılacağını, hesaplamaların nasıl yapılacağını, giriş-çıkış akışlarını, zaman damgalarını ve seçenekleri açıklıyoruz. Grafikteki her düğüm bir Calculator olarak uygulanır. Grafik yürütme toplu işlem, hesap makinelerinde gerçekleşir. Bir hesap makinesi sıfır veya daha fazla giriş akışı ve/veya yan paket alabilir ve sıfır ya da daha fazla çıkış akışı ve/veya yan paket üretir.

CalculatorBase

CalculatorBase sınıfının yeni bir alt sınıfı tanımlanarak, çeşitli yöntemler uygulayarak ve yeni alt sınıfı Mediapipe'a kaydederek bir hesap makinesi oluşturulur. Yeni bir hesap makinesi en azından aşağıdaki dört yöntemi uygulamalıdır

  • GetContract()
    • Hesap makinesi yazarları, GetContract() işlevinde bir hesap makinesinin beklenen giriş ve çıkış türlerini belirtebilir. Bir grafik başlatıldığında çerçeve, bağlı giriş ve çıkışların paket türlerinin bu spesifikasyondaki bilgilerle eşleşip eşleşmediğini doğrulamak için statik bir yöntemi çağırır.
  • Open()
    • Grafik başladıktan sonra çerçeve Open() çağırır. Giriş yan paketleri bu noktada hesaplayıcıda kullanılabilir. Open(), düğüm yapılandırma işlemlerini yorumlar (Grafikler bölümüne bakın) ve hesap makinesinin grafik çalıştırma başına durumunu hazırlar. Bu işlev, hesap makinesi çıkışlarına da paket yazabilir. Open() sırasındaki bir hata, grafiğin sonlandırılmasına yol açabilir.
  • Process()
    • Girişleri olan bir hesap makinesinde, en az bir giriş akışında kullanılabilir paket olduğunda çerçeve Process() işlemini art arda çağırır. Çerçeve, varsayılan olarak tüm girişlerin aynı zaman damgasına sahip olmasını garanti eder (daha fazla bilgi için Senkronizasyon bölümüne bakın). Paralel yürütme etkinleştirildiğinde aynı anda birden fazla Process() çağrısı yapılabilir. Process() sırasında bir hata oluşursa çerçeve Close() çağrısı yapar ve grafik çalışması sonlandırılır.
  • Close()
    • Tüm Process() çağrıları tamamlandıktan veya tüm giriş akışları kapatıldığında çerçeve Close() yöntemini çağırır. Bu işlev, Open() çağrılıp başarılı olduysa ve grafik çalıştırma bir hata nedeniyle sonlandırılsa bile her zaman çağrılır. Close() sırasında herhangi bir giriş akışı üzerinden giriş yok ancak giriş tarafı paketlerine erişimi devam ettiği için çıkışlar yazabilir. Close() geri döndüğünde hesap makinesi, ölü düğüm olarak kabul edilmelidir. Grafik çalışması biter bitmez hesap makinesi nesnesi yok edilir.

Aşağıda CalculatorBase.h kod snippet'lerini görebilirsiniz.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

Hesap makinesinin ömrü

Bir MediaPipe grafiğinin başlatılması sırasında çerçeve, ne tür paketlerin beklendiğini belirlemek için bir GetContract() statik yöntemini çağırır.

Çerçeve, çalıştırılan her grafik için hesap makinesinin tamamını oluşturur ve yok eder (ör. video başına veya resim başına bir kez). Grafik çalıştırmalarında sabit kalan pahalı veya büyük nesneler, sonraki çalıştırmalarda hesaplamaların tekrarlanmaması için giriş tarafı paketleri olarak sağlanmalıdır.

Başlatma işleminden sonra, grafiğin her çalıştırması için aşağıdaki sıra oluşur:

  • Open()
  • Process() (tekrar tekrar)
  • Close()

Çerçeve, hesap makinesini başlatmak için Open() yöntemini çağırır. Open() tüm seçenekleri yorumlamalı ve hesap makinesinin her grafik çalışma durumunu ayarlamalıdır. Open(), giriş tarafı paketlerini elde edebilir ve hesap makinesi çıkışlarına paket yazabilir. Uygunsa giriş akışlarının olası paket arabelleğe alınmasını azaltmak için SetOffset() yöntemini çağırmalıdır.

Open() veya Process() sırasında bir hata oluşursa (bunlardan birinin Ok olmayan bir durum döndürmesiyle belirtildiği gibi) grafik çalıştırması, hesap makinesinin yöntemlerine başka çağrı yapılmadan sonlandırılır ve hesap makinesi kaldırılır.

Girişleri olan bir hesap makinesinde, en az bir girişin kullanılabilir paketi olduğunda çerçeve Process() yöntemini çağırır. Çerçeve, tüm girişlerin aynı zaman damgasına sahip olmasını, zaman damgalarının Process() öğesine yapılan her çağrıda artacağını ve tüm paketlerin teslim edilmesini garanti eder. Sonuç olarak, Process() çağrıldığında bazı girişlerin paketi olmayabilir. Paketi eksik olan bir giriş, zaman damgası olmadan boş bir paket oluşturuyormuş gibi görünür.

Çerçeve, tüm Process() çağrılarından sonra Close() yöntemini çağırır. Tüm girişler tükendi ancak Close(), giriş tarafı paketlerine erişebilir ve çıkış yazabilir. Close'un geri döndükten sonra hesap makinesi kaldırılır.

Giriş içermeyen hesap makineleri kaynak olarak adlandırılır. Kaynak hesaplayıcı, Ok durumu döndürdüğü sürece Process() çağrılmaya devam eder. Kaynak hesaplayıcı, bir durdurma durumu (ör. mediaPipe::tool::StatusStop()) döndürülerek tüketildiğini gösterir.

Giriş ve çıkışları tanımlama

Hesap makinesinin herkese açık arayüzü, bir dizi giriş ve çıkış akışından oluşur. CalculatorGraphConfiguration sisteminde bazı hesap makinelerinden elde edilen çıkışlar, adlandırılmış akışlar kullanılarak diğer hesap makinelerinin girişlerine bağlanır. Akış adları normalde küçük harfle yazılır. Giriş ve çıkış etiketleri ise normalde BÜYÜK HARF olur. Aşağıdaki örnekte VIDEO etiket adına sahip çıkış, video_stream adlı akış kullanılarak VIDEO_IN etiketli girişe bağlanmıştır.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Giriş ve çıkış akışları; dizin numarası, etiket adı veya etiket adı ve dizin numarasının bir kombinasyonuyla tanımlanabilir. Aşağıdaki örnekte giriş ve çıkış tanımlayıcılarıyla ilgili bazı örnekler görebilirsiniz. SomeAudioVideoCalculator, video çıkışını etikete göre, ses çıkışlarını ise etiket ve dizin kombinasyonuyla tanımlar. VIDEO etiketli giriş, video_stream adlı akışa bağlı. AUDIO etiketine ve 0 ile 1 dizinlerine sahip çıkışlar, audio_left ve audio_right adlı akışlara bağlanmış. SomeAudioCalculator, ses girişlerini yalnızca dizine göre tanımlar (etiket gerekmez).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

Hesap makinesi uygulamasında girişler ve çıkışlar, etiket adı ve dizin numarasıyla da tanımlanır. Aşağıdaki fonksiyonda girdi ve çıktı tanımlanmaktadır:

  • Dizin numarasına göre: Birleştirilmiş giriş akışı, basitçe 0 dizini ile tanımlanır.
  • Etiket adına göre: Video çıkış akışı, "VIDEO" etiket adıyla tanımlanır.
  • Etiket adına ve dizin numarasına göre: Çıkış ses akışları, AUDIO etiket adı ve 0 ile 1 dizin numaralarının kombinasyonuyla tanımlanır.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

İşleniyor

Kaynak olmayan bir düğümde çağrılan Process(), her şeyin yolunda gittiğini belirtmek için absl::OkStatus() ya da hata sinyali verecek başka bir durum kodu döndürmelidir

Kaynak olmayan bir hesap makinesi tool::StatusStop() değerini döndürürse bu, grafiğin erken iptal edildiğini gösterir. Bu durumda, tüm kaynak hesaplayıcılar ve grafik giriş akışları kapatılır (ve kalan Paketler grafik aracılığıyla yayılır).

Grafikteki kaynak düğüm, absl::OkStatus( değerini döndürdüğü sürece Process() tarafından çağrılmaya devam edecektir. Bu, tool::StatusStop() döndürülecek başka veri olmadığını belirtmek için kullanılır. Diğer tüm durumlar bir hata oluştuğunu gösterir.

Close(), başarılı olduğunu belirtmek için absl::OkStatus() değerini döndürür. Diğer herhangi bir durum ise bir hata olduğunu gösterir.

Temel Process() işlevi şu şekildedir. Giriş verilerini istemek için Input() yöntemini (yalnızca hesap makinesinde tek bir giriş olduğunda kullanılabilir) kullanır. Ardından çıkış paketi için gereken belleği ayırmak üzere std::unique_ptr kullanır ve hesaplamaları yapar. İşlem tamamlandığında işaretçiyi çıkış akışına eklerken serbest bırakır.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Hesap makinesi seçenekleri

Hesaplayıcılar, işleme parametrelerini (1) giriş akışı paketleri (2) giriş tarafı paketleri ve (3) hesap makinesi seçenekleri üzerinden kabul eder. Hesap makinesi seçenekleri (belirtilmişse) CalculatorGraphConfiguration.Node mesajının node_options alanında değişmez değerler olarak görünür.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

node_options alanı proto3 söz dizimini kabul eder. Alternatif olarak, hesap makinesi seçenekleri proto2 söz dizimi kullanılarak options alanında belirtilebilir.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Bazı hesap makineleri hesap makinesi seçeneklerini kabul etmez. Hesap makinesi, seçenekleri kabul etmek için normalde seçenekleri temsil eden yeni bir protokol mesajı türü tanımlar (ör. PacketClonerCalculatorOptions). Hesap makinesi daha sonra bu protobuf mesajını CalculatorBase::Open yönteminde ve muhtemelen CalculatorBase::GetContract işlevinde veya CalculatorBase::Process yönteminde okur. Normalde yeni protobuf mesaj türü, ".proto" dosyası ve mediapipe_proto_library() derleme kuralı kullanılarak protobuf şeması olarak tanımlanır.

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Örnek hesap makinesi

Bu bölümde, nispeten basit bir iş yapan ve birçok hesap makinesi grafiğinde kullanılan PacketClonerCalculator uygulaması ele alınmaktadır. PacketClonerCalculator, isteğe bağlı olarak en son giriş paketlerinin bir kopyasını oluşturur.

PacketClonerCalculator, gelen veri paketlerinin zaman damgaları mükemmel bir şekilde hizalanmadığında kullanışlıdır. Duyusal veriler toplayan bir mikrofon, ışık sensörü ve video kamera bulunan bir odamız olduğunu varsayalım. Sensörlerin her biri bağımsız olarak çalışır ve aralıklı olarak veri toplar. Her sensörün çıkışının şu şekilde olduğunu varsayalım:

  • mikrofon = odadaki sesin desibel cinsinden yüksekliği (Tam sayı)
  • ışık sensörü = odanın parlaklığı (Tam sayı)
  • video kamera = odanın RGB resim çerçevesi (ImageFrame)

Basit algılama ardışık düzenimiz, bu 3 sensörden gelen duyusal verileri işleyecek şekilde tasarlanmıştır. Kameradan alınan görüntü çerçevesi verileri, en son toplanan mikrofon ses yüksekliği verileri ve ışık sensörü parlaklık verileriyle senkronize edilir. MediaPipe ile bunu yapmak için algı ardışık düzenimizin 3 giriş akışı vardır:

  • Room_mic_signal - Bu giriş akışındaki her veri paketi, zaman damgasıyla bir odada sesin ne kadar yüksek olduğunu temsil eden tam sayı verileridir.
  • oda_aydınlatma_sensörü - Bu giriş akışındaki her bir veri paketi, zaman damgasıyla odanın ne kadar parlak olduğunu gösteren tam sayı verileridir.
  • Room_video_tick_signal - Bu giriş akışındaki her bir veri paketi, odadaki kameradan toplanan videoyu temsil eden video verilerinin görüntü çerçevesidir ve zaman damgası bulunur.

Aşağıda, PacketClonerCalculator uygulanması gösterilmektedir. GetContract(), Open() ve Process() yöntemlerinin yanı sıra en son giriş paketlerini barındıran current_ örnek değişkenini de görebilirsiniz.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

Genellikle, bir hesap makinesinde yalnızca bir .cc dosyası bulunur. Mediapipe, hesap makinelerini bilgilendirmek için kayıt yöntemini kullandığından .h gerekli değildir. Hesap makinesi sınıfınızı tanımladıktan sonra REGISTER_CALCULATOR(calculator_class_name) makro çağrısıyla kaydedin.

Aşağıda, 3 giriş akışı, 1 düğüm (PacketClonerCalculator) ve 2 çıkış akışı içeren önemsiz bir MediaPipe grafiği bulunmaktadır.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

Aşağıdaki şemada, PacketClonerCalculator öğesinin giriş paketi serisine (üstteki) göre çıkış paketlerini (altta) nasıl tanımladığını görebilirsiniz.

PacketClonerCalculator kullanarak grafik oluşturma
PacketCloner biçimde, TICK giriş akışında her paket aldığında giriş akışlarının her birinden en yeni paketi çıkarır. Çıkış paketlerinin sırası (altta), giriş paketlerinin sırasına (üstte) ve bunların zaman damgalarına göre belirlenir. Zaman damgaları, şemanın sağ tarafında gösteriliyor.