Kalkulatory

Każdy kalkulator jest węzłem wykresu. Opisujemy, jak utworzyć nowy kalkulator, jak go uruchomić, wykonać obliczenia, jakie są strumienie danych wejściowych i wyjściowych, sygnatury czasowe oraz opcje. Każdy węzeł na wykresie jest zaimplementowany jako Calculator. Większość wykresów wykonuje się w kalkulatorach. Kalkulator może otrzymać zero lub więcej strumieni wejściowych lub pakietów dodatkowych i wygenerować 0 lub więcej strumieni wyjściowych lub pakietów bocznych.

CalculatorBase

Aby utworzyć kalkulator, trzeba zdefiniować nową podklasę klasy CalculatorBase, wdrożyć kilka metod i zarejestrować nową podklasę w Mediapipe. Nowy kalkulator musi implementować przynajmniej 4 metody opisane poniżej

  • GetContract()
    • Autorzy kalkulatora mogą określić w GetContract() oczekiwane typy danych wejściowych i wyjściowych kalkulatora. Po zainicjowaniu wykresu platforma wywołuje metodę statyczną, aby sprawdzić, czy typy pakietów połączonych danych wejściowych i wyjściowych są zgodne z informacjami zawartymi w tej specyfikacji.
  • Open()
    • Po uruchomieniu wykresu platforma wywołuje Open(). W tym momencie kalkulator ma dostęp do pakietów dodatkowych. Open() interpretuje operacje konfiguracji węzłów (patrz Wykresy) i przygotowuje stan kalkulatora dla poszczególnych wykresów. Ta funkcja może też zapisywać pakiety w wynikach kalkulatora. Błąd podczas Open() może zakończyć generowanie wykresu.
  • Process()
    • W przypadku kalkulatora z danymi wejściowych platforma wielokrotnie wywołuje funkcję Process() za każdym razem, gdy w co najmniej 1 strumieniu wejściowym jest dostępny pakiet. Platforma domyślnie gwarantuje, że wszystkie dane wejściowe mają tę samą sygnaturę czasową (więcej informacji znajdziesz w artykule Synchronizacja). Gdy włączone jest wykonywanie równoległe, można wywoływać wiele wywołań Process() jednocześnie. Jeśli w czasie Process() wystąpi błąd, platforma wywoła metodę Close() i bieg wykresu się zakończy.
  • Close()
    • Po zakończeniu wszystkich wywołań funkcji Process() lub zamknięciu wszystkich strumieni danych wejściowych platforma wywołuje metodę Close(). Ta funkcja jest zawsze wywoływana, jeśli funkcja Open() została wywołana i pomyślna, a nawet wtedy, gdy uruchomienie wykresu zostało zakończone z powodu błędu. Podczas Close() żadne dane wejściowe nie są dostępne przez żadne strumienie wejściowe, ale nadal ma ona dostęp do pakietów wejściowych, więc może zapisywać dane wyjściowe. Po zwróceniu funkcji Close() kalkulator powinien być uważany za martwy węzeł. Obiekt kalkulatora jest niszczony wraz z zakończeniem wykresu.

Poniżej znajdziesz fragmenty kodu z CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

Jak działa kalkulator

Podczas inicjowania wykresu MediaPipe platforma wywołuje metodę statyczną GetContract(), aby określić oczekiwane rodzaje pakietów.

Platforma tworzy i niszczy cały kalkulator przy każdym uruchomieniu wykresu (np. raz na film lub raz na obraz). Drogie lub duże obiekty, które pozostają stałe w ramach przebiegu wykresu, powinny być dostarczane jako wejściowe pakiety boczne, aby obliczenia nie były powtarzane przy kolejnych uruchomieniach.

Po zainicjowaniu przy każdym uruchomieniu wykresu zachodzi taka sekwencja:

  • Open()
  • Process() (wielokrotnie)
  • Close()

Platforma wywołuje kalkulator Open(). Funkcja Open() powinna zinterpretować dowolne opcje i skonfigurować stan kalkulatora dla poszczególnych wykresów. Funkcja Open() może uzyskiwać pakiety po stronie wejściowej i zapisywać je na danych wyjściowych kalkulatora. W razie potrzeby powinien wywołać SetOffset(), aby ograniczyć potencjalne buforowanie pakietów strumieni wejściowych.

Jeśli w funkcji Open() lub Process() wystąpi błąd (gdy jeden z nich zwraca stan inny niż Ok), wykres zostanie zakończony bez dalszych wywołań metod kalkulatora, a kalkulator zostanie zniszczony.

