07.08.2022

Пирамида

Иван Колесников в комментарии к прошлом посту подсказал мне использовать пирамиду для подсчета частот символов в реализации арифметического сжатия. Его идея оказалось очень удачной и позволила существенно ускорить процесс как кодирования, так и декодирования данных.

Дело в том, что в арифметическом сжатии при кодирования каждого символа нужно выполнить работу не только по его преобразованию в сжатый вид, но перед этим получить вероятность его появления в тексте, а также, в случае использования адаптивной статистической модели, еще и обновить саму модель.
В общем-то, обе операции не очень сложные, но вызываются очень часто, поэтому составляют изрядную долю времени работы алгоритма.
В принципе, можно использовать не адаптивную, а постоянную статистическую модель, но этот вариант в общем случае представляется мне менее удобным, так как в этом варианте данные сжимаются за два прохода. На первом проходе нужно собрать статистику для построения модели, а на втором – собственно, сжать сами данные.
Конечно, если у нас уже есть готовая статистическая модель данных, то мы можем сразу приступить к сжатию и оно, естественно, будет быстрее. Но это не избавляет нас от второго недостатка
– необходимости в этом случае вместе со сжатым файлом хранить также и модель исходных данных, что, конечно, несколько снижает коэффициент сжатия. 

Но вернемся к особенностям использования пирамиды в арифметическом кодировании. Многие реализации хранят вероятности просто в массиве, точнее, часто даже в двух, примерно как на рисунке ниже для алфавита из 16 символов.

Зеленым шрифтом показаны обычные (текущие) частоты встреч символов в тексте, но для работы алгоритма нужны частоты с накоплением, они приведены ниже синим цветом.
Поэтому в принципе верхняя таблица не нужна, тем не менее ее часто оставляют, так как при ограниченной её разрядности можно сделать модель более адаптивной к локальной статистике. То есть если в разных участках большого массива информации вероятность встреч символом немного отличается, то такая реализация позволяет учесть эту особенность, обеспечивая чуть лучшее сжатие.
Но как по мне, если уж стремиться к максимальному сжатию, то лучше разбить большой массив информации на блоки, используя для каждого следующего блока начальную модель с предыдущего, редуцированную до минимальной точности.

При арифметическом сжатии нам нужно, во-первых, получить две частоты с накоплением: кодируемого символа и предыдущего, а во вторых, обновить модель, то есть увеличить, как минимум элементы второго массива на единицу. Если поддерживается два массива, то также увеличить на единицу частоту кодируемого символа.
Сложность первого процесса крайне мала и равна O(1), а вот со вторым дело обстоит немного хуже
там сложность O(n), где n размер алфавита.
Обычно в такой реализации процессы разнесены по времени: первый выполняется до кодирования символа, а второй
после. В принципе, их не так уж сложно объединить, но особой необходимости в этом нет, а реализация станет сложней и чуть-чуть медленее.

Теперь попробуем ускорить второй процесс (первый процесс с его O(1) ускорять уже некуда) с помощью пирамиды. Вот так она будет выглядеть для показанного выше примера.

Собственно, пирамида – это идеально сбалансированное дерево постоянной структуры, обычно частично упорядоченное. В нашем случае значение в каждом узле, кроме листовых, является суммой двух его сыновей. А листовые узлы хранят исходную таблицу частот (без накопления).
Постоянство структуры дает одно преимущество
пирамиду можно хранить без дополнительных расходов на описание структуры, что в двоичном дереве обычно реализуется с помощью двух указателей на сыновей. В пирамиде же ее просто можно сохранить в массиве, не тратя дополнительной памяти на указатели.
В этом примере голубые числа вверху слева от вершин
индексы в массиве. Переход от родителя (определяемого, например, индексом i) к сыновьям очень прост и эффективен:

  Left := i*2+1;
  Right := i*2+2;

В случае пирамиды операция поиска частоты с накоплением имеет сложность O(log₂n), такую же, как и изменение модели. Если выполнять оба процесса последовательно, то получим O(2log₂n), что лучше, чем при использовании обычного массива O(n+1).
Но в данном случае имеет смысл объединить оба процесса в один, и тогда общая сложность будет в два раза меньше:
O(log₂n).

