• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • Examples
  • File List

priority_queue.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/containers/priority_queue.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 1999 Peter Sanders <[email protected]>
00007  *  Copyright (C) 2003, 2004, 2007 Roman Dementiev <[email protected]>
00008  *  Copyright (C) 2007-2009 Johannes Singler <[email protected]>
00009  *  Copyright (C) 2007-2010 Andreas Beckmann <[email protected]>
00010  *
00011  *  Distributed under the Boost Software License, Version 1.0.
00012  *  (See accompanying file LICENSE_1_0.txt or copy at
00013  *  http://www.boost.org/LICENSE_1_0.txt)
00014  **************************************************************************/
00015 
00016 #ifndef STXXL_PRIORITY_QUEUE_HEADER
00017 #define STXXL_PRIORITY_QUEUE_HEADER
00018 
00019 #include <vector>
00020 
00021 #include <stxxl/bits/deprecated.h>
00022 #include <stxxl/bits/mng/typed_block.h>
00023 #include <stxxl/bits/mng/block_alloc.h>
00024 #include <stxxl/bits/mng/read_write_pool.h>
00025 #include <stxxl/bits/mng/prefetch_pool.h>
00026 #include <stxxl/bits/mng/write_pool.h>
00027 #include <stxxl/bits/common/tmeta.h>
00028 #include <stxxl/bits/algo/sort_base.h>
00029 #include <stxxl/bits/parallel.h>
00030 #include <stxxl/bits/common/is_sorted.h>
00031 
00032 #if STXXL_PARALLEL
00033 
00034 #if defined(STXXL_PARALLEL_MODE) && ((__GNUC__ * 10000 + __GNUC_MINOR__ * 100) < 40400)
00035 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00036 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00037 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00038 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 0
00039 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 0
00040 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 0
00041 #endif
00042 
00043 // enable/disable parallel merging for certain cases, for performance tuning
00044 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00045 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1
00046 #endif
00047 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00048 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1
00049 #endif
00050 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00051 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1
00052 #endif
00053 
00054 #endif //STXXL_PARALLEL
00055 
00056 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00057 #define STXXL_PQ_EXTERNAL_LOSER_TREE 0 // no loser tree for the external sequences
00058 #else
00059 #define STXXL_PQ_EXTERNAL_LOSER_TREE 1
00060 #endif
00061 
00062 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00063 #define STXXL_PQ_INTERNAL_LOSER_TREE 0 // no loser tree for the internal sequences
00064 #else
00065 #define STXXL_PQ_INTERNAL_LOSER_TREE 1
00066 #endif
00067 
00068 #include <stxxl/bits/containers/pq_helpers.h>
00069 #include <stxxl/bits/containers/pq_mergers.h>
00070 #include <stxxl/bits/containers/pq_ext_merger.h>
00071 #include <stxxl/bits/containers/pq_losertree.h>
00072 
00073 __STXXL_BEGIN_NAMESPACE
00074 
00075 /*
00076    KNBufferSize1 = 32;
00077    KNN = 512; // length of group 1 sequences
00078    KNKMAX = 64;  // maximal arity
00079    LogKNKMAX = 6;  // ceil(log KNKMAX)
00080    KNLevels = 4; // overall capacity >= KNN*KNKMAX^KNLevels
00081  */
00082 
00083 // internal memory consumption >= N_*(KMAX_^IntLevels_) + ext
00084 
00085 template <
00086     class Tp_,
00087     class Cmp_,
00088     unsigned BufferSize1_ = 32,                    // equalize procedure call overheads etc.
00089     unsigned N_ = 512,                             // length of group 1 sequences
00090     unsigned IntKMAX_ = 64,                        // maximal arity for internal mergers
00091     unsigned IntLevels_ = 4,                       // number of internal groups
00092     unsigned BlockSize_ = (2 * 1024 * 1024),       // external block size
00093     unsigned ExtKMAX_ = 64,                        // maximal arity for external mergers
00094     unsigned ExtLevels_ = 2,                       // number of external groups
00095     class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY
00096     >
00097 struct priority_queue_config
00098 {
00099     typedef Tp_ value_type;
00100     typedef Cmp_ comparator_type;
00101     typedef AllocStr_ alloc_strategy_type;
00102     enum
00103     {
00104         delete_buffer_size = BufferSize1_,
00105         N = N_,
00106         IntKMAX = IntKMAX_,
00107         num_int_groups = IntLevels_,
00108         num_ext_groups = ExtLevels_,
00109         BlockSize = BlockSize_,
00110         ExtKMAX = ExtKMAX_,
00111         element_size = sizeof(Tp_)
00112     };
00113 };
00114 
00115 __STXXL_END_NAMESPACE
00116 
00117 namespace std
00118 {
00119     template <class BlockType_,
00120               class Cmp_,
00121               unsigned Arity_,
00122               class AllocStr_>
00123     void swap(stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & a,
00124               stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & b)
00125     {
00126         a.swap(b);
00127     }
00128     template <class ValTp_, class Cmp_, unsigned KNKMAX>
00129     void swap(stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & a,
00130               stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & b)
00131     {
00132         a.swap(b);
00133     }
00134 }
00135 
00136 __STXXL_BEGIN_NAMESPACE
00137 
00139 template <class Config_>
00140 class priority_queue : private noncopyable
00141 {
00142 public:
00143     typedef Config_ Config;
00144     enum
00145     {
00146         delete_buffer_size = Config::delete_buffer_size,
00147         N = Config::N,
00148         IntKMAX = Config::IntKMAX,
00149         num_int_groups = Config::num_int_groups,
00150         num_ext_groups = Config::num_ext_groups,
00151         total_num_groups = Config::num_int_groups + Config::num_ext_groups,
00152         BlockSize = Config::BlockSize,
00153         ExtKMAX = Config::ExtKMAX
00154     };
00155 
00157     typedef typename Config::value_type value_type;
00159     typedef typename Config::comparator_type comparator_type;
00160     typedef typename Config::alloc_strategy_type alloc_strategy_type;
00162     typedef stxxl::uint64 size_type;
00163     typedef typed_block<BlockSize, value_type> block_type;
00164     typedef read_write_pool<block_type> pool_type;
00165 
00166 protected:
00167     typedef priority_queue_local::internal_priority_queue<value_type, std::vector<value_type>, comparator_type>
00168     insert_heap_type;
00169 
00170     typedef priority_queue_local::loser_tree<
00171         value_type,
00172         comparator_type,
00173         IntKMAX> int_merger_type;
00174 
00175     typedef priority_queue_local::ext_merger<
00176         block_type,
00177         comparator_type,
00178         ExtKMAX,
00179         alloc_strategy_type> ext_merger_type;
00180 
00181 
00182     int_merger_type int_mergers[num_int_groups];
00183     pool_type * pool;
00184     bool pool_owned;
00185     ext_merger_type * ext_mergers;
00186 
00187     // one delete buffer for each tree => group buffer
00188     value_type group_buffers[total_num_groups][N + 1];          // tree->group_buffers->delete_buffer (extra space for sentinel)
00189     value_type * group_buffer_current_mins[total_num_groups];   // group_buffer_current_mins[i] is current start of group_buffers[i], end is group_buffers[i] + N
00190 
00191     // overall delete buffer
00192     value_type delete_buffer[delete_buffer_size + 1];
00193     value_type * delete_buffer_current_min;                     // current start of delete_buffer
00194     value_type * delete_buffer_end;                             // end of delete_buffer
00195 
00196     comparator_type cmp;
00197 
00198     // insert buffer
00199     insert_heap_type insert_heap;
00200 
00201     // how many groups are active
00202     unsigned_type num_active_groups;
00203 
00204     // total size not counting insert_heap and delete_buffer
00205     size_type size_;
00206 
00207 private:
00208     void init();
00209 
00210     void refill_delete_buffer();
00211     unsigned_type refill_group_buffer(unsigned_type k);
00212 
00213     unsigned_type make_space_available(unsigned_type level);
00214     void empty_insert_heap();
00215 
00216     value_type get_supremum() const { return cmp.min_value(); } //{ return group_buffers[0][KNN].key; }
00217     unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; }
00218     unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; }
00219 
00220 public:
00226     priority_queue(pool_type & pool_);
00227 
00237     _STXXL_DEPRECATED(priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_));
00238 
00248     priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem);
00249 
00252     void swap(priority_queue & obj)
00253     {
00254         //swap_1D_arrays(int_mergers,obj.int_mergers,num_int_groups); // does not work in g++ 3.4.3 :( bug?
00255         for (unsigned_type i = 0; i < num_int_groups; ++i)
00256             std::swap(int_mergers[i], obj.int_mergers[i]);
00257 
00258         //std::swap(pool,obj.pool);
00259         //std::swap(pool_owned, obj.pool_owned);
00260         std::swap(ext_mergers, obj.ext_mergers);
00261         for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1)
00262             for (unsigned_type i2 = 0; i2 < (N + 1); ++i2)
00263                 std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]);
00264 
00265         swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups);
00266         swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1);
00267         std::swap(delete_buffer_current_min, obj.delete_buffer_current_min);
00268         std::swap(delete_buffer_end, obj.delete_buffer_end);
00269         std::swap(cmp, obj.cmp);
00270         std::swap(insert_heap, obj.insert_heap);
00271         std::swap(num_active_groups, obj.num_active_groups);
00272         std::swap(size_, obj.size_);
00273     }
00274 
00275     virtual ~priority_queue();
00276 
00279     size_type size() const;
00280 
00283     bool empty() const { return (size() == 0); }
00284 
00296     const value_type & top() const;
00297 
00304     void pop();
00305 
00310     void push(const value_type & obj);
00311 
00316     unsigned_type mem_cons() const
00317     {
00318         unsigned_type dynam_alloc_mem = 0;
00319         //dynam_alloc_mem += w_pool.mem_cons();
00320         //dynam_alloc_mem += p_pool.mem_cons();
00321         for (int i = 0; i < num_int_groups; ++i)
00322             dynam_alloc_mem += int_mergers[i].mem_cons();
00323 
00324         for (int i = 0; i < num_ext_groups; ++i)
00325             dynam_alloc_mem += ext_mergers[i].mem_cons();
00326 
00327 
00328         return (sizeof(*this) +
00329                 sizeof(ext_merger_type) * num_ext_groups +
00330                 dynam_alloc_mem);
00331     }
00332 };
00333 
00334 
00335 template <class Config_>
00336 inline typename priority_queue<Config_>::size_type priority_queue<Config_>::size() const
00337 {
00338     return size_ +
00339            insert_heap.size() - 1 +
00340            (delete_buffer_end - delete_buffer_current_min);
00341 }
00342 
00343 
00344 template <class Config_>
00345 inline const typename priority_queue<Config_>::value_type & priority_queue<Config_>::top() const
00346 {
00347     assert(!insert_heap.empty());
00348 
00349     const typename priority_queue<Config_>::value_type & t = insert_heap.top();
00350     if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, t))
00351         return t;
00352     else
00353         return *delete_buffer_current_min;
00354 }
00355 
00356 template <class Config_>
00357 inline void priority_queue<Config_>::pop()
00358 {
00359     //STXXL_VERBOSE1("priority_queue::pop()");
00360     assert(!insert_heap.empty());
00361 
00362     if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, insert_heap.top()))
00363         insert_heap.pop();
00364     else
00365     {
00366         assert(delete_buffer_current_min < delete_buffer_end);
00367         ++delete_buffer_current_min;
00368         if (delete_buffer_current_min == delete_buffer_end)
00369             refill_delete_buffer();
00370     }
00371 }
00372 
00373 template <class Config_>
00374 inline void priority_queue<Config_>::push(const value_type & obj)
00375 {
00376     //STXXL_VERBOSE3("priority_queue::push("<< obj <<")");
00377     assert(int_mergers->not_sentinel(obj));
00378     if (insert_heap.size() == N + 1)
00379         empty_insert_heap();
00380 
00381 
00382     assert(!insert_heap.empty());
00383 
00384     insert_heap.push(obj);
00385 }
00386 
00387 
00389 
00390 template <class Config_>
00391 priority_queue<Config_>::priority_queue(pool_type & pool_) :
00392     pool(&pool_),
00393     pool_owned(false),
00394     delete_buffer_end(delete_buffer + delete_buffer_size),
00395     insert_heap(N + 2),
00396     num_active_groups(0), size_(0)
00397 {
00398     STXXL_VERBOSE2("priority_queue::priority_queue(pool)");
00399     init();
00400 }
00401 
00402 // DEPRECATED
00403 template <class Config_>
00404 priority_queue<Config_>::priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_) :
00405     pool(new pool_type(p_pool_, w_pool_)),
00406     pool_owned(true),
00407     delete_buffer_end(delete_buffer + delete_buffer_size),
00408     insert_heap(N + 2),
00409     num_active_groups(0), size_(0)
00410 {
00411     STXXL_VERBOSE2("priority_queue::priority_queue(p_pool, w_pool)");
00412     init();
00413 }
00414 
00415 template <class Config_>
00416 priority_queue<Config_>::priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem) :
00417     pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
00418     pool_owned(true),
00419     delete_buffer_end(delete_buffer + delete_buffer_size),
00420     insert_heap(N + 2),
00421     num_active_groups(0), size_(0)
00422 {
00423     STXXL_VERBOSE2("priority_queue::priority_queue(pool sizes)");
00424     init();
00425 }
00426 
00427 template <class Config_>
00428 void priority_queue<Config_>::init()
00429 {
00430     assert(!cmp(cmp.min_value(), cmp.min_value())); // verify strict weak ordering
00431 
00432     ext_mergers = new ext_merger_type[num_ext_groups];
00433     for (unsigned_type j = 0; j < num_ext_groups; ++j)
00434         ext_mergers[j].set_pool(pool);
00435 
00436     value_type sentinel = cmp.min_value();
00437     insert_heap.push(sentinel);                                // always keep the sentinel
00438     delete_buffer[delete_buffer_size] = sentinel;              // sentinel
00439     delete_buffer_current_min = delete_buffer_end;             // empty
00440     for (unsigned_type i = 0; i < total_num_groups; i++)
00441     {
00442         group_buffers[i][N] = sentinel;                        // sentinel
00443         group_buffer_current_mins[i] = &(group_buffers[i][N]); // empty
00444     }
00445 }
00446 
00447 template <class Config_>
00448 priority_queue<Config_>::~priority_queue()
00449 {
00450     STXXL_VERBOSE2("priority_queue::~priority_queue()");
00451     if (pool_owned)
00452         delete pool;
00453 
00454     delete[] ext_mergers;
00455 }
00456 
00457 //--------------------- Buffer refilling -------------------------------
00458 
00459 // refill group_buffers[j] and return number of elements found
00460 template <class Config_>
00461 unsigned_type priority_queue<Config_>::refill_group_buffer(unsigned_type group)
00462 {
00463     STXXL_VERBOSE2("priority_queue::refill_group_buffer(" << group << ")");
00464 
00465     value_type * target;
00466     unsigned_type length;
00467     size_type group_size = (group < num_int_groups) ?
00468                            int_mergers[group].size() :
00469                            ext_mergers[group - num_int_groups].size();                         //elements left in segments
00470     unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group]; //elements left in target buffer
00471     if (group_size + left_elements >= size_type(N))
00472     {                                                                                          // buffer will be filled completely
00473         target = group_buffers[group];
00474         length = N - left_elements;
00475     }
00476     else
00477     {
00478         target = group_buffers[group] + N - group_size - left_elements;
00479         length = group_size;
00480     }
00481 
00482     if (length > 0)
00483     {
00484         // shift remaininig elements to front
00485         memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type));
00486         group_buffer_current_mins[group] = target;
00487 
00488         // fill remaining space from group
00489         if (group < num_int_groups)
00490             int_mergers[group].multi_merge(target + left_elements, length);
00491         else
00492             ext_mergers[group - num_int_groups].multi_merge(
00493                 target + left_elements,
00494                 target + left_elements + length);
00495     }
00496 
00497     //STXXL_MSG(length + left_elements);
00498     //std::copy(target,target + length + left_elements,std::ostream_iterator<value_type>(std::cout, "\n"));
00499 #if STXXL_CHECK_ORDER_IN_SORTS
00500     priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp);
00501     if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
00502     {
00503         STXXL_VERBOSE2("length: " << length << " left_elements: " << left_elements);
00504         for (value_type * v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
00505         {
00506             if (inv_cmp(*v, *(v - 1)))
00507             {
00508                 STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << "   " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1));
00509             }
00510         }
00511         assert(false);
00512     }
00513 #endif
00514 
00515     return length + left_elements;
00516 }
00517 
00518 
00519 template <class Config_>
00520 void priority_queue<Config_>::refill_delete_buffer()
00521 {
00522     STXXL_VERBOSE2("priority_queue::refill_delete_buffer()");
00523 
00524     size_type total_group_size = 0;
00525     //num_active_groups is <= 4
00526     for (int i = num_active_groups - 1; i >= 0; i--)
00527     {
00528         if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
00529         {
00530             unsigned_type length = refill_group_buffer(i);
00531             // max active level dry now?
00532             if (length == 0 && unsigned(i) == num_active_groups - 1)
00533                 --num_active_groups;
00534 
00535             total_group_size += length;
00536         }
00537         else
00538             total_group_size += delete_buffer_size;  // actually only a sufficient lower bound
00539     }
00540 
00541     unsigned_type length;
00542     if (total_group_size >= delete_buffer_size)      // buffer can be filled completely
00543     {
00544         length = delete_buffer_size;                 // amount to be copied
00545         size_ -= size_type(delete_buffer_size);      // amount left in group_buffers
00546     }
00547     else
00548     {
00549         length = total_group_size;
00550         assert(size_ == size_type(length)); // trees and group_buffers get empty
00551         size_ = 0;
00552     }
00553 
00554     priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp);
00555 
00556     // now call simplified refill routines
00557     // which can make the assumption that
00558     // they find all they are asked in the buffers
00559     delete_buffer_current_min = delete_buffer_end - length;
00560     STXXL_VERBOSE2("Active groups = " << num_active_groups);
00561     switch (num_active_groups)
00562     {
00563     case 0:
00564         break;
00565     case 1:
00566         std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
00567         group_buffer_current_mins[0] += length;
00568         break;
00569     case 2:
00570 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00571         {
00572             std::pair<value_type *, value_type *> seqs[2] =
00573             {
00574                 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00575                 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
00576             };
00577             parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
00578 
00579             group_buffer_current_mins[0] = seqs[0].first;
00580             group_buffer_current_mins[1] = seqs[1].first;
00581         }
00582 #else
00583         priority_queue_local::merge_iterator(
00584             group_buffer_current_mins[0],
00585             group_buffer_current_mins[1], delete_buffer_current_min, length, cmp);
00586 #endif
00587         break;
00588     case 3:
00589 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00590         {
00591             std::pair<value_type *, value_type *> seqs[3] =
00592             {
00593                 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00594                 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
00595                 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
00596             };
00597             parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
00598 
00599             group_buffer_current_mins[0] = seqs[0].first;
00600             group_buffer_current_mins[1] = seqs[1].first;
00601             group_buffer_current_mins[2] = seqs[2].first;
00602         }
00603 #else
00604         priority_queue_local::merge3_iterator(
00605             group_buffer_current_mins[0],
00606             group_buffer_current_mins[1],
00607             group_buffer_current_mins[2], delete_buffer_current_min, length, cmp);
00608 #endif
00609         break;
00610     case 4:
00611 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00612         {
00613             std::pair<value_type *, value_type *> seqs[4] =
00614             {
00615                 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00616                 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
00617                 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
00618                 std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
00619             };
00620             parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
00621 
00622             group_buffer_current_mins[0] = seqs[0].first;
00623             group_buffer_current_mins[1] = seqs[1].first;
00624             group_buffer_current_mins[2] = seqs[2].first;
00625             group_buffer_current_mins[3] = seqs[3].first;
00626         }
00627 #else
00628         priority_queue_local::merge4_iterator(
00629             group_buffer_current_mins[0],
00630             group_buffer_current_mins[1],
00631             group_buffer_current_mins[2],
00632             group_buffer_current_mins[3], delete_buffer_current_min, length, cmp); //side effect free
00633 #endif
00634         break;
00635     default:
00636         STXXL_THROW(std::runtime_error, "priority_queue<...>::refill_delete_buffer()",
00637                     "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
00638     }
00639 
00640 #if STXXL_CHECK_ORDER_IN_SORTS
00641     if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
00642     {
00643         for (value_type * v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
00644         {
00645             if (inv_cmp(*v, *(v - 1)))
00646             {
00647                 STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << "   " << *(v - 1) << " " << *v);
00648             }
00649         }
00650         assert(false);
00651     }
00652 #endif
00653     //std::copy(delete_buffer_current_min,delete_buffer_current_min + length,std::ostream_iterator<value_type>(std::cout, "\n"));
00654 }
00655 
00656 //--------------------------------------------------------------------
00657 
00658 // check if space is available on level k and
00659 // empty this level if necessary leading to a recursive call.
00660 // return the level where space was finally available
00661 template <class Config_>
00662 unsigned_type priority_queue<Config_>::make_space_available(unsigned_type level)
00663 {
00664     STXXL_VERBOSE2("priority_queue::make_space_available(" << level << ")");
00665     unsigned_type finalLevel;
00666     assert(level < total_num_groups);
00667     assert(level <= num_active_groups);
00668 
00669     if (level == num_active_groups)
00670         ++num_active_groups;
00671 
00672     const bool spaceIsAvailable_ =
00673         (level < num_int_groups) ? int_mergers[level].is_space_available()
00674         : ((level == total_num_groups - 1) ? true : (ext_mergers[level - num_int_groups].is_space_available()));
00675 
00676     if (spaceIsAvailable_)
00677     {
00678         finalLevel = level;
00679     }
00680     else
00681     {
00682         finalLevel = make_space_available(level + 1);
00683 
00684         if (level < num_int_groups - 1)                                  // from internal to internal tree
00685         {
00686             unsigned_type segmentSize = int_mergers[level].size();
00687             value_type * newSegment = new value_type[segmentSize + 1];
00688             int_mergers[level].multi_merge(newSegment, segmentSize);     // empty this level
00689 
00690             newSegment[segmentSize] = delete_buffer[delete_buffer_size]; // sentinel
00691             // for queues where size << #inserts
00692             // it might make sense to stay in this level if
00693             // segmentSize < alpha * KNN * k^level for some alpha < 1
00694             int_mergers[level + 1].insert_segment(newSegment, segmentSize);
00695         }
00696         else
00697         {
00698             if (level == num_int_groups - 1) // from internal to external tree
00699             {
00700                 const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
00701                 STXXL_VERBOSE1("Inserting segment into first level external: " << level << " " << segmentSize);
00702                 ext_mergers[0].insert_segment(int_mergers[num_int_groups - 1], segmentSize);
00703             }
00704             else // from external to external tree
00705             {
00706                 const size_type segmentSize = ext_mergers[level - num_int_groups].size();
00707                 STXXL_VERBOSE1("Inserting segment into second level external: " << level << " " << segmentSize);
00708                 ext_mergers[level - num_int_groups + 1].insert_segment(ext_mergers[level - num_int_groups], segmentSize);
00709             }
00710         }
00711     }
00712     return finalLevel;
00713 }
00714 
00715 
00716 // empty the insert heap into the main data structure
00717 template <class Config_>
00718 void priority_queue<Config_>::empty_insert_heap()
00719 {
00720     STXXL_VERBOSE2("priority_queue::empty_insert_heap()");
00721     assert(insert_heap.size() == (N + 1));
00722 
00723     const value_type sup = get_supremum();
00724 
00725     // build new segment
00726     value_type * newSegment = new value_type[N + 1];
00727     value_type * newPos = newSegment;
00728 
00729     // put the new data there for now
00730     //insert_heap.sortTo(newSegment);
00731     value_type * SortTo = newSegment;
00732 
00733     insert_heap.sort_to(SortTo);
00734 
00735     SortTo = newSegment + N;
00736     insert_heap.clear();
00737     insert_heap.push(*SortTo);
00738 
00739     assert(insert_heap.size() == 1);
00740 
00741     newSegment[N] = sup; // sentinel
00742 
00743     // copy the delete_buffer and group_buffers[0] to temporary storage
00744     // (the temporary can be eliminated using some dirty tricks)
00745     const unsigned_type tempSize = N + delete_buffer_size;
00746     value_type temp[tempSize + 1];
00747     unsigned_type sz1 = current_delete_buffer_size();
00748     unsigned_type sz2 = current_group_buffer_size(0);
00749     value_type * pos = temp + tempSize - sz1 - sz2;
00750     std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
00751     std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
00752     temp[tempSize] = sup; // sentinel
00753 
00754     // refill delete_buffer
00755     // (using more complicated code it could be made somewhat fuller
00756     // in certain circumstances)
00757     priority_queue_local::merge_iterator(pos, newPos, delete_buffer_current_min, sz1, cmp);
00758 
00759     // refill group_buffers[0]
00760     // (as above we might want to take the opportunity
00761     // to make group_buffers[0] fuller)
00762     priority_queue_local::merge_iterator(pos, newPos, group_buffer_current_mins[0], sz2, cmp);
00763 
00764     // merge the rest to the new segment
00765     // note that merge exactly trips into the footsteps
00766     // of itself
00767     priority_queue_local::merge_iterator(pos, newPos, newSegment, N, cmp);
00768 
00769     // and insert it
00770     unsigned_type freeLevel = make_space_available(0);
00771     assert(freeLevel == 0 || int_mergers[0].size() == 0);
00772     int_mergers[0].insert_segment(newSegment, N);
00773 
00774     // get rid of invalid level 2 buffers
00775     // by inserting them into tree 0 (which is almost empty in this case)
00776     if (freeLevel > 0)
00777     {
00778         for (int_type i = freeLevel; i >= 0; i--)
00779         {
00780             // reverse order not needed
00781             // but would allow immediate refill
00782 
00783             newSegment = new value_type[current_group_buffer_size(i) + 1]; // with sentinel
00784             std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
00785             int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i));
00786             group_buffer_current_mins[i] = group_buffers[i] + N;           // empty
00787         }
00788     }
00789 
00790     // update size
00791     size_ += size_type(N);
00792 
00793     // special case if the tree was empty before
00794     if (delete_buffer_current_min == delete_buffer_end)
00795         refill_delete_buffer();
00796 }
00797 
00798 namespace priority_queue_local
00799 {
00800     struct Parameters_for_priority_queue_not_found_Increase_IntM
00801     {
00802         enum { fits = false };
00803         typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
00804     };
00805 
00806     struct dummy
00807     {
00808         enum { fits = false };
00809         typedef dummy result;
00810     };
00811 
00812     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_, bool stop = false>
00813     struct find_B_m
00814     {
00815         typedef find_B_m<E_, IntM_, MaxS_, B_, m_, stop> Self;
00816         enum {
00817             k = IntM_ / B_,    // number of blocks that fit into M
00818             element_size = E_, // element size
00819             IntM = IntM_,
00820             B = B_,            // block size
00821             m = m_,            // number of blocks fitting into buffers
00822             c = k - m_,
00823             // memory occ. by block must be at least 10 times larger than size of ext sequence
00824             // && satisfy memory req && if we have two ext mergers their degree must be at least 64=m/2
00825             fits = c > 10 && ((k - m) * (m) * (m * B / (element_size * 4 * 1024))) >= MaxS_
00826                    && ((MaxS_ < ((k - m) * m / (2 * element_size)) * 1024) || m >= 128),
00827             step = 1
00828         };
00829 
00830         typedef typename find_B_m<element_size, IntM, MaxS_, B, m + step, fits || (m >= k - step)>::result candidate1;
00831         typedef typename find_B_m<element_size, IntM, MaxS_, B / 2, 1, fits || candidate1::fits>::result candidate2;
00832         typedef typename IF<fits, Self, typename IF<candidate1::fits, candidate1, candidate2>::result>::result result;
00833     };
00834 
00835     // specialization for the case when no valid parameters are found
00836     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, bool stop>
00837     struct find_B_m<E_, IntM_, MaxS_, 2048, 1, stop>
00838     {
00839         enum { fits = false };
00840         typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
00841     };
00842 
00843     // to speedup search
00844     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_>
00845     struct find_B_m<E_, IntM_, MaxS_, B_, m_, true>
00846     {
00847         enum { fits = false };
00848         typedef dummy result;
00849     };
00850 
00851     // E_ size of element in bytes
00852     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_>
00853     struct find_settings
00854     {
00855         // start from block size (8*1024*1024) bytes
00856         typedef typename find_B_m<E_, IntM_, MaxS_, (8 * 1024 * 1024), 1>::result result;
00857     };
00858 
00859     struct Parameters_not_found_Try_to_change_the_Tune_parameter
00860     {
00861         typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
00862     };
00863 
00864 
00865     template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize_>
00866     struct compute_N
00867     {
00868         typedef compute_N<AI_, X_, CriticalSize_> Self;
00869         enum
00870         {
00871             X = X_,
00872             AI = AI_,
00873             N = X / (AI * AI) // two stage internal
00874         };
00875         typedef typename IF<(N >= CriticalSize_), Self, typename compute_N<AI / 2, X, CriticalSize_>::result>::result result;
00876     };
00877 
00878     template <unsigned_type X_, unsigned_type CriticalSize_>
00879     struct compute_N<1, X_, CriticalSize_>
00880     {
00881         typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
00882     };
00883 }
00884 
00886 
00889 
00891 
00954 template <class Tp_, class Cmp_, unsigned_type IntM_, unsigned MaxS_, unsigned Tune_ = 6>
00955 class PRIORITY_QUEUE_GENERATOR
00956 {
00957 public:
00958     // actual calculation of B, m, k and element_size
00959     typedef typename priority_queue_local::find_settings<sizeof(Tp_), IntM_, MaxS_>::result settings;
00960     enum {
00961         B = settings::B,
00962         m = settings::m,
00963         X = B * (settings::k - m) / settings::element_size,  // interpretation of result
00964         Buffer1Size = 32                                     // fixed
00965     };
00966     // derivation of N, AI, AE
00967     typedef typename priority_queue_local::compute_N<(1 << Tune_), X, 4 * Buffer1Size>::result ComputeN;
00968     enum
00969     {
00970         N = ComputeN::N,
00971         AI = ComputeN::AI,
00972         AE = (m / 2 < 2) ? 2 : (m / 2)            // at least 2
00973     };
00974     enum {
00975         // Estimation of maximum internal memory consumption (in bytes)
00976         EConsumption = X * settings::element_size + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024
00977     };
00978     /*
00979         unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
00980         unsigned N_ = 512,          // bandwidth
00981         unsigned IntKMAX_ = 64,     // maximal arity for internal mergers
00982         unsigned IntLevels_ = 4,
00983         unsigned BlockSize_ = (2*1024*1024),
00984         unsigned ExtKMAX_ = 64,     // maximal arity for external mergers
00985         unsigned ExtLevels_ = 2,
00986      */
00987     typedef priority_queue<priority_queue_config<Tp_, Cmp_, Buffer1Size, N, AI, 2, B, AE, 2> > result;
00988 };
00989 
00991 
00992 __STXXL_END_NAMESPACE
00993 
00994 
00995 namespace std
00996 {
00997     template <class Config_>
00998     void swap(stxxl::priority_queue<Config_> & a,
00999               stxxl::priority_queue<Config_> & b)
01000     {
01001         a.swap(b);
01002     }
01003 }
01004 
01005 #endif // !STXXL_PRIORITY_QUEUE_HEADER
01006 // vim: et:ts=4:sw=4

Generated by  doxygen 1.7.1