Сортировка чёт-нечет

Предположим, что нам захотелось бы ускорить сортировку пузырьком, запустив в сортируемый массив данных одновременно несколько вычислителей, каждый из которых сравнивал бы свою пару чисел, переставлял при необходжимости и сдвигался к следующей паре. Длительность элементарной операции зависит от того, нужно переставлять данные или нет. Получается, что вычислители будут передвигаться по массиву с разной и непостоянной скоростью. В моменты, пересечений на одних данных, будут возникать коллизии. Чтобы избежать этого, каждый вычислитель должен постоянно контролировать положение всех других вычислителей. С ростом числа вычислителей, временные затраты на сортировку снижаются линейно, а затраты на синхронизацию между вычислителями растут квадратично.
Где-то тут вся эта история перестаёт выглядеть разумной... Решение возникающих проблем становится слишком затратным.

Вместо этого, на первом проходе сортировки, будем сравнивать только элементы, стоящие на чётных позициях, с тем, что стоит справа. Нулевой с первым, второй с третьим и т.д. Весь массив разбивается на непересекающиеся пары, каждую из которых может обработать свой вычислитель, независимо от других. 

На втором этапе аналогично происходит сравнение только элементов в нечётных позициях. 

Всё остальное неизменно, шаги повторяются, пока перестановки не прекратятся.

В реальной жизни редко бывает под рукой так много свободных процессорных ядер(или так мало данных в сортируемом массиве). Предоставим каждому вычислителю диапазон из нескольких пар, идущих подряд. Наших построений это не нарушит, диапазоны вычислителей по прежнему никогда не будут пересекаться.

Для начала, создадим класс одиночного вычислителя для работы в отдельном потоке:

unit uOddEvenSort;

interface

uses
     uTypes
    ,System.Classes
    ,System.SyncObjs
    ;