W przypadku kalkulatora z danymi wejściowych platforma wywołuje funkcję Process(), gdy dostępne jest co najmniej 1 pakiet danych wejściowych. Platforma gwarantuje, że wszystkie dane wejściowe będą miały tę samą sygnaturę czasową, że będą one zwiększać się z każdym wywołaniem funkcji Process() i że wszystkie pakiety zostaną dostarczone. W efekcie po wywołaniu funkcji Process() niektóre dane wejściowe mogą nie mieć żadnych pakietów. Dane wejściowe, których brakuje pakietu, prawdopodobnie powoduje utworzenie pustego pakietu (bez sygnatury czasowej).

Platforma wywołuje Close() po wszystkich wywołaniach funkcji Process(). Wszystkie dane wejściowe zostały wyczerpane, ale Close() ma dostęp do pakietów po stronie wejściowej i może zapisywać dane wyjściowe. Po zwróceniu Zamknij kalkulator jest niszczony.

Kalkulatory bez danych wejściowych są nazywane źródłami. Kalkulator źródeł nadal wywołuje funkcję Process(), dopóki zwraca stan Ok. Kalkulator źródeł pokazuje, że zasoby są wyczerpane przez zwrócenie stanu zatrzymania (np. mediaPipe::tool::StatusStop()).

Identyfikowanie danych wejściowych i wyjściowych

Publiczny interfejs kalkulatora składa się z zestawu strumieni wejściowych i wyjściowych. W konfiguracji CalculatorGraphConfiguration wyniki niektórych kalkulatorów są łączone z danymi innych kalkulatorów za pomocą nazwanych strumieni. Nazwy strumieni są zwykle zapisywane małymi literami, a tagi danych wejściowych i wyjściowych są zwykle pisane WIELKIMI LITERAMI. W poniższym przykładzie dane wyjściowe o nazwie tagu VIDEO są połączone z danymi wejściowymi tagu o nazwie VIDEO_IN za pomocą strumienia o nazwie video_stream.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Strumienie wejściowe i wyjściowe można identyfikować według numeru indeksu, nazwy tagu lub kombinacji nazwy tagu i numeru indeksu. W przykładzie poniżej znajdziesz kilka przykładów identyfikatorów danych wejściowych i wyjściowych. SomeAudioVideoCalculator identyfikuje wyjścia wideo według tagu, a wyjściowe wyjścia audio – na podstawie kombinacji tagu i indeksu. Dane wejściowe z tagiem VIDEO są połączone ze strumieniem o nazwie video_stream. Dane wyjściowe z tagiem AUDIO i indeksami 0 i 1 są połączone ze strumieniami o nazwach audio_left i audio_right. SomeAudioCalculator identyfikuje dane wejściowe audio tylko według indeksu (tag nie jest potrzebny).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

W implementacji kalkulatora dane wejściowe i wyniki są również identyfikowane za pomocą nazwy tagu i numeru indeksu. W funkcji poniżej dane wejściowe i wyjściowe są identyfikowane:

  • Według numeru indeksu: połączony strumień danych wejściowych jest identyfikowany po prostu przez indeks 0.
  • Według nazwy tagu: strumień wyjścia wideo jest identyfikowany przez tag „VIDEO”.
  • Według nazwy tagu i numeru indeksu: wyjściowe strumienie audio są identyfikowane przez połączenie nazwy tagu AUDIO oraz numerów indeksu 0 i 1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

W trakcie przetwarzania

Funkcja Process() wywołana z węzła innego niż źródłowy musi zwrócić wartość absl::OkStatus(), aby zasygnalizować, że wszystko poszło dobrze, lub dowolny inny kod stanu sygnalizujący błąd

Jeśli kalkulator nieźródłowy zwraca wartość tool::StatusStop(), oznacza to, że wykres jest wcześniej anulowany. W tym przypadku wszystkie kalkulatory źródeł i strumienie danych wejściowych wykresu zostaną zamknięte (a pozostałe pakiety zostaną rozpowszechnione na wykresie).

