Лабораторная работа № 4. Организация взаимодействия нескольких сервисов с использованием технологии DSS 


Мы поможем в написании ваших работ!



ЗНАЕТЕ ЛИ ВЫ?

Лабораторная работа № 4. Организация взаимодействия нескольких сервисов с использованием технологии DSS



Цель: разработкараспределённого приложения на основе технологии DSS.

Краткие теоретические сведения

Требуется разработать распределённую систему для решения вычислительной задачи. Такая система должна включать сервисы двух типов: координирующий (MainService) и вычислительный (CompService).

Требования к координирующему сервису:

1) существует только один экземпляр координирующего сервиса;

2) координирующий сервис запускает вычислительные сервисы на узлах, раздаёт им задания и собирает результаты вычислений с каждого узла, завершает вычислительные сервисы.

Требования к вычислительному сервису:

1) на одной ЭВМ может быть запущено несколько DSS узлов для запуска вычислительных сервисов;

2) вычислительный сервис принимает задание, выполняет вычисления и отправляет результат обработки на координирующий сервис;

Рассмотрим пример реализации распределённой системы для решения задачи перемножения матрицы на вектор.

Данные требования к сервисам, составляющим распределённую систему, при необходимости, могут усложняться.

 

Протокол обмена информацией

 

Протокол обмена сообщениями между координирующим и вычислительными сервисами:

1) MainService выполняет чтение информации об узлах (хост:порт), на которых нужно запустить CompService, формируются задания для каждого вычислительного узла;

2) MainService запускает на каждом доступном узле экземпляр сервиса CompService и подписывается на уведомления CompService, для хранения информации о каждом узле используются массивы;

3) CompService после запуска подключается к MainService и подписывается на уведомления MainService;

4) MainService после получения запроса на подписку отправляет своё состояние каждому подписавшемуся CompService, в данном состоянии хранятся данные, которые нужно обработать, а также задание для каждого вычислительного сервиса;

5) CompService после получения состояния MainService определяет имя ЭВМ на которой запущен сервис CompService и номер порта; на основе данной информации (с помощью поиска по массиву) каждый экземпляр CompService определяет свой идентификатор, а также задание, которое он должен выполнить, после чего запускается выполнение вычислительного задания;

6) после окончания вычислений CompService записывает результаты вычислений в своё состояние и отправляет его MainService, в MainService ведётся подсчёт количества полученных состояний и одновременное объединение результатов работы вычислительных сервисов;

7) после сбора данных со всех вычислительных сервисов MainService отправляет им сообщения о завершении работы (также, можно завершить DSS узел).

Описанный протокол специфичен для поставленной задачи, однако, его можно использовать в качестве основы при разработке других распределённых систем.

В случае невозможности передачи сообщений, вследствие их значительного объёма, такие сообщения можно разбивать на части, передавая, одновременно с данными, номер части сообщения.

 

Разработка сервисов

 

Рассмотрим процесс разработки сервисов, которые реализуют описанный протокол:

1) создайте два сервиса, укажите для первого сервиса имя MainService, для второго – CompService;

2) откомпилируйте оба сервиса, добавьте в раздел References сервиса MainService ссылку на прокси библиотеку CompService.Y2012.M07.Proxy.dll сервиса CompService, добавьте в раздел References сервиса CompService ссылку на прокси библиотеку MainService.Y2012.M07.Proxy.dll сервиса MainService, установите Copy Local и Specific Version обеих библиотек в False;

3) добавьте в класс MainServiceState (файл MainServiceTypes.cs) описание структур данных, приведённых в пунктах 3.1 и 3.2;

3.1) вычислительные сервисы могут запускаться на узлах, расположенных как на ЭВМ, на которой запущен координирующий сервис, так и на других узлах сети. Каждый DSS узел уникально идентифицируется именем компьютера, на котором он запущен, а также портом, который связан с сервисом.

Примечание. При запуске DSS узла указываются два порта:

а) c параметром /p – TCP порт для чтения HTTP запросов;

б) с параметром /t – TCP порт для чтения SOAP запросов (SOAP — протокол обмена структурированными сообщениями в распределённой вычислительной среде. Первоначально SOAP предназначался в основном для реализации удалённого вызова процедур (RPC). Сейчас протокол используется для обмена произвольными сообщениями в формате XML, а не только для вызова процедур).

