ماشین حساب ها

هر ماشین حساب یک گره از یک نمودار است. ما نحوه ایجاد یک ماشین حساب جدید، نحوه مقداردهی اولیه یک ماشین حساب، نحوه انجام محاسبات، جریان های ورودی و خروجی، مهرهای زمانی و گزینه ها را توضیح می دهیم. هر گره در نمودار به عنوان یک Calculator پیاده سازی می شود. بخش عمده ای از اجرای گراف در ماشین حساب آن اتفاق می افتد. یک ماشین حساب ممکن است صفر یا بیشتر جریان ورودی و/یا بسته های جانبی را دریافت کند و صفر یا بیشتر جریان خروجی و/یا بسته های جانبی تولید کند.

CalculatorBase

یک ماشین حساب با تعریف یک زیر کلاس جدید از کلاس CalculatorBase ، پیاده سازی تعدادی روش و ثبت زیر کلاس جدید با Mediapipe ایجاد می شود. حداقل، یک ماشین حساب جدید باید چهار روش زیر را اجرا کند

  • GetContract()
    • نویسندگان ماشین حساب می توانند انواع ورودی ها و خروجی های مورد انتظار یک ماشین حساب را در GetContract() مشخص کنند. هنگامی که یک گراف مقدار دهی اولیه می شود، چارچوب یک روش ثابت را فراخوانی می کند تا بررسی کند که آیا نوع بسته ورودی و خروجی متصل با اطلاعات این مشخصات مطابقت دارند یا خیر.
  • Open()
    • پس از شروع یک نمودار، فریم ورک Open() را فراخوانی می کند. بسته های جانبی ورودی در این مرحله در دسترس ماشین حساب هستند. Open() عملیات پیکربندی گره را تفسیر می کند (به نمودارها مراجعه کنید) و وضعیت ماشین حساب را در هر اجرا گراف آماده می کند. این تابع همچنین ممکن است بسته هایی را در خروجی های ماشین حساب بنویسد. یک خطا در هنگام Open() می تواند اجرای نمودار را خاتمه دهد.
  • Process()
    • برای یک ماشین حساب با ورودی، هر زمان که حداقل یک جریان ورودی بسته ای در دسترس داشته باشد، فریم ورک به طور مکرر Process() فراخوانی می کند. چارچوب به‌طور پیش‌فرض تضمین می‌کند که تمام ورودی‌ها دارای مهر زمانی یکسان هستند (برای اطلاعات بیشتر به همگام‌سازی مراجعه کنید). هنگامی که اجرای موازی فعال است، می‌توان چندین فراخوانی Process() را به طور همزمان فراخوانی کرد. اگر در حین Process() خطایی رخ دهد، فریم ورک Close() را فراخوانی می کند و اجرای نمودار پایان می یابد.
  • Close()
    • پس از پایان تمام فراخوان‌های Process() یا زمانی که تمام جریان‌های ورودی بسته می‌شوند، فریم ورک Close() فراخوانی می‌کند. اگر Open() فراخوانی شده باشد و موفقیت آمیز باشد و حتی اگر اجرای نمودار به دلیل یک خطا خاتمه یابد، همیشه این تابع فراخوانی می شود. هیچ ورودی از طریق هیچ جریان ورودی در طول Close() موجود نیست، اما همچنان به بسته های جانبی ورودی دسترسی دارد و بنابراین ممکن است خروجی ها را بنویسد. پس از بازگشت Close() ، ماشین حساب باید یک گره مرده در نظر گرفته شود. به محض پایان اجرای نمودار، شیء ماشین حساب از بین می رود.

کدهای زیر کدهای CalculatorBase.h هستند.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

زندگی یک ماشین حساب

در طول مقداردهی اولیه یک گراف MediaPipe، چارچوب یک متد GetContract() استاتیک را فراخوانی می کند تا مشخص کند چه نوع بسته هایی مورد انتظار است.

چارچوب کل ماشین‌حساب را برای هر گراف اجرا می‌کند و از بین می‌برد (مثلاً یک بار در هر ویدیو یا یک بار در هر تصویر). اشیاء گران قیمت یا بزرگ که در طول اجرای نمودار ثابت می مانند باید به عنوان بسته های جانبی ورودی عرضه شوند تا محاسبات در اجراهای بعدی تکرار نشوند.

پس از مقداردهی اولیه، برای هر اجرای گراف، دنباله زیر رخ می دهد:

  • Open()
  • Process() (مکرر)
  • Close()

چارچوب Open() را برای مقداردهی اولیه ماشین حساب فراخوانی می کند. Open() باید هر گزینه ای را تفسیر کند و وضعیت ماشین حساب در هر گراف را تنظیم کند. Open() ممکن است بسته های جانبی ورودی را دریافت کند و بسته ها را در خروجی های ماشین حساب بنویسد. در صورت لزوم، باید SetOffset() را فراخوانی کند تا بافر بسته بالقوه جریان های ورودی را کاهش دهد.

