Потоки в реальном времени

Временные метки в реальном времени

Графики калькулятора MediaPipe часто используются для обработки потоков видео или аудио кадров для интерактивных приложений. Платформа MediaPipe требует только, чтобы последовательным пакетам присваивались монотонно увеличивающиеся временные метки. По соглашению, калькуляторы и графики реального времени используют время записи или время представления каждого кадра в качестве его временной метки, причем каждая временная метка указывает микросекунды с Jan/1/1970:00:00:00 . Это позволяет обрабатывать пакеты из различных источников в глобально согласованной последовательности.

Планирование в реальном времени

Обычно каждый калькулятор запускается, как только становятся доступными все его входные пакеты для данной временной метки. Обычно это происходит, когда калькулятор завершил обработку предыдущего кадра, и каждый из калькуляторов, выдающих входные данные, завершил обработку текущего кадра. Планировщик MediaPipe вызывает каждый калькулятор, как только эти условия выполняются. Дополнительную информацию см. в разделе Синхронизация .

Границы временной метки

Когда калькулятор не создает никаких выходных пакетов для данной временной метки, он вместо этого может выводить «привязку к временной метке», указывающую, что для этой временной метки не будет создано ни одного пакета. Эта индикация необходима для того, чтобы позволить нижестоящим калькуляторам работать с этой меткой времени, даже если для определенных потоков с этой меткой времени не поступило ни одного пакета. Это особенно важно для графиков реального времени в интерактивных приложениях, где крайне важно, чтобы каждый калькулятор начал обработку как можно скорее.

Рассмотрим график, подобный следующему:

node {
   calculator: "A"
   input_stream: "alpha_in"
   output_stream: "alpha"
}
node {
   calculator: "B"
   input_stream: "alpha"
   input_stream: "foo"
   output_stream: "beta"
}

Предположим: в момент времени T узел A не отправляет пакет в своем выходном потоке alpha . Узел B получает пакет в foo с меткой времени T и ожидает пакета в alpha с меткой времени T Если A не отправляет B обновление с привязкой к временной метке для alpha , B будет продолжать ждать прибытия пакета в alpha . Между тем, очередь пакетов foo будет накапливать пакеты в T , T+1 и так далее.

Для вывода пакета в поток калькулятор использует функции API CalculatorContext::Outputs и OutputStream::Add . Чтобы вместо этого вывести метку времени, привязанную к потоку, калькулятор может использовать функции API CalculatorContext::Outputs и CalculatorContext::SetNextTimestampBound . Указанная граница — это наименьшая допустимая отметка времени для следующего пакета в указанном выходном потоке. Если пакет не выводится, калькулятор обычно делает что-то вроде:

cc->Outputs().Tag("output_frame").SetNextTimestampBound(
  cc->InputTimestamp().NextAllowedInStream());

Функция Timestamp::NextAllowedInStream возвращает последующую временную метку. Например, Timestamp(1).NextAllowedInStream() == Timestamp(2) .

Распространение границ временных меток

Калькуляторы, которые будут использоваться в графиках в реальном времени, должны определять границы выходных меток времени на основе границ входных меток времени, чтобы обеспечить быстрое планирование последующих калькуляторов. Обычно калькуляторы выдают пакеты с теми же метками времени, что и входные пакеты. В этом случае простого вывода пакета при каждом вызове Calculator::Process достаточно, чтобы определить границы временной метки вывода.

Однако от калькуляторов не требуется следовать этому общему шаблону для выходных временных меток, от них требуется только выбирать монотонно увеличивающиеся выходные временные метки. В результате некоторые калькуляторы должны явно рассчитывать границы временных меток. MediaPipe предоставляет несколько инструментов для вычисления соответствующей временной метки, привязанной к каждому калькулятору.

1. SetNextTimestampBound() можно использовать для указания границы временной метки t + 1 для выходного потока.

cc->Outputs.Tag("OUT").SetNextTimestampBound(t.NextAllowedInStream());

В качестве альтернативы можно создать пустой пакет с отметкой времени t , чтобы указать привязку к метке времени t + 1 .

cc->Outputs.Tag("OUT").Add(Packet(), t);

Временная метка входного потока обозначается пакетом или пустым пакетом во входном потоке.

Timestamp bound = cc->Inputs().Tag("IN").Value().Timestamp();

2. TimestampOffset() можно указать для автоматического копирования временной метки, привязанной к входным потокам, к выходным потокам.

cc->SetTimestampOffset(0);

Преимущество этого параметра заключается в автоматическом распространении границ меток времени, даже если поступают только границы меток времени и Calculator::Process не вызывается.

3. Можно указать ProcessTimestampBounds() для вызова Calculator::Process для каждой новой «установленной отметки времени», где «установленная отметка времени» — это новая наивысшая отметка времени ниже текущих границ отметки времени. Без ProcessTimestampBounds() Calculator::Process вызывается только при наличии одного или нескольких прибывающих пакетов.

cc->SetProcessTimestampBounds(true);

Этот параметр позволяет калькулятору выполнять собственный расчет и распространение границ временных меток, даже если обновляются только входные временные метки. Его можно использовать для повторения эффекта TimestampOffset() , но его также можно использовать для расчета привязки метки времени, учитывающей дополнительные факторы.

Например, чтобы реплицировать SetTimestampOffset(0) калькулятор может сделать следующее:

absl::Status Open(CalculatorContext* cc) {
  cc->SetProcessTimestampBounds(true);
}

absl::Status Process(CalculatorContext* cc) {
  cc->Outputs.Tag("OUT").SetNextTimestampBound(
      cc->InputTimestamp().NextAllowedInStream());
}

Планирование Calculator::Open и Calculator::Close

Calculator::Open вызывается, когда созданы все необходимые входные побочные пакеты. Входные побочные пакеты могут предоставляться прилагаемым приложением или «калькуляторами побочных пакетов» внутри графа. Побочные пакеты можно указать вне графа с помощью API CalculatorGraph::Initialize и CalculatorGraph::StartRun . Дополнительные пакеты могут быть указаны калькуляторами внутри графа с помощью CalculatorGraphConfig::OutputSidePackets и OutputSidePacket::Set .

Calculator::Close вызывается, когда все входные потоки стали Done , закрываясь или достигнув привязки к временной метке Timestamp::Done .

Примечание. Если граф завершает все ожидающие выполнения калькулятора и становится Done , прежде чем некоторые потоки станут Done , MediaPipe вызовет оставшиеся вызовы Calculator::Close , чтобы каждый калькулятор мог выдать окончательные выходные данные.

Использование TimestampOffset имеет некоторые последствия для Calculator::Close . Калькулятор, определяющий SetTimestampOffset(0) по своей конструкции будет сигнализировать, что все его выходные потоки достигли Timestamp::Done когда все его входные потоки достигли Timestamp::Done , и, следовательно, дальнейшие выходные данные невозможны. Это не позволяет такому калькулятору отправлять какие-либо пакеты во время Calculator::Close . Если калькулятору необходимо создать сводный пакет во время Calculator::Close , Calculator::Process должен указать границы метки времени так, чтобы хотя бы одна метка времени (например, Timestamp::Max ) оставалась доступной во время Calculator::Close . Это означает, что такой калькулятор обычно не может полагаться на SetTimestampOffset(0) и вместо этого должен явно указывать границы метки времени с помощью SetNextTimestampBounds() .