При определении номеров портов предлагается придерживаться следующего правила: номер порта, указываемого с параметром /p на единицу меньше номера порта, указываемого с параметром /t.

Информация об узлах, на которых нужно запустить вычислительные сервисы также должна храниться в состоянии координирующего сервиса. Для этого предлагается использовать следующую структуру:

[DataContract]

public struct NodeElement

{

[DataMember]

public string pc;

[DataMember]

public int port;

[DataMember]

public int id;

public NodeElement(string p1, int p2, int p3)

{

pc = p1;

port = p2;

id = p3;

}

}

private List<NodeElement> _NodeList = new List<NodeElement>();

[DataMember(IsRequired = true)]

public List<NodeElement> NodeList

{

get { return _NodeList; }

set { _NodeList = value; }

}

Таким образом, информация о вычислительных узлах хранится в списке NodeList. Данный список заполняется в методе Start координирующего сервиса. Каждый узел описывается тремя параметрами:

а) pc – имя ЭВМ;

б) port – TCP порт узла для передачи HTTP запросов;

в) id – идентификационный номер узла.

3.2) данные, которые отправляются на вычислительные сервисы, размещаются в состоянии координирующего сервиса. Для их хранения используются следующие массивы:

[DataMember(IsRequired = true)]

public int[,] A;

[DataMember(IsRequired = true)]

public int[] b;

[DataMember(IsRequired = true)]

public int[] c;

Параметры задачи, отправляемые на каждый вычислительный сервис, хранятся в виде массива объектов:

[DataMember(IsRequired = true)]

public HelpClass[] ClArr;

Класс, используемый для хранения параметров приведён ниже (данный класс нужно добавлять в пространство имён MainService выше описания состояния сервиса):

[DataContract]

public class HelpClass

{

[DataMember]

public int a; // начало диапазона

[DataMember]

public int b; // конец диапазона

}

4) добавьте в файл CompServiceTypes.cs в класс CompServiceState описание структур данных:

4.1) вычислительный сервис должен хранить результаты вычислений, для этого в состояние сервиса нужно включить следующее поле:

// результат вычислений

[DataMember]

public int[] _c;

4.2) вычислительный сервис должен хранить идентификатор, который используется для объединения результатов на координирующем сервисе:

// идентификатор вычислительного сервиса

[DataMember]

public int _id;

5) рассмотрим операторы, которые необходимы для управления сервисами, подписавшимися на уведомления MainService:

5.1) в начало файла MainService.cs поместите следующие операторы:

using compSrv = CompService.Proxy;

using ds = Microsoft.Dss.Services.Directory;

using cs = Microsoft.Dss.Services.Constructor;

using submgr = Microsoft.Dss.Services.SubscriptionManager;

using System.Xml;

5.2) сервис управления подпиской (Subscription Manager Service, SMS) позволяет упростить работу с подпиской на уведомления. При использовании SMS не нужно отслеживать сервисы, которые подписались на уведомления сервиса, и организовывать рассылку всем подписавшимся сервисам уведомляющих сообщений. Опишите в классе MainService порт для менеджера управления подписками:

[SubscriptionManagerPartner]

private submgr.SubscriptionManagerPort _submgrPort = new submgr.SubscriptionManagerPort();

6.1) в начало файла CompService.cs поместите следующие операторы:

using submgr = Microsoft.Dss.Services.SubscriptionManager;

using mainSrv = MainService.Proxy;

using ds = Microsoft.Dss.Services.Directory;

using cs = Microsoft.Dss.Services.Constructor;

using System.Net;

using Microsoft.Dss.Core;

6.2) для использования SMS опишите в классе CompService порт для менеджера управления подписками:

[SubscriptionManagerPartner]

private submgr.SubscriptionManagerPort _submgrPort = new submgr.SubscriptionManagerPort();

7) в классе MainService (файл MainService.cs) определите следующие переменные:

// порт _compSrvPort используется для отправки запросов вычислительному сервису

compSrv.CompServiceOperations[] _compSrvPort;

// порт _compSrvNotify используется для получения сообщений от вычислительного сервиса

compSrv.CompServiceOperations[] _compSrvNotify;

// сообщение, содержащее запрос на подписку

compSrv.Subscribe[] subscribe;

// порт complPort используется для получения сообщений о результатах выполнения

// вычислений в CompService

Port<int> complPort = new Port<int>();

// remoteDir и remoteConstructor используются для обращения к удалённому узлу

