Máy tính

Mỗi máy tính là một nút của biểu đồ. Chúng tôi sẽ mô tả cách tạo một máy tính, cách khởi chạy máy tính, cách thực hiện các phép tính, các luồng đầu vào và đầu ra, dấu thời gian và tuỳ chọn. Mỗi nút trong biểu đồ là được triển khai dưới dạng Calculator. Phần lớn quá trình thực thi biểu đồ diễn ra bên trong máy tính cá nhân. Một máy tính có thể nhận được không hoặc nhiều luồng đầu vào và/hoặc bên gói và tạo ra không có hoặc nhiều luồng đầu ra và/hoặc gói phụ.

CalculatorBase

Một máy tính được tạo bằng cách xác định một lớp con mới của CalculatorBase lớp này, triển khai một số phương thức và đăng ký lớp con mới bằng Mediapipe. Một máy tính mới phải triển khai tối thiểu 4 phương thức dưới đây

  • GetContract()
    • Tác giả máy tính có thể chỉ định các kiểu dữ liệu đầu vào và đầu ra dự kiến của một máy tính trong GetContract(). Khi một biểu đồ được khởi tạo, khung sẽ gọi một phương thức tĩnh để xác minh xem các loại gói của đầu vào và đầu ra được kết nối khớp với thông tin trong đặc điểm kỹ thuật.
  • Open()
    • Sau khi một biểu đồ bắt đầu, khung này sẽ gọi Open(). Phía đầu vào gói dữ liệu được cung cấp cho máy tính tại thời điểm này. Open() diễn giải các thao tác cấu hình nút (xem Biểu đồ) và chuẩn bị trạng thái chạy trên mỗi đồ thị của máy tính. Chức năng này có thể ghi các gói vào đầu ra của máy tính. Lỗi có thể xảy ra trong Open() kết thúc lần chạy biểu đồ.
  • Process()
    • Đối với một máy tính có dữ liệu đầu vào, khung này sẽ gọi Process() nhiều lần bất cứ khi nào có ít nhất một luồng đầu vào có gói dữ liệu. Khung theo mặc định, đảm bảo rằng tất cả dữ liệu đầu vào đều có cùng dấu thời gian (xem Đồng bộ hoá để biết thêm thông tin). Nhiều Các lệnh gọi Process() có thể được gọi cùng lúc khi thực thi song song đã được bật. Nếu xảy ra lỗi trong Process(), khung này sẽ gọi Close() và chạy biểu đồ sẽ kết thúc.
  • Close()
    • Sau khi tất cả các lệnh gọi đến Process() kết thúc hoặc khi tất cả luồng đầu vào đóng, khung này sẽ gọi Close(). Hàm này luôn được gọi nếu Open() đã được gọi và thành công, kể cả khi kết thúc chạy biểu đồ do lỗi. Không có dữ liệu đầu vào thông qua bất kỳ luồng đầu vào nào trong Close(), nhưng vẫn có quyền truy cập vào các gói dữ liệu bên đầu vào và do đó có thể ghi kết quả. Sau khi trả về Close(), hàm này sẽ tính phải được coi là một nút chết. Đối tượng tính toán bị huỷ bỏ là ngay khi biểu đồ chạy xong.

Sau đây là đoạn mã của CalculatorBase.h.

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // ...
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // ...
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // ...
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

Tuổi thọ của máy tính

Trong quá trình khởi chạy biểu đồ MediaPipe, khung này sẽ gọi một Phương thức tĩnh GetContract() để xác định loại gói dự kiến.

Khung này tạo và huỷ toàn bộ máy tính cho mỗi lần chạy biểu đồ (ví dụ: một lần cho mỗi video hoặc một lần cho mỗi hình ảnh). Vật to lớn hoặc tốn kém mà vẫn tồn tại không đổi trên các lần chạy đồ thị cần được cung cấp dưới dạng gói bên đầu vào để và các phép tính khác sẽ không được lặp lại trong các lần chạy tiếp theo.

Sau khi khởi tạo, đối với mỗi lần chạy biểu đồ, trình tự sau đây sẽ xảy ra:

  • Open()
  • Process() (nhiều lần)
  • Close()

