Stxxl  1.3.2
pq_ext_merger.h
1 /***************************************************************************
2  * include/stxxl/bits/containers/pq_ext_merger.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 1999 Peter Sanders <[email protected]>
7  * Copyright (C) 2003, 2004, 2007 Roman Dementiev <[email protected]>
8  * Copyright (C) 2007-2009 Johannes Singler <[email protected]>
9  * Copyright (C) 2007-2010 Andreas Beckmann <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #ifndef STXXL_PQ_EXT_MERGER_HEADER
17 #define STXXL_PQ_EXT_MERGER_HEADER
18 
19 #include <stxxl/bits/mng/mng.h>
20 
21 __STXXL_BEGIN_NAMESPACE
22 
26 
29 namespace priority_queue_local
30 {
31  template <typename Iterator>
32  class short_sequence : public std::pair<Iterator, Iterator>
33  {
34  typedef std::pair<Iterator, Iterator> pair;
35 
36  public:
37  typedef Iterator iterator;
38  typedef const iterator const_iterator;
39  typedef typename std::iterator_traits<iterator>::value_type value_type;
40  typedef typename std::iterator_traits<iterator>::difference_type size_type;
41  typedef value_type & reference;
42  typedef const value_type & const_reference;
43  typedef unsigned_type origin_type;
44 
45  private:
46  origin_type m_origin;
47 
48  public:
49  short_sequence(Iterator first, Iterator last, origin_type origin) :
50  pair(first, last), m_origin(origin)
51  { }
52 
53  iterator begin()
54  {
55  return this->first;
56  }
57 
58  const_iterator begin() const
59  {
60  return this->first;
61  }
62 
63  const_iterator cbegin() const
64  {
65  return begin();
66  }
67 
68  iterator end()
69  {
70  return this->second;
71  }
72 
73  const_iterator end() const
74  {
75  return this->second;
76  }
77 
78  const_iterator cend() const
79  {
80  return end();
81  }
82 
83  reference front()
84  {
85  return *begin();
86  }
87 
88  const_reference front() const
89  {
90  return *begin();
91  }
92 
93  reference back()
94  {
95  return *(end() - 1);
96  }
97 
98  const_reference back() const
99  {
100  return *(end() - 1);
101  }
102 
103  size_type size() const
104  {
105  return end() - begin();
106  }
107 
108  bool empty() const
109  {
110  return size() == 0;
111  }
112 
113  origin_type origin() const
114  {
115  return m_origin;
116  }
117  };
118 
123  template <class BlockType_,
124  class Cmp_,
125  unsigned Arity_,
126  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
127  class ext_merger : private noncopyable
128  {
129  public:
130  typedef stxxl::uint64 size_type;
131  typedef BlockType_ block_type;
132  typedef typename block_type::bid_type bid_type;
133  typedef typename block_type::value_type value_type;
134  typedef Cmp_ comparator_type;
135  typedef AllocStr_ alloc_strategy;
137 
138  // arity_bound / 2 < arity <= arity_bound
139  enum { arity = Arity_, arity_bound = 1UL << (LOG2<Arity_>::ceil) };
140 
141  protected:
142  comparator_type cmp;
143 
144  bool is_sentinel(const value_type & a) const
145  {
146  return !(cmp(cmp.min_value(), a)); // a <= cmp.min_value()
147  }
148 
149  bool not_sentinel(const value_type & a) const
150  {
151  return cmp(cmp.min_value(), a); // a > cmp.min_value()
152  }
153 
154  struct sequence_state : private noncopyable
155  {
156  block_type * block; //current block
157  unsigned_type current; //current index in current block
158  std::list<bid_type> * bids; //list of blocks forming this sequence
159  comparator_type cmp;
160  ext_merger * merger;
161  bool allocated;
162 
164  const value_type & operator * () const
165  {
166  return (*block)[current];
167  }
168 
169  sequence_state() : bids(NULL), allocated(false)
170  { }
171 
172  ~sequence_state()
173  {
174  STXXL_VERBOSE2("ext_merger sequence_state::~sequence_state()");
175  if (bids != NULL)
176  {
177  block_manager * bm = block_manager::get_instance();
178  bm->delete_blocks(bids->begin(), bids->end());
179  delete bids;
180  }
181  }
182 
183  void make_inf()
184  {
185  current = 0;
186  (*block)[0] = cmp.min_value();
187  }
188 
189  bool is_sentinel(const value_type & a) const
190  {
191  return !(cmp(cmp.min_value(), a));
192  }
193 
194  bool not_sentinel(const value_type & a) const
195  {
196  return cmp(cmp.min_value(), a);
197  }
198 
199  void swap(sequence_state & obj)
200  {
201  if (&obj != this)
202  {
203  std::swap(current, obj.current);
204  std::swap(block, obj.block);
205  std::swap(bids, obj.bids);
206  assert(merger == obj.merger);
207  std::swap(allocated, obj.allocated);
208  }
209  }
210 
211  sequence_state & operator ++ ()
212  {
213  assert(not_sentinel((*block)[current]));
214  assert(current < block->size);
215 
216  ++current;
217 
218  if (current == block->size)
219  {
220  STXXL_VERBOSE2("ext_merger sequence_state operator++ crossing block border ");
221  // go to the next block
222  assert(bids != NULL);
223  if (bids->empty()) // if there is no next block
224  {
225  STXXL_VERBOSE2("ext_merger sequence_state operator++ it was the last block in the sequence ");
226  delete bids;
227  bids = NULL;
228  make_inf();
229  }
230  else
231  {
232  STXXL_VERBOSE2("ext_merger sequence_state operator++ there is another block ");
233  bid_type bid = bids->front();
234  bids->pop_front();
235  merger->pool->hint(bid);
236  if (!(bids->empty()))
237  {
238  STXXL_VERBOSE2("ext_merger sequence_state operator++ more blocks exist in a sequence, hinting the next");
239  merger->pool->hint(bids->front());
240  }
241  merger->pool->read(block, bid)->wait();
242  STXXL_VERBOSE2("first element of read block " << bid << " " << *(block->begin()) << " cached in " << block);
243  if (!(bids->empty()))
244  merger->pool->hint(bids->front()); // re-hint, reading might have made a block free
245  block_manager::get_instance()->delete_block(bid);
246  current = 0;
247  }
248  }
249  return *this;
250  }
251  };
252 
253 
254 #if STXXL_PQ_EXTERNAL_LOSER_TREE
255  struct Entry
256  {
257  value_type key; // key of loser element (winner for 0)
258  unsigned_type index; // the number of the losing segment
259  };
260 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
261 
262  size_type size_; // total number of elements stored
263  unsigned_type log_k; // log of current tree size
264  unsigned_type k; // invariant (k == 1 << log_k), always a power of 2
265  // only entries 0 .. arity-1 may hold actual sequences, the other
266  // entries arity .. arity_bound-1 are sentinels to make the size of the tree
267  // a power of 2 always
268 
269  // stack of empty segment indices
271 
272 #if STXXL_PQ_EXTERNAL_LOSER_TREE
273  // upper levels of loser trees
274  // entry[0] contains the winner info
275  Entry entry[arity_bound];
276 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
277 
278  // leaf information
279  // note that Knuth uses indices k..k-1
280  // while we use 0..k-1
281  sequence_state states[arity_bound]; // sequence including current position, dereference gives current element
282 
283  pool_type * pool;
284 
285  block_type * sentinel_block;
286 
287  public:
288  ext_merger() :
289  size_(0), log_k(0), k(1), pool(0)
290  {
291  init();
292  }
293 
294  ext_merger(pool_type * pool_) :
295  size_(0), log_k(0), k(1),
296  pool(pool_)
297  {
298  init();
299  }
300 
301  virtual ~ext_merger()
302  {
303  STXXL_VERBOSE1("ext_merger::~ext_merger()");
304  for (unsigned_type i = 0; i < arity; ++i)
305  {
306  delete states[i].block;
307  }
308  delete sentinel_block;
309  }
310 
311  void set_pool(pool_type * pool_)
312  {
313  pool = pool_;
314  }
315 
316  private:
317  void init()
318  {
319  STXXL_VERBOSE2("ext_merger::init()");
320  assert(!cmp(cmp.min_value(), cmp.min_value())); // verify strict weak ordering
321 
322  sentinel_block = NULL;
323  if (arity < arity_bound)
324  {
325  sentinel_block = new block_type;
326  for (unsigned_type i = 0; i < block_type::size; ++i)
327  (*sentinel_block)[i] = cmp.min_value();
328  if (arity + 1 == arity_bound) {
329  // same memory consumption, but smaller merge width, better use arity = arity_bound
330  STXXL_ERRMSG("inefficient PQ parameters for ext_merger: arity + 1 == arity_bound");
331  }
332  }
333 
334  for (unsigned_type i = 0; i < arity_bound; ++i)
335  {
336  states[i].merger = this;
337  if (i < arity)
338  states[i].block = new block_type;
339  else
340  states[i].block = sentinel_block;
341 
342  states[i].make_inf();
343  }
344 
345  assert(k == 1);
346  free_segments.push(0); //total state: one free sequence
347 
348  rebuild_loser_tree();
349 #if STXXL_PQ_EXTERNAL_LOSER_TREE
350  assert(is_sentinel(*states[entry[0].index]));
351 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
352  }
353 
354  // rebuild loser tree information from the values in current
355  void rebuild_loser_tree()
356  {
357 #if STXXL_PQ_EXTERNAL_LOSER_TREE
358  unsigned_type winner = init_winner(1);
359  entry[0].index = winner;
360  entry[0].key = *(states[winner]);
361 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
362  }
363 
364 
365 #if STXXL_PQ_EXTERNAL_LOSER_TREE
366  // given any values in the leaves this
367  // routing recomputes upper levels of the tree
368  // from scratch in linear time
369  // initialize entry[root].index and the subtree rooted there
370  // return winner index
371  unsigned_type init_winner(unsigned_type root)
372  {
373  if (root >= k)
374  { // leaf reached
375  return root - k;
376  }
377  else
378  {
379  unsigned_type left = init_winner(2 * root);
380  unsigned_type right = init_winner(2 * root + 1);
381  value_type lk = *(states[left]);
382  value_type rk = *(states[right]);
383  if (!(cmp(lk, rk)))
384  { // right subtree looses
385  entry[root].index = right;
386  entry[root].key = rk;
387  return left;
388  }
389  else
390  {
391  entry[root].index = left;
392  entry[root].key = lk;
393  return right;
394  }
395  }
396  }
397 
398  // first go up the tree all the way to the root
399  // hand down old winner for the respective subtree
400  // based on new value, and old winner and loser
401  // update each node on the path to the root top down.
402  // This is implemented recursively
403  void update_on_insert(
404  unsigned_type node,
405  const value_type & new_key,
406  unsigned_type new_index,
407  value_type * winner_key,
408  unsigned_type * winner_index, // old winner
409  unsigned_type * mask) // 1 << (ceil(log KNK) - dist-from-root)
410  {
411  if (node == 0)
412  { // winner part of root
413  *mask = 1 << (log_k - 1);
414  *winner_key = entry[0].key;
415  *winner_index = entry[0].index;
416  if (cmp(entry[node].key, new_key))
417  {
418  entry[node].key = new_key;
419  entry[node].index = new_index;
420  }
421  }
422  else
423  {
424  update_on_insert(node >> 1, new_key, new_index, winner_key, winner_index, mask);
425  value_type loserKey = entry[node].key;
426  unsigned_type loserIndex = entry[node].index;
427  if ((*winner_index & *mask) != (new_index & *mask))
428  { // different subtrees
429  if (cmp(loserKey, new_key))
430  { // new_key will have influence here
431  if (cmp(*winner_key, new_key))
432  { // old winner loses here
433  entry[node].key = *winner_key;
434  entry[node].index = *winner_index;
435  }
436  else
437  { // new entry looses here
438  entry[node].key = new_key;
439  entry[node].index = new_index;
440  }
441  }
442  *winner_key = loserKey;
443  *winner_index = loserIndex;
444  }
445  // note that nothing needs to be done if
446  // the winner came from the same subtree
447  // a) new_key <= winner_key => even more reason for the other tree to lose
448  // b) new_key > winner_key => the old winner will beat the new
449  // entry further down the tree
450  // also the same old winner is handed down the tree
451 
452  *mask >>= 1; // next level
453  }
454  }
455 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
456 
457  // make the tree two times as wide
458  void double_k()
459  {
460  STXXL_VERBOSE1("ext_merger::double_k (before) k=" << k << " log_k=" << log_k << " arity_bound=" << arity_bound << " arity=" << arity << " #free=" << free_segments.size());
461  assert(k > 0);
462  assert(k < arity);
463  assert(free_segments.empty()); // stack was free (probably not needed)
464 
465  // make all new entries free
466  // and push them on the free stack
467  for (unsigned_type i = 2 * k - 1; i >= k; i--) //backwards
468  {
469  states[i].make_inf();
470  if (i < arity)
471  free_segments.push(i);
472  }
473 
474  // double the size
475  k *= 2;
476  log_k++;
477 
478  STXXL_VERBOSE1("ext_merger::double_k (after) k=" << k << " log_k=" << log_k << " arity_bound=" << arity_bound << " arity=" << arity << " #free=" << free_segments.size());
479  assert(!free_segments.empty());
480 
481  // recompute loser tree information
482  rebuild_loser_tree();
483  }
484 
485 
486  // compact nonempty segments in the left half of the tree
487  void compact_tree()
488  {
489  STXXL_VERBOSE1("ext_merger::compact_tree (before) k=" << k << " log_k=" << log_k << " #free=" << free_segments.size());
490  assert(log_k > 0);
491 
492  // compact all nonempty segments to the left
493 
494  unsigned_type target = 0;
495  for (unsigned_type from = 0; from < k; from++)
496  {
497  if (!is_segment_empty(from))
498  {
499  assert(is_segment_allocated(from));
500  if (from != target)
501  {
502  assert(!is_segment_allocated(target));
503  states[target].swap(states[from]);
504  }
505  ++target;
506  }
507  }
508 
509  // half degree as often as possible
510  while (k > 1 && target <= (k / 2))
511  {
512  k /= 2;
513  log_k--;
514  }
515 
516  // overwrite garbage and compact the stack of free segment indices
517  free_segments.clear(); // none free
518  for ( ; target < k; target++)
519  {
520  assert(!is_segment_allocated(target));
521  states[target].make_inf();
522  if (target < arity)
523  free_segments.push(target);
524  }
525 
526  STXXL_VERBOSE1("ext_merger::compact_tree (after) k=" << k << " log_k=" << log_k << " #free=" << free_segments.size());
527  assert(k > 0);
528 
529  // recompute loser tree information
530  rebuild_loser_tree();
531  }
532 
533 
534 #if 0
535  void swap(ext_merger & obj)
536  {
537  std::swap(cmp, obj.cmp);
538  std::swap(free_segments, obj.free_segments);
539  std::swap(size_, obj.size_);
540  std::swap(log_k, obj.log_k);
541  std::swap(k, obj.k);
542  swap_1D_arrays(entry, obj.entry, arity_bound);
543  swap_1D_arrays(states, obj.states, arity_bound);
544 
545  // std::swap(pool,obj.pool);
546  }
547 #endif
548 
549  public:
550  unsigned_type mem_cons() const // only rough estimation
551  {
552  return (STXXL_MIN<unsigned_type>(arity + 1, arity_bound) * block_type::raw_size);
553  }
554 
555  // delete the (length = end-begin) smallest elements and write them to [begin..end)
556  // empty segments are deallocated
557  // requires:
558  // - there are at least length elements
559  // - segments are ended by sentinels
560  template <class OutputIterator>
561  void multi_merge(OutputIterator begin, OutputIterator end)
562  {
563  size_type length = end - begin;
564 
565  STXXL_VERBOSE1("ext_merger::multi_merge from " << k << " sequence(s), length = " << length);
566 
567  if (length == 0)
568  return;
569 
570  assert(k > 0);
571  assert(length <= size_);
572 
573  //This is the place to make statistics about external multi_merge calls.
574 
575 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
576  typedef stxxl::int64 diff_type;
577  typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
578 
579  std::vector<sequence> seqs;
580  std::vector<unsigned_type> orig_seq_index;
581 
582  Cmp_ cmp;
584 
585  for (unsigned_type i = 0; i < k; ++i) //initialize sequences
586  {
587  if (states[i].current == states[i].block->size || is_sentinel(*states[i]))
588  continue;
589 
590  seqs.push_back(std::make_pair(states[i].block->begin() + states[i].current, states[i].block->end()));
591  orig_seq_index.push_back(i);
592 
593 #if STXXL_CHECK_ORDER_IN_SORTS
594  if (!is_sentinel(*seqs.back().first) && !stxxl::is_sorted(seqs.back().first, seqs.back().second, inv_cmp))
595  {
596  STXXL_VERBOSE1("length " << i << " " << (seqs.back().second - seqs.back().first));
597  for (value_type * v = seqs.back().first + 1; v < seqs.back().second; ++v)
598  {
599  if (inv_cmp(*v, *(v - 1)))
600  {
601  STXXL_VERBOSE1("Error at position " << i << "/" << (v - seqs.back().first - 1) << "/" << (v - seqs.back().first) << " " << *(v - 1) << " " << *v);
602  }
603  if (is_sentinel(*v))
604  {
605  STXXL_VERBOSE1("Wrong sentinel at position " << (v - seqs.back().first));
606  }
607  }
608  assert(false);
609  }
610 #endif
611 
612  //Hint first non-internal (actually second) block of this sequence.
613  if (states[i].bids != NULL && !states[i].bids->empty())
614  pool->hint(states[i].bids->front());
615  }
616 
617  assert(seqs.size() > 0);
618 
619 #if STXXL_CHECK_ORDER_IN_SORTS
620  value_type last_elem;
621 #endif
622 
623  diff_type rest = length; //elements still to merge for this output block
624 
625  while (rest > 0)
626  {
627  value_type min_last = cmp.min_value(); // minimum of the sequences' last elements
628  diff_type total_size = 0;
629 
630  for (unsigned_type i = 0; i < seqs.size(); ++i)
631  {
632  diff_type seq_i_size = seqs[i].second - seqs[i].first;
633  if (seq_i_size > 0)
634  {
635  total_size += seq_i_size;
636  if (inv_cmp(*(seqs[i].second - 1), min_last))
637  min_last = *(seqs[i].second - 1);
638 
639  STXXL_VERBOSE1("front block of seq " << i << ": front=" << *(seqs[i].first) << " back=" << *(seqs[i].second - 1) << " len=" << seq_i_size);
640  } else {
641  STXXL_VERBOSE1("front block of seq " << i << ": empty");
642  }
643  }
644 
645  assert(total_size > 0);
646  assert(!is_sentinel(min_last));
647 
648  STXXL_VERBOSE1("min_last " << min_last << " total size " << total_size << " num_seq " << seqs.size());
649 
650  diff_type less_equal_than_min_last = 0;
651  //locate this element in all sequences
652  for (unsigned_type i = 0; i < seqs.size(); ++i)
653  {
654  //assert(seqs[i].first < seqs[i].second);
655 
656  typename block_type::iterator position =
657  std::upper_bound(seqs[i].first, seqs[i].second, min_last, inv_cmp);
658 
659  //no element larger than min_last is merged
660 
661  STXXL_VERBOSE1("seq " << i << ": " << (position - seqs[i].first) << " greater equal than " << min_last);
662 
663  less_equal_than_min_last += (position - seqs[i].first);
664  }
665 
666  diff_type output_size = STXXL_MIN(less_equal_than_min_last, rest); // at most rest elements
667 
668  STXXL_VERBOSE1("output_size=" << output_size << " = min(leq_t_ml=" << less_equal_than_min_last << ", rest=" << rest << ")");
669 
670  assert(output_size > 0);
671 
672  //main call
673 
674  begin = parallel::multiway_merge(seqs.begin(), seqs.end(), begin, inv_cmp, output_size); //sequence iterators are progressed appropriately
675 
676  rest -= output_size;
677  size_ -= output_size;
678 
679  for (unsigned_type i = 0; i < seqs.size(); ++i)
680  {
681  sequence_state & state = states[orig_seq_index[i]];
682 
683  state.current = seqs[i].first - state.block->begin();
684 
685  assert(seqs[i].first <= seqs[i].second);
686 
687  if (seqs[i].first == seqs[i].second) //has run empty
688  {
689  assert(state.current == state.block->size);
690  if (state.bids == NULL || state.bids->empty()) // if there is no next block
691  {
692  STXXL_VERBOSE1("seq " << i << ": ext_merger::multi_merge(...) it was the last block in the sequence ");
693  state.make_inf();
694  }
695  else
696  {
697 #if STXXL_CHECK_ORDER_IN_SORTS
698  last_elem = *(seqs[i].second - 1);
699 #endif
700  STXXL_VERBOSE1("seq " << i << ": ext_merger::multi_merge(...) there is another block ");
701  bid_type bid = state.bids->front();
702  state.bids->pop_front();
703  pool->hint(bid);
704  if (!(state.bids->empty()))
705  {
706  STXXL_VERBOSE2("seq " << i << ": ext_merger::multi_merge(...) more blocks exist, hinting the next");
707  pool->hint(state.bids->front());
708  }
709  pool->read(state.block, bid)->wait();
710  STXXL_VERBOSE1("seq " << i << ": first element of read block " << bid << " " << *(state.block->begin()) << " cached in " << state.block);
711  if (!(state.bids->empty()))
712  pool->hint(state.bids->front()); // re-hint, reading might have made a block free
713  state.current = 0;
714  seqs[i] = std::make_pair(state.block->begin() + state.current, state.block->end());
715  block_manager::get_instance()->delete_block(bid);
716 
717  #if STXXL_CHECK_ORDER_IN_SORTS
718  STXXL_VERBOSE1("before " << last_elem << " after " << *seqs[i].first << " newly loaded block " << bid);
719  if (!stxxl::is_sorted(seqs[i].first, seqs[i].second, inv_cmp))
720  {
721  STXXL_VERBOSE1("length " << i << " " << (seqs[i].second - seqs[i].first));
722  for (value_type * v = seqs[i].first + 1; v < seqs[i].second; ++v)
723  {
724  if (inv_cmp(*v, *(v - 1)))
725  {
726  STXXL_VERBOSE1("Error at position " << i << "/" << (v - seqs[i].first - 1) << "/" << (v - seqs[i].first) << " " << *(v - 1) << " " << *v);
727  }
728  if (is_sentinel(*v))
729  {
730  STXXL_VERBOSE1("Wrong sentinel at position " << (v - seqs[i].first));
731  }
732  }
733  assert(false);
734  }
735  #endif
736  }
737  }
738  }
739  } //while (rest > 1)
740 
741  for (unsigned_type i = 0; i < seqs.size(); ++i)
742  {
743  unsigned_type seg = orig_seq_index[i];
744  if (is_segment_empty(seg))
745  {
746  STXXL_VERBOSE1("deallocated " << seg);
747  deallocate_segment(seg);
748  }
749  }
750 
751 #else // STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
752 
753  //Hint first non-internal (actually second) block of each sequence.
754  for (unsigned_type i = 0; i < k; ++i)
755  {
756  if (states[i].bids != NULL && !states[i].bids->empty())
757  pool->hint(states[i].bids->front());
758  }
759 
760  switch (log_k) {
761  case 0:
762  assert(k == 1);
763  assert(entry[0].index == 0);
764  assert(free_segments.empty());
765  //memcpy(target, states[0], length * sizeof(value_type));
766  //std::copy(states[0],states[0]+length,target);
767  for (size_type i = 0; i < length; ++i, ++(states[0]), ++begin)
768  *begin = *(states[0]);
769 
770  entry[0].key = **states;
771  if (is_segment_empty(0))
772  deallocate_segment(0);
773 
774  break;
775  case 1:
776  assert(k == 2);
777  merge_iterator(states[0], states[1], begin, length, cmp);
778  rebuild_loser_tree();
779  if (is_segment_empty(0) && is_segment_allocated(0))
780  deallocate_segment(0);
781 
782  if (is_segment_empty(1) && is_segment_allocated(1))
783  deallocate_segment(1);
784 
785  break;
786  case 2:
787  assert(k == 4);
788  if (is_segment_empty(3))
789  merge3_iterator(states[0], states[1], states[2], begin, length, cmp);
790  else
791  merge4_iterator(states[0], states[1], states[2], states[3], begin, length, cmp);
792  rebuild_loser_tree();
793  if (is_segment_empty(0) && is_segment_allocated(0))
794  deallocate_segment(0);
795 
796  if (is_segment_empty(1) && is_segment_allocated(1))
797  deallocate_segment(1);
798 
799  if (is_segment_empty(2) && is_segment_allocated(2))
800  deallocate_segment(2);
801 
802  if (is_segment_empty(3) && is_segment_allocated(3))
803  deallocate_segment(3);
804 
805  break;
806  case 3: multi_merge_f<OutputIterator, 3>(begin, end);
807  break;
808  case 4: multi_merge_f<OutputIterator, 4>(begin, end);
809  break;
810  case 5: multi_merge_f<OutputIterator, 5>(begin, end);
811  break;
812  case 6: multi_merge_f<OutputIterator, 6>(begin, end);
813  break;
814  case 7: multi_merge_f<OutputIterator, 7>(begin, end);
815  break;
816  case 8: multi_merge_f<OutputIterator, 8>(begin, end);
817  break;
818  case 9: multi_merge_f<OutputIterator, 9>(begin, end);
819  break;
820  case 10: multi_merge_f<OutputIterator, 10>(begin, end);
821  break;
822  default: multi_merge_k(begin, end);
823  break;
824  }
825 
826  size_ -= length;
827 #endif
828 
829  // compact tree if it got considerably smaller
830  {
831  const unsigned_type num_segments_used = std::min<unsigned_type>(arity, k) - free_segments.size();
832  const unsigned_type num_segments_trigger = k - (3 * k / 5);
833  // using k/2 would be worst case inefficient (for large k)
834  // for k \in {2, 4, 8} the trigger is k/2 which is good
835  // because we have special mergers for k \in {1, 2, 4}
836  // there is also a special 3-way-merger, that will be
837  // triggered if k == 4 && is_segment_empty(3)
838  STXXL_VERBOSE3("ext_merger compact? k=" << k << " #used=" << num_segments_used
839  << " <= #trigger=" << num_segments_trigger << " ==> "
840  << ((k > 1 && num_segments_used <= num_segments_trigger) ? "yes" : "no ")
841  << " || "
842  << ((k == 4 && !free_segments.empty() && !is_segment_empty(3)) ? "yes" : "no ")
843  << " #free=" << free_segments.size());
844  if (k > 1 && ((num_segments_used <= num_segments_trigger) ||
845  (k == 4 && !free_segments.empty() && !is_segment_empty(3))))
846  {
847  compact_tree();
848  }
849  }
850  }
851 
852  private:
853 #if STXXL_PQ_EXTERNAL_LOSER_TREE
854  // multi-merge for arbitrary K
855  template <class OutputIterator>
856  void multi_merge_k(OutputIterator begin, OutputIterator end)
857  {
858  Entry * current_pos;
859  value_type current_key;
860  unsigned_type current_index; // leaf pointed to by current entry
861  unsigned_type kReg = k;
862  OutputIterator done = end;
863  OutputIterator target = begin;
864  unsigned_type winner_index = entry[0].index;
865  value_type winner_key = entry[0].key;
866 
867  while (target != done)
868  {
869  // write result
870  *target = *(states[winner_index]);
871 
872  // advance winner segment
873  ++(states[winner_index]);
874 
875  winner_key = *(states[winner_index]);
876 
877  // remove winner segment if empty now
878  if (is_sentinel(winner_key)) //
879  deallocate_segment(winner_index);
880 
881 
882  // go up the entry-tree
883  for (unsigned_type i = (winner_index + kReg) >> 1; i > 0; i >>= 1)
884  {
885  current_pos = entry + i;
886  current_key = current_pos->key;
887  if (cmp(winner_key, current_key))
888  {
889  current_index = current_pos->index;
890  current_pos->key = winner_key;
891  current_pos->index = winner_index;
892  winner_key = current_key;
893  winner_index = current_index;
894  }
895  }
896 
897  ++target;
898  }
899  entry[0].index = winner_index;
900  entry[0].key = winner_key;
901  }
902 
903  template <class OutputIterator, unsigned LogK>
904  void multi_merge_f(OutputIterator begin, OutputIterator end)
905  {
906  OutputIterator done = end;
907  OutputIterator target = begin;
908  unsigned_type winner_index = entry[0].index;
909  Entry * reg_entry = entry;
910  sequence_state * reg_states = states;
911  value_type winner_key = entry[0].key;
912 
913  assert(log_k >= LogK);
914  while (target != done)
915  {
916  // write result
917  *target = *(reg_states[winner_index]);
918 
919  // advance winner segment
920  ++(reg_states[winner_index]);
921 
922  winner_key = *(reg_states[winner_index]);
923 
924 
925  // remove winner segment if empty now
926  if (is_sentinel(winner_key))
927  deallocate_segment(winner_index);
928 
929 
930  ++target;
931 
932  // update loser tree
933 #define TreeStep(L) \
934  if (1 << LogK >= 1 << L) { \
935  Entry * pos ## L = reg_entry + ((winner_index + (1 << LogK)) >> (((int(LogK - L) + 1) >= 0) ? ((LogK - L) + 1) : 0)); \
936  value_type key ## L = pos ## L->key; \
937  if (cmp(winner_key, key ## L)) { \
938  unsigned_type index ## L = pos ## L->index; \
939  pos ## L->key = winner_key; \
940  pos ## L->index = winner_index; \
941  winner_key = key ## L; \
942  winner_index = index ## L; \
943  } \
944  }
945  TreeStep(10);
946  TreeStep(9);
947  TreeStep(8);
948  TreeStep(7);
949  TreeStep(6);
950  TreeStep(5);
951  TreeStep(4);
952  TreeStep(3);
953  TreeStep(2);
954  TreeStep(1);
955 #undef TreeStep
956  }
957  reg_entry[0].index = winner_index;
958  reg_entry[0].key = winner_key;
959  }
960 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
961 
962  public:
963  bool is_space_available() const // for new segment
964  {
965  return k < arity || !free_segments.empty();
966  }
967 
968 
969  // insert segment beginning at target
970  // require: is_space_available() == 1
971  template <class Merger>
972  void insert_segment(Merger & another_merger, size_type segment_size)
973  {
974  STXXL_VERBOSE1("ext_merger::insert_segment(merger,...)" << this);
975 
976  if (segment_size > 0)
977  {
978  // get a free slot
979  if (free_segments.empty())
980  { // tree is too small
981  double_k();
982  }
983  assert(!free_segments.empty());
984  unsigned_type free_slot = free_segments.top();
985  free_segments.pop();
986 
987 
988  // link new segment
989  assert(segment_size);
990  unsigned_type nblocks = segment_size / block_type::size;
991  //assert(nblocks); // at least one block
992  STXXL_VERBOSE1("ext_merger::insert_segment nblocks=" << nblocks);
993  if (nblocks == 0)
994  {
995  STXXL_VERBOSE1("ext_merger::insert_segment(merger,...) WARNING: inserting a segment with " <<
996  nblocks << " blocks");
997  STXXL_VERBOSE1("THIS IS INEFFICIENT: TRY TO CHANGE PRIORITY QUEUE PARAMETERS");
998  }
999  unsigned_type first_size = segment_size % block_type::size;
1000  if (first_size == 0)
1001  {
1002  first_size = block_type::size;
1003  --nblocks;
1004  }
1005  block_manager * bm = block_manager::get_instance();
1006  std::list<bid_type> * bids = new std::list<bid_type>(nblocks);
1007  bm->new_blocks(alloc_strategy(), bids->begin(), bids->end());
1008  block_type * first_block = new block_type;
1009 
1010  another_merger.multi_merge(
1011  first_block->begin() + (block_type::size - first_size),
1012  first_block->end());
1013 
1014  STXXL_VERBOSE1("last element of first block " << *(first_block->end() - 1));
1015  assert(!cmp(*(first_block->begin() + (block_type::size - first_size)), *(first_block->end() - 1)));
1016 
1017  assert(pool->size_write() > 0);
1018 
1019  for (typename std::list<bid_type>::iterator curbid = bids->begin(); curbid != bids->end(); ++curbid)
1020  {
1021  block_type * b = pool->steal();
1022  another_merger.multi_merge(b->begin(), b->end());
1023  STXXL_VERBOSE1("first element of following block " << *curbid << " " << *(b->begin()));
1024  STXXL_VERBOSE1("last element of following block " << *curbid << " " << *(b->end() - 1));
1025  assert(!cmp(*(b->begin()), *(b->end() - 1)));
1026  pool->write(b, *curbid);
1027  STXXL_VERBOSE1("written to block " << *curbid << " cached in " << b);
1028  }
1029 
1030  insert_segment(bids, first_block, first_size, free_slot);
1031 
1032  size_ += segment_size;
1033 
1034 #if STXXL_PQ_EXTERNAL_LOSER_TREE
1035  // propagate new information up the tree
1036  value_type dummyKey;
1037  unsigned_type dummyIndex;
1038  unsigned_type dummyMask;
1039  update_on_insert((free_slot + k) >> 1, *(states[free_slot]), free_slot,
1040  &dummyKey, &dummyIndex, &dummyMask);
1041 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
1042  }
1043  else
1044  {
1045  // deallocate memory ?
1046  STXXL_VERBOSE1("Merged segment with zero size.");
1047  }
1048  }
1049 
1050  size_type size() const { return size_; }
1051 
1052  protected:
1058  void insert_segment(std::list<bid_type> * bidlist, block_type * first_block,
1059  unsigned_type first_size, unsigned_type slot)
1060  {
1061  STXXL_VERBOSE1("ext_merger::insert_segment(bidlist,...) " << this << " " << bidlist->size() << " " << slot);
1062  assert(!is_segment_allocated(slot));
1063  assert(first_size > 0);
1064 
1065  sequence_state & new_sequence = states[slot];
1066  new_sequence.current = block_type::size - first_size;
1067  std::swap(new_sequence.block, first_block);
1068  delete first_block;
1069  std::swap(new_sequence.bids, bidlist);
1070  if (bidlist) // the old list
1071  {
1072  assert(bidlist->empty());
1073  delete bidlist;
1074  }
1075  new_sequence.allocated = true;
1076  assert(is_segment_allocated(slot));
1077  }
1078 
1079  // free an empty segment .
1080  void deallocate_segment(unsigned_type slot)
1081  {
1082  STXXL_VERBOSE1("ext_merger::deallocate_segment() deleting segment " << slot << " allocated=" << int(is_segment_allocated(slot)));
1083  assert(is_segment_allocated(slot));
1084  states[slot].allocated = false;
1085  states[slot].make_inf();
1086 
1087  // push on the stack of free segment indices
1088  free_segments.push(slot);
1089  }
1090 
1091  // is this segment empty ?
1092  bool is_segment_empty(unsigned_type slot) const
1093  {
1094  return is_sentinel(*(states[slot]));
1095  }
1096 
1097  // Is this segment allocated? Otherwise it's empty,
1098  // already on the stack of free segment indices and can be reused.
1099  bool is_segment_allocated(unsigned_type slot) const
1100  {
1101  return states[slot].allocated;
1102  }
1103  }; // ext_merger
1104 } // priority_queue_local
1105 
1107 
1108 __STXXL_END_NAMESPACE
1109 
1110 #endif // !STXXL_PQ_EXT_MERGER_HEADER
1111 // vim: et:ts=4:sw=4
void insert_segment(std::list< bid_type > *bidlist, block_type *first_block, unsigned_type first_size, unsigned_type slot)
Definition: pq_ext_merger.h:1058
Inverts the order of a comparison functor by swapping its arguments.
Definition: pq_helpers.h:145
Implements dynamically resizable buffered writing and prefetched reading pool.
Definition: read_write_pool.h:28
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
Definition: mng.h:218
bool hint(bid_type bid)
Gives a hint for prefetching a block.
Definition: read_write_pool.h:135
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
Definition: mng.h:90
request_ptr write(block_type *&block, bid_type bid)
Passes a block to the pool for writing.
Definition: read_write_pool.h:102
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
request_ptr read(block_type *&block, bid_type bid)
Reads block. If this block is cached block is not read but passed from the cache. ...
Definition: read_write_pool.h:151
Block manager class.
Definition: mng.h:59
block_type * steal()
Take out a block from the pool.
Definition: read_write_pool.h:116
External merger, based on the loser tree data structure. !
Definition: pq_ext_merger.h:127
size_type size_write() const
Returns number of blocks owned by the write_pool.
Definition: read_write_pool.h:75