В предыдущем посте я описал подход, позволяющий существенно сократить количество вызовов к azure storage, который может сэкономить много денег. Но тем не менее ваши воркеры продолжают поедать ваши деньги.

А нужны ли вообще воркеры?

Оказывается не нужны. Если у вас небольшое приложение и вы используете очереди для надежной (reliable) асинхронной обработки, причем сама обработка не требует больших вычислительных затрат, то вам и не нужны воркеры. Можете использовать пару методов ToObserver\ToObservable из предыдущего поста, а для оповещений обычный Subject<Unit>.

Отказываться  от очередей в данном случае не надо, так как при масштабировании работу подхватят воркеры.

Scale Down

Как вы уже могли догадаться возможность масштабировать “вниз” в облаке не менее важна, чем масштабирование “вверх”. С учетом всех ранее перечисленных подходов можно любое приложение развернуть на одном Extra Small Instance в Windows Azure за $30 и тысячей транзакций хранилища (меньше $0.01) в месяц, если к нему будет мало обращений. Это уже сопоставимо с ценой shared-хостинга.

На этом история scale down заканчивается и начинается история…

Scale Up\Out

Сразу же рекомендую посмотреть на Autoscale Application Block (кодовое имя WASABi) из комплекта Enterprise Library. Ссылка на Enterprise Library 5.0 Windows Azure Integration Pack. Этот модуль позволяет задавать правила в соответствии с которыми будет изменяться количество экземпляров ролей в вашем приложении.

Но количество ролей позволяет выдерживать вычислительную нагрузку, хотя в большинстве веб-приложений хранилище становится узким местом.

К сожалению Windows Azure тут не исключение. В блоге Windows Azure Storage описаны scalability targets. Вы можете обнаружить очень интересные сведения о том что максимальное количество сообщений очереди, обрабатываемых в секунду – 500 (по другим сведениям это количество транзакций в секунду). Это очень-очень  мало. И надо не забывать что это предельное значение, на практике его достигнуть будет непросто. Кроме того латентность очереди может достигать 100ms.

Первое что необходимо чтобы избежать высокой латентности на маленьких сообщениях в очереди - установить ServicePointManager.UseNagleAlgorithm значение false.

Следующая проблема – максимальный размер сообщения в очереди – 8KB, так как для передачи используется Base64 кодировка, то реально данных можно передать около 6KB, кстати строки по-умолчанию не кодируются. Добрые люди уже придумали как решать такую проблему: http://msdn.microsoft.com/en-us/library/windowsazure/hh690942(v=VS.103).aspx

Масштабирование воркеров

Как вы думаете что будет если взять “наивную” реализацию воркера, как в первом посте и запустить на Extra Large Instance, насколько быстрее будет работать?

На самом деле вообще не будет быстрее. С этой точки зрения большое количество маленьких воркеров лучше чем один большой. Хотя тоже не лучший вариант по словам представителей Microsoft. С другой стороны куча маленьких воркеров будут пинать Azure Storage гораздо чаще, что несомненно отразится на ценнике. Того же можно добиться если запустить вручную несколько потоков с наивным циклом в воркере, развернутом на Medium instance или более крутой машине.

Чтобы этого избежать надо использовать метод CloudQueue.GetMessages. Пример ниже показывает кусок кода для итератора, который потом обрабатывается Rx.

while (true)
{
    var msgsObs = getMessages(32).ToListObservable();
    yield return msgsObs;
    var msgs = msgsObs[0];

    var hasMessages = false;
    foreach (var msg in msgs)
    {
        hasMessages = true;
        idleCount = 0;

        result.OnNext(msg);
    }

    if (!hasMessages)
    {
        var delay = CalulateDelay(idleCount++, MinimumIdleIntervalMs, MaximumIdleIntervalMs, 100);
        if (delay.TotalMilliseconds >= MaximumIdleIntervalMs)
        {
            yield break;
        }

        yield return Observable.Timer(delay).ToListObservable();
    }
}

Обратите внимание что вызов OnNext должен быть упорядочен, чтобы не возникало Race Condition. Это требование указано в Rx Design Guidelines, и если вы его не читали, то крайне рекомендую это сделать.

Кроме того удаление сообщения из очереди в таком коде возлагается на внешний код.

Пример:

from m in queue.ToObservable(notifications)
from _1 in Observable.Start(() => /*work*/, Scheduler.TaskPool)
from _2 in queue.DeleteMessageAsync(m)
select Unit.Default;

Само это выражение не приводит ни к какому эффекту. Для него надо выполнить Subscribe чтобы запустить вычисления. Тогда будет использоваться TaskPool, который довольно эффективно распределяет вычисления по процессорам. Если вычисления длительные (более 10ms - 100ms), то лучше использовать Scheduler.NewThread. Если же у вас IO-bound код, то лучше будет использовать Scheduler.ThreadPool.

Подходы, описанные выше помогут выжать максимум из очереди Windows Azure, оптимально расходуя ресурсы виртуальных машин при этом. Но что делать когда код упрется в ограничение количества сообщений в секунду. Ни добавление воркеров, ни увеличение толщины инстансов не поможет.  В таком случае может помочь секционирование.

Вместо одной очереди вы создаете N очередей. При добавлении сообщения в очередь выбираете случайную. Считываете сразу из всех. Надо как-то разбираться из какой очереди пришло сообщение. Реализация такого нетривиальна и уже есть первый подобный проект на codeplex: http://partitioncloudqueue.codeplex.com/.

Но Rx как всегда рулит и с его помощью очень просто сделать такой partitioning.

На клиенте:

List<CloudQueue> queues = /*...*/;
var observers = queues.Select(q => q.ToObserver(/*notifier*/))
                      .ToList();

var rnd = new Random();
var partitionedObserver = Observer.Create<CloudQueueMessage>(
        m => observers[rnd.Next(observers.Count)].OnNext(m),
        e => observers.ForEach(obs => obs.OnError(e)),
        () => observers.ForEach(obs => obs.OnCompleted())
    );
partitionedObserver.OnNext(new CloudQueueMessage(/*message*/));

На сервере:

IObservable<Unit> ProcessMessages(CloudQueue queue, /*notifier*/, /*scheduler*/)
{
    return from m in queue.ToObservable(/*notifier*/)
           from _1 in Observable.Start(/*action*/, /*scheduler*/)
           from _2 in queue.DeleteMessageAsync(m)
           select Unit.Default;
}

/*.....*/

List<CloudQueue> queues = /*...*/;
queues.Select(q => ProcessMessages(q,/*notifier*/, /*scheduler*/))
      .Merge()
      .Subscribe();

Другой подход, позволяющий решить проблему ограничения на количество сообщений – пакетная передача. Вместо создания множества очередей, вы записываете множество сообщений в один пакет и предаете его. Для этих целей можно использовать CloudBlockBlob. Можно отдельными блоками загружать отдельные сообщения, а потом получить список блоков из блоба. В сообщении при этом передавать только url блоба.

Заключение

Все описанные выше способы помогут вам более эффективно реализовывать процессинг в Windows Azure. Для тех кто дочитал до сюда – сюрприз. Весь код с примерами использования есть на codeplex, а также библиотека для работы с очередями доступна в NuGet.

Теги : Reactive Extensions, Azure