מחשבונים

כל מחשבון הוא צומת של תרשים. נתאר את האופן שבו ניתן ליצור איך להפעיל מחשבון, איך לבצע את החישובים שלו, שידורי קלט ופלט, חותמות זמן ואפשרויות. כל צומת בתרשים הוטמע בתור Calculator. רוב הביצוע של התרשים מתבצע בתוך מחשבונים. מחשבון עשוי לקבל אפס זרמי קלט או יותר ו/או בצד מנות ויוצרות אפס או יותר שידורי פלט ו/או חבילות צדדיות.

CalculatorBase

מחשבון נוצר על ידי הגדרת מחלקה חדשה של CalculatorBase יישום של מספר שיטות ורישום המחלקה החדשה באמצעות Mediapipe. לכל הפחות, יש להשתמש במחשבון חדש ליישם את ארבע השיטות הבאות

  • GetContract()
    • מחברי המחשבון יכולים לציין את סוגי הקלט והפלט הצפויים של מחשבון ב-GetContract(). כאשר תרשים מאותחל, ה-framework קורא לשיטה סטטית כדי לבדוק אם סוגי המנות של הקלט והפלט המחוברים תואמים למידע למפרט.
  • Open()
    • אחרי שהתרשים מתחיל, ה-framework קורא לפונקציה Open(). צד הקלט המנות זמינות למחשבון בשלב זה. Open() מפרש את הפעולות של תצורת הצומת (ראו תרשימים) ולהכין את המצב של הצגת המחשבון לפי גרף. הפונקציה הזו עשויה גם לכתוב חבילות לפלט מחשבון. שגיאה במהלך Open() עשויה לסיים את הרצת התרשים.
  • Process()
    • כאשר משתמשים במחשבון עם ערכי קלט, ה-framework שולח קריאה ל-Process() שוב ושוב בכל פעם שיש לפחות חבילה זמינה בשידור קלט אחד. המסגרת מבטיח כברירת מחדל שלכל הקלטים יש אותה חותמת זמן (ראו סנכרון לקבלת מידע נוסף). יותר מאחת אפשר להפעיל Process() קריאות בו-זמנית כשמתבצעת הפעלה מקבילה מופעלת. אם מתרחשת שגיאה במהלך Process(), ה-framework מפעיל Close() והרצת התרשים מסתיימת.
  • Close()
    • לאחר שכל הקריאות אל Process() יסתיימו או כשכל שידורי הקלט ייסגרו, ה-framework קורא ל-Close(). הפונקציה הזו תמיד נקראת אם בוצעה שיחה אל Open() ובוצעה בהצלחה, גם אם הרצת התרשים הסתיימה בגלל שגיאה. אין קלט זמין דרך אף אחד מערוצי הקלט במהלך Close(), אבל עדיין יש לו גישה לחבילות צדדיות קלט ולכן עשויים לכתוב פלטים. אחרי Close(), המחשבון צריך להיחשב כצומת מת. אובייקט המחשבון מושמד כ- ברגע שהגרף מסיים לפעול.

אלה קטעי קוד מ- CalculatorBase.h

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

חיי המחשבון

במהלך אתחול תרשים MediaPipe, ה-framework קורא לפונקציה שיטה סטטית GetContract() לקביעת סוגי המנות הצפויים.

ה-framework בונה ומשמיד את המחשבון כולו בכל הרצת גרף (למשל, פעם אחת לכל סרטון או פעם אחת לכל תמונה). חפצים יקרים או גדולים שנשארים רצים קבועים על פני רצפים של תרשים צריך לספק כמנות צדדיות קלט, החישובים אינם חוזרים על עצמם בהפעלות הבאות.

לאחר האתחול, בכל הרצה של התרשים, מתרחש הרצף הבא:

  • Open()
  • Process() (שוב)
  • Close()

ה-framework קורא לפונקציה Open() כדי לאתחל את המחשבון. Open() צריך לפרש כל אופציה ולהגדיר את מצב ההצגה של המחשבון לפי גרף. Open() יכול לקבל חבילות צדדיות קלט ולכתוב חבילות נתונים לפלט המחשבון. אם המיקום מתאים, הוא צריך לקרוא ל-SetOffset() כדי לצמצם את האפשרות לאגירת חבילות נתונים של זרמי קלט.

אם מתרחשת שגיאה במהלך Open() או Process() (כפי שמצוין על ידי אחד מהם) מחזיר סטטוס שאינו Ok), הרצת התרשים הסתיימה ללא קריאות נוספות לשיטות של המחשבון, והוא מושמד.

