Для тех кто не в курсе: Windows Azure – “облачная” платформа Microsoft. Создавая приложения, работающие “в облаке”, у вас есть возможность разделять систему на “роли”. Бывают веб-роли, которые представляют из себя обычные веб-приложения, бывают также worker-роли (далее воркеры), предназначенные для вычислений.
Для увеличения масштабируемости приложения используется очереди. Сообщения в очередях обрабатываются воркерами, а ставят сообщения чаще всего веб-роли или другие воркеры. Таким образом можно разбить какие-либо длительные операции на небольшие и обрабатывать их асинхронно на любом количестве узлов, так как очереди в Windows Azure специально проектировали для сценария множественных потребителей.
Типовой код для воркера Windows Azure на C# такой:
while (true)
{
var msg = queue.GetMessage();
if (msg != null)
{
//do some work
queue.DeleteMessage(msg);
}
else
{
Thread.Sleep(10000);
}
Trace.WriteLine("Working", "Information");
}
Как вы думаете сколько стоит этот воркер. В смысле реальных денег потребляемых таким приложением, развернутым на Windows Azure.
Для этого надо посмотреть цены: https://www.windowsazure.com/en-us/pricing/details/.
Если задеплоить такую роль в одном small экземпляре, то получится $2,88 в день/$86,4 в месяц/~2600 рублей в месяц. Так? А вот и нет…
Есть еще “скрытая” стоимость такой архитектуры, заключается она в том что транзакции к хранилищу тоже оплачиваются https://www.windowsazure.com/en-us/pricing/details/#storage. Всего $0.01 за 10,000 транзакций. Каждая транзакция – это один запрос к azure storage.
Код выше выполняет один запрос каждые 10 секунд даже если нету никаких сообщений в очереди.
Стоимость такого кода получается 60*60*24*30/(10 * 1000) = $25,92 в месяц. вместе со стоимостью compute hours это выходит $112,32 в месяц. И это даже если код не выполняет никакой работы!
Кроме того SLA гарантирует работоспособность роли 99,95% только при наличии минимум двух инстансов, так что для устойчивости надо еще умножить цену на 2. Итого $250 в месяц.
Вывод
Архитектура, которую предлагает Microsoft для масштабирования довольно дорого стоит. Используйте код из примеров очень осторожно, он может увести ваш проект в большой минус.
Что делать?
Вариант первый – использовать service bus, в нем тоже есть очереди, но API позволяет в одной транзакции ожидать сообщения, а не сразу null возвращать при его отсутствии.
Вариант второй – использовать адаптивную подстройку интервала опроса очереди и выключать опрос в случае отсутствия сообщений.
Второй вариант кажется хорошей идеей так как позволяет масштабировать подход как “вниз”, так и “вверх”. Но тут возникает вопрос, а если мы прекратим прием сообщений, то как его потом возобновить? Видимо надо передать сообщение… Приходим снова к той же проблеме.
Но сигнал к “пробуждению” читателя сообщений можно передавать по более дешевому каналу, например через wcf internal endpoint.
Реализация
Чтобы абстрагироваться от всех деталей с сообщениями, таймаутами и каналами удобно использовать библиотеку Rx. Я использую Experimental версию так как в ней собрано много нужных комбинаторов.
Для начала надо вписать код в концепцию Rx. Длительные операции, вроде вызовов методов Cloud Storage и тайматуов сделать в виде IObservable.
public static IObservable<CloudQueueMessage> ObserveMessages(this CloudQueue queue)
{
return Observable.Create<CloudQueueMessage>(obs => Iterator(obs, queue));
}
private static IEnumerable<IObservable<object>> Iterator(
IObserver<CloudQueueMessage> result,
CloudQueue queue)
{
//Observable queue.GetMessage
var getMessage = Observable.FromAsyncPattern<CloudQueueMessage>(
queue.BeginGetMessage,
queue.EndGetMessage);
//Observable queue.DeleteMessage
var deleteMessage = Observable.FromAsyncPattern<CloudQueueMessage>(
queue.BeginDeleteMessage,
queue.EndDeleteMessage);
while (true)
{
//var msg = queue.GetMessage();
var msgObs = getMessage().ToListObservable();
yield return msgObs;
var msg = msgObs[0];
if (msg != null)
{
//do some work
result.OnNext(msg);
//queue.DeleteMessage(msg);
yield return deleteMessage(msg).ToListObservable();
}
else
{
//Thread.Sleep(10000);
//Same pattern as above
yield return Observable.Timer(TimeSpan.FromSeconds(10))
.ToListObservable();
}
Trace.WriteLine("Working", "Information");
}
}
Теперь надо немного изменить код, сделав таймаут адаптивным.
Функция вычисления таймаута:
private static TimeSpan CalulateDelay(int idleCount, int minimumIdleIntervalMs, int maximumIdleIntervalMs, int deltaBackoffMs)
{
// Calculate a new sleep interval value that will follow a random exponential back-off curve.
int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2)));
int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs);
// Pass the calculated interval to the dequeue task to enable it to enter into a sleep state for the specified duration.
return TimeSpan.FromMilliseconds((double)interval);
}
Честно украдена отсюда.
Сам код воркера:
var idleCount = 0;
while (true)
{
var msgObs = getMessage().ToListObservable();
yield return msgObs;
var msg = msgObs[0];
if (msg != null)
{
idleCount = 0;
//do some work
result.OnNext(msg);
yield return deleteMessage(msg).ToListObservable();
}
else
{
var delay =
CalulateDelay(idleCount++,
MinimumIdleIntervalMs,
MaximumIdleIntervalMs,
100);
if (delay.TotalMilliseconds >= MaximumIdleIntervalMs)
{
yield break;
}
yield return Observable.Timer(delay).ToListObservable();
}
}
Выключать цикл опроса сообщений мы научились, теперь попробуем научиться его включать. Будем считать что “внешний раздражитель”, который будет будить цикл выборки сообщений, выглядит как IObservable<T>.
public static IObservable<CloudQueueMessage> ObserveMessages<T>(
this CloudQueue queue,
IObservable<T> haveMoreMessages)
{
var iterator = Observable.Create<CloudQueueMessage>(
obs => Iterator(obs, queue));
IDisposable subscription = null;
return Observable.Create<CloudQueueMessage>(
obs => haveMoreMessages.Subscribe(
_ =>
{
if (subscription == null)
{
subscription = iterator.Subscribe(
obs.OnNext,
obs.OnError,
() => subscription = null);
}
},
() => subscription.Dispose() ));
}
Код получился запутанный, но при некоторой сноровке читается очень хорошо.
На сегодня все. В следующей части я расскажу как сделать пробуждение воркеров по сигналу и какими еще способами можно оптимизировать стоимость решения для Windows Azure.