Khung này gọi Open() để khởi chạy máy tính. Open() sẽ diễn giải bất kỳ tuỳ chọn nào và thiết lập trạng thái chạy trên mỗi đồ thị của máy tính. Open() có thể nhận gói phía đầu vào và ghi gói vào đầu ra của máy tính. Nếu phù hợp, trình này sẽ gọi SetOffset() để giảm tình trạng giật gói dữ liệu tiềm ẩn của luồng đầu vào.

Nếu xảy ra lỗi trong Open() hoặc Process() (theo chỉ định của một trong các lỗi đó) trả về trạng thái không phải Ok), thì quá trình chạy biểu đồ sẽ bị chấm dứt mà không có lệnh gọi nào khác đối với phương thức của máy tính và máy tính đó bị huỷ bỏ.

Đối với một máy tính có dữ liệu đầu vào, khung sẽ gọi Process() bất cứ khi nào ít nhất một đầu vào có sẵn một gói. Khung này đảm bảo rằng tất cả thông tin đầu vào đều có cùng dấu thời gian, nhưng dấu thời gian đó tăng lên theo mỗi lệnh gọi đến Process() và tất cả các gói đã được phân phối. Do đó, một số dữ liệu đầu vào có thể không có gói dữ liệu khi Process() được gọi. Đầu vào có gói bị thiếu có vẻ như tạo ra một gói trống (không có dấu thời gian).

Khung này gọi Close() sau tất cả các lệnh gọi đến Process(). Tất cả giá trị nhập vào sẽ đã hết, nhưng Close() có quyền truy cập vào các gói phụ đầu vào và có thể ghi đầu ra. Sau khi Close trả về, hàm tính sẽ bị huỷ.

Những máy tính không có dữ liệu đầu vào được gọi là nguồn. Một công cụ tính nguồn tiếp tục gọi Process() miễn là nó trả về trạng thái Ok. Đáp hàm tính nguồn cho biết nguồn đã hết bằng cách trả về trạng thái dừng (nghĩa là mediaPipe::tool::StatusStop()).

Xác định dữ liệu đầu vào và đầu ra

Giao diện công khai cho một máy tính bao gồm một tập hợp các luồng đầu vào và các luồng đầu ra. Trong CalculatorGraphConfiguration, kết quả đầu ra từ một số được kết nối với dữ liệu đầu vào của các máy tính khác bằng cách sử dụng phát trực tuyến. Tên luồng thường được viết bằng chữ thường, trong khi các thẻ đầu vào và thẻ đầu ra được viết thường thường được CHỮ HOA. Trong ví dụ bên dưới, kết quả với tên thẻ VIDEO là đã kết nối với đầu vào có tên thẻ VIDEO_IN bằng cách sử dụng luồng có tên video_stream.

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

Bạn có thể xác định luồng đầu vào và đầu ra theo số chỉ mục, theo tên thẻ hoặc bằng tổ hợp tên thẻ và số chỉ mục. Bạn có thể xem một số ví dụ về phương thức nhập giá trị nhận dạng đầu ra trong ví dụ bên dưới. SomeAudioVideoCalculator nhận dạng đầu ra video theo thẻ và đầu ra âm thanh của thẻ nhờ sự kết hợp giữa thẻ và chỉ mục. Đầu vào có thẻ VIDEO được kết nối với luồng có tên video_stream Dữ liệu đầu ra có thẻ AUDIO cùng các chỉ mục 01 là đã kết nối với các luồng có tên audio_leftaudio_right. SomeAudioCalculator chỉ xác định đầu vào âm thanh theo chỉ mục (không cần thẻ).

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

Trong quá trình triển khai công cụ tính toán, dữ liệu đầu vào và đầu ra cũng được xác định bằng thẻ tên và số chỉ mục. Trong hàm dưới đây đầu vào và đầu ra được xác định:

  • Theo số chỉ mục: Luồng đầu vào kết hợp được xác định đơn giản bằng chỉ mục 0.
  • Theo tên thẻ: Luồng đầu ra video được xác định theo tên thẻ "VIDEO".
  • Theo tên thẻ và số chỉ mục: Luồng âm thanh đầu ra được xác định theo tổ hợp tên thẻ AUDIO với số chỉ mục 01.
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Đang xử lý

Process() được gọi trên nút không phải nguồn phải trả về absl::OkStatus() cho cho biết rằng mọi thứ đều diễn ra tốt đẹp hoặc bất kỳ mã trạng thái nào khác để báo lỗi

