Для тех кто не в курсе: Windows Azure – “облачная” платформа Microsoft. Создавая приложения, работающие “в облаке”, у вас есть возможность разделять систему на “роли”. Бывают веб-роли, которые представляют из себя обычные веб-приложения,  бывают также worker-роли (далее воркеры), предназначенные для вычислений.

Для увеличения масштабируемости приложения используется очереди. Сообщения в очередях обрабатываются воркерами, а ставят сообщения чаще всего веб-роли или другие воркеры. Таким образом можно разбить какие-либо длительные операции на небольшие и обрабатывать их асинхронно на любом количестве узлов, так как очереди в Windows Azure специально проектировали для сценария множественных потребителей.

Типовой код для воркера Windows Azure на C# такой:

while (true)
{
    var msg = queue.GetMessage();
    if (msg != null)
    {
        //do some work
        queue.DeleteMessage(msg);
    }
    else
    {
        Thread.Sleep(10000);
    }

    Trace.WriteLine("Working", "Information");
}

Как вы думаете сколько стоит этот воркер. В смысле реальных денег потребляемых таким приложением, развернутым на Windows Azure.

Для этого надо посмотреть цены: https://www.windowsazure.com/en-us/pricing/details/.

Если задеплоить такую роль в одном small экземпляре, то получится $2,88 в день/$86,4 в месяц/~2600 рублей в месяц. Так? А вот и нет…

Есть еще “скрытая” стоимость такой архитектуры, заключается она в том что транзакции к хранилищу тоже оплачиваются https://www.windowsazure.com/en-us/pricing/details/#storage. Всего  $0.01 за 10,000 транзакций. Каждая транзакция – это один запрос к azure storage.

Код выше выполняет один запрос каждые 10 секунд даже если нету никаких сообщений в очереди.
Стоимость такого кода получается 60*60*24*30/(10 * 1000) = $25,92 в месяц. вместе со стоимостью compute hours это выходит $112,32 в месяц. И это даже если код не выполняет никакой работы!

Кроме того SLA гарантирует работоспособность роли 99,95% только при наличии минимум двух инстансов, так что для устойчивости надо еще умножить цену на 2. Итого $250 в месяц.

Вывод

Архитектура, которую предлагает Microsoft для масштабирования довольно дорого стоит. Используйте код из примеров очень осторожно, он может увести ваш проект в большой минус.

Что делать?

Вариант первый – использовать service bus, в нем тоже есть очереди, но API позволяет в одной транзакции ожидать сообщения, а не сразу null возвращать при его отсутствии.

Вариант второй – использовать адаптивную подстройку интервала опроса очереди и выключать опрос в случае отсутствия сообщений.

Второй вариант кажется хорошей идеей так как позволяет масштабировать подход как “вниз”, так и “вверх”. Но тут возникает вопрос, а если мы прекратим прием сообщений, то как его потом возобновить? Видимо надо передать сообщение… Приходим снова к той же проблеме.

Но сигнал к “пробуждению” читателя сообщений можно передавать по более дешевому каналу, например через wcf internal endpoint.

Реализация

Чтобы абстрагироваться от всех деталей с сообщениями, таймаутами и каналами удобно использовать библиотеку Rx. Я использую Experimental версию так как в ней собрано много нужных комбинаторов.

Для начала надо вписать код в концепцию Rx. Длительные операции, вроде вызовов методов Cloud Storage и тайматуов сделать в виде IObservable.

public static IObservable<CloudQueueMessage> ObserveMessages(this CloudQueue queue)
{
    return Observable.Create<CloudQueueMessage>(obs => Iterator(obs, queue));
}

private static IEnumerable<IObservable<object>> Iterator(
                                                    IObserver<CloudQueueMessage> result, 
                                                    CloudQueue queue)
{
    //Observable queue.GetMessage
    var getMessage = Observable.FromAsyncPattern<CloudQueueMessage>(
                                        queue.BeginGetMessage,
                                        queue.EndGetMessage);
    //Observable queue.DeleteMessage
    var deleteMessage = Observable.FromAsyncPattern<CloudQueueMessage>(
                                        queue.BeginDeleteMessage,
                                        queue.EndDeleteMessage);

    while (true)
    {
        //var msg = queue.GetMessage();               
        var msgObs = getMessage().ToListObservable();
        yield return msgObs;
        var msg = msgObs[0];

        if (msg != null)
        {
            //do some work
            result.OnNext(msg);

            //queue.DeleteMessage(msg);                     
            yield return deleteMessage(msg).ToListObservable();
        }
        else
        {
            //Thread.Sleep(10000);
            //Same pattern as above
            yield return Observable.Timer(TimeSpan.FromSeconds(10))
                                   .ToListObservable();
        }

        Trace.WriteLine("Working", "Information");
    }
}

Теперь надо немного изменить код, сделав таймаут адаптивным.

Функция вычисления таймаута:

private static TimeSpan CalulateDelay(int idleCount, int minimumIdleIntervalMs,  int maximumIdleIntervalMs, int deltaBackoffMs)
{
    // Calculate a new sleep interval value that will follow a random exponential back-off curve.
    int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2)));
    int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs);

    // Pass the calculated interval to the dequeue task to enable it to enter into a sleep state for the specified duration.
    return TimeSpan.FromMilliseconds((double)interval);            
}

Честно украдена отсюда.

Сам код воркера:

var idleCount = 0;
while (true)
{
    var msgObs = getMessage().ToListObservable();
    yield return msgObs;
    var msg = msgObs[0];

    if (msg != null)
    {
        idleCount = 0;

        //do some work
        result.OnNext(msg);

        yield return deleteMessage(msg).ToListObservable();
    }
    else
    {
        var delay = 
                CalulateDelay(idleCount++, 
                              MinimumIdleIntervalMs, 
                              MaximumIdleIntervalMs, 
                              100);
        if (delay.TotalMilliseconds >= MaximumIdleIntervalMs)
        {
            yield break;
        }

        yield return Observable.Timer(delay).ToListObservable();
    }
}

Выключать цикл опроса сообщений мы научились, теперь попробуем научиться его включать. Будем считать что “внешний раздражитель”, который будет будить цикл выборки сообщений, выглядит как IObservable<T>.

public static IObservable<CloudQueueMessage> ObserveMessages<T>(
                                                this CloudQueue queue, 
                                                IObservable<T> haveMoreMessages)
{
    var iterator = Observable.Create<CloudQueueMessage>(
                                  obs => Iterator(obs, queue));
    IDisposable subscription = null;

    return Observable.Create<CloudQueueMessage>(
        obs => haveMoreMessages.Subscribe(
            _ =>
            {
                if (subscription == null)
                {
                    subscription = iterator.Subscribe(
                                                obs.OnNext, 
                                                obs.OnError, 
                                                () => subscription = null);
                }
            }, 
            () => subscription.Dispose() ));
}

Код получился запутанный, но при некоторой сноровке читается очень хорошо.

На сегодня все. В следующей части я расскажу как сделать  пробуждение воркеров по сигналу и какими еще способами можно оптимизировать стоимость решения для Windows Azure.

Теги : Reactive Extensions, Azure