В результате реализации этих процессов при сжатии данных получаем компактный и красивый код:

function TArithmeticCoder.GetLimit(Symbol: byte; var Prev: cardinal): cardinal;
var
  i : cardinal;
begin
  i := Symbol + 255;
  Prev := 0;
  Result := Limits[i];
  Limit255 := Limits[0];
  while i <> 0 do
  begin
    if (i and 1) = 0 then // правый узел
      Prev := Prev + Limits[i-1]; // добавляем левый узел
    inc(Limits[i]);
    i := (i-1) shr 1; // переходим на уровень вверх
  end;
  inc(Limits[i]);
  Result := Prev+Result;
end;

Функция возвращает частоту с накоплением переданного ей в параметре символа, а частоту предыдущего возвращает в переменной Prev.
Так как при кодировании код символа известен, то проще идти в обратном направлении, от листьев к корню.
Limit255
общее количество закодированных символов, кроме последнего. Необходимость этой переменной неоднозначна, в принципе в корне дерева (Limits[0]), лежит нужное нам значение, правда, в связи с тем, что мы при обходе дерева сразу же пересчитали модель, увеличенное на единицу. Так как к корню мы обращается на каждой итерации сжатия, то он гарантированно лежит в кэше, правда, из него надо вычесть единицу. В общем, не понятно.

Обратная функция, используемая в процессе декодирования:

function TArithmeticCoder.GetLimitIndex(Limit: cardinal; var Prev, Current : cardinal): byte;
var
  i : cardinal;
  Left, Right : cardinal;
begin
  i := 0;
  Limit255 := Limits[i];
  Prev := 0;
  while (i < 255) do
  begin
    Left := (i shl 1)+1;
    Right := (i shl 1)+2;
    inc(Limits[i]);
    if Limit < Prev+Limits[Left] then
      i := Left
    else
    begin
      Prev := Prev + Limits[Left];
      i := Right;
    end;
  end;
  Current := Prev + Limits[i];
  inc(Limits[i]);
  Result := i-255;
end;

Принимает в качестве параметра вероятность символа, возвращает код символа, а также вероятность этого и предыдущего символа в соответствии с моделью, ну и по ходу дела меняя саму модуль.

Тем не менее, этот код был написан последним, чисто для сравнения и не является рабочим. Дело в том, что я очень "жадный". И если посмотреть на объем памяти, занимаемый пирамидой, то видно что он почти в два раза больше, чем первоначальный массив.
Возможный механизм того, как можно сэкономить память, описал Иван, когда описывал свою идею в комментарии. Так как родитель и его два непосредственных потомка связаны простым соотношением

Root := Left + Right;

то очевидно, что необязательно хранить все три значения. Достаточно хранить либо правую, либо левую ветвь, особой разницы нет. Я выбрал левую, потому что для расчета частот с накоплением используется именно она. Но для обхода пирамиды все равно нужно рассчитывать значение правого потомка, так что подходы абсолютно одинаковы. Пример такого дерева приведен на рисунке ниже.

Здесь оранжевым цветом отмечены вершины, которые не будут хранится в результирующей пирамиде. Нумерация в массиве построена по тому же принципу, как и в полной пирамиде – по уровням.
Я несколько дней "ходил" вокруг этой схемы, но, к своему стыду, так и не смог придумать эффективного механизма её обхода.

А потом я поменял нумерацию вершин и вроде в голове сложился хоть какой-то приемлемый вариант, но, конечно, совсем не такой элегантный, как на полной пирамиде.

Теперь пирамида помещается ровно в ту же память, что и массив в исходном алгоритме с обычной таблицей и у меня есть вариант ее обхода:

function TArithmeticCoder.GetLimit(Symbol: byte; var Prev: cardinal): cardinal;
var
  i, Base, Size : integer;
  Root, Left, Right : cardinal;