ds.DirectoryPort[] remoteDir;

cs.ConstructorPort[] remoteConstructor;

// переменные query, queryRsp, create, createRsp, compService используются для создания

// вычислительного сервиса на удалённом узле

ds.Query[] query;

ds.QueryResponseType[] queryRsp;

cs.Create[] create;

CreateResponse[] createRsp;

string[] compService;

// массивы namex и remote используются для хранения информации об узлах для запуска

// вычислительных сервисов

XmlQualifiedName[] namex;

PartnerType[] remote;

Описанные массивы используются для управления или подключения к вычислительному сервису, количество вычислительных сервисов задаётся в переменной c:

int c = 2;

8) в классе CompService (файл CompService.cs) определите следующие переменные:

mainSrv.MainServiceOperations _mainSrvPort = new mainSrv.MainServiceOperations();

mainSrv.MainServiceOperations _mainSrvNotify = new mainSrv.MainServiceOperations();

mainSrv.Subscribe subscribe = new mainSrv.Subscribe();

9) рассмотрим код, который организует подписку вычислительного сервиса на уведомления координирующего сервиса;

9.1) добавьте в список операций сервиса MainService (файл MainServiceTypes.cs) операцию Subscribe (возможно, что данная операция уже находится в списке). Данная операция используется для обработки запросов на подписку об уведомления, приходящих от других сервисов:

[ServicePort]

public class MainServiceOperations: PortSet<DsspDefaultLookup, DsspDefaultDrop, Get,

Subscribe>

{

}

9.2) опишите в файле MainServiceTypes.cs класс Subscribe (данный класс определяет сообщение Subscribe):

/// <summary>

/// MainService subscribe operation

/// </summary>

///

public class Subscribe: Subscribe<SubscribeRequestType, PortSet<SubscribeResponseType,

Fault>>

{

}

9.3) реализуйте обработчик операции Subscribe в файле MainService.cs:

[ServiceHandler(ServiceHandlerBehavior.Concurrent)]

public IEnumerator<ITask> SubscribeHandler(Subscribe subscribe)

{

SubscribeRequestType request = subscribe.Body;

LogInfo("Коорд. серв.: запрос подписки от: " + request.Subscriber);

// использование Subscription Manager для управления подписками

yield return Arbiter.Choice(

SubscribeHelper(_submgrPort, request, subscribe.ResponsePort),

delegate(SuccessResult success)

{

},

delegate(Exception e)

{

LogError(null, "Коорд. серв.: ошибка при подписке к выч. серв.", e);

}

);

yield break;

}

Метод SubscribeHelper управляет процессом подписки и обрабатывает запросы на подписку, отправляемые координирующему сервису от вычислительного сервиса. При получении запроса на подписку вычислительный сервис подписывается на уведомления координирующего сервиса, после чего координирующий сервис отправляет вычислительному сервису своё состояние, в котором находится вся информация о вычислительной задаче.

10) рассмотрим код, который организует подписку координирующего сервиса на уведомления вычислительного сервиса;

10.1) добавьте в список операций сервиса CompService (файл CompServiceTypes.cs) операцию Subscribe (возможно, что данная операция уже находится в списке):

[ServicePort]

public class CompServiceOperations: PortSet<DsspDefaultLookup, DsspDefaultDrop, Get,

Subscribe>

{

}

10.2) опишите в файле CompServiceTypes.cs класс Subscribe (данный класс определяет сообщение Subscribe):

/// <summary>

/// операция подписки CompService

/// </summary>

///

public class Subscribe: Subscribe<SubscribeRequestType, PortSet<SubscribeResponseType,

Fault>>

{

}

10.3) реализуйте обработчик операции Subscribe в файле CompService.cs:

[ServiceHandler(ServiceHandlerBehavior.Concurrent)]

public IEnumerator<ITask> SubscribeHandler(Subscribe subscribe)

{

SubscribeRequestType request = subscribe.Body;

LogInfo("Выч. серв.: запрос подписки от: " + request.Subscriber);

// использование Subscription Manager для управления подписчиками

yield return Arbiter.Choice(

SubscribeHelper(_submgrPort, request, subscribe.ResponsePort),

delegate(SuccessResult success)

{

LogInfo("Выч. серв.: подписка выполнена");

},

delegate(Exception e)

{

LogError(null, "Выч. серв.: подписка не выполнена", e);

}

);

yield break;

}