type
     /// <summary>
     ///   Класс-сортировщик чёт-нечет для работы в отдельном потоке.
     /// </summary>
     TSorter = class(TThread)
               protected
                 /// <summary>
                 ///   Ссылка на сортируемый масив.
                 /// </summary>
                 FA : TAI;
                 /// <summary>
                 ///   Событие-разрешение начать проход сортировки.
                 /// </summary>
                 FGoEvent      : TEvent;
                 /// <summary>
                 ///   Событие окончания прохода сортировки.
                 /// </summary>
                 FFinishedEvent : TEvent;
                 /// <summary>
                 ///   Флаг окончания работы.
                 /// </summary>
                 FQuit     : boolean;
                 /// <summary>
                 ///   Начальная позиция диапазона сортировки для этого
                 ///   сортировщика.
                 /// </summary>
                 FStart    : integer;
                 /// <summary>
                 ///   Размер диапазона сортировки в парах чисел.
                 /// </summary>
                 /// <remarks>
                 ///   Поле хранит индекс последней пары. Нумерация пар
                 ///   начинается с 0. Т.е., если FRange равен нулю, значит
                 ///   диапазон размером в одну пару чисел.
                 /// </remarks>
                 FRange    : integer;
                 /// <summary>
                 ///   Максимальный индекс в сортируемом диапазоне.
                 /// </summary>
                 FMaxIndex : integer;
                 /// <summary>
                 ///   Признак произошедших перестановок во время последнего
                 ///   прохода сортировки.
                 /// </summary>
                 FChange  : Boolean;

                 /// <summary>
                 ///   Обмен значений между двумя ячейками в сортируемом
                 ///   массиве.
                 /// </summary>
                 procedure swap(i,j : integer);

                 procedure Execute;override;
               public
                 /// <summary>
                 ///   Указать рабочий диапазон для этого сортировщика.
                 /// </summary>
                 /// <param name="aStart">
                 ///   Индек4с начальной позиции диапазона.
                 /// </param>
                 /// <param name="aRange">
                 ///   Номер последней пары в рабочем диапазоне
                 /// </param>
                 /// <remarks>
                 ///   Если aRange = 0, значит диапазон сортировки равен двум
                 ///   значениям, aStart и aStart+1. Если aRange = 1, значит
                 ///   диапазон сортировки: aStart .. aStart+3, и т.д.
                 /// </remarks>
                 procedure SetRange(aStart, aRange: integer);
                 /// <summary>
                 ///   Команда выполнить один проход сортировки. Вызывается
                 ///   внешним потоком для управления сортировкой.
                 /// </summary>
                 procedure DoIt;
                 /// <summary>
                 ///   Команда завершить работу. Вызывается внешним процессом,
                 ///   когда сортировка закончена.
                 /// </summary>
                 procedure Quit;
                 /// <summary>
                 ///   Выдаёт наружу ссылку на событие окончания прохода
                 ///   сортировки, чтобы вызывающая сторона могла дожидаться.
                 /// </summary>
                 function Ready:TEvent;
                 /// <summary>
                 ///   Возвращает истину, если на последнем проходе сортировки
                 ///   были перестановки данных.
                 /// </summary>
                 function WasChanged: boolean;
                 /// <summary>
                 ///   Сдвиг рабочего диапазона вправо. Служит для перехода на
                 ///   нечётную сортировку. Используется в паре со StepLeft.
                 /// </summary>
                 /// <remarks>
                 ///   Класс не контролирует корректность последовательностей
                 ///   вызовов StepRight и StepLeft. Ответственность на
                 ///   вызывающей стороне.
                 /// </remarks>
                 procedure StepRight;
                 /// <summary>
                 ///   Сдвиг рабочего диапазона влево. Служит для возврата на
                 ///   чётную сортировку. Используется после StepRight.
                 /// </summary>
                 /// <remarks>
                 ///   Класс не контролирует корректность последовательностей
                 ///   вызовов StepRight и StepLeft. Ответственность на
                 ///   вызывающей стороне.
                 /// </remarks>
                 procedure StepLeft;


                 /// <summary>
                 ///   Конструктор сортировщика.
                 /// </summary>
                 /// <param name="A">
                 ///   Ссылка на входной массив сортировки.
                 /// </param>
                 constructor Create(A: TAI); overload;
                 destructor Destroy; override;
               end;

implementation

constructor TSorter.Create(A: TAI);
begin
  FA := A; // Запоминаем сортируемый массив
  FMaxIndex := High(FA); // Запоминаем максимальный индекс элемента
  // Создаём события синхронизации.
  FGoEvent       := TEvent.Create(nil, true, false, '');
  FFinishedEvent := TEvent.Create(nil, true, false, '');
  Create; // Предыдущий колнструктор нити.
  FreeOnTerminate := true;
end;

destructor TSorter.Destroy;
begin
  // Не забудем освободить память из под событий.
  FGoEvent.Free;
  FFinishedEvent.Free;
  inherited;
end;

procedure TSorter.DoIt;
begin
  FFinishedEvent.ResetEvent;
  FGoEvent.SetEvent;
end;

procedure TSorter.Execute;
var
    i: integer;
begin
  // Каждый проход сортировки начинается только
  //  когда активировано событие FGoEvent.
  while FGoEvent.WaitFor(infinite)=wrSignaled do
  begin
    if FQuit then Break;
    FChange := false;
    // Собственно, сама сортировка.
    for i := 0 to FRange do
      if (FStart+i*2<FMaxIndex) and (FA[FStart+i*2] > FA[FStart+i*2 +1])
        then swap(FStart+i*2, FStart+i*2+1);

    FGoEvent.ResetEvent;     // Сбрасываем использованное разрешение
    FFinishedEvent.SetEvent;
  end;
end;

procedure TSorter.Quit;
begin
  FQuit := true;
  FGoEvent.SetEvent;
end;