Nếu một hàm tính không phải nguồn trả về tool::StatusStop(), thì hàm này sẽ báo hiệu cho biểu đồ sẽ sớm bị huỷ. Trong trường hợp này, tất cả máy tính nguồn và đồ thị luồng đầu vào sẽ bị đóng (và các Gói tin còn lại sẽ được truyền qua biểu đồ).

Một nút nguồn trong biểu đồ sẽ tiếp tục có Process() được gọi trên nó trong khoảng thời gian vì hàm này trả về absl::OkStatus(). Để cho biết rằng không có thêm dữ liệu nào đã tạo trả về tool::StatusStop(). Bất kỳ trạng thái nào khác cho biết có lỗi đã xảy ra.

Close() trả về absl::OkStatus() để cho biết thành công. Trạng thái khác chỉ báo lỗi.

Dưới đây là hàm Process() cơ bản. Hàm này sử dụng phương thức Input() (có thể chỉ được sử dụng nếu máy tính có một đầu vào duy nhất) để yêu cầu dữ liệu đầu vào. Nó sau đó sử dụng std::unique_ptr để phân bổ bộ nhớ cần thiết cho gói đầu ra, và thực hiện các phép tính. Khi hoàn tất, ứng dụng sẽ giải phóng con trỏ khi thêm vào luồng đầu ra.

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here....
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Tùy chọn máy tính

Máy tính chấp nhận các tham số xử lý thông qua (1) gói luồng đầu vào (2) gói bên đầu vào và (3) tuỳ chọn máy tính. Lựa chọn máy tính, nếu được chỉ định, xuất hiện dưới dạng giá trị cố định trong trường node_options của Tin nhắn CalculatorGraphConfiguration.Node.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Trường node_options chấp nhận cú pháp proto3. Một cách khác là công cụ tính bạn có thể chỉ định các tuỳ chọn trong trường options bằng cú pháp proto2.

  node {
    calculator: "TfLiteInferenceCalculator"
    input_stream: "TENSORS:main_model_input"
    output_stream: "TENSORS:main_model_output"
    node_options: {
      [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
        model_path: "mediapipe/models/detection_model.tflite"
      }
    }
  }

Không phải máy tính nào cũng chấp nhận lựa chọn tính toán. Để chấp nhận các tuỳ chọn, thường sẽ xác định một loại thông báo protobuf mới để thể hiện chẳng hạn như PacketClonerCalculatorOptions. Sau đó, công cụ tính đọc thông báo protobuf đó trong phương thức CalculatorBase::Open và có thể trong chức năng CalculatorBase::GetContract hoặc CalculatorBase::Process. Thường thì loại thông báo protobuf mới sẽ được định nghĩa là giản đồ protobuf bằng cách sử dụng ".proto" và một tệp Quy tắc xây dựng mediapipe_proto_library().

  mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

Công cụ tính mẫu

Phần này thảo luận cách triển khai PacketClonerCalculator, thực hiện một công việc tương đối đơn giản và được sử dụng trong nhiều biểu đồ tính toán. PacketClonerCalculator chỉ tạo một bản sao của các gói đầu vào gần đây nhất theo yêu cầu.

PacketClonerCalculator rất hữu ích khi dấu thời gian của gói dữ liệu đến chưa được căn chỉnh một cách hoàn hảo. Giả sử chúng ta có một căn phòng có micrô, đèn cảm biến và máy quay video đang thu thập dữ liệu cảm giác. Từng cảm biến hoạt động độc lập và thu thập dữ liệu không liên tục. Giả sử dữ liệu đầu ra của mỗi cảm biến là:

  • micrô = độ to theo đexiben của âm thanh trong phòng (Số nguyên)
  • cảm biến ánh sáng = độ sáng trong phòng (Số nguyên)
  • máy quay video = khung hình ảnh RGB của phòng (ImageFrame)

