STXXL  1.4-dev
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
multiway_mergesort.h
Go to the documentation of this file.
1 /***************************************************************************
2  * include/stxxl/bits/parallel/multiway_mergesort.h
3  *
4  * Parallel multiway mergesort.
5  * Extracted from MCSTL - http://algo2.iti.uni-karlsruhe.de/singler/mcstl/
6  *
7  * Part of the STXXL. See http://stxxl.sourceforge.net
8  *
9  * Copyright (C) 2007 Johannes Singler <[email protected]>
10  * Copyright (C) 2014 Timo Bingmann <[email protected]>
11  *
12  * Distributed under the Boost Software License, Version 1.0.
13  * (See accompanying file LICENSE_1_0.txt or copy at
14  * http://www.boost.org/LICENSE_1_0.txt)
15  **************************************************************************/
16 
17 #ifndef STXXL_PARALLEL_MULTIWAY_MERGESORT_HEADER
18 #define STXXL_PARALLEL_MULTIWAY_MERGESORT_HEADER
19 
20 #include <vector>
21 #include <iterator>
22 #include <algorithm>
23 
24 #include <stxxl/bits/config.h>
31 
33 
34 #if STXXL_PARALLEL
35 
36 namespace parallel {
37 
38 //! Subsequence description.
39 template <typename DiffType>
40 struct PMWMSPiece
41 {
42  //! Begin of subsequence.
43  DiffType begin;
44  //! End of subsequence.
45  DiffType end;
46 };
47 
48 /*!
49  * Data accessed by all threads.
50  *
51  * PMWMS = parallel multiway mergesort
52  */
53 template <typename RandomAccessIterator>
54 struct PMWMSSortingData
55 {
56  typedef typename std::iterator_traits<RandomAccessIterator>::value_type
57  ValueType;
58  typedef typename std::iterator_traits<RandomAccessIterator>::difference_type
59  DiffType;
60 
61  //! Input begin.
62  RandomAccessIterator source;
63  //! Start indices, per thread.
64  DiffType* starts;
65 
66  /*!
67  * Temporary arrays for each thread.
68  *
69  * Indirection Allows using the temporary storage in different ways,
70  * without code duplication. \see STXXL_MULTIWAY_MERGESORT_COPY_LAST
71  */
72  ValueType** temporaries;
73 #if STXXL_MULTIWAY_MERGESORT_COPY_LAST
74  /** Storage in which to sort. */
75  RandomAccessIterator* sorting_places;
76  /** Storage into which to merge. */
77  ValueType** merging_places;
78 #else
79  /** Storage in which to sort. */
80  ValueType** sorting_places;
81  /** Storage into which to merge. */
82  RandomAccessIterator* merging_places;
83 #endif
84  /** Samples. */
85  ValueType* samples;
86  /** Offsets to add to the found positions. */
87  DiffType* offsets;
88  /** PMWMSPieces of data to merge \c [thread][sequence] */
89  std::vector<PMWMSPiece<DiffType> >* pieces;
90 };
91 
92 //! Thread local data for PMWMS.
93 template <typename RandomAccessIterator>
94 struct PMWMSSorterPU
95 {
96  /** Total number of thread involved. */
97  thread_index_t num_threads;
98  /** Number of owning thread. */
99  thread_index_t iam;
100  /** Pointer to global data. */
101  PMWMSSortingData<RandomAccessIterator>* sd;
102 };
103 
104 /*!
105  * Select samples from a sequence.
106  * \param d Pointer to thread-local data. Result will be placed in \c d->ds->samples.
107  * \param num_samples Number of samples to select.
108  */
109 template <typename RandomAccessIterator, typename DiffType>
110 inline void determine_samples(PMWMSSorterPU<RandomAccessIterator>* d,
111  DiffType& num_samples)
112 {
113  PMWMSSortingData<RandomAccessIterator>* sd = d->sd;
114 
115  num_samples = SETTINGS::sort_mwms_oversampling * d->num_threads - 1;
116 
117  std::vector<DiffType> es(num_samples + 2);
118  equally_split(sd->starts[d->iam + 1] - sd->starts[d->iam],
119  (thread_index_t)(num_samples + 1), es.begin());
120 
121  for (DiffType i = 0; i < num_samples; i++)
122  sd->samples[d->iam * num_samples + i] = sd->source[sd->starts[d->iam] + es[i + 1]];
123 }
124 
125 /*!
126  * PMWMS code executed by each thread.
127  * \param d Pointer to thread-local data.
128  * \param comp Comparator.
129  */
130 template <bool Stable, typename RandomAccessIterator, typename Comparator>
131 inline void parallel_sort_mwms_pu(PMWMSSorterPU<RandomAccessIterator>* d,
132  Comparator& comp)
133 {
134  typedef typename std::iterator_traits<RandomAccessIterator>::value_type
135  ValueType;
136  typedef typename std::iterator_traits<RandomAccessIterator>::difference_type
137  DiffType;
138 
139  Timing<inactive_tag> t;
140 
141  t.tic();
142 
143  PMWMSSortingData<RandomAccessIterator>* sd = d->sd;
144  thread_index_t iam = d->iam;
145 
146  // length of this thread's chunk, before merging
147  DiffType length_local = sd->starts[iam + 1] - sd->starts[iam];
148 
149 #if STXXL_MULTIWAY_MERGESORT_COPY_LAST
150  typedef RandomAccessIterator SortingPlacesIterator;
151  // sort in input storage
152  sd->sorting_places[iam] = sd->source + sd->starts[iam];
153 #else
154  typedef ValueType* SortingPlacesIterator;
155  // sort in temporary storage, leave space for sentinel
156  sd->sorting_places[iam] = sd->temporaries[iam] = static_cast<ValueType*>(::operator new (sizeof(ValueType) * (length_local + 1)));
157  // copy there
158  std::uninitialized_copy(sd->source + sd->starts[iam], sd->source + sd->starts[iam] + length_local, sd->sorting_places[iam]);
159 #endif
160 
161  // sort locally
162  if (Stable)
163  std::stable_sort(sd->sorting_places[iam], sd->sorting_places[iam] + length_local, comp);
164  else
165  std::sort(sd->sorting_places[iam], sd->sorting_places[iam] + length_local, comp);
166 
167  STXXL_DEBUG_ASSERT(stxxl::is_sorted(sd->sorting_places[iam], sd->sorting_places[iam] + length_local, comp));
168 
169  // invariant: locally sorted subsequence in sd->sorting_places[iam], sd->sorting_places[iam] + length_local
170 
171  t.tic("local sort");
172 
173  if (SETTINGS::sort_splitting == SETTINGS::SAMPLING)
174  {
175  DiffType num_samples;
176  determine_samples(d, num_samples);
177 
178 #pragma omp barrier
179 
180  t.tic("sample/wait");
181 
182 #pragma omp single
183  std::sort(sd->samples, sd->samples + (num_samples * d->num_threads), comp);
184 
185 #pragma omp barrier
186 
187  for (int s = 0; s < d->num_threads; s++)
188  {
189  // for each sequence
190  if (num_samples * iam > 0)
191  sd->pieces[iam][s].begin =
192  std::lower_bound(sd->sorting_places[s],
193  sd->sorting_places[s] + sd->starts[s + 1] - sd->starts[s],
194  sd->samples[num_samples * iam],
195  comp)
196  - sd->sorting_places[s];
197  else
198  // absolute beginning
199  sd->pieces[iam][s].begin = 0;
200 
201  if ((num_samples * (iam + 1)) < (num_samples * d->num_threads))
202  sd->pieces[iam][s].end =
203  std::lower_bound(sd->sorting_places[s],
204  sd->sorting_places[s] + sd->starts[s + 1] - sd->starts[s],
205  sd->samples[num_samples * (iam + 1)],
206  comp)
207  - sd->sorting_places[s];
208  else
209  // absolute end
210  sd->pieces[iam][s].end = sd->starts[s + 1] - sd->starts[s];
211  }
212  }
213  else if (SETTINGS::sort_splitting == SETTINGS::EXACT)
214  {
215 #pragma omp barrier
216 
217  t.tic("wait");
218 
219  std::vector<std::pair<SortingPlacesIterator, SortingPlacesIterator> > seqs(d->num_threads);
220  for (int s = 0; s < d->num_threads; s++)
221  seqs[s] = std::make_pair(sd->sorting_places[s], sd->sorting_places[s] + sd->starts[s + 1] - sd->starts[s]);
222 
223  std::vector<SortingPlacesIterator> offsets(d->num_threads);
224 
225  // if not last thread
226  if (iam < d->num_threads - 1)
227  multiseq_partition(seqs.begin(), seqs.end(), sd->starts[iam + 1], offsets.begin(), comp);
228 
229  for (int seq = 0; seq < d->num_threads; seq++)
230  {
231  // for each sequence
232  if (iam < (d->num_threads - 1))
233  sd->pieces[iam][seq].end = offsets[seq] - seqs[seq].first;
234  else
235  // absolute end of this sequence
236  sd->pieces[iam][seq].end = sd->starts[seq + 1] - sd->starts[seq];
237  }
238 
239 #pragma omp barrier
240 
241  for (int seq = 0; seq < d->num_threads; seq++)
242  {
243  // for each sequence
244  if (iam > 0)
245  sd->pieces[iam][seq].begin = sd->pieces[iam - 1][seq].end;
246  else
247  // absolute beginning
248  sd->pieces[iam][seq].begin = 0;
249  }
250  }
251 
252  t.tic("split");
253 
254  // offset from target begin, length after merging
255  DiffType offset = 0, length_am = 0;
256  for (int s = 0; s < d->num_threads; s++)
257  {
258  length_am += sd->pieces[iam][s].end - sd->pieces[iam][s].begin;
259  offset += sd->pieces[iam][s].begin;
260  }
261 
262 #if STXXL_MULTIWAY_MERGESORT_COPY_LAST
263  // merge to temporary storage, uninitialized creation not possible since
264  // there is no multiway_merge calling the placement new instead of the
265  // assignment operator
266  sd->merging_places[iam] = sd->temporaries[iam] = new ValueType[length_am];
267 #else
268  // merge directly to target
269  sd->merging_places[iam] = sd->source + offset;
270 #endif
271  std::vector<std::pair<SortingPlacesIterator, SortingPlacesIterator> > seqs(d->num_threads);
272 
273  for (int s = 0; s < d->num_threads; s++)
274  {
275  seqs[s] = std::make_pair(sd->sorting_places[s] + sd->pieces[iam][s].begin, sd->sorting_places[s] + sd->pieces[iam][s].end);
276 
277  STXXL_DEBUG_ASSERT(stxxl::is_sorted(seqs[s].first, seqs[s].second, comp));
278  }
279 
280  sequential_multiway_merge<Stable, false>(seqs.begin(), seqs.end(), sd->merging_places[iam], length_am, comp);
281 
282  t.tic("merge");
283 
284  STXXL_DEBUG_ASSERT(stxxl::is_sorted(sd->merging_places[iam], sd->merging_places[iam] + length_am, comp));
285 
286 #pragma omp barrier
287 
288 #if STXXL_MULTIWAY_MERGESORT_COPY_LAST
289  // write back
290  std::copy(sd->merging_places[iam], sd->merging_places[iam] + length_am, sd->source + offset);
291 #endif
292 
293  delete sd->temporaries[iam];
294 
295  t.tic("copy back");
296 
297  t.print();
298 }
299 
300 /*!
301  * PMWMS main call.
302  * \param begin Begin iterator of sequence.
303  * \param end End iterator of sequence.
304  * \param comp Comparator.
305  * \param num_threads Number of threads to use.
306  * \tparam Stable Stable sorting.
307  */
308 template <bool Stable,
309  typename RandomAccessIterator, typename Comparator>
310 inline void
311 parallel_sort_mwms(RandomAccessIterator begin,
312  RandomAccessIterator end,
313  Comparator comp,
314  int num_threads)
315 {
316  STXXL_PARALLEL_PCALL(end - begin)
317 
318  typedef typename std::iterator_traits<RandomAccessIterator>::value_type
319  ValueType;
320  typedef typename std::iterator_traits<RandomAccessIterator>::difference_type
321  DiffType;
322 
323  DiffType n = end - begin;
324 
325  if (n <= 1)
326  return;
327 
328  if (num_threads > n) // at least one element per thread
329  num_threads = static_cast<thread_index_t>(n);
330 
331  PMWMSSortingData<RandomAccessIterator> sd;
332 
333  sd.source = begin;
334  sd.temporaries = new ValueType*[num_threads];
335 #if STXXL_MULTIWAY_MERGESORT_COPY_LAST
336  sd.sorting_places = new RandomAccessIterator[num_threads];
337  sd.merging_places = new ValueType*[num_threads];
338 #else
339  sd.sorting_places = new ValueType*[num_threads];
340  sd.merging_places = new RandomAccessIterator[num_threads];
341 #endif
342  if (SETTINGS::sort_splitting == SETTINGS::SAMPLING)
343  sd.samples = new ValueType[num_threads * (SETTINGS::sort_mwms_oversampling * num_threads - 1)];
344  else
345  sd.samples = NULL;
346  sd.offsets = new DiffType[num_threads - 1];
347  sd.pieces = new std::vector<PMWMSPiece<DiffType> >[num_threads];
348  for (int s = 0; s < num_threads; s++)
349  sd.pieces[s].resize(num_threads);
350  PMWMSSorterPU<RandomAccessIterator>* pus = new PMWMSSorterPU<RandomAccessIterator>[num_threads];
351  DiffType* starts = sd.starts = new DiffType[num_threads + 1];
352 
353  DiffType chunk_length = n / num_threads, split = n % num_threads, start = 0;
354  for (int i = 0; i < num_threads; i++)
355  {
356  starts[i] = start;
357  start += (i < split) ? (chunk_length + 1) : chunk_length;
358  pus[i].num_threads = num_threads;
359  pus[i].iam = i;
360  pus[i].sd = &sd;
361  }
362  starts[num_threads] = start;
363 
364  //now sort in parallel
365 #pragma omp parallel num_threads(num_threads)
366  parallel_sort_mwms_pu<Stable>(&(pus[omp_get_thread_num()]), comp);
367 
368  delete[] starts;
369  delete[] sd.temporaries;
370  delete[] sd.sorting_places;
371  delete[] sd.merging_places;
372 
373  if (SETTINGS::sort_splitting == SETTINGS::SAMPLING)
374  delete[] sd.samples;
375 
376  delete[] sd.offsets;
377  delete[] sd.pieces;
378 
379  delete[] pus;
380 }
381 
382 } // namespace parallel
383 
384 #endif // STXXL_PARALLEL
385 
387 
388 #endif // !STXXL_PARALLEL_MULTIWAY_MERGESORT_HEADER
int thread_index_t
Definition: types.h:37
void multiseq_partition(const RanSeqs &begin_seqs, const RanSeqs &end_seqs, const RankType &rank, RankIterator begin_offsets, Comparator comp=std::less< typename std::iterator_traits< typename std::iterator_traits< RanSeqs >::value_type::first_type >::value_type >())
Splits several sorted sequences at a certain global rank, resulting in a splitting point for each seq...
void sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp, unsigned_type M)
Sort records comparison-based, see stxxl::sort -- Sorting Comparison-Based.
Definition: sort.h:676
#define STXXL_DEBUG_ASSERT(condition)
Definition: verbose.h:250
DiffTypeOutputIterator equally_split(DiffType n, thread_index_t p, DiffTypeOutputIterator s)
Split a sequence into parts of almost equal size.
Definition: equally_split.h:41
static std::vector< std::string > split(const std::string &str, const std::string &sep, unsigned int min_fields=0, unsigned int limit_fields=std::numeric_limits< unsigned int >::max())
Split a string by given separator string. Returns a vector of strings with at least min_fields and at...
Definition: utils.h:56
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
bool is_sorted(ForwardIterator first, ForwardIterator last, StrictWeakOrdering comp)
Definition: is_sorted.h:24
#define STXXL_PARALLEL_PCALL(n)
#define STXXL_END_NAMESPACE
Definition: namespace.h:17