function TSorter.Ready: TEvent;
begin
  result := FFinishedEvent;
end;

procedure TSorter.SetRange(aStart, aRange: integer);
begin
  FStart := aStart;
  FRange := aRange;
end;

procedure TSorter.StepLeft;
begin
  dec(FStart);
end;

procedure TSorter.StepRight;
begin
  inc(FStart);
end;

procedure TSorter.swap(i, j: integer);
var
    m: integer;
begin
  m := FA[i];
  FA[i] := FA[j];
  FA[j] := m;

  FChange := true;
end;

function TSorter.WasChanged: boolean;
begin
  result := FChange;
end;

end.

Тогда функция сортировки может выглядеть так:

type
  TAI = array of integer;
...

procedure OddEvenSort(var A: TAI; N: integer = -1);

var
    range  : integer;
    Sorter : array of TSorter;
    i      : Integer;
    sorted : boolean;
begin
  // N - Число сортировщиков.
  // Если оно не пришло в параметрах, оставим одно ядро системе,
  //  остальные используем для сортировки.
  if N<0 then N := TThread.ProcessorCount - 1;
  if N>Length(A) div 2 then N := Length(A) div 2;
  if N<1 then Exit;


  // Определяем размер блока для сортировщика в парах чисел
  range := Length(A) div 2 div N;
  // Создаём сортировщиков
  SetLength(Sorter, N);
  for i := Low(Sorter) to High(Sorter)-1 do
  begin
    Sorter[i] := TSorter.Create(A);
    Sorter[i].SetRange(i*range*2, range -1);
  end;
  // Последнего создаём отдельно, потому что его диапазон
  // отличается, если число пар не кратно числу вычислителей.
  Sorter[High(Sorter)] := TSorter.Create(A);
  Sorter[High(Sorter)].SetRange(High(Sorter)*range*2, (Length(A)-High(Sorter)*range*2) div 2);

  repeat // Цикл сортировки
    sorted := true;
    // Запускаем всех сортировщиков
    for i := Low(Sorter) to High(Sorter) do
      Sorter[i].DoIt;
    // Дожидаемся пока они закончат.
    for i := Low(Sorter) to High(Sorter) do
      Sorter[i].Ready.WaitFor(infinite);
    // Опрашиваем, делал ли кто-то перестановки.  
    for i := Low(Sorter) to High(Sorter) do
      sorted := sorted and (not(Sorter[i].WasChanged));
    // Переводим всех в режим сравнения нечётных элементов
    for i := Low(Sorter) to High(Sorter) do
      Sorter[i].StepRight;
    // Повторяем запуск - ожидание - опрос. 
    for i := Low(Sorter) to High(Sorter) do
      Sorter[i].DoIt;
    for i := Low(Sorter) to High(Sorter) do
      Sorter[i].Ready.WaitFor(infinite);
    for i := Low(Sorter) to High(Sorter) do
      sorted := sorted and (not(Sorter[i].WasChanged));
    // Возвращаем сортировщики к чётным элементам.
    for i := Low(Sorter) to High(Sorter) do
      Sorter[i].StepLeft;
  until sorted; // Крутимся, пока массив не отсортируется.
  // Разошлём сортировщикам команду завершиться.
  for i := Low(Sorter) to High(Sorter) do
    Sorter[i].Quit;
end;

Для проверки работоспособности, посмотрим как зависит время сортировки от количества сортировщиков.
Входные данные - 100 000 случайных целых чисел. Время в секундах:

Как видим, в тестовой системе было три свободных ядра... 
Для 4-х ядерной системы с Hyper-threading результат вполне предсказуем.
Первые три сортировщика разошлись по физическим ядрам и время обработки стремительно уменьшалось. 
4-5-6-й сортировщики выбрали остаток вычислительной мощности Hyper-threading-ом.
Дальнейший рост числа исполнителей эффекта не даёт, потому что они просто стоят в очереди.