partitionedmapserializer.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. ////////////////////////////////////////////////////////////////////////////////
  2. /// \file partitionedmapserializer.h
  3. /// \brief Classes for serializing elements spread out over many processes
  4. ///
  5. /// These classes were developed to save state information in LPJ-GUESS. For
  6. /// parallel runs with grid cells spread out over different processes the state
  7. /// information can be saved to files in a way so that they can later be read
  8. /// back when running with a different number of processes.
  9. ///
  10. /// PartitionedMapSerializer and PartitionedMapDeserializer handle the more
  11. /// general case of saving an associative array, or map, partitioned over
  12. /// different processes. In the LPJ-GUESS case, the elements of the map
  13. /// are the grid cells, and the keys uniquely identifying elements are
  14. /// coordinates.
  15. ///
  16. /// Each process will write its elements to a single file, followed by an index
  17. /// (the keys, sorted). When the elements should be read back each process will
  18. /// read in the index of each state file and read in the elements it wants in
  19. /// the order that minimizes disk seeks.
  20. ///
  21. /// \author Joe Siltberg
  22. /// $Date: 2018-02-02 18:01:35 +0100 (ven, 02 fév 2018) $
  23. ///
  24. ////////////////////////////////////////////////////////////////////////////////
  25. #ifndef JL_PARTITIONED_MAP_SERIALIZER_H
  26. #define JL_PARTITIONED_MAP_SERIALIZER_H
  27. #include <vector>
  28. #include <utility>
  29. #include <string>
  30. #include <algorithm>
  31. #include <map>
  32. #include <stdexcept>
  33. #include <fstream>
  34. #include <memory>
  35. /// Creates a path to a state file given directory and process rank
  36. /** For instance, create_path("output", 17) might return something like
  37. * "output/17.state".
  38. */
  39. std::string create_path(const char* directory, int rank);
  40. /// Thrown if an error is encountered when serializing/deserializing
  41. class PartitionedMapSerializerError : public std::runtime_error {
  42. public:
  43. PartitionedMapSerializerError(const std::string& what) : std::runtime_error(what) {}
  44. };
  45. /// Class for serializing a partitioned map
  46. /** This class writes out a single state file with the elements of the map
  47. * which belong to the current process. Together with the elements an
  48. * index is also written so that maps can quickly be deserialized even
  49. * if the elements -> process mapping is different when deserializing.
  50. *
  51. * \param Element The type of the elements in the map
  52. * \param Key The type of the keys identifying elements
  53. * \param ElementSerializer Functor serializing an element
  54. * \param KeySerializer Functor serializing a key
  55. */
  56. template<typename Element,
  57. typename Key,
  58. typename ElementSerializer,
  59. typename KeySerializer>
  60. class PartitionedMapSerializer {
  61. public:
  62. /// Construct a serializer
  63. /** \param directory Where to place the files
  64. * \param my_rank An integer uniquely identifying this process
  65. * (for instance an MPI rank)
  66. * \param es The functor for serializing elements
  67. * \param ks The functor for serializing keys
  68. */
  69. PartitionedMapSerializer(const char* directory,
  70. int my_rank,
  71. ElementSerializer es,
  72. KeySerializer ks)
  73. : element_serializer(es),
  74. key_serializer(ks),
  75. file(create_path(directory, my_rank).c_str(),
  76. std::ios::binary | std::ios::trunc) {
  77. if (file.fail()) {
  78. throw PartitionedMapSerializerError(std::string("failed to open a file in ") + directory);
  79. }
  80. }
  81. ~PartitionedMapSerializer() {
  82. write_index();
  83. }
  84. /// Serializes a single element
  85. /** \param key Key for the element to serialize
  86. * \param element The element to serialize
  87. */
  88. void serialize_element(const Key& key,
  89. const Element& element) {
  90. add_to_index(key);
  91. element_serializer(file, element);
  92. }
  93. private:
  94. /// Simply adds a key and the current file position to the index, in memory
  95. /** The index is written out to file in the destructor */
  96. void add_to_index(const Key& key) {
  97. index.push_back(std::make_pair(key, file.tellp()));
  98. }
  99. /// Writes out the index to the file
  100. void write_index() {
  101. // sort it so we can search quickly
  102. std::sort(index.begin(), index.end());
  103. // write each key and the position of its element in the file
  104. for (size_t i = 0; i < index.size(); i++) {
  105. key_serializer(file, index[i].first);
  106. // convert to a std::streamsize since that is guaranteed to be defined as a
  107. // basic integral type which we can serialize by simply taking its binary
  108. // representation
  109. std::streamsize file_position = index[i].second;
  110. file.write(reinterpret_cast<const char*>(&file_position),
  111. sizeof(file_position));
  112. }
  113. // write out the number of elements after the index so it possible to
  114. // find the start of the index
  115. size_t number_of_elements = index.size();
  116. file.write(reinterpret_cast<const char*>(&number_of_elements),
  117. sizeof(number_of_elements));
  118. if (file.fail()) {
  119. throw PartitionedMapSerializerError("failed to write out index");
  120. }
  121. }
  122. typedef std::pair<Key, std::streampos> IndexElement;
  123. typedef std::vector<IndexElement> Index;
  124. /// The in-memory index, written to file when we're done
  125. Index index;
  126. ElementSerializer element_serializer;
  127. KeySerializer key_serializer;
  128. /// The file we're serializing to
  129. std::ofstream file;
  130. };
  131. /// Class for deserializing a partitioned map
  132. /** This class will read in the indices of each state file and then read in
  133. * just the elements that belong to this process.
  134. *
  135. * \param Element The type of the elements in the map
  136. * \param Key The type of the keys identifying elements
  137. * \param ElementDeserializer Functor deserializing an element
  138. * \param KeyDeserializer Functor deserializing a key
  139. * \param KeySize The size of a serialized key (in bytes)
  140. */
  141. template<typename Element,
  142. typename Key,
  143. typename ElementDeserializer,
  144. typename KeyDeserializer,
  145. int KeySize>
  146. class PartitionedMapDeserializer {
  147. public:
  148. /// Construct a deserializer
  149. /** \param directory Where to find the state files
  150. * \param max_rank The rank of the process with highest rank
  151. * \param ed Functor for deserializing an element
  152. * \param kd Functor for deserializing a key
  153. */
  154. PartitionedMapDeserializer(const char* directory,
  155. int max_rank,
  156. ElementDeserializer ed,
  157. KeyDeserializer kd)
  158. : element_deserializer(ed), key_deserializer(kd) {
  159. // we'll simply try to open all files between 0 and max_rank (inclusive)
  160. int rank = 0;
  161. while (rank <= max_rank) {
  162. std::string path = create_path(directory, rank);
  163. std::auto_ptr<std::ifstream> stream(
  164. new std::ifstream(path.c_str(), std::ios::binary | std::ios::in));
  165. if (!stream->fail()) {
  166. File* file = new File;
  167. // read index
  168. // seek to the number of elements
  169. stream->seekg(-std::streampos(sizeof(size_t)), std::ios::end);
  170. size_t number_of_elements;
  171. stream->read(reinterpret_cast<char*>(&number_of_elements), sizeof(size_t));
  172. // seek to start of index
  173. std::streamsize index_size =
  174. number_of_elements*(KeySize+sizeof(std::streamsize));
  175. stream->seekg(-std::streampos(sizeof(size_t))-index_size, std::ios::cur);
  176. // read the index
  177. for (size_t i = 0; i < number_of_elements; i++) {
  178. Key key;
  179. std::streamsize position;
  180. key_deserializer(*stream, key);
  181. stream->read(reinterpret_cast<char*>(&position), sizeof(std::streamsize));
  182. file->index.push_back(std::make_pair(key, position));
  183. }
  184. if (stream->fail()) {
  185. throw PartitionedMapSerializerError(std::string("failed to read index for state file: ") + path);
  186. }
  187. file->stream = stream;
  188. files.push_back(file);
  189. }
  190. rank++;
  191. }
  192. if (files.empty()) {
  193. throw PartitionedMapSerializerError("couldn't open any state files");
  194. }
  195. }
  196. ~PartitionedMapDeserializer() {
  197. for (size_t i = 0; i < files.size(); i++) {
  198. delete files[i];
  199. }
  200. }
  201. /// Reads in a single element from disk
  202. /** Use deserialize_elements instead if several elements should be read
  203. * at once as that version will minimize the number of disk seeks.
  204. *
  205. * \param key The key identifying the element to read in
  206. * \param element The element to deserialize to
  207. */
  208. void deserialize_element(const Key& key, Element& element) {
  209. for (size_t i = 0; i < files.size(); i++) {
  210. Index& index = files[i]->index;
  211. std::auto_ptr<std::ifstream>& stream = files[i]->stream;
  212. // find position of element in this file if it has it
  213. typename Index::iterator itr =
  214. std::lower_bound(index.begin(), index.end(), std::make_pair(key,0),
  215. IndexElementComparator());
  216. // if it had it, seek to that position and deserialize
  217. if (itr != index.end() && !(key < (*itr).first)) {
  218. stream->seekg((*itr).second, std::ios::beg);
  219. element_deserializer(*stream, element);
  220. return;
  221. }
  222. }
  223. throw PartitionedMapSerializerError("failed to find element to deserialize");
  224. }
  225. /// Reads in several elements from disk
  226. /** \param keys The keys identifying the elements to read in
  227. * \param elements Vector of pointers to elements that should be deserialized.
  228. */
  229. void deserialize_elements(const std::vector<Key>& keys,
  230. const std::vector<Element*>& elements) {
  231. // This function is a bit tricky since it should read in the elements
  232. // from disk in the order which minimizes the number of seeks, but
  233. // at the same time return the elements in the order of the keys.
  234. // A vector to keep track of which elements have been deserialized,
  235. // all set to false initially
  236. std::vector<bool> found_elements(elements.size());
  237. // Create a mapping from keys to their position in the keys vector.
  238. // This way, we can quickly find out where in the elements vector
  239. // we can find the element to deserialize for that key.
  240. std::map<Key, size_t> keys_map;
  241. for (size_t k = 0; k < keys.size(); k++) {
  242. keys_map[keys[k]] = k;
  243. }
  244. // go through all indexes, finding all file positions to read from
  245. for (size_t i = 0; i < files.size(); i++) {
  246. // for each file, we have a vector of (file pos, index) pairs, where
  247. // file pos is where to read in the file, and index is where the element
  248. // should be placed in the elements vector
  249. std::vector<std::pair<std::streampos, size_t> > positions;
  250. // Simply loop through the index linearly
  251. Index& index = files[i]->index;
  252. for (size_t j = 0; j < index.size(); j++) {
  253. Key& key = index[j].first;
  254. // Is this one of the keys we're interested in?
  255. typename std::map<Key, size_t>::iterator itr = keys_map.find(key);
  256. if (itr != keys_map.end()) {
  257. // Yes, so save the position of that element in the file and
  258. // the position in the elements vector where to store the element
  259. positions.push_back(std::make_pair(index[j].second, (*itr).second));
  260. }
  261. }
  262. // did any of the keys exist in this file?
  263. if (positions.empty()) {
  264. // nope, so continue with next file
  265. continue;
  266. }
  267. // sort the positions for this file, so that we will read the
  268. // elements from this file in the order they are stored in the file
  269. std::sort(positions.begin(), positions.end());
  270. // read all the elements from this file
  271. std::auto_ptr<std::ifstream>& stream = files[i]->stream;
  272. stream->seekg(positions[0].first, std::ios::beg);
  273. for (size_t e = 0; e < positions.size(); e++) {
  274. // only seek if necessary
  275. if (stream->tellg() != positions[e].first) {
  276. stream->seekg(positions[e].first, std::ios::beg);
  277. }
  278. // deserialize this element
  279. element_deserializer(*stream, *elements[positions[e].second]);
  280. if (!stream->fail()) {
  281. found_elements[positions[e].second] = true;
  282. }
  283. else {
  284. throw PartitionedMapSerializerError("failed to deserialize element from state file");
  285. }
  286. }
  287. if (stream->fail()) {
  288. throw PartitionedMapSerializerError("failed to deserialize elements from state file");
  289. }
  290. }
  291. // check if there are any elements not read in
  292. if (std::find(found_elements.begin(), found_elements.end(), false) !=
  293. found_elements.end()) {
  294. throw PartitionedMapSerializerError("failed to find some elements");
  295. }
  296. }
  297. private:
  298. typedef std::pair<Key, std::streampos> IndexElement;
  299. typedef std::vector<IndexElement> Index;
  300. // A functor which compares IndexElements by comparing their keys
  301. struct IndexElementComparator {
  302. bool operator()(const IndexElement& left, const IndexElement& right) {
  303. return left.first < right.first;
  304. }
  305. };
  306. /// Internal representation for each state file
  307. struct File {
  308. /// An opened stream for this state file
  309. std::auto_ptr<std::ifstream> stream;
  310. /// The index, read in from the file
  311. Index index;
  312. };
  313. /// All the state files
  314. std::vector<File* > files;
  315. ElementDeserializer element_deserializer;
  316. KeyDeserializer key_deserializer;
  317. };
  318. #endif // JL_PARTITIONED_MAP_SERIALIZER_H