В первой части я показал сколько стоит использование воркер-ролей и очередей в Windows Azure и что с этим можно сделать.
Довольной хороший подход – адаптировать интервал опроса новых сообщений и отключать опрос в случае их отсутствия продолжительное время. Но после выключения надо как-то включать.
Для этого был создан extension-метод:
public static IObservable<CloudQueueMessage> ToObservable<T>( this CloudQueue queue, IObservable<T> haveMoreMessages)
Этот метод возвращает сообщения очереди в виде IObservable коллекции. Включение опроса осуществляется появлением элемента в последовательности haveMoreMessages.
Теперь о том как реализовать последовательность haveMoreMessages.
Самый дешевый вариант взаимодействия между экземплярами ролей это internal wcf communication. Для того чтобы работать с WCF необходимо определить контракты.
[ServiceContract] public interface IQueueNotifier { [OperationContract(IsOneWay = true)] void MessageAdded(string queueName); [OperationContract(IsOneWay = true)] void NoMoreMessages(string queueName); }
Контракт содержит всего два метода оповещения о новом сообщении в очереди и об окончании сообщений.
Реализация тоже тривиальна:
public class QueueNotifier : IQueueNotifier { private ISubject<string> moreMessages = new Subject<string>(); private ISubject<string> queueCompleted = new Subject<string>(); public IObservable<string> MoreMessages { get { return this.moreMessages; } } public IObservable<string> QueueCompleted { get { return this.queueCompleted; } } public void MessageAdded(string queueName) { moreMessages.OnNext(queueName); } public void NoMoreMessages(string queueName) { queueCompleted.OnNext(queueName); } }
Далее комбинируя два потока получаем IObservable<Unit> пригодный для метода, описанного в начале поста.
public static IObservable<Unit> GetQueueNotifications(this QueueNotifier service, string queueName) { return Observable.Create<Unit>(obs => { var sub1 = service.MoreMessages .Where(q => q == queueName) .Subscribe(q => obs.OnNext(Unit.Default)); var sub2 = service.QueueCompleted .Where(q => q == queueName) .Subscribe(q => obs.OnCompleted()); return new CompositeDisposable(sub1, sub2); }); }
Теперь захостив QueueNotifier в воркере можно передавать ему оповещения из других ролей.
Клиентская сторона
Чтобы отправлять оповещения нужно создать ChannelFactory<IQueueNotifier> и получить экземпляр прокси на клиенте.
Далее надо получить IObserver:
public static IObserver<Unit> CreateQueueNotifierObserver(this IQueueNotifier proxy, string queueName) { return Observer.Create<Unit>( _ => proxy.MessageAdded(queueName), _ => proxy.NoMoreMessages(queueName), () => proxy.NoMoreMessages(queueName) ); }
Надо помнить что экземпляров воркера может быть много и у вас получится по одному наблюдателю на каждый инстанс воркера. При этом не надо передавать оповещение каждому воркеру, достаточно передать оповещение одному (случайному). В случае высокой нагрузки оповещения будут распределяться равномерно межу воркерами и никто их них не будет “спать”. В случае низкой нагрузки просыпаться будет только один воркер, экономя деньги.
public static IObserver<Unit> CombineObservers(List<IObserver<Unit>> notifiers) { var rnd = new Random(); return Observer.Create<Unit>( u => notifiers[rnd.Next(notifiers.Count)].OnNext(u), e => notifiers.ForEach(obs => obs.OnError(e)), () => notifiers.ForEach(obs => obs.OnCompleted()) ); }
Обратите внимание что OnCompleted рассылается всем воркерам, чтобы можно было остановить обработку сообщений.
Остается только скомбинировать отправку сообщения в очередь с отправкой оповещения.
public static IObserver<CloudQueueMessage> ToObserver(this CloudQueue queue, IObserver<Unit> notifier) { var addMessage = Observable.FromAsyncPattern<CloudQueueMessage>(queue.BeginAddMessage, queue.EndAddMessage); return Observer.Create<CloudQueueMessage>( m => addMessage(m).Subscribe(notifier.OnNext, notifier.OnError), notifier.OnError, notifier.OnCompleted); }
Таким образом получается достигнуть того что воркеры не обращаются постоянно к Azure Storage, экономя деньги и ресурсы виртуальных машин. При этом мы получили на клиенте и сервере очень простые интерфейсы, позволяющие тем не менее выполнять сложные действия с ними.
В следующей части дальнейшая оптимизация, библиотека и пример приложения.