В первой части я показал сколько стоит использование воркер-ролей и очередей в Windows Azure и что с этим можно сделать.

Довольной хороший подход – адаптировать интервал опроса новых сообщений и отключать опрос в случае их отсутствия продолжительное время. Но после выключения надо как-то включать.

Для этого был создан extension-метод:

public static IObservable<CloudQueueMessage> ToObservable<T>(
                                                        this CloudQueue queue, 
                                                        IObservable<T> haveMoreMessages)

Этот метод возвращает сообщения очереди в виде IObservable коллекции. Включение опроса осуществляется появлением элемента в последовательности haveMoreMessages.

Теперь о том как реализовать последовательность haveMoreMessages.

Самый дешевый вариант взаимодействия между экземплярами ролей это internal wcf communication. Для того чтобы работать с WCF необходимо определить контракты.

[ServiceContract]
public interface IQueueNotifier
{
    [OperationContract(IsOneWay = true)]
    void MessageAdded(string queueName);

    [OperationContract(IsOneWay = true)]
    void NoMoreMessages(string queueName);
}

Контракт содержит всего два метода оповещения о новом сообщении в очереди и об окончании сообщений.

Реализация тоже тривиальна:

public class QueueNotifier : IQueueNotifier
{
    private ISubject<string> moreMessages = new Subject<string>();
    private ISubject<string> queueCompleted = new Subject<string>();

    public IObservable<string> MoreMessages
    {
        get
        {
            return this.moreMessages;
        }
    }

    public IObservable<string> QueueCompleted
    {
        get
        {
            return this.queueCompleted;
        }
    }

    public void MessageAdded(string queueName)
    {
        moreMessages.OnNext(queueName);
    }

    public void NoMoreMessages(string queueName)
    {
        queueCompleted.OnNext(queueName);
    }
}

Далее комбинируя два потока получаем IObservable<Unit> пригодный для метода, описанного в начале поста.

public static IObservable<Unit> GetQueueNotifications(this QueueNotifier service, string queueName)
{
    return Observable.Create<Unit>(obs =>
    {
        var sub1 = service.MoreMessages
                          .Where(q => q == queueName)
                          .Subscribe(q => obs.OnNext(Unit.Default));

        var sub2 = service.QueueCompleted
                          .Where(q => q == queueName)
                          .Subscribe(q => obs.OnCompleted());

        return new CompositeDisposable(sub1, sub2);
    });
}

Теперь захостив QueueNotifier в воркере можно передавать ему оповещения из других ролей.

Клиентская сторона

Чтобы отправлять оповещения нужно создать ChannelFactory<IQueueNotifier> и получить экземпляр прокси на клиенте.

Далее надо получить IObserver:

public static IObserver<Unit> CreateQueueNotifierObserver(this IQueueNotifier proxy, string queueName)
{
    return Observer.Create<Unit>(
                _ => proxy.MessageAdded(queueName),
                _ => proxy.NoMoreMessages(queueName),
                () => proxy.NoMoreMessages(queueName)
            );
}

Надо помнить что экземпляров воркера может быть много и у вас получится по одному наблюдателю на каждый инстанс воркера. При этом не надо передавать оповещение каждому воркеру, достаточно передать оповещение одному (случайному). В случае высокой нагрузки оповещения будут распределяться равномерно межу воркерами и никто их них не будет “спать”. В случае низкой нагрузки просыпаться будет только один воркер, экономя деньги.

public static IObserver<Unit> CombineObservers(List<IObserver<Unit>> notifiers)
{
    var rnd = new Random();

    return Observer.Create<Unit>(
            u => notifiers[rnd.Next(notifiers.Count)].OnNext(u),
            e => notifiers.ForEach(obs => obs.OnError(e)),
            () => notifiers.ForEach(obs => obs.OnCompleted())
        );
}

Обратите внимание что OnCompleted рассылается всем воркерам, чтобы можно было остановить обработку сообщений.

Остается только скомбинировать отправку сообщения в очередь с отправкой оповещения.

public static IObserver<CloudQueueMessage> ToObserver(this CloudQueue queue, IObserver<Unit> notifier)
{
    var addMessage = Observable.FromAsyncPattern<CloudQueueMessage>(queue.BeginAddMessage, queue.EndAddMessage);

    return Observer.Create<CloudQueueMessage>(
            m => addMessage(m).Subscribe(notifier.OnNext, notifier.OnError),
            notifier.OnError,
            notifier.OnCompleted);
}

Таким образом получается достигнуть того что воркеры не обращаются постоянно к Azure Storage, экономя деньги и ресурсы виртуальных машин. При этом мы получили на клиенте и сервере очень простые интерфейсы, позволяющие тем не менее выполнять сложные действия  с ними.

В следующей части дальнейшая оптимизация, библиотека и пример приложения.

Теги : Reactive Extensions, Azure