Синхронизация

Механика планирования

Обработка данных в графе MediaPipe происходит внутри узлов обработки, определенных как подклассы CalculatorBase . Система планирования решает, когда должен запускаться каждый калькулятор.

Каждый граф имеет по крайней мере одну очередь планировщика . Каждая очередь планировщика имеет ровно одного исполнителя . Узлы статически назначаются очереди (и, следовательно, исполнителю). По умолчанию существует одна очередь, исполнителем которой является пул потоков с количеством потоков в зависимости от возможностей системы.

Каждый узел имеет состояние планирования, которое может быть «не готов» , «готов » или «работает» . Функция готовности определяет, готов ли узел к работе. Эта функция вызывается при инициализации графа, когда узел завершает работу и когда изменяется состояние входных данных узла.

Используемая функция готовности зависит от типа узла. Узел без потокового входа известен как исходный узел ; исходные узлы всегда готовы к работе, пока они не сообщат платформе, что у них больше нет данных для вывода, после чего они закрываются.

Неисходные узлы готовы, если у них есть входные данные для обработки, и если эти входные данные образуют действительный входной набор в соответствии с условиями, установленными политикой ввода узла (обсуждается ниже). Большинство узлов используют политику ввода по умолчанию, но некоторые узлы указывают другую.

Когда узел становится готовым, задача добавляется в соответствующую очередь планировщика, которая является приоритетной очередью. Функция приоритета в настоящее время фиксирована и учитывает статические свойства узлов и их топологическую сортировку внутри графа. Например, узлы, расположенные ближе к выходной стороне графа, имеют более высокий приоритет, а исходные узлы имеют самый низкий приоритет.

Каждая очередь обслуживается исполнителем, который отвечает за фактическое выполнение задачи путем вызова кода калькулятора. Могут быть предоставлены и настроены различные исполнители; это можно использовать для настройки использования ресурсов выполнения, например, запуская определенные узлы в потоках с более низким приоритетом.

Синхронизация временных меток

Выполнение графа MediaPipe децентрализовано: глобальные часы отсутствуют, и разные узлы могут одновременно обрабатывать данные из разных временных меток. Это обеспечивает более высокую пропускную способность посредством конвейерной обработки.

Однако информация о времени очень важна для многих рабочих процессов восприятия. Узлам, которые получают несколько входных потоков, обычно необходимо каким-то образом координировать их. Например, детектор объектов может вывести список граничных прямоугольников из кадра, и эта информация может быть передана в узел рендеринга, который должен обработать ее вместе с исходным кадром.

Таким образом, одной из ключевых задач платформы MediaPipe является обеспечение синхронизации входных данных для узлов. С точки зрения механики фреймворка основная роль метки времени — служить ключом синхронизации .

Более того, MediaPipe предназначен для поддержки детерминированных операций, что важно во многих сценариях (тестирование, моделирование, пакетная обработка и т. д.), одновременно позволяя авторам графиков ослаблять детерминизм там, где это необходимо для удовлетворения ограничений реального времени.

Две цели синхронизации и детерминизма лежат в основе нескольких вариантов проектирования. Примечательно, что пакеты, попадающие в данный поток, должны иметь монотонно увеличивающиеся временные метки: это не просто полезное предположение для многих узлов, но на него также опирается логика синхронизации. Каждый поток имеет метку времени , которая является минимально возможной меткой времени, разрешенной для нового пакета в потоке. Когда приходит пакет с отметкой времени T , граница автоматически переходит на T+1 , что отражает требование монотонности. Это позволяет платформе точно знать, что больше не будет поступать пакетов с отметкой времени меньше T

Политика ввода

Синхронизация выполняется локально на каждом узле с использованием политики ввода, указанной узлом.

Политика ввода по умолчанию, определенная DefaultInputStreamHandler , обеспечивает детерминированную синхронизацию входных данных со следующими гарантиями:

  • Если пакеты с одинаковой меткой времени передаются в несколько входных потоков, они всегда будут обрабатываться вместе, независимо от порядка их прибытия в реальном времени.

  • Входные наборы обрабатываются строго в порядке возрастания меток времени.

  • Никакие пакеты не отбрасываются, и обработка полностью детерминирована.

  • Узел становится готовым к обработке данных как можно скорее с учетом вышеуказанных гарантий.

Чтобы объяснить, как это работает, нам нужно ввести определение фиксированной временной метки. Мы говорим, что временная метка в потоке устанавливается , если она ниже границы временной метки. Другими словами, временная метка устанавливается для потока, как только состояние ввода в эту временную метку безвозвратно известно: либо пакет существует, либо есть уверенность, что пакет с этой временной меткой не прибудет.

Временная метка устанавливается для нескольких потоков, если она устанавливается для каждого из этих потоков. Более того, если временная метка установлена, это означает, что все предыдущие временные метки также установлены. Таким образом, установленные временные метки могут обрабатываться детерминированно в порядке возрастания.

Учитывая это определение, калькулятор с политикой ввода по умолчанию готов, если существует временная метка, которая устанавливается для всех входных потоков и содержит пакет хотя бы в одном входном потоке. Политика ввода предоставляет все доступные пакеты для установленной временной метки как один входной набор для калькулятора.

Одним из последствий такого детерминированного поведения является то, что для узлов с несколькими входными потоками может существовать теоретически неограниченное время ожидания установления временной метки, и тем временем неограниченное количество пакетов может быть помещено в буфер. (Рассмотрим узел с двумя входными потоками, один из которых продолжает отправлять пакеты, а другой ничего не отправляет и не продвигает границу.)

Поэтому мы также предоставляем пользовательские политики ввода: например, разделение входных данных на разные наборы синхронизации, определенные SyncSetInputStreamHandler , или полный отказ от синхронизации и обработку входных данных немедленно по мере их поступления, определенную ImmediateInputStreamHandler .

Управление потоком

Существует два основных механизма управления потоком. Механизм противодавления регулирует выполнение вышестоящих узлов, когда пакеты, буферизованные в потоке, достигают (настраиваемого) предела, определенного CalculatorGraphConfig::max_queue_size . Этот механизм поддерживает детерминированное поведение и включает систему предотвращения взаимоблокировок, которая при необходимости ослабляет настроенные ограничения.

Вторая система состоит из вставки специальных узлов, которые могут отбрасывать пакеты в соответствии с ограничениями в реальном времени (обычно с использованием пользовательских политик ввода), определенными FlowLimiterCalculator . Например, общий шаблон размещает узел управления потоком на входе подграфа с петлевым соединением от конечного выхода к узлу управления потоком. Таким образом, узел управления потоком может отслеживать, сколько временных меток обрабатывается в нисходящем графе, и отбрасывать пакеты, если это количество достигает (настраиваемого) предела; а поскольку пакеты отбрасываются в восходящем направлении, мы избегаем напрасной работы, которая могла бы возникнуть в результате частичной обработки временной метки и последующего отбрасывания пакетов между промежуточными этапами.

Этот подход на основе калькулятора дает автору графа контроль над тем, где могут быть отброшены пакеты, и обеспечивает гибкость в адаптации и настройке поведения графа в зависимости от ограничений ресурсов.