В первой части я показал сколько стоит использование воркер-ролей и очередей в Windows Azure и что с этим можно сделать.
Довольной хороший подход – адаптировать интервал опроса новых сообщений и отключать опрос в случае их отсутствия продолжительное время. Но после выключения надо как-то включать.
Для этого был создан extension-метод:
public static IObservable<CloudQueueMessage> ToObservable<T>(
this CloudQueue queue,
IObservable<T> haveMoreMessages)
Этот метод возвращает сообщения очереди в виде IObservable коллекции. Включение опроса осуществляется появлением элемента в последовательности haveMoreMessages.
Теперь о том как реализовать последовательность haveMoreMessages.
Самый дешевый вариант взаимодействия между экземплярами ролей это internal wcf communication. Для того чтобы работать с WCF необходимо определить контракты.
[ServiceContract]
public interface IQueueNotifier
{
[OperationContract(IsOneWay = true)]
void MessageAdded(string queueName);
[OperationContract(IsOneWay = true)]
void NoMoreMessages(string queueName);
}
Контракт содержит всего два метода оповещения о новом сообщении в очереди и об окончании сообщений.
Реализация тоже тривиальна:
public class QueueNotifier : IQueueNotifier
{
private ISubject<string> moreMessages = new Subject<string>();
private ISubject<string> queueCompleted = new Subject<string>();
public IObservable<string> MoreMessages
{
get
{
return this.moreMessages;
}
}
public IObservable<string> QueueCompleted
{
get
{
return this.queueCompleted;
}
}
public void MessageAdded(string queueName)
{
moreMessages.OnNext(queueName);
}
public void NoMoreMessages(string queueName)
{
queueCompleted.OnNext(queueName);
}
}
Далее комбинируя два потока получаем IObservable<Unit> пригодный для метода, описанного в начале поста.
public static IObservable<Unit> GetQueueNotifications(this QueueNotifier service, string queueName)
{
return Observable.Create<Unit>(obs =>
{
var sub1 = service.MoreMessages
.Where(q => q == queueName)
.Subscribe(q => obs.OnNext(Unit.Default));
var sub2 = service.QueueCompleted
.Where(q => q == queueName)
.Subscribe(q => obs.OnCompleted());
return new CompositeDisposable(sub1, sub2);
});
}
Теперь захостив QueueNotifier в воркере можно передавать ему оповещения из других ролей.
Клиентская сторона
Чтобы отправлять оповещения нужно создать ChannelFactory<IQueueNotifier> и получить экземпляр прокси на клиенте.
Далее надо получить IObserver:
public static IObserver<Unit> CreateQueueNotifierObserver(this IQueueNotifier proxy, string queueName)
{
return Observer.Create<Unit>(
_ => proxy.MessageAdded(queueName),
_ => proxy.NoMoreMessages(queueName),
() => proxy.NoMoreMessages(queueName)
);
}
Надо помнить что экземпляров воркера может быть много и у вас получится по одному наблюдателю на каждый инстанс воркера. При этом не надо передавать оповещение каждому воркеру, достаточно передать оповещение одному (случайному). В случае высокой нагрузки оповещения будут распределяться равномерно межу воркерами и никто их них не будет “спать”. В случае низкой нагрузки просыпаться будет только один воркер, экономя деньги.
public static IObserver<Unit> CombineObservers(List<IObserver<Unit>> notifiers)
{
var rnd = new Random();
return Observer.Create<Unit>(
u => notifiers[rnd.Next(notifiers.Count)].OnNext(u),
e => notifiers.ForEach(obs => obs.OnError(e)),
() => notifiers.ForEach(obs => obs.OnCompleted())
);
}
Обратите внимание что OnCompleted рассылается всем воркерам, чтобы можно было остановить обработку сообщений.
Остается только скомбинировать отправку сообщения в очередь с отправкой оповещения.
public static IObserver<CloudQueueMessage> ToObserver(this CloudQueue queue, IObserver<Unit> notifier)
{
var addMessage = Observable.FromAsyncPattern<CloudQueueMessage>(queue.BeginAddMessage, queue.EndAddMessage);
return Observer.Create<CloudQueueMessage>(
m => addMessage(m).Subscribe(notifier.OnNext, notifier.OnError),
notifier.OnError,
notifier.OnCompleted);
}
Таким образом получается достигнуть того что воркеры не обращаются постоянно к Azure Storage, экономя деньги и ресурсы виртуальных машин. При этом мы получили на клиенте и сервере очень простые интерфейсы, позволяющие тем не менее выполнять сложные действия с ними.
В следующей части дальнейшая оптимизация, библиотека и пример приложения.