11) рассмотрим реализацию операцию Replace в координирующем сервисе:

11.1) добавьте в список операций сервиса MainService (файл MainServiceTypes.cs) операцию Replace. Данная операция используется для обработки Replace запроса, полученного от вычислительного сервиса:

[ServicePort]

public class MainServiceOperations: PortSet<DsspDefaultLookup, DsspDefaultDrop, Get,

Subscribe, Replace>

{

}

11.2) опишите в файле MainServiceTypes.cs класс Replace (данный класс определяет Replace сообщение):

public class Replace: Replace<ServiceTutorial7State, PortSet<DefaultReplaceResponseType,

Fault>>

{

}

11.3) реализуйте обработчик операции Replace в файле MainService.cs:

/// <summary>

/// обработчик Replace сообщения

/// </summary>

[ServiceHandler(ServiceHandlerBehavior.Exclusive)]

public IEnumerator<ITask> ReplaceHandler(Replace replace)

{

_state = replace.Body;

// отправка сообщения Replace всем подписчикам

base.SendNotification(_submgrPort, replace);

replace.ResponsePort.Post(DefaultReplaceResponseType.Instance);

yield break;

}

11) рассмотрим реализацию операцию Replace в вычислительном сервисе:

12.1) добавьте в список операций сервиса CompService (файл CompServiceTypes.cs) операцию Replace:

/// <summary>

/// CompService main operations port

/// </summary>

[ServicePort]

public class CompServiceOperations: PortSet<DsspDefaultLookup, DsspDefaultDrop, Get,

Subscribe, Replace>

{

}

12.2) опишите в файле CompServiceTypes.cs класс Replace (данный класс определяет Replace сообщение)

/// <summary>

/// CompService replace operation

/// </summary>

public class Replace: Replace<CompServiceState, PortSet<DefaultReplaceResponseType, Fault>>

{

}

12.3) реализуйте обработчик операции Replace в файле CompService.cs:

/// <summary>

/// обработчик Replace сообщения

/// </summary>

[ServiceHandler(ServiceHandlerBehavior.Exclusive)]

public IEnumerator<ITask> ReplaceHandler(Replace replace)

{

_state = replace.Body;

// отправка сообщения Replace всем подписчикам

base.SendNotification(_submgrPort, replace);

replace.ResponsePort.Post(DefaultReplaceResponseType.Instance);

yield break;

}

13) добавьте в метод SubscribeHandler координирующего сервиса, в делегат, отвечающий за обработку сообщения об успешной подписке, оператор, который отправляет состояние сервиса при получении запроса на подписку:

yield return Arbiter.Choice(

SubscribeHelper(_submgrPort, request, subscribe.ResponsePort),

delegate(SuccessResult success)

{

// отправка Replace сообщения всем подписчикам

base.SendNotificationToTarget<Replace>(request.Subscriber, _submgrPort, _state);

},

14) рассмотрим действия, выполняемые в методе Start координирующего сервиса:

14.1) инициализируется состояние координирующего сервиса и вызывается метод Start базового класса:

if (_state == null)

{

_state = new MainServiceState();

}

base.Start();

14.2) инициализируются массивы, описанные выше:

_compSrvPort = new compSrv.CompServiceOperations[c];

_compSrvNotify = new compSrv.CompServiceOperations[c];

subscribe = new compSrv.Subscribe[c];

remoteDir = new ds.DirectoryPort[c];

remoteConstructor = new cs.ConstructorPort[c];

query = new ds.Query[c];

queryRsp = new ds.QueryResponseType[c];

create = new cs.Create[c];

createRsp = new CreateResponse[c];

compService = new string[c];

namex = new XmlQualifiedName[c];

remote = new PartnerType[c];

for (int i = 0; i < c; i++)

{

_compSrvPort[i] = new compSrv.ServiceTutorial4Operations();

_compSrvNotify[i] = new compSrv.ServiceTutorial4Operations();

}

14.3) активируются обработчики сообщений от вычислительных сервисов:

for (int i = 0; i < c; i++)

{

// обработчик Replace сообщения, полученного от вычислительного сервиса

Activate<ITask>(Arbiter.Receive<compSrv.Replace>(true, _compSrvNotify[i],

RemoteNotifyReplaceHandler)

);

}

14.4) активируется обработчик, выполняющийся, когда все вычислительные сервисы отправят результаты обработки данных:

Arbiter.Activate(Environment.TaskQueue, Arbiter.MultipleItemReceive(

true, complPort, c, delegate(int[] array)

{

LogInfo("Коорд. серв.: данные от выч. серв. получены");

for (int i = 0; i < 10; i++)

LogInfo(_state.c[i].ToString());

// отправка сообщений на завершение вычислительным сервисам

for (int i = 0; i < c; i++)

{

_compSrvPort[i].DsspDefaultDrop();

_compSrvPort[i] = null;

}

}));

14.5) инициализируется состояние координирующего сервиса:

а) формируется список адресов вычислительных узлов, указывается имя компьютера, TCP порт узла для передачи HTTP запросов и идентификатор узла:

_state.NodeList.Add(new MainServiceState.NodeElement("xc", 40000, 0));

_state.NodeList.Add(new MainServiceState.NodeElement("xc", 45000, 1));

б) формируется вычислительное задание для каждого вычислительного узла:

_state.ClArr = new HelpClass[c];

int a1 = 10; // строки

int b1 = 10; // столбцы

for (int i = 0; i < c; i++)

_state.ClArr[i] = new HelpClass();

_state.ClArr[0].a = 0;

_state.ClArr[0].b = a1 / 2 - 1;

_state.ClArr[1].a = a1 / 2;

_state.ClArr[1].b = a1 - 1;

_state.A = new int[a1, b1];

_state.b = new int[b1];

_state.c = new int[a1];

for (int i = 0; i < a1; i++)

{

for (int j = 0; j < b1; j++)

_state.A[i, j] = i + j;

}

for (int j = 0; j < b1; j++)

_state.b[j] = j;

14.6) запускается процесс создания вычислительных сервисов:

SpawnIterator(OnStartup);

15) вставьте в класс координирующего сервиса определение метода RemoteNotifyReplaceHandler, данный обработчик запускается при получении сообщения Replace от вычислительного сервиса:

// обработчик, запускаемый при получении Replace сообщения от вычислительного сервиса

private void RemoteNotifyReplaceHandler(compSrv.Replace replace)

{

// объединяем результаты, полученные от вычислительного сервиса

// и записываем их в результирующий массив

int id = replace.Body._id;

int _aa1 = _state.ClArr[id].a;

int _bb1 = _state.ClArr[id].b;

LogInfo("Коорд. серв.: получены данные от " + replace.Body._id + " __ " + _aa1 + " to "

+ _bb1);

for (int i = _aa1; i <= _bb1; i++)

{

_state.c[i] = replace.Body._c[i];

LogInfo(replace.Body._c[i].ToString());

}

// отправка в порт завершения сообщения о том, что пришли данные с одного из

// вычислительных сервисов

complPort.Post(1);

}

16) вставьте в класс координирующего сервиса определение метода OnStartup, данный итератор предназначен для запуска вычислительных сервисов на удалённых узлах, также в данном методе выполняется подписка на уведомления вычислительного сервиса:

16.1) описываются массивы для хранения информации об удалённых узлах:

private IEnumerator<ITask> OnStartup()

{

string[] nms = new String[c];

string[] rmt = new String[c];

nms[0] = "http://schemas.tempuri.org/2012/07/mainservice.html:Remote";

nms[1] = "http://schemas.tempuri.org/2012/07/mainservice.html:Remote";

rmt[0] = "http://xc:40000/directory";

rmt[1] = "http://xc:45000/directory";

for (int i = 0; i < c; i++)

{

namex[i] = new XmlQualifiedName(nms[i]);

remote[i] = new PartnerType(namex[i], null, rmt[i]);

}

16.2) запускается цикл, на каждой итерации которого выполняется создание сервиса на удалённом узле, после чего выполняется подписка на уведомления созданного сервиса:

а) создаётся транслятор запросов у удалённому узлу:

for (int i = 0; i < c; i++)

{

remoteDir[i] = DirectoryPort;

if (remote[i]!= null &&!string.IsNullOrEmpty(remote[i].Service))

{

remoteDir[i] = ServiceForwarder<ds.DirectoryPort>(remote[i].Service);

}

б) отправляется Query сообщение сервису Directory Service, запрашивается конструктор сервисов удалённого узла (конструктор сервисов идентифицируется по его соглашению):

remoteConstructor[i] = ConstructorPort;