begin
  i := 0; // индекс корня
  Root := Limits[i];
  Limit255 := Root;
  inc(Limits[0]);
  Base := 128;
  Size := 128;
  Prev := 0;
  while (Size <> 0) do
  begin
    Left := Limits[i+1];
    Right := Root - Left;
    if Symbol < Base then // идем влево
    begin
      inc(i);
      Size := Size shr 1;
      Root := Left;
      inc(Limits[i]);
      Base := Base - Size;
    end
    else // идем вправо
    begin
      i := i + Size;
      Size := Size shr 1;
      Prev := Prev + Left;
      Root := Right;
      Base := Base + Size;
    end;
  end;
  if Symbol < Base then
    Result := Prev + Left
  else
    Result := Prev + Right;
end;

И вариант для усеченной пирамиды, используемый в процессе декодирования:

function TArithmeticCoder.GetLimitIndex(Limit: cardinal; var Prev, Current : cardinal): byte;
var
  i, Base, Size : integer;
  Root, Left, Right : cardinal;
  LM : cardinal;
begin
  i := 0; // индекс корня
  Root := Limits[i];
  Limit255 := Root;
  inc(Limits[0]);
  Base := 128;
  Size := 128;
  Prev := 0;
  while {(Level <> 256)} (Size <> 0) do
  begin
    Left := Limits[i+1];
    Right := Root - Left;
    if Limit < Prev + Left then // идем влево
    begin
      inc(i);
      Size := Size shr 1;
      Root := Left;
      inc(Limits[i]);
      Base := Base - Size;
      LM := 1;
    end
    else // идем вправо
    begin
      i := i + Size;
      Size := Size shr 1;
      Prev := Prev + Left;
      Root := Right;
      Base := Base + Size;
      LM := 0;
    end;
  end;
  Result := Base - LM;
  if LM = 1 then
    Current := Prev + Left
  else
    Current := Prev + Right;
end;

Как видно из кода, функции сильно разбухли и стали менее понятны. Тем не менее, на x64 они работают немного быстрее, чем вариант с полной пирамидой.
Этому помогает, во-первых, меньший размер массива, а значит, и меньшее количество промахов в кэше L1. Ну а во-вторых, большое количество регистров в этой архитектуре, что позволяет все локальные переменный держать в регистрах. Не уверен, что такой код будет более быстр на x86 с её куцыми 8 регистрами общего назначения.

Стоит ли овчинка выделки? В смысле, надо ли использовать более сложный код, если выигрыш не так велик? Я думаю что да, ведь основная цель реализация варианта с 16-битным алфавитом, в котором размер пирамиды увеличится в 256 раз, что существенно  увеличит вероятность промахов в кэше. Но вариант с 16-битном алфавите на полной пирамиде я не тестировал, так что привести численное сравнение не могу.

