context_client.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. #include "xios_spl.hpp"
  2. #include "context_client.hpp"
  3. #include "context_server.hpp"
  4. #include "event_client.hpp"
  5. #include "buffer_out.hpp"
  6. #include "buffer_client.hpp"
  7. #include "type.hpp"
  8. #include "event_client.hpp"
  9. #include "context.hpp"
  10. #include "mpi.hpp"
  11. #include "timer.hpp"
  12. #include "cxios.hpp"
  13. #include "server.hpp"
  14. namespace xios
  15. {
  16. /*!
  17. \param [in] parent Pointer to context on client side
  18. \param [in] intraComm_ communicator of group client
  19. \param [in] interComm_ communicator of group server
  20. \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
  21. */
  22. CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
  23. : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4)
  24. {
  25. context = parent;
  26. intraComm = intraComm_;
  27. interComm = interComm_;
  28. MPI_Comm_rank(intraComm, &clientRank);
  29. MPI_Comm_size(intraComm, &clientSize);
  30. int flag;
  31. MPI_Comm_test_inter(interComm, &flag);
  32. if (flag) MPI_Comm_remote_size(interComm, &serverSize);
  33. else MPI_Comm_size(interComm, &serverSize);
  34. computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader);
  35. timeLine = 0;
  36. }
  37. void CContextClient::computeLeader(int clientRank, int clientSize, int serverSize,
  38. std::list<int>& rankRecvLeader,
  39. std::list<int>& rankRecvNotLeader)
  40. {
  41. if ((0 == clientSize) || (0 == serverSize)) return;
  42. if (clientSize < serverSize)
  43. {
  44. int serverByClient = serverSize / clientSize;
  45. int remain = serverSize % clientSize;
  46. int rankStart = serverByClient * clientRank;
  47. if (clientRank < remain)
  48. {
  49. serverByClient++;
  50. rankStart += clientRank;
  51. }
  52. else
  53. rankStart += remain;
  54. for (int i = 0; i < serverByClient; i++)
  55. rankRecvLeader.push_back(rankStart + i);
  56. rankRecvNotLeader.resize(0);
  57. }
  58. else
  59. {
  60. int clientByServer = clientSize / serverSize;
  61. int remain = clientSize % serverSize;
  62. if (clientRank < (clientByServer + 1) * remain)
  63. {
  64. if (clientRank % (clientByServer + 1) == 0)
  65. rankRecvLeader.push_back(clientRank / (clientByServer + 1));
  66. else
  67. rankRecvNotLeader.push_back(clientRank / (clientByServer + 1));
  68. }
  69. else
  70. {
  71. int rank = clientRank - (clientByServer + 1) * remain;
  72. if (rank % clientByServer == 0)
  73. rankRecvLeader.push_back(remain + rank / clientByServer);
  74. else
  75. rankRecvNotLeader.push_back(remain + rank / clientByServer);
  76. }
  77. }
  78. }
  79. /*!
  80. In case of attached mode, the current context must be reset to context for client
  81. \param [in] event Event sent to server
  82. */
  83. void CContextClient::sendEvent(CEventClient& event)
  84. {
  85. list<int> ranks = event.getRanks();
  86. if (CXios::checkEventSync)
  87. {
  88. int typeId, classId, typeId_in, classId_in, timeLine_out;
  89. typeId_in=event.getTypeId() ;
  90. classId_in=event.getClassId() ;
  91. // MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
  92. MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ;
  93. MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
  94. MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
  95. if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
  96. {
  97. ERROR("void CContextClient::sendEvent(CEventClient& event)",
  98. << "Event are not coherent between client.");
  99. }
  100. }
  101. if (!event.isEmpty())
  102. {
  103. list<int> sizes = event.getSizes();
  104. // We force the getBuffers call to be non-blocking on classical servers
  105. list<CBufferOut*> buffList;
  106. bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) ));
  107. // bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer );
  108. if (couldBuffer)
  109. {
  110. event.send(timeLine, sizes, buffList);
  111. checkBuffers(ranks);
  112. if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
  113. {
  114. waitEvent(ranks);
  115. CContext::setCurrent(context->getId());
  116. }
  117. }
  118. else
  119. {
  120. tmpBufferedEvent.ranks = ranks;
  121. tmpBufferedEvent.sizes = sizes;
  122. for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++)
  123. tmpBufferedEvent.buffers.push_back(new CBufferOut(*it));
  124. info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ;
  125. event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers);
  126. }
  127. }
  128. timeLine++;
  129. }
  130. /*!
  131. * Send the temporarily buffered event (if any).
  132. *
  133. * \return true if a temporarily buffered event could be sent, false otherwise
  134. */
  135. bool CContextClient::sendTemporarilyBufferedEvent()
  136. {
  137. bool couldSendTmpBufferedEvent = false;
  138. if (hasTemporarilyBufferedEvent())
  139. {
  140. list<CBufferOut*> buffList;
  141. if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call
  142. {
  143. list<CBufferOut*>::iterator it, itBuffer;
  144. for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++)
  145. (*itBuffer)->put((char*)(*it)->start(), (*it)->count());
  146. info(100)<<"DEBUG : temporaly event sent "<<endl ;
  147. checkBuffers(tmpBufferedEvent.ranks);
  148. tmpBufferedEvent.clear();
  149. couldSendTmpBufferedEvent = true;
  150. }
  151. }
  152. return couldSendTmpBufferedEvent;
  153. }
  154. /*!
  155. If client is also server (attached mode), after sending event, it should process right away
  156. the incoming event.
  157. \param [in] ranks list rank of server connected this client
  158. */
  159. void CContextClient::waitEvent(list<int>& ranks)
  160. {
  161. parentServer->server->setPendingEvent();
  162. while (checkBuffers(ranks))
  163. {
  164. parentServer->server->listen();
  165. parentServer->server->checkPendingRequest();
  166. }
  167. while (parentServer->server->hasPendingEvent())
  168. {
  169. parentServer->server->eventLoop();
  170. }
  171. }
  172. /*!
  173. * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
  174. * it is explicitly requested to be non-blocking.
  175. *
  176. * \param [in] serverList list of rank of connected server
  177. * \param [in] sizeList size of message corresponding to each connection
  178. * \param [out] retBuffers list of buffers that can be used to store an event
  179. * \param [in] nonBlocking whether this function should be non-blocking
  180. * \return whether the already allocated buffers could be used
  181. */
  182. bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,
  183. bool nonBlocking /*= false*/)
  184. {
  185. list<int>::const_iterator itServer, itSize;
  186. list<CClientBuffer*> bufferList;
  187. map<int,CClientBuffer*>::const_iterator it;
  188. list<CClientBuffer*>::iterator itBuffer;
  189. bool areBuffersFree;
  190. for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
  191. {
  192. it = buffers.find(*itServer);
  193. if (it == buffers.end())
  194. {
  195. newBuffer(*itServer);
  196. it = buffers.find(*itServer);
  197. }
  198. bufferList.push_back(it->second);
  199. }
  200. CTimer::get("Blocking time").resume();
  201. do
  202. {
  203. areBuffersFree = true;
  204. for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
  205. areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
  206. if (!areBuffersFree)
  207. {
  208. checkBuffers();
  209. if (CServer::serverLevel == 0)
  210. context->server->listen();
  211. else if (CServer::serverLevel == 1)
  212. {
  213. context->server->listen();
  214. for (int i = 0; i < context->serverPrimServer.size(); ++i)
  215. context->serverPrimServer[i]->listen();
  216. CServer::contextEventLoop(false) ; // avoid dead-lock at finalize...
  217. }
  218. else if (CServer::serverLevel == 2)
  219. context->server->listen();
  220. }
  221. } while (!areBuffersFree && !nonBlocking);
  222. CTimer::get("Blocking time").suspend();
  223. if (areBuffersFree)
  224. {
  225. for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
  226. retBuffers.push_back((*itBuffer)->getBuffer(*itSize));
  227. }
  228. return areBuffersFree;
  229. }
  230. /*!
  231. Make a new buffer for a certain connection to server with specific rank
  232. \param [in] rank rank of connected server
  233. */
  234. void CContextClient::newBuffer(int rank)
  235. {
  236. if (!mapBufferSize_.count(rank))
  237. {
  238. error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
  239. mapBufferSize_[rank] = CXios::minBufferSize;
  240. maxEventSizes[rank] = CXios::minBufferSize;
  241. }
  242. CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank], maxBufferedEvents);
  243. // Notify the server
  244. CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize));
  245. bufOut->put(mapBufferSize_[rank]); // Stupid C++
  246. buffer->checkBuffer();
  247. }
  248. /*!
  249. Verify state of buffers. Buffer is under pending state if there is no message on it
  250. \return state of buffers, pending(true), ready(false)
  251. */
  252. bool CContextClient::checkBuffers(void)
  253. {
  254. map<int,CClientBuffer*>::iterator itBuff;
  255. bool pending = false;
  256. for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
  257. pending |= itBuff->second->checkBuffer();
  258. return pending;
  259. }
  260. //! Release all buffers
  261. void CContextClient::releaseBuffers()
  262. {
  263. map<int,CClientBuffer*>::iterator itBuff;
  264. for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
  265. {
  266. delete itBuff->second;
  267. }
  268. buffers.clear();
  269. }
  270. /*!
  271. Verify state of buffers corresponding to a connection
  272. \param [in] ranks list rank of server to which client connects to
  273. \return state of buffers, pending(true), ready(false)
  274. */
  275. bool CContextClient::checkBuffers(list<int>& ranks)
  276. {
  277. list<int>::iterator it;
  278. bool pending = false;
  279. for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer();
  280. return pending;
  281. }
  282. /*!
  283. * Set the buffer size for each connection. Warning: This function is collective.
  284. *
  285. * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
  286. * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
  287. */
  288. void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)
  289. {
  290. mapBufferSize_ = mapSize;
  291. maxEventSizes = maxEventSize;
  292. // Compute the maximum number of events that can be safely buffered.
  293. double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max();
  294. for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it)
  295. {
  296. double ratio = double(it->second) / maxEventSizes[it->first];
  297. if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio;
  298. }
  299. MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm);
  300. if (minBufferSizeEventSizeRatio < 1.0)
  301. {
  302. ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)",
  303. << "The buffer sizes and the maximum events sizes are incoherent.");
  304. }
  305. else if (minBufferSizeEventSizeRatio == std::numeric_limits<double>::max())
  306. minBufferSizeEventSizeRatio = 1.0; // In this case, maxBufferedEvents will never be used but we want to avoid any floating point exception
  307. maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server
  308. + size_t(minBufferSizeEventSizeRatio) // one local buffer can always be fully used
  309. + 1; // the other local buffer might contain only one event
  310. }
  311. /*!
  312. Get leading server in the group of connected server
  313. \return ranks of leading servers
  314. */
  315. const std::list<int>& CContextClient::getRanksServerNotLeader(void) const
  316. {
  317. return ranksServerNotLeader;
  318. }
  319. /*!
  320. Check if client connects to leading server
  321. \return connected(true), not connected (false)
  322. */
  323. bool CContextClient::isServerNotLeader(void) const
  324. {
  325. return !ranksServerNotLeader.empty();
  326. }
  327. /*!
  328. Get leading server in the group of connected server
  329. \return ranks of leading servers
  330. */
  331. const std::list<int>& CContextClient::getRanksServerLeader(void) const
  332. {
  333. return ranksServerLeader;
  334. }
  335. /*!
  336. Check if client connects to leading server
  337. \return connected(true), not connected (false)
  338. */
  339. bool CContextClient::isServerLeader(void) const
  340. {
  341. return !ranksServerLeader.empty();
  342. }
  343. /*!
  344. * Check if the attached mode is used.
  345. *
  346. * \return true if and only if attached mode is used
  347. */
  348. bool CContextClient::isAttachedModeEnabled() const
  349. {
  350. return (parentServer != 0);
  351. }
  352. /*!
  353. * Finalize context client and do some reports. Function is non-blocking.
  354. */
  355. void CContextClient::finalize(void)
  356. {
  357. map<int,CClientBuffer*>::iterator itBuff;
  358. bool stop = false;
  359. CTimer::get("Blocking time").resume();
  360. while (hasTemporarilyBufferedEvent())
  361. {
  362. checkBuffers();
  363. sendTemporarilyBufferedEvent();
  364. }
  365. CTimer::get("Blocking time").suspend();
  366. CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
  367. if (isServerLeader())
  368. {
  369. CMessage msg;
  370. const std::list<int>& ranks = getRanksServerLeader();
  371. for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
  372. {
  373. info(100)<<"DEBUG : Sent context Finalize event to rank "<<*itRank<<endl ;
  374. event.push(*itRank, 1, msg);
  375. }
  376. sendEvent(event);
  377. }
  378. else sendEvent(event);
  379. CTimer::get("Blocking time").resume();
  380. // while (!stop)
  381. {
  382. checkBuffers();
  383. if (hasTemporarilyBufferedEvent())
  384. sendTemporarilyBufferedEvent();
  385. stop = true;
  386. // for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest();
  387. }
  388. CTimer::get("Blocking time").suspend();
  389. std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
  390. iteMap = mapBufferSize_.end(), itMap;
  391. StdSize totalBuf = 0;
  392. for (itMap = itbMap; itMap != iteMap; ++itMap)
  393. {
  394. report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl
  395. << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
  396. totalBuf += itMap->second;
  397. }
  398. report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
  399. //releaseBuffers(); // moved to CContext::finalize()
  400. }
  401. /*!
  402. */
  403. bool CContextClient::havePendingRequests(void)
  404. {
  405. bool pending = false;
  406. map<int,CClientBuffer*>::iterator itBuff;
  407. for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
  408. pending |= itBuff->second->hasPendingRequest();
  409. return pending;
  410. }
  411. }