اگر در هنگام Open() یا Process() خطایی رخ دهد (همانطور که یکی از آنها وضعیت غیر Ok را نشان می دهد)، اجرای نمودار بدون فراخوانی بیشتر به متدهای ماشین حساب خاتمه می یابد و ماشین حساب از بین می رود.

برای یک ماشین حساب با ورودی، هر زمان که حداقل یک ورودی بسته ای در دسترس داشته باشد، فریم ورک Process() فراخوانی می کند. این چارچوب تضمین می‌کند که همه ورودی‌ها دارای مهر زمانی یکسانی هستند، مهر زمانی با هر فراخوانی به Process() افزایش می‌یابد و همه بسته‌ها تحویل داده می‌شوند. در نتیجه، برخی از ورودی ها ممکن است هنگام فراخوانی Process() هیچ بسته ای نداشته باشند. به نظر می رسد ورودی که بسته آن وجود ندارد، یک بسته خالی (بدون مهر زمانی) تولید می کند.

فریم ورک Close() بعد از همه فراخوانی های Process() فرا می خواند. تمام ورودی ها تمام شده اند، اما Close() به بسته های جانبی ورودی دسترسی دارد و ممکن است خروجی ها را بنویسد. پس از بازگشت Close، ماشین حساب از بین می رود.

ماشین حساب های بدون ورودی به عنوان منبع نامیده می شوند. یک ماشین حساب منبع تا زمانی که وضعیت Ok را برمی گرداند به فراخوانی Process() ادامه می دهد. یک ماشین حساب منبع نشان می دهد که با برگرداندن وضعیت توقف (به عنوان مثال mediaPipe::tool::StatusStop() .) تمام شده است.

شناسایی ورودی ها و خروجی ها

رابط عمومی برای یک ماشین حساب شامل مجموعه ای از جریان های ورودی و جریان های خروجی است. در CalculatorGraphConfiguration، خروجی های برخی از ماشین حساب ها با استفاده از جریان های نامگذاری شده به ورودی های ماشین حساب های دیگر متصل می شوند. نام‌های جریان معمولاً کوچک هستند، در حالی که برچسب‌های ورودی و خروجی معمولاً بزرگ هستند. در مثال زیر، خروجی با نام برچسب VIDEO با استفاده از جریانی به نام video_stream به ورودی با نام برچسب VIDEO_IN متصل شده است.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

جریان های ورودی و خروجی را می توان با شماره فهرست، با نام برچسب یا با ترکیبی از نام برچسب و شماره فهرست شناسایی کرد. نمونه هایی از شناسه های ورودی و خروجی را در مثال زیر مشاهده می کنید. SomeAudioVideoCalculator خروجی ویدیوی خود را با برچسب و خروجی های صوتی خود را با ترکیب برچسب و نمایه شناسایی می کند. ورودی با برچسب VIDEO به جریانی با نام video_stream متصل می شود. خروجی‌های دارای برچسب AUDIO و شاخص‌های 0 و 1 به جریان‌هایی با نام‌های audio_left و audio_right متصل می‌شوند. SomeAudioCalculator ورودی های صوتی خود را فقط با شاخص شناسایی می کند (بدون نیاز به برچسب).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

در پیاده سازی ماشین حساب، ورودی ها و خروجی ها نیز با نام تگ و شماره شاخص شناسایی می شوند. در تابع زیر ورودی و خروجی مشخص می شود:

  • با شماره شاخص: جریان ورودی ترکیبی به سادگی با شاخص 0 شناسایی می شود.
  • بر اساس نام برچسب: جریان خروجی ویدئو با نام برچسب "VIDEO" مشخص می شود.
  • بر اساس نام برچسب و شماره فهرست: جریان های صوتی خروجی با ترکیب نام برچسب AUDIO و شماره های فهرست 0 و 1 شناسایی می شوند.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

پردازش

Process() فراخوانده شده بر روی یک گره غیر منبع باید absl::OkStatus() را برگرداند تا نشان دهد که همه چیز خوب پیش رفته است، یا هر کد وضعیت دیگری برای علامت دادن به یک خطا.

اگر یک ماشین حساب غیر منبع tool::StatusStop() را برگرداند، این نشان می دهد که نمودار زودتر لغو شده است. در این حالت، تمام ماشین‌حساب‌های منبع و جریان‌های ورودی گراف بسته می‌شوند (و بسته‌های باقی‌مانده در نمودار منتشر می‌شوند).

گره منبع در یک گراف تا زمانی که absl::OkStatus( ) را برمی گرداند، به Process() فراخوانی می شود. برای نشان دادن اینکه اطلاعات دیگری برای تولید وجود ندارد tool::StatusStop() . هر وضعیت دیگری نشان دهنده یک خطا است.

Close() absl::OkStatus() برای نشان دادن موفقیت برمی گرداند. هر وضعیت دیگری نشان دهنده شکست است.

در اینجا تابع اصلی Process() است. از متد Input() (که فقط در صورتی قابل استفاده است که ماشین حساب یک ورودی داشته باشد) برای درخواست داده های ورودی خود استفاده می کند. سپس از std::unique_ptr برای تخصیص حافظه مورد نیاز برای بسته خروجی استفاده می کند و محاسبات را انجام می دهد. پس از اتمام، هنگام اضافه کردن آن به جریان خروجی، نشانگر را آزاد می کند.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