עבור מחשבון עם ערכי קלט, ה-framework קורא ל-Process() בכל פעם שהיא לפחות לקלט אחד יש חבילה זמינה. ה-framework מבטיח שלכל מקורות הקלט אותה חותמת זמן, שחותמות הזמן עולות עם כל קריאה אל Process() שכל החבילות מועברות. כתוצאה מכך, ייתכן שחלק מנתוני הקלט לא חבילות כשמתבצעת קריאה אל Process(). קלט שהחבילה שלו חסרה ייראה תיצור חבילה ריקה (בלי חותמת זמן).

תוכנת ה-framework קוראת ל-Close() אחרי כל הקריאות ל-Process(). כל ערכי הקלט כבר מוצה, אבל ל-Close() יש גישה לחבילות צדדיות קלט ועשוי לכתוב פלטים. לאחר ש-Close מחזירה את המחשבון, הוא מושמד.

מחשבונים ללא קלט מכונים מקורות. מחשבון מקור עדיין אפשר לקרוא לפונקציה Process() כל עוד היא מחזירה סטטוס Ok. א' מחשבון המקור מציין שהוא מוצה על ידי החזרת סטטוס עצירה (כלומר mediaPipe::tool::StatusStop()).

זיהוי קלט ופלט

הממשק הציבורי למחשבון מורכב מקבוצה של מקורות קלט בסטרימינג של פלט. ב-ForecastGraphConfiguration, הפלט של הפלט מחשבונים מחוברים לקלט של מחשבונים אחרים באמצעות בסטרימינג. השמות של מקורות הנתונים מופיעים בדרך כלל באותיות קטנות, ותגי הקלט והפלט בדרך כלל משתמשים באותיות גדולות. בדוגמה הבאה, הפלט עם שם התג VIDEO הוא מחובר לקלט עם שם התג VIDEO_IN באמצעות מקור הנתונים שנקרא video_stream.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

ניתן לזהות שידורי קלט ופלט לפי מספר אינדקס, שם תג או שילוב של שם תג ומספר אינדקס. אפשר לראות כמה דוגמאות של קלט בדוגמה הבאה אפשר לראות מזהי פלט. SomeAudioVideoCalculator מזהה פלט הווידאו שלו על ידי תג ופלט האודיו שלו על ידי שילוב של תג להוסיף לאינדקס. הקלט עם התג VIDEO מחובר לשידור שנקרא video_stream. הפלט עם התג AUDIO והאינדקסים 0 ו-1 הם מחובר לשידורים בשם audio_left ו-audio_right. SomeAudioCalculator מזהה את קלט האודיו לפי אינדקס בלבד (אין צורך בתג).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

בהטמעת המחשבון, המערכת מזהה את הקלט והפלט גם לפי תג השם ומספר האינדקס. בפונקציה שמתחת לקלט ולפלט מזוהים:

  • לפי מספר אינדקס: זרם הקלט המשולב מזוהה פשוט על ידי אינדקס 0
  • לפי שם התג: הסטרימינג של פלט הווידאו מזוהה לפי שם התג 'VIDEO'.
  • לפי שם התג ומספר האינדקס: שידורי האודיו של הפלט מזוהים על ידי שילוב של שם התג AUDIO ומספרי האינדקס 0 ו-1.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

בעיבוד

הפונקציה Process() שנקראה בצומת שאינו מקור חייבת להחזיר את הערך absl::OkStatus() אל לציין שהכול היה תקין, או כל קוד סטטוס אחר שמסמן שגיאה

אם מחשבון שאינו מקור מחזיר tool::StatusStop(), הערך הזה מסמן מתבצע ביטול מוקדם. במקרה כזה, כל מחשבון המקור והתרשימים שידורי הקלט ייסגרו (והמנות הנותרות יופצו דרך ).

צומת מקור בתרשים ימשיך לקבל קריאה ל-Process() כל עוד כשמחזירה את הערך absl::OkStatus(). כדי לציין שאין עוד נתונים ההחזרה tool::StatusStop() שנוצרה. כל סטטוס אחר מעיד על שגיאה אירעה שגיאה.

הפונקציה Close() מחזירה את הערך absl::OkStatus() כדי לציין הצלחה. סטטוס אחר מציין כישלון.

זוהי הפונקציה הבסיסית Process(). היא משתמשת ב-method Input() (שיכולה צריך להשתמש בו רק אם למחשבון יש קלט יחיד) כדי לבקש את נתוני הקלט שלו. הוא ואז משתמש ב-std::unique_ptr כדי להקצות את הזיכרון שדרוש לחבילת הפלט, ומבצע את החישובים. בסיום, הוא ישחרר את הסמן כשתוסיפו אותו אל את זרם הפלט.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

אפשרויות המחשבון

