מחשבונים

כל מחשבון הוא צומת בגרף. נסביר איך ליצור מחשבון חדש, איך להפעיל מחשבון, איך לבצע חישובים, איך מקורות קלט ופלט, חותמות זמן ואפשרויות. כל צומת בתרשים מוטמע בתור Calculator. רוב הביצוע של הגרף מתרחש בתוך המחשבונים. מחשבון יכול לקבל אפס או יותר זרמי קלט ו/או חבילות צד, ומפיק אפס זרמי פלט או יותר ו/או חבילות צד.

CalculatorBase

מחשבון נוצר על ידי הגדרה של מחלקה חדשה של המחלקה CalculatorBase, יישום מספר שיטות ורישום קבוצת המשנה החדשה ב-Mediapipe. לכל הפחות, מחשבון חדש חייב ליישם את ארבע השיטות הבאות לכל הפחות

  • GetContract()
    • מחברי מחשבון יכולים לציין ב-GetContract() את סוגי הקלט והפלט הצפויים של המחשבון. כשמאתחלים גרף, ה-framework קורא לשיטה סטטית כדי לוודא שסוגי החבילות של הקלט והפלט המחוברים תואמים למידע במפרט הזה.
  • Open()
    • לאחר תחילת התרשים, ה-framework קורא ל-Open(). מנות הצד לקלט זמינות למחשבון בשלב הזה. Open() מפרש את הפעולות של הגדרת הצמתים (ראו תרשימים) ומכינים את המצב של המחשבון לכל ריצה. הפונקציה הזו יכולה גם לכתוב חבילות כדי לחשב פלטים של המחשבון. שגיאה במהלך Open() עלולה לסיים את הפעלת התרשים.
  • Process()
    • למחשבון עם מקורות קלט, ה-framework קורא ל-Process() שוב ושוב בכל פעם שיש לפחות מנה אחת של מקור קלט אחד. כברירת מחדל, ה-framework מבטיח שלכל מקורות הקלט תהיה אותה חותמת זמן (למידע נוסף, ראו סנכרון). אפשר להפעיל מספר קריאות Process() בו-זמנית כשמפעילים ביצוע מקביל. אם תתרחש שגיאה במהלך Process(), ה-framework קורא ל-Close() והרצת התרשים תסתיים.
  • Close()
    • אחרי שכל הקריאות אל Process() מסתיימות או כשכל מקורות הקלט נסגרים, ה-framework קורא ל-Close(). הפונקציה הזו תמיד מופעלת במקרה של קריאה לפונקציה Open() והפעולה בוצעה בהצלחה, וגם אם התרשים פועל בעקבות שגיאה. במהלך Close() אין מקורות קלט זמינים, אבל עדיין יש לה גישה לחבילות צד של קלט, ולכן היא עשויה לכתוב פלטים. אחרי החזרת Close(), המחשבון אמור להיחשב כצומת לא פעיל. אובייקט המחשבון מושמד מיד לאחר סיום ההרצה של הגרף.

בהמשך הם קטעי קוד מ-CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

חיי המחשבון

במהלך אתחול של תרשים MediaPipe, ה-framework קורא לשיטה סטטית GetContract() כדי לקבוע מהם סוגי המנות הצפויים.

ה-framework בונה ומשמיד את כל המחשבון בכל הרצת גרף (למשל, פעם אחת לכל סרטון או פעם אחת לכל תמונה). יש לספק אובייקטים יקרים או גדולים שנשארים קבועים בהפעלות הגרף כחבילות צד להזנת קלט, כדי שהחישובים לא יחזרו על עצמם בהפעלות הבאות.

לאחר האתחול, בכל הרצה של התרשים, מתרחש הרצף הבא:

  • Open()
  • Process() (חוזרת)
  • Close()

ה-framework קורא ל-Open() כדי להפעיל את המחשבון. על Open() לפרש את כל האפשרויות ולהגדיר את המצב של המחשבון לכל גרף. Open() יכול לקבל חבילות צד להזנת קלט ולכתוב חבילות כדי לחשב פלטים. אם הדבר מתאים, היא צריכה להפעיל את SetOffset() כדי להפחית את פוטנציאל אגירת החבילות של מקורות הקלט.

אם מתרחשת שגיאה במהלך Open() או Process() (כפי שמצוין באחת מהשגיאות שמחזירות סטטוס שאינו Ok), הרצת הגרף תסתיים ללא קריאות נוספות לשיטות של המחשבון, והמחשבון יושמד.