Węzeł źródłowy na wykresie będzie nadal wywoływany przez funkcję Process(), dopóki będzie zwracać wartość absl::OkStatus(. Aby wskazać, że nie ma więcej danych do wygenerowania, zwróć wartość tool::StatusStop(). Każdy inny stan oznacza, że wystąpił błąd.

Close() zwraca wartość absl::OkStatus(), co oznacza, że się udało. Każdy inny stan wskazuje na błąd.

Oto podstawowa funkcja Process(). Korzysta z metody Input() (której można używać tylko wtedy, gdy kalkulator ma tylko jedno dane wejściowe) do żądania danych wejściowych. Następnie wykorzystuje std::unique_ptr do przydzielenia pamięci potrzebnej na pakiet wyjściowy i wykonuje obliczenia. Gdy skończysz, zwalnia wskaźnik podczas dodawania go do strumienia danych wyjściowych.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Opcje kalkulatora

Kalkulatory akceptują parametry przetwarzania przez (1) pakiety strumienia wejściowego (2) pakiety po stronie wejściowej i (3) opcje kalkulatora. Opcje kalkulatora (jeśli są określone) pojawiają się jako wartości literałowe w polu node_options komunikatu CalculatorGraphConfiguration.Node.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Pole node_options akceptuje składnię proto3. Opcje kalkulatora można też określić w polu options przy użyciu składni proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Nie wszystkie kalkulatory obsługują opcje. Aby akceptować opcje, kalkulator zwykle definiuje nowy typ wiadomości protobuf reprezentującej opcje, np. PacketClonerCalculatorOptions. Kalkulator odczyta następnie tę wiadomość protobuf za pomocą metody CalculatorBase::Open, a także również w funkcji CalculatorBase::GetContract lub metod CalculatorBase::Process. Normalnie nowy typ wiadomości protobuf będzie zdefiniowany jako schemat protokołu za pomocą pliku „.proto” i reguły kompilacji mediapipe_proto_library().

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Przykładowy kalkulator

W tej sekcji opisujemy implementację funkcji PacketClonerCalculator, która jest stosunkowo prosta i używana w wielu wykresach kalkulatorów. PacketClonerCalculator po prostu tworzy na żądanie kopię najnowszych pakietów wejściowych.

Funkcja PacketClonerCalculator jest przydatna, gdy sygnatury czasowe przychodzących pakietów danych nie są idealnie dopasowane. Załóżmy, że mamy pomieszczenie z mikrofonem, czujnikiem światła i kamerą, która zbiera dane sensoryczne. Każdy z czujników działa niezależnie i zbiera dane nieregularnie. Załóżmy, że dane wyjściowe każdego czujnika to:

  • mikrofon = głośność w decybelach dźwięku w pomieszczeniu (liczba całkowita)
  • czujnik światła = jasność w pomieszczeniu (liczba całkowita)
  • kamera wideo = ramka obrazu RGB pokoju (ImageFrame)

Nasz prosty potok percepcyjny został zaprojektowany do przetwarzania danych sensorycznych z tych 3 czujników w dowolnym momencie, gdy mamy dane klatki obrazu z kamery synchronizowane z ostatnio zebranymi danymi na temat głośności mikrofonu i jasnością z czujnika światła. W tym celu w MediaPipe nasz potok percepcyjny zawiera 3 strumienie danych wejściowych:

  • Room_mic_signal – każda pakiet danych w tym strumieniu wejściowym jest liczbą całkowitą reprezentującą głośność dźwięku w pomieszczeniu (z sygnaturą czasową).
  • Room_lightening_sensor – każda pakiet danych w tym strumieniu wejściowym jest liczbą całkowitą określającą jasność pomieszczenia pomieszczeń za pomocą sygnatury czasowej.
  • Room_video_tick_signal – każda pakiet danych w tym strumieniu wejściowym to ramka obrazu danych wideo reprezentująca film zebrany przez kamerę w sali z sygnaturą czasową.

Poniżej znajduje się implementacja PacketClonerCalculator. Widać metody GetContract(), Open() i Process(), a także zmienną instancji current_, która zawiera najnowsze pakiety wejściowe.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

Kalkulator zawiera zazwyczaj tylko plik .cc. Rozszerzenie .h nie jest wymagane, ponieważ Mediapipe korzysta z rejestracji do uzyskiwania informacji o kalkulatorach. Po zdefiniowaniu klasy kalkulatora zarejestruj ją za pomocą wywołania REGISTER_CALCULATOR(nazwa_klasy_kalkulatora).

Poniżej znajduje się prosty wykres MediaPipe zawierający 3 strumienie wejściowe, 1 węzeł (PacketClonerCalculator) i 2 strumienie wyjściowe.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

Poniższy diagram przedstawia, jak PacketClonerCalculator definiuje swoje pakiety wyjściowe (na dole) na podstawie serii pakietów wejściowych (góra).

Wykres przy użyciu PacketClonerCalculator
Za każdym razem, gdy otrzymuje pakiet w strumieniu wejściowym TICK, PacketClonerCalculator wysyła ostatni pakiet z każdego ze swoich strumieni wejściowych. Kolejność pakietów wyjściowych (u dołu) jest określana na podstawie sekwencji pakietów wejściowych (u góry) i ich sygnatur czasowych. Sygnatury czasowe są widoczne po prawej stronie wykresu.