Для тех кто не в курсе: Windows Azure – “облачная” платформа Microsoft. Создавая приложения, работающие “в облаке”, у вас есть возможность разделять систему на “роли”. Бывают веб-роли, которые представляют из себя обычные веб-приложения, бывают также worker-роли (далее воркеры), предназначенные для вычислений.
Для увеличения масштабируемости приложения используется очереди. Сообщения в очередях обрабатываются воркерами, а ставят сообщения чаще всего веб-роли или другие воркеры. Таким образом можно разбить какие-либо длительные операции на небольшие и обрабатывать их асинхронно на любом количестве узлов, так как очереди в Windows Azure специально проектировали для сценария множественных потребителей.
Типовой код для воркера Windows Azure на C# такой:
while (true) { var msg = queue.GetMessage(); if (msg != null) { //do some work queue.DeleteMessage(msg); } else { Thread.Sleep(10000); } Trace.WriteLine("Working", "Information"); }
Как вы думаете сколько стоит этот воркер. В смысле реальных денег потребляемых таким приложением, развернутым на Windows Azure.
Для этого надо посмотреть цены: https://www.windowsazure.com/en-us/pricing/details/.
Если задеплоить такую роль в одном small экземпляре, то получится $2,88 в день/$86,4 в месяц/~2600 рублей в месяц. Так? А вот и нет…
Есть еще “скрытая” стоимость такой архитектуры, заключается она в том что транзакции к хранилищу тоже оплачиваются https://www.windowsazure.com/en-us/pricing/details/#storage. Всего $0.01 за 10,000 транзакций. Каждая транзакция – это один запрос к azure storage.
Код выше выполняет один запрос каждые 10 секунд даже если нету никаких сообщений в очереди.
Стоимость такого кода получается 60*60*24*30/(10 * 1000) = $25,92 в месяц. вместе со стоимостью compute hours это выходит $112,32 в месяц. И это даже если код не выполняет никакой работы!
Кроме того SLA гарантирует работоспособность роли 99,95% только при наличии минимум двух инстансов, так что для устойчивости надо еще умножить цену на 2. Итого $250 в месяц.
Вывод
Архитектура, которую предлагает Microsoft для масштабирования довольно дорого стоит. Используйте код из примеров очень осторожно, он может увести ваш проект в большой минус.
Что делать?
Вариант первый – использовать service bus, в нем тоже есть очереди, но API позволяет в одной транзакции ожидать сообщения, а не сразу null возвращать при его отсутствии.
Вариант второй – использовать адаптивную подстройку интервала опроса очереди и выключать опрос в случае отсутствия сообщений.
Второй вариант кажется хорошей идеей так как позволяет масштабировать подход как “вниз”, так и “вверх”. Но тут возникает вопрос, а если мы прекратим прием сообщений, то как его потом возобновить? Видимо надо передать сообщение… Приходим снова к той же проблеме.
Но сигнал к “пробуждению” читателя сообщений можно передавать по более дешевому каналу, например через wcf internal endpoint.
Реализация
Чтобы абстрагироваться от всех деталей с сообщениями, таймаутами и каналами удобно использовать библиотеку Rx. Я использую Experimental версию так как в ней собрано много нужных комбинаторов.
Для начала надо вписать код в концепцию Rx. Длительные операции, вроде вызовов методов Cloud Storage и тайматуов сделать в виде IObservable.
public static IObservable<CloudQueueMessage> ObserveMessages(this CloudQueue queue) { return Observable.Create<CloudQueueMessage>(obs => Iterator(obs, queue)); } private static IEnumerable<IObservable<object>> Iterator( IObserver<CloudQueueMessage> result, CloudQueue queue) { //Observable queue.GetMessage var getMessage = Observable.FromAsyncPattern<CloudQueueMessage>( queue.BeginGetMessage, queue.EndGetMessage); //Observable queue.DeleteMessage var deleteMessage = Observable.FromAsyncPattern<CloudQueueMessage>( queue.BeginDeleteMessage, queue.EndDeleteMessage); while (true) { //var msg = queue.GetMessage(); var msgObs = getMessage().ToListObservable(); yield return msgObs; var msg = msgObs[0]; if (msg != null) { //do some work result.OnNext(msg); //queue.DeleteMessage(msg); yield return deleteMessage(msg).ToListObservable(); } else { //Thread.Sleep(10000); //Same pattern as above yield return Observable.Timer(TimeSpan.FromSeconds(10)) .ToListObservable(); } Trace.WriteLine("Working", "Information"); } }
Теперь надо немного изменить код, сделав таймаут адаптивным.
Функция вычисления таймаута:
private static TimeSpan CalulateDelay(int idleCount, int minimumIdleIntervalMs, int maximumIdleIntervalMs, int deltaBackoffMs) { // Calculate a new sleep interval value that will follow a random exponential back-off curve. int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2))); int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs); // Pass the calculated interval to the dequeue task to enable it to enter into a sleep state for the specified duration. return TimeSpan.FromMilliseconds((double)interval); }
Честно украдена отсюда.
Сам код воркера:
var idleCount = 0; while (true) { var msgObs = getMessage().ToListObservable(); yield return msgObs; var msg = msgObs[0]; if (msg != null) { idleCount = 0; //do some work result.OnNext(msg); yield return deleteMessage(msg).ToListObservable(); } else { var delay = CalulateDelay(idleCount++, MinimumIdleIntervalMs, MaximumIdleIntervalMs, 100); if (delay.TotalMilliseconds >= MaximumIdleIntervalMs) { yield break; } yield return Observable.Timer(delay).ToListObservable(); } }
Выключать цикл опроса сообщений мы научились, теперь попробуем научиться его включать. Будем считать что “внешний раздражитель”, который будет будить цикл выборки сообщений, выглядит как IObservable<T>.
public static IObservable<CloudQueueMessage> ObserveMessages<T>( this CloudQueue queue, IObservable<T> haveMoreMessages) { var iterator = Observable.Create<CloudQueueMessage>( obs => Iterator(obs, queue)); IDisposable subscription = null; return Observable.Create<CloudQueueMessage>( obs => haveMoreMessages.Subscribe( _ => { if (subscription == null) { subscription = iterator.Subscribe( obs.OnNext, obs.OnError, () => subscription = null); } }, () => subscription.Dispose() )); }
Код получился запутанный, но при некоторой сноровке читается очень хорошо.
На сегодня все. В следующей части я расскажу как сделать пробуждение воркеров по сигналу и какими еще способами можно оптимизировать стоимость решения для Windows Azure.