למחשבון עם מקורות קלט, ה-framework קורא ל-Process() בכל פעם שיש לפחות קלט אחד של מנה זמינה. ה-framework מבטיח שלכל מקורות הקלט תהיה חותמת זמן זהה, שחותמות הזמן גדלות בכל קריאה ל-Process() ושכל החבילות נשלחות. כתוצאה מכך, ייתכן שחלק מהקלטים לא יכללו חבילות כאשר מתבצעת קריאה ל-Process(). נראה שהקלט שהחבילה שלו חסרה מייצר חבילה ריקה (ללא חותמת זמן).

ה-framework קורא ל-Close() אחרי כל הקריאות אל Process(). כל אפשרויות הקלט מוצו, אבל ל-Close() יש גישה לחבילות צדדיות להזנת קלט, והוא יכול לכתוב פלטים. כש-Close מוחזרת, המחשבון מושמד.

מחשבונים ללא קלט מכונים כמקורות. למחשבון מקור ימשיך להיות Process() קריאה, כל עוד הוא מחזיר את הסטטוס Ok. מחשבון מקור מציין שהוא מוצה על ידי החזרת סטטוס עצירה (כלומר mediaPipe::tool::StatusStop()).

זיהוי ערכי קלט ופלט

הממשק הציבורי למחשבון מורכב מקבוצה של מקורות קלט ותזרמי פלט. ב-InvoiceGraphConfiguration, הפלטים מכמה מחשבים מחוברים לקלט של מחשבונים אחרים באמצעות מקורות נתונים בעלי שם. בדרך כלל שמות של מקורות נתונים מופיעים באותיות קטנות, ואילו תגי קלט ופלט הם בדרך כלל אותיות גדולות. בדוגמה הבאה, הפלט עם שם התג VIDEO מקושר לקלט עם שם התג VIDEO_IN באמצעות מקור הנתונים video_stream.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

ניתן לזהות את מקורות הקלט והפלט לפי מספר האינדקס, שם התג או שילוב של שם התג ומספר האינדקס. בדוגמה הבאה אפשר לראות כמה דוגמאות למזהי קלט ופלט. SomeAudioVideoCalculator מזהה את פלט הווידאו לפי תג ואת פלט האודיו שלו לפי שילוב של תג ואינדקס. הקלט עם התג VIDEO מחובר לשידור שנקרא video_stream. הפלטים עם התג AUDIO והאינדקסים 0 ו-1 מקושרים למקורות הנתונים שנקראים audio_left ו-audio_right. SomeAudioCalculator מזהה את קלט האודיו לפי אינדקס בלבד (אין צורך בתג).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

בהטמעת המחשבון, ערכי הקלט והפלט מזוהים גם לפי שם התג ומספר האינדקס. בפונקציה הבאה, הקלט והפלט מזוהים:

  • לפי מספר אינדקס: זרם הקלט המשולב מזוהה פשוט באמצעות האינדקס 0.
  • לפי שם התג: הסטרימינג של פלט הווידאו מזוהה באמצעות שם התג "VIDEO".
  • לפי שם התג ומספר האינדקס: שידורי האודיו שבפלט מזוהים באמצעות השילוב של שם התג AUDIO ומספרי האינדקס 0 ו-1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

בתהליך עיבוד

קוד Process() שנקרא בצומת שאינו מקור צריך להחזיר absl::OkStatus() כדי לציין שהכל תקין, או כל קוד סטטוס אחר כדי לאותת על שגיאה

אם מחשבון שאינו מקור מחזיר את הערך tool::StatusStop(), סימן שהתרשים מבוטל בשלב מוקדם. במקרה כזה, כל מחשבון המקור ותזרמי הקלט של הגרף ייסגרו (והחבילות הנותרות יתעדכנו בתרשים).

הפונקציה Process() תמשיך לקרוא בצומת מקור בתרשים כל עוד הוא מחזיר absl::OkStatus(). על מנת לציין שאין עוד נתונים ליצירה, עליך להחזיר tool::StatusStop(). כל סטטוס אחר מציין שאירעה שגיאה.

הפונקציה Close() מחזירה את הערך absl::OkStatus() כדי לציין שהצלחתם. כל סטטוס אחר מצביע על כישלון.

זוהי הפונקציה הבסיסית Process(). היא משתמשת בשיטה Input() (שאפשר להשתמש בה רק אם יש למחשבון קלט אחד) כדי לבקש את נתוני הקלט. לאחר מכן, היא משתמשת ב-std::unique_ptr כדי להקצות את הזיכרון הדרוש לחבילת הפלט, ומבצעת את החישובים. בסיום, הוא משחרר את הסמן כשמוסיפים אותו לזרם הפלט.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

אפשרויות המחשבון