گزینه های ماشین حساب

ماشین حساب ها پارامترهای پردازش را از طریق (1) بسته های جریان ورودی (2) بسته های جانبی ورودی و (3) گزینه های ماشین حساب می پذیرند. گزینه های ماشین حساب، در صورت مشخص شدن، به صورت مقادیر تحت اللفظی در قسمت node_options پیام CalculatorGraphConfiguration.Node ظاهر می شوند.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

فیلد node_options دستور proto3 را می پذیرد. از طرف دیگر، گزینه های ماشین حساب را می توان در قسمت options با استفاده از دستور proto2 مشخص کرد.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

همه ماشین حساب ها گزینه های ماشین حساب را نمی پذیرند. برای پذیرش گزینه‌ها، یک ماشین‌حساب معمولاً یک نوع پیام پروتوباف جدید برای نمایش گزینه‌های خود، مانند PacketClonerCalculatorOptions تعریف می‌کند. سپس ماشین حساب آن پیام protobuf را در روش CalculatorBase::Open ، و احتمالاً در تابع CalculatorBase::GetContract یا روش CalculatorBase::Process خود می خواند. به طور معمول، نوع پیام protobuf جدید به عنوان یک طرحواره protobuf با استفاده از یک فایل ".proto" و یک قانون ساخت mediapipe_proto_library() تعریف می شود.

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

ماشین حساب نمونه

این بخش پیاده سازی PacketClonerCalculator را مورد بحث قرار می دهد که کار نسبتاً ساده ای را انجام می دهد و در بسیاری از نمودارهای ماشین حساب استفاده می شود. PacketClonerCalculator به سادگی یک کپی از آخرین بسته های ورودی خود را در صورت تقاضا تولید می کند.

PacketClonerCalculator زمانی مفید است که مُهرهای زمانی ارسال بسته‌های داده کاملاً هم‌تراز نباشند. فرض کنید اتاقی داریم با یک میکروفون، حسگر نور و یک دوربین فیلمبرداری که در حال جمع آوری داده های حسی است. هر یک از حسگرها به طور مستقل عمل می کنند و داده ها را به طور متناوب جمع آوری می کنند. فرض کنید خروجی هر سنسور به صورت زیر باشد:

  • میکروفون = بلندی صدا در دسی بل در اتاق (عدد صحیح)
  • حسگر نور = روشنایی اتاق (عدد صحیح)
  • دوربین فیلمبرداری = قاب تصویر RGB اتاق (ImageFrame)

خط لوله ادراک ساده ما برای پردازش داده‌های حسی از این 3 حسگر طراحی شده است، به طوری که در هر زمانی که داده‌های قاب تصویر از دوربین داریم که با آخرین داده‌های بلندی صدای میکروفون و داده‌های روشنایی حسگر نور همگام‌سازی می‌شوند. برای انجام این کار با MediaPipe، خط لوله درک ما دارای 3 جریان ورودی است:

  • room_mic_signal - هر بسته داده در این جریان ورودی، داده‌های صحیحی است که نشان‌دهنده میزان صدای بلند در یک اتاق با مهر زمانی است.
  • room_lightening_sensor - هر بسته داده در این جریان ورودی داده‌های عدد صحیح است که نشان‌دهنده میزان روشنایی اتاق با مهر زمانی است.
  • room_video_tick_signal - هر بسته داده در این جریان ورودی، فریم تصویری از داده‌های ویدیویی است که نشان‌دهنده ویدیوی جمع‌آوری‌شده از دوربین در اتاق با مهر زمانی است.

در زیر پیاده سازی PacketClonerCalculator آورده شده است. می‌توانید متدهای GetContract() ، Open() و Process() و همچنین متغیر نمونه current_ که جدیدترین بسته‌های ورودی را نگه می‌دارد، مشاهده کنید.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

به طور معمول، یک ماشین حساب فقط یک فایل cc. دارد. هیچ بعد از اینکه کلاس ماشین حساب خود را تعریف کردید، آن را با یک فراخوانی کلان REGISTER_CALCULATOR (calculator_class_name) ثبت کنید.

در زیر یک نمودار ساده MediaPipe وجود دارد که دارای 3 جریان ورودی، 1 گره (PacketClonerCalculator) و 2 جریان خروجی است.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

نمودار زیر نشان می دهد که چگونه PacketClonerCalculator بسته های خروجی خود (پایین) را بر اساس سری بسته های ورودی خود (بالا) تعریف می کند.

نمودار با استفاده از PacketClonerCalculator
هر بار که بسته ای را در جریان ورودی TICK خود دریافت می کند، PacketClonerCalculator آخرین بسته را از هر یک از جریان های ورودی خود خروجی می دهد. توالی بسته های خروجی (پایین) با توالی بسته های ورودی (بالا) و مهرهای زمانی آنها تعیین می شود. مهرهای زمانی در سمت راست نمودار نشان داده شده است.