המחשבונים מקבלים פרמטרים של עיבוד באמצעות (1) חבילות קלט לזרם (2) חבילות צד קלט, ו-(3) אפשרויות מחשבון. אפשרויות המחשבון, אם יופיעו כערכים מילוליים בשדה node_options של הודעה אחת (CalculatorGraphConfiguration.Node).

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

השדה node_options מקבל את התחביר של proto3. לחלופין, אפשר להשתמש במחשבון ניתן לציין אפשרויות בשדה options באמצעות תחביר proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

לא כל המחשבונים מקבלים אפשרויות של מחשבון. כדי לקבל אפשרויות, בדרך כלל יגדיר סוג הודעה של אב-טיפוס חדש כדי לייצג כמו PacketClonerCalculatorOptions. לאחר מכן, המחשבון לקרוא את הודעת ה-protobuf באמצעות method CalculatorBase::Open שלה, ואולי גם גם בפונקציה CalculatorBase::GetContract או אמצעי תשלום אחד (CalculatorBase::Process). בדרך כלל, סוג ההודעה החדש של אב-טיפוס להיות מוגדרת כסכימת Protobuf באמצעות סיומת " .proto" קובץ כלל build אחד (mediapipe_proto_library()).

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

מחשבון לדוגמה

בקטע הזה מוסבר על ההטמעה של PacketClonerCalculator, עושה עבודה פשוטה יחסית, ומשמשת בתרשימי מחשבון רבים. PacketClonerCalculator פשוט מפיק עותק של חבילות הקלט האחרונות שלו על פי דרישה.

PacketClonerCalculator מועילה כאשר חותמות הזמן של חבילות הנתונים מגיעות לא מיושרים בצורה מושלמת. נניח שיש לנו חדר עם מיקרופון, תאורה ומצלמת וידאו שאוספת נתונים חושיים. כל אחד מהחיישנים פועלת באופן עצמאי ואוספת נתונים לסירוגין. נניח שהפלט של כל חיישן הוא:

  • מיקרופון = עוצמת קול בדציבלים של הקול בחדר (מספר שלם)
  • חיישן אור = בהירות החדר (מספר שלם)
  • מצלמת וידאו = מסגרת של חדר בתמונת RGB (ImageFrame)

צינור התפיסה הפשוט שלנו מיועד לעבד נתוני חושים כך שבכל פעם שיש לנו נתוני פריים מהתמונה מהמצלמה מסונכרנת עם הנתונים האחרונים של עוצמת המיקרופון והתאורה את נתוני הבהירות של החיישן. כדי לעשות זאת באמצעות MediaPipe, צינור עיבוד התפיסה שלנו כולל 3 שידורי קלט:

  • Room_mic_signal – כל חבילת נתונים בזרם הקלט הזה היא נתונים במספר שלמים שמייצג את עוצמת הקול בחדר עם חותמת זמן.
  • Room_lightening_sensor – כל חבילת נתונים בזרם הקלט הזה היא מספר שלם נתונים שמייצגים את מידת הבהירות של החדר באמצעות חותמת זמן.
  • Room_video_tick_signal – כל חבילת נתונים במקור הקלט הזה imageframe של נתוני וידאו שמייצגים וידאו שנאסף מהמצלמה חדר עם חותמת זמן.

בהמשך מופיע יישום של PacketClonerCalculator. אפשר לראות GetContract(), Open() ו-Process() וגם המופע משתנה current_ שמכיל את חבילות הקלט האחרונות.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

בדרך כלל, למחשבון יש רק קובץ .cc. לא נדרש הסיומת .h כי mediapipe משתמש ברישום כדי להציג מחשבונים. אחרי הגדרתם את מחלקת המחשבון, רושמים אותו באמצעות הרצת מאקרו REGISTER_CALCULATOR(calculator_class_name).

בהמשך מופיע תרשים MediaPipe טריוויאלי עם 3 שידורי קלט, צומת אחד (PacketClonerCalculator) ו-2 שידורי פלט.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

בתרשים הבא אפשר לראות איך PacketClonerCalculator מגדיר את הפלט שלו מנות (למטה) על סמך סדרת חבילות הקלט שלהן (למעלה).

הציגו בגרף באמצעות PacketClonerCalculator
בכל פעם שהיא מקבלת חבילה בזרם הקלט TICK שלה, ה-packetCloner המחשבון מפיק את הפלט של החבילה העדכנית ביותר מכל זרם הקלט שלה. הרצף של חבילות הפלט (בתחתית) נקבע לפי הרצף של חבילות הקלט (למעלה) וחותמות הזמן שלהן. חותמות הזמן מוצגות בצד ימין של התרשים.