query[i] = new ds.Query(

new ds.QueryRequestType(

new ServiceInfoType(cs.Contract.Identifier)

)

);

remoteDir[i].Post(query[i]);

yield return (Choice)query[i].ResponsePort;

queryRsp[i] = query[i].ResponsePort;

if (queryRsp[i]!= null)

{

remoteConstructor[i] =

ServiceForwarder<cs.ConstructorPort>(queryRsp[i].RecordList[0].Service);

}

в) конструктору сервисов отправляется сообщение Create, если создание сервиса на удалённом узле прошло успешно, создаётся транслятор (forwarder) запросов к сервису:

compService[i] = null;

create[i] = new cs.Create(new ServiceInfoType(compSrv.Contract.Identifier));

remoteConstructor[i].Post(create[i]);

yield return (Choice)create[i].ResponsePort;

createRsp[i] = create[i].ResponsePort;

if (createRsp[i]!= null)

{

compService[i] = createRsp[i].Service;

}

else

{

LogError((Fault)create[i].ResponsePort);

yield break;

}

_compSrvPort[i] = ServiceForwarder<compSrv.CompServiceOperations>(compService[i]);

yield return _compSrvPort[i].Subscribe(_compSrvNotify[i], out subscribe[i]);

if ((Fault)subscribe[i].ResponsePort!= null)

{

LogError("Коорд. серв.: ошибка при подписке на уведомления выч. серв.",

(Fault)subscribe[i].ResponsePort);

}

else

{

LogInfo("Коорд. серв.: подписка на уведомления выч. серв. выполнена");

}

}

}

17) рассмотрим действия, выполняемые в методе Start вычислительного сервиса:

17.1) инициализируется состояние вычислительного сервиса, вызывается метод Start базового класса:

protected override void Start()

{

if (_state == null)

{

_state = new ServiceTutorial4State();

}

base.Start();

17.2) запускается итератор, в котором выполняется подключение к координирующему сервису:

// в запускаемом итераторе организуется подписка на

// уведомления координирующего сервиса

SpawnIterator(OnStartup);

17.3) активируется обработчик Replace сообщений от координирующего сервиса:

Activate<ITask>(Arbiter.Receive<mainSrv.Replace>(true, _mainSrvNotify,

RemoteNotifyReplaceHandler));

}

18) вставьте в класс вычислительного сервиса определение метода OnStartup, данный итератор предназначен для запуска вычислительных сервисов на удалённых узлах, также в данном методе выполняется подписка на уведомления вычислительного сервиса (в данном методе прописан порт и ip-адрес узла, на котором запущен координирующий сервис):

ServiceInfoType[] list = null;

private IEnumerator<ITask> OnStartup()

{

// подключение к узлу

// ip-адрес

string machine = "192.168.56.1";

// TCP порт для передачи HTTP запросов

ushort nport = 50003;

// создание URI сервиса

UriBuilder builder = new UriBuilder(Schemes.DsspTcp, machine, nport,

ServicePaths.InstanceDirectory);

ds.DirectoryPort fport = ServiceForwarder<ds.DirectoryPort>(builder.Uri);

// создание и отправка Get запроса

ds.Get get = new ds.Get();

fport.Post(get);

// обработка результата запроса

yield return Arbiter.Choice(get.ResponsePort,

delegate(ds.GetResponseType response)

{

list = response.RecordList;

},

delegate(Fault fault)

{

list = new ServiceInfoType[0];

LogError(fault);

}

);

// количество найденных сервисов

int cnt = 0;

// массив для хранения информации о найденных сервисах

ServiceInfoType[] servList = new ServiceInfoType[cnt];

foreach (ServiceInfoType info in list)

{

if (info.Contract == mainSrv.Contract.Identifier)

{

Array.Resize(ref servList, servList.Length + 1);

servList[cnt] = info;

cnt = cnt + 1;

}

}

_mainSrvPort = ServiceForwarder<mainSrv.MainServiceOperations>(servList[0].Service);

subscribe.NotificationPort = _mainSrvNotify;

// отправка запроса на подписку

_mainSrvPort.Post(subscribe);

// обработка результата запроса

yield return Arbiter.Choice(

subscribe.ResponsePort,

delegate(SubscribeResponseType response)

{

LogInfo("Выч. серв.: подписан на " + servList[0].Service);

},

delegate(Fault fault)

{

LogError("Выч. серв. " + fault);

}

);

yield break;

}