12 комментариев:

  1. Спасибо за код и наглядные картинки :)))

    > Я несколько дней "ходил" вокруг этой схемы...
    Мне кажется поможет отделить 0-й элемент от остальной пирамиды, 0-й элемент отдельно, а начиная с 1-го - сама пирамида левых элементов, попробую описать как трансформировать код шаг за шагом:

    1. Для начала преобразуем: inc(i); ... inc(Limits[i]); в inc(Limits[i + 1]); inc(i);
    2. Теперь можно заметить что мы всегда читаем i + 1 элемент (или по другому левый). Переименуем i в leftIdx, инициируем его 1 вместо 0, ну и уберем +1 при чтении
    3. Теперь должно быть очень хорошо видно, что 0-й элемент отдельно, а дальше пирамида левых элементов.
    4. Осталось изменить навигацию для послойного расположения: лево: leftIdx = leftIdx * 2, право: leftIdx = leftIdx * 2 + 1

    Интересно было бы сравнить что быстрее :) по идеи послойная должна быть чуть более оптимальна к кэшу, но будет ли это заметно на практике?

    Еще одно возможное микро улучшение: условие после цикла, вроде можно заменить на result = prev + root, тогда right нужно будет расчитывать только при правом переходе.

    ОтветитьУдалить
    Ответы
    1. > по идеи послойная должна быть чуть более оптимальна к кэшу

      Хотя... похоже что не прав: Ваш порядок должен быть даже более дружественным к кэшу, так как поддеревья располагаются более компактно, без вкраплений элементов от других не посещенных веток. Непоследовательное чтение будет только при правом переходе, тогда как при послойном: и при правом и левом.

      Удалить
    2. > 4. Осталось изменить навигацию для послойного расположения: лево: leftIdx = leftIdx * 2, право: leftIdx = leftIdx * 2 + 1
      Я тоже почти к этому подходил, затыкался на обходе нескольких правых вершин подряд. Видимо, надо подумать, может, можно с таким подходом более эффективный код написать.

      > Еще одно возможное микро улучшение: условие после цикла, вроде можно заменить на result = prev + root,
      > тогда right нужно будет расчитывать только при правом переходе.
      О, спасибо за подсказку. Мне кажется, не такое уж оно и микро. Уж точно код функции стал компактнее, что уже большой плюс. Но вот компилятор Delphi разницы в производительности не дал. Думаю, что это особенность его оптимизатора. На ассемблере получилось заметно быстрее.

      Удалить
    3. У меня вроде получилось для прямого прохода более-менее компактно снизу-вверх реализовать  для послойного расположения, правда все равно достаточно объемно, на сколько корректно не проверял :)))

      prev = 0; result = 0; i = symbol / 2 + 128;
      while (i > 0) {
      if (symbol % 2 = 0) {
      prev += limits[i];
      } else {
      if (result == 0) { result = limits[i] - prev; }
      limits[i]++;
      }
      i /= 2; symbol /= 2;
      }
      if (result == 0) {
      result = limit[0];
      } else {
      result += prev;
      }
      limits[0]++;

      Удалить
    4. А для обратного по моему мало что можно улучшить, как вариант: при послойном расположении можно избавиться от Size, а вместо Base сразу считать Result:

      result = 0; root = limits[0]; prev = 0; i = 1;
      limits[0]++;
      while (i < 128) {
      left = limits[i];
      if (limit < prev + left) {
      limits[i]++;
      root = left;
      result *= 2;
      i *= 2;
      } else {
      prev += left;
      root -= left;
      result = result * 2 + 1;
      i = i * 2 + 1;
      }
      }
      current = prev + root;

      Но будет ли это быстрее я не знаю :)

      Удалить
    5. Круто, Иван! Ты прям код накидал, спасибо. Обязательно его потестю, как руки дойдут.

      Удалить
  2. Ох... не удержался и тоже реализовал на Java :)

    По простому без особых ухищрений, ориентируясь на чистоту кода, а не на скорость :) Пришлось немного помучиться над определением окончания потока, так чтобы работало для любых символов и когда размер файла не кратен размеру алфавита, ну и деление пирамиды на 2 реализовать, чтобы гигабайтные файлы поддерживались.

    Тестировал на текстовом 100МБ файле, распакованный enwik8 взятый здесь: http://mattmahoney.net/dc/textdata.html

    Получившаяся степень сжатия в зависимости от размера алфавита:
    - 8-bit: 1.57
    - 16-bit: 1.78
    - 24-bit: 1.79

    Скорость считал как [оригинальный размер файла]/[время], сжатие/распаковка в MB/s:
    - Simple Array (массив частот с накоплением, простые циклы без двоичного поиска), 8-bit: 9.1 / 7.7
    - Opt Array (двоичный поиск, ленивый подсчет), 8-bit: 9.6 / 7.3
    - Tree (пирамида), 8-bit: 8.3 / 8.5
    - Tree, 16-bit: 9.8 / 10.2
    - Tree, 24-bit: 8.6 / 8.6

    Также протестировал на гигабайтном enwik9 взятом там же, скорости и степени сжатия чуть чуть получше чем для enwik8, но без сюрпризов.

    Ну и на 100МБ случайных байтов:

    Степень "сжатия" :)))
    - 8-bit: 0.9964
    - 16-bit: 0.9996
    - 24-bit: 0.9939

    Скорости:
    - Simple Array, 8-bit: 6.6 / 5.3
    - Opt Array, 8-bit: 6.9 / 6.0
    - Tree, 8-bit: 5.8 / 5.2
    - Tree, 16-bit: 6.2 / 6.0
    - Tree, 24-bit: 4.8 / 4.3

    Интересно что на 8-битах: самый простой код ненамного медленнее, а порой даже быстрее, думаю компилятор использует SIMD инструкции, плюс код более дружественный для конвейера. На 16-ти битах конечно уже без вариантов, даже приводить цифры нет смысла :)

    Еще нашел пару багов в коде что приводил в предыдущих комментариях:
    - при прямом проходе условие должно быть: symbol % 2 == 1
    - при обратном: условие цикла должно быть: i < 256

    ОтветитьУдалить
    Ответы
    1. > Пришлось немного помучиться над определением окончания потока
      Хм... Ну классически они там просто в алфавит лишний символ добавляют вроде. Но я еще очень давно посмотрел-посмотрел на это дело и решил вопрос просто, по деревенски: я никак не храню признак конца файла вообще. Просто в заголовок сжатого пакета добавляю несжатую длину исходных данных. Так как символ окончания потока встречается всегда ровно один раз в данных, то он кодируется наибольшим количеством бит и занимает много памяти. Если вместо этого хранить длину исходных данных, то ухудшишь размер файла от 0 до максимум трех байтов (на данных до 4 ГиБ).
      Хотя для коротких последовательностей, в несколько сотен байт или несколько десятков байт это может быть критично.

      >...когда размер файла не кратен размеру алфавита...
      Вот здесь вообще не понял. У меня вроде никаких вовсе проблем нет при произвольном размере как файла, так и алфавита.

      Скорость работы Java поражает. Очень быстрый код получился! У меня чисто на Delphi, с пирамидой получается в 1.5 раза медленнее на 8-битном алфавите. На 16-битном чуть лучше. И даже на реализация на ассемблере до самого быстрого варианта на Java не дотягивает.
      Впрочем, у нас, конечно, разные процессоры.

      > при прямом проходе условие должно быть: symbol % 2 == 1
      Это я увидел, но пока сильно не углублялся, с ходу что-то твой код не пошел. Может, где-то при переводе на Pascal накосячил.

      Удалить
    2. > в алфавит лишний символ добавляют
      Ага, но лишний символ ломает пирамиду, так как число символов перестает быть степенью двойки, зная что частота его всегда единица, можно наверное его как-то отдельно обрабатывать не храня в пирамиде, но как-то это муторно при обратном проходе, там и так код запутанный. Я пошел по другому пути: конец потока определяется по последнему существующему символу, а если этот символ встретился в потоке, то экранирую его им самим (фактически дублирую). Это конечно раздувает выходной поток, но зато более-менее просто реализовать.

      > добавляю несжатую длину исходных данных
      Да это просто :), но в Java есть стандартный API для работы с потоком байт (InputStream и OutputStream), который хорошо стыкуется с другими классами. Как думаю понятно из названия - это поток байтов с явным закрытием потока в конце, и наперед его размер неизвестен .

      > Вот здесь вообще не понял.
      У Вас такой проблемы нет, потому что Вы сохраняете оригинальное число байт, а я определяю окончание по терминальному символу, и если последний символ неполный (скажем из 1 байта для 16-bit), то при распаковке мне нужно его превратить в оригинальное число байт (1 байт для 16-bit). На самом деле тут все просто: так как символ неполный, один из его байтов я использую для хранения числа реальных байтов в нем.

      > Очень быстрый код получился
      Спасибо, я еще чуток улучшил: изначально я делал по статье что приводил где-то в предыдущих комментариях, но потом нашел другую статью, где ее маленько раскритиковали за лишние деления :) В итоге теперь у меня при сжатии 1 деление, а при распаковке 2, а было 2 и 3. Это, плюс запуск на более современной JVM, еще немного ускорило код, новые цифры для пирамиды (сжатие / распаковка в MB/s:
      - 8-bit: 10.7 / 9.7
      - 16-bit 12.4 / 11.8
      - 24-bit 11.7 / 10.8

      А не смотрели где узкое место, у меня если верить профайлеру:

      сжатие:
      - модель: 37%
      - расчет диапазона, выделение общего префикса битов: 26%
      - агрегация бит: 20%
      - запись байт: 9%
      - остальное: 8%

      распаковка:
      - модель: 36%
      - расчет диапазона, выделение общего префикса битов: 29%
      - распаковка бит: 12%
      - чтение байт: 10%
      - остальное: 13%

      Удалить
    3. > конец потока определяется по последнему существующему символу
      Как-то непонятно написал, а комментарий нельзя редактировать :)

      В общем я использую последний символ алфавита в качестве терминального (назовем его EOF), соответственно:
      - EOF в середине потока дублируется
      - После кодирования последнего полного символа, кодируем EOF, а далее кодируем 0 - если неполных символ нет или неполный символ (с числом занятых байтов внутри).

      Удалить
    4. > В итоге теперь у меня при сжатии 1 деление, а при распаковке 2, а было 2 и 3.
      Ох ты, как интересно. Скинь ссылочку, если не трудно, любопытно посмотреть.

      > А не смотрели где узкое место, у меня если верить профайлеру:
      Нет, не смотрел. Надо, кстати, попробовать. Когда-то давно я попытался профайлером Delphi воспользоваться, он такую лажу выдал, что я больше никогда и не пытался. )

      Кстати, сейчас поразмыслил... Очень интересные результаты получились с простым массивом на 8-битном алфавите за счет SIMD. Возможно, для короткого алфавита он самый лучший будет. По пирамиде обход всегда занимает 8 итераций, каждая итерация достаточно сложна. А при использовании SIMD, оценивая грубо верхнюю границу, потребуется в среднем 128 сложений, что на 256-битных регистрах потребует 16 простых операций. Видимо, стоит попробовать такой вариант для 8-битного алфавита.

      Удалить
    5. > Скинь ссылочку, если не трудно
      https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.57.8885&rep=rep1&type=pdf (ищите "Alternative calculation"), но если в 2-х словах, то можно считать как:

      range = high - low + 1
      low = low + (p.low*range ) / p.total
      high = low + (p.high*range ) / p.total -1

      или:
      step = (high - low + 1) / p.total
      high = low + step * p.high - 1
      low = low + step * p.low

      Во 2-м варианте:
      - меньше делений
      - нет произведения больших чисел, и модель и диапазон можно оставить в 32 битах (но мне это не критично)
      - но недостаток что часть диапазона не используется, так как step * p.total будет меньше high, что в теории может привести к меньшему сжатию, но на практике вроде не особо влияет.

      Я тут очередной раз открыл для себя, на сколько процессор не любит условия с непонятным переходом. Для анализа общего префикса битов у меня был примерно следующий код
      while(true) {
      if (старшие биты 0) ...
      else if (старшие биты 1) ...
      else if (underflow) ...
      else break
      }

      задействовал битовую магию и переписал на без условий внутри циклов:
      while (старшие биты одинаковы)...
      while (underflow)...

      Плюс еще при распаковке одно условие выкинул.

      и скорость стала:
      - simple array, 8-bit: 12.9 / 9.9
      - opt array, 8-bit: 11.0 / 10.3
      - tree, 8-bit: 12.1 / 11.8
      - tree, 16-bit: 15.2 / 16.0
      - tree, 24-bit: 13.3 / 14.1

      Но вообще, как по мне, это все еще какие-то детские цифры, надо бы изучить что-нибудь типа https://faculty.cc.gatech.edu/~jarek/courses/7491/Arithmetic2.pdf там все как-то не очевидно :)))

      > он такую лажу выдал
      В мире Java, современный профайлер более-менее адекватно работает если он семплирующий, а не инструментирующий:
      - инструментирующий: условно вставляет в код измерилки времени, которые понятное дело вносят свою задержку, как результат часто получается бред особенно для простого кода выполняющегося много раз.
      - семплирующий: очень часто (скажем каждые 10 мк. сек) смотрит что именно программа выполняет (Java позволяет это с минимальным влиянием), далее статистика агрегируется. Такой подход не позволяет получить сильно много деталей, но зато более-менее точно находит горячие места.

      > Возможно, для короткого алфавита он самый лучший будет
      Ага, согласен.

      Удалить