מחשבונים מקבלים פרמטרים של עיבוד באמצעות (1) מנות מקור קלט (2) חבילות צד להזנת קלט ו-(3) אפשרויות מחשבון. אם ציינת אפשרויות מחשבון, הן יופיעו כערכים מילוליים בשדה node_options של ההודעה CalculatorGraphConfiguration.Node.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

השדה node_options מקבל את התחביר proto3. לחלופין, ניתן לציין אפשרויות מחשבון בשדה options באמצעות תחביר proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

לא כל המחשבונים מקבלים אפשרויות של מחשבון. כדי לקבל אפשרויות, מחשבון יגדיר בדרך כלל סוג חדש של הודעה מסוג protobuf שייצג את האפשרויות שלו, כמו PacketClonerCalculatorOptions. לאחר מכן, המחשבון יקרא את ההודעה של protobuf בשיטת CalculatorBase::Open, ויכול להיות שגם בפונקציה CalculatorBase::GetContract או בשיטה CalculatorBase::Process. בדרך כלל, הסוג החדש של הודעה מסוג protobuf יוגדר כסכימת protobuf באמצעות קובץ ".proto" וכלל build של mediapipe_proto_library().

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

מחשבון לדוגמה

בקטע הזה תוכלו לקרוא על ההטמעה של PacketClonerCalculator, שהפעולה פשוטה יחסית, שמשתמשים בה בגרפים רבים של מחשבון. PacketClonerCalculator פשוט יוצר עותק של חבילות הקלט האחרונות שלו לפי דרישה.

PacketClonerCalculator שימושית כשחותמות הזמן של חבילות הנתונים שמגיעות לא תואמות בצורה מושלמת. נניח שיש לנו חדר עם מיקרופון, חיישן אור ומצלמת וידאו שאוספת נתוני חיישנים. כל אחד מהחיישנים פועל באופן עצמאי ואוסף נתונים לסירוגין. נניח שהפלט של כל חיישן הוא:

  • מיקרופון = עוצמת הקול בדציבלים של הצליל בחדר (מספר שלם)
  • חיישן אור = בהירות החדר (מספר שלם)
  • מצלמת וידאו = מסגרת תמונה RGB של חדר (ImageFrame)

צינור התפיסה הפשוט שלנו נועד לעבד נתונים חושיים מ-3 החיישנים האלה, כך שבכל פעם שיש לנו נתוני פריים מהמצלמה שמסתנכרנים עם נתוני עוצמת הקול של המיקרופון ונתוני הבהירות של חיישן האור האחרונים. כדי לעשות זאת באמצעות MediaPipe, לצינור עיבוד הנתונים שלנו יש 3 קלטים:

  • room_mic_signal – כל חבילת נתונים בשידור הקלט הזה היא מספרים שלמים שמייצגת את עוצמת האודיו בחדר עם חותמת זמן.
  • room_lightening_sensor – כל חבילת נתונים במקור הקלט הזה היא מספר שלם שמייצג את מידת הבהירות של החדר בחותמת זמן.
  • Room_video_tick_signal – כל חבילת נתונים במקור הקלט הזה היא תמונת תמונה של נתוני וידאו שמייצגים וידאו שנאסף מהמצלמה בחדר, עם חותמת זמן.

בהמשך מוצג ההטמעה של PacketClonerCalculator. אפשר לראות את השיטות GetContract(), Open() ו-Process(), וגם את משתנה המכונה current_ שמכיל את מנות הקלט האחרונות.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

בדרך כלל, מחשבון כולל קובץ .cc בלבד לא נדרש .h כי Mediapipe משתמש ברישום כדי לחשב מחשבונים. אחרי שתגדירו את סיווג המחשבון, רשמו אותו להפעלת המאקרו REGISTER_CALCULATOR(Calculator_class_name).

בהמשך מוצג תרשים MediaPipe טריוויאלי שכולל 3 זרמי קלט, צומת 1 (PacketClonerCalculator) ו-2 זרמי פלט.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

בתרשים הבא אפשר לראות איך מנות הפלט שלו מוגדרות ב-PacketClonerCalculator (התחתון) על סמך סדרת מנות הקלט (למעלה).

הציגו בגרף באמצעות PacketClonerCalculator
בכל פעם שהוא מקבל חבילה בזרם הקלט שלה מסוג TICK, מחשבון ה-PacketCloner רואים את הפלט של החבילה האחרונה מכל אחד מזרמי הקלט שלו. הרצף של מנות הפלט (התחתית) נקבע לפי הרצף של מנות הקלט (למעלה) וחותמות הזמן שלהן. חותמות הזמן מוצגות בצד שמאל של התרשים.