19) вставьте в класс вычислительного сервиса определение метода RemoteNotifyReplaceHandler, данный обработчик запускается при получении сообщения Replace от координирующего сервиса:

19.1) на основе имени узла и порта, с которым связан узел, определяется идентификатор вычислительного сервиса:

private void RemoteNotifyReplaceHandler(mainSrv.Replace replace)

{

LogInfo("Выч. серв.: получено Replace сообщение от коорд. сервиса");

Dictionary<string, string> env = Environment.ServiceUriTable;

string addr = " ";

foreach (KeyValuePair<string, string> pair in env)

{

addr = pair.Value;

break;

}

string[] splitAddr = addr.Split('/');

string pcinfo = splitAddr[2];

string[] info = pcinfo.Split(':');

string pc = info[0];

string port = info[1];

int id = -1;

foreach (mainSrv.MainServiceState.NodeElement xx in replace.Body.NodeList)

{

if (((xx.port + 1).ToString() == port) && (xx.pc.ToString() == pc))

{

id = xx.id;

break;

}

}

LogInfo("Выч. серв.: идентификатор сервиса: " + id.ToString());

int aa1 = replace.Body.ClArr[id].a;

int bb1 = replace.Body.ClArr[id].b;

c = new int[replace.Body.c.Length];

19.2) определяется порт завершения:

Port<int> p = new Port<int>();

19.3) вставка и активация временного получателя, обрабатывающего сообщения, приходящие на порт завершения, данный получатель отправляет результат вычисления координирующему сервису:

Receiver r = Arbiter.Receive(false, p, delegate(int n)

{

Replace resReplace = new Replace();

_state._c = c;

_state._id = id;

resReplace.Body = _state;

base.SendNotification(_submgrPort, resReplace);

}

);

Activate(r);

19.4) формируется задание для вычислительного сервиса:

HelpStruct pr = new HelpStruct(replace.Body.ClArr[id].a, replace.Body.ClArr[id].b,

replace.Body.b, replace.Body.A);

19.5) запускается вычислительная задача:

Arbiter.ExecuteToCompletion(Environment.TaskQueue, new IterativeTask<Port<int>,

HelpStruct>(p, pr, CalcHandler));

}

20) определите в классе вычислительного сервиса структуру, используемую для передачи параметров в итератор (в итератор передаются перемножаемые матрица и вектор, а также диапазоны индексов перемножаемых элементов):

public struct HelpStruct

{

public int _a, _b;

public int[] b;

public int[,] A;

public HelpStruct(int p1, int p2, int[] p3, int[,] p5)

{

_a = p1;

_b = p2;

b = p3;

A = p5;

}

}

21) определите в классе вычислительного сервиса метод CalcHandler, который выполняет вычисления на основе задания, полученного от координирующего сервиса:

// переменная, хранящая результат вычислений

int[] c;

IEnumerator<ITask> CalcHandler(Port<int> port, HelpStruct p)

{

for (int i = p._a; i <= p._b; i++)

{

c[i] = 0;

for (int j = 0; j < p.b.Length; j++)

{

c[i] += p.A[i, j] * p.b[j];

}

LogInfo("C ===" + c[i].ToString());

}

port.Post(1);

yield break;

}

22) вставьте в класс вычислительного сервиса определение метода Drop:

[ServiceHandler(ServiceHandlerBehavior.Teardown)]

public virtual IEnumerator<ITask> DropHandler(DsspDefaultDrop drop)

{

Console.WriteLine("Выч. серв. завершил свою работу");

base.DefaultDropHandler(drop);

//ControlPanelPort.Shutdown();

yield break;

}

23) вставьте в класс координирующего сервиса определение метода Drop:

[ServiceHandler(ServiceHandlerBehavior.Teardown)]

public IEnumerator<ITask> DropHandler(DsspDefaultDrop drop)

{

base.DefaultDropHandler(drop);

yield break;

}

24) скомпилируйте оба сервиса.

 



Поделиться:


Последнее изменение этой страницы: 2016-04-26; просмотров: 275; Нарушение авторского права страницы; Мы поможем в написании вашей работы!

infopedia.su Все материалы представленные на сайте исключительно с целью ознакомления читателями и не преследуют коммерческих целей или нарушение авторских прав. Обратная связь - 3.146.255.127 (0.487 с.)