Quy trình nhận thức đơn giản của chúng tôi được thiết kế để xử lý dữ liệu cảm giác từ 3 nhóm cảm giác này để bất cứ lúc nào khi chúng tôi có dữ liệu khung hình ảnh từ máy ảnh được đồng bộ hoá với dữ liệu âm lượng và ánh sáng micrô được thu thập gần đây nhất dữ liệu về độ sáng của cảm biến. Để làm được điều này bằng MediaPipe, hệ thống nhận thức của chúng tôi gồm 3 luồng đầu vào:

  • Room_mic_signal – Mỗi gói dữ liệu trong luồng đầu vào này là dữ liệu số nguyên cho biết mức âm thanh lớn trong một căn phòng kèm theo dấu thời gian.
  • Room_lightening_sensor – Mỗi gói dữ liệu trong luồng đầu vào này là số nguyên dữ liệu thể hiện độ sáng của căn phòng bằng dấu thời gian.
  • Room_video_tick_signal - Mỗi gói dữ liệu trong luồng đầu vào này là khung hình ảnh của dữ liệu video thể hiện video được thu thập từ máy ảnh trong phòng có dấu thời gian.

Dưới đây là cách triển khai PacketClonerCalculator. Bạn có thể thấy Các phương thức GetContract(), Open()Process() cũng như thực thể biến current_ chứa các gói đầu vào gần đây nhất.

// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"

namespace mediapipe {

// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
//   calculator: "PacketClonerCalculator"
//   input_stream: "first_base_signal"
//   input_stream: "second_base_signal"
//   input_stream: "tick_signal"
//   output_stream: "cloned_first_base_signal"
//   output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    const int tick_signal_index = cc->Inputs().NumEntries() - 1;
    // cc->Inputs().NumEntries() returns the number of input streams
    // for the PacketClonerCalculator
    for (int i = 0; i < tick_signal_index; ++i) {
      cc->Inputs().Index(i).SetAny();
      // cc->Inputs().Index(i) returns the input stream pointer by index
      cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
    }
    cc->Inputs().Index(tick_signal_index).SetAny();
    return absl::OkStatus();
  }

  absl::Status Open(CalculatorContext* cc) final {
    tick_signal_index_ = cc->Inputs().NumEntries() - 1;
    current_.resize(tick_signal_index_);
    // Pass along the header for each stream if present.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Header().IsEmpty()) {
        cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        // Sets the output stream of index i header to be the same as
        // the header for the input stream of index i
      }
    }
    return absl::OkStatus();
  }

  absl::Status Process(CalculatorContext* cc) final {
    // Store input signals.
    for (int i = 0; i < tick_signal_index_; ++i) {
      if (!cc->Inputs().Index(i).Value().IsEmpty()) {
        current_[i] = cc->Inputs().Index(i).Value();
      }
    }

    // Output if the tick signal is non-empty.
    if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!current_[i].IsEmpty()) {
          cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          // Add a packet to output stream of index i a packet from inputstream i
          // with timestamp common to all present inputs
        } else {
          cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          // if current_[i], 1 packet buffer for input stream i is empty, we will set
          // next allowed timestamp for input stream i to be current timestamp + 1
        }
      }
    }
    return absl::OkStatus();
  }

 private:
  std::vector<Packet> current_;
  int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}  // namespace mediapipe

Thông thường, một máy tính chỉ có tệp .cc. Bạn không cần phải dùng miền .h vì mediapipe sử dụng đăng ký để làm cho các máy tính biết đến nó. Sau khi bạn có xác định lớp máy tính của bạn, hãy đăng ký lớp đó bằng lệnh gọi macro REGISTER_CALCULATOR(calculator_class_name).

Dưới đây là một biểu đồ MediaPipe bình thường có 3 luồng đầu vào, 1 nút (PacketClonerCalculator) và 2 luồng đầu ra.

input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"

node {
   calculator: "PacketClonerCalculator"
   input_stream: "room_mic_signal"
   input_stream: "room_lighting_sensor"
   input_stream: "room_video_tick_signal"
   output_stream: "cloned_room_mic_signal"
   output_stream: "cloned_lighting_sensor"
 }

Sơ đồ dưới đây cho thấy cách PacketClonerCalculator xác định đầu ra gói (dưới cùng) dựa trên chuỗi gói đầu vào (trên cùng).

Vẽ đồ thị sử dụng PacketClonerCalculator
Mỗi lần nhận được một gói trên luồng đầu vào TICK, PacketClonerCalculator sẽ xuất ra gói gần đây nhất từ mỗi luồng đầu vào của nó. Trình tự của các gói đầu ra (dưới cùng) được xác định bằng trình tự của gói đầu vào (trên cùng) và dấu thời gian của chúng. Các dấu thời gian nằm ở bên phải biểu đồ.