• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • Examples
  • File List

sort_stream.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/stream/sort_stream.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2005 Roman Dementiev <[email protected]>
00007  *  Copyright (C) 2006-2008 Johannes Singler <[email protected]>
00008  *  Copyright (C) 2008-2010 Andreas Beckmann <[email protected]>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #ifndef STXXL_SORT_STREAM_HEADER
00016 #define STXXL_SORT_STREAM_HEADER
00017 
00018 #ifdef STXXL_BOOST_CONFIG
00019  #include <boost/config.hpp>
00020 #endif
00021 
00022 #include <stxxl/bits/stream/stream.h>
00023 #include <stxxl/bits/mng/mng.h>
00024 #include <stxxl/bits/algo/sort_base.h>
00025 #include <stxxl/bits/algo/sort_helper.h>
00026 #include <stxxl/bits/algo/adaptor.h>
00027 #include <stxxl/bits/algo/run_cursor.h>
00028 #include <stxxl/bits/algo/losertree.h>
00029 #include <stxxl/bits/stream/sorted_runs.h>
00030 
00031 
00032 __STXXL_BEGIN_NAMESPACE
00033 
00034 namespace stream
00035 {
00038 
00039 
00041     //     CREATE RUNS                                                    //
00043 
00050     template <
00051         class Input_,
00052         class Cmp_,
00053         unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00054         class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00055     class basic_runs_creator : private noncopyable
00056     {
00057     protected:
00058         Input_ & input;
00059         Cmp_ cmp;
00060 
00061     public:
00062         typedef Cmp_ cmp_type;
00063         typedef typename Input_::value_type value_type;
00064         typedef typed_block<BlockSize_, value_type> block_type;
00065         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00066         typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00067 
00068     private:
00069         typedef typename sorted_runs_type::run_type run_type;
00070         sorted_runs_type result_; // stores the result (sorted runs)
00071         unsigned_type m_;         // memory for internal use in blocks
00072         bool result_computed;     // true iff result is already computed (used in 'result' method)
00073 
00075         unsigned_type fetch(block_type * blocks, unsigned_type first_idx, unsigned_type last_idx)
00076         {
00077             typename element_iterator_traits<block_type>::element_iterator output =
00078                 make_element_iterator(blocks, first_idx);
00079             unsigned_type curr_idx = first_idx;
00080             while (!input.empty() && curr_idx != last_idx) {
00081                 *output = *input;
00082                 ++input;
00083                 ++output;
00084                 ++curr_idx;
00085             }
00086             return curr_idx;
00087         }
00088 
00089         void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
00090         {
00091             unsigned_type last_idx = num_blocks * block_type::size;
00092             if (first_idx < last_idx) {
00093                 typename element_iterator_traits<block_type>::element_iterator curr =
00094                     make_element_iterator(blocks, first_idx);
00095                 while (first_idx != last_idx) {
00096                     *curr = cmp.max_value();
00097                     ++curr;
00098                     ++first_idx;
00099                 }
00100             }
00101         }
00102 
00104         void sort_run(block_type * run, unsigned_type elements)
00105         {
00106             potentially_parallel::
00107             sort(make_element_iterator(run, 0),
00108                  make_element_iterator(run, elements),
00109                  cmp);
00110         }
00111 
00112         void compute_result();
00113 
00114     public:
00119         basic_runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) :
00120             input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false)
00121         {
00122             sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00123             if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00124                 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00125             }
00126             assert(m_ > 0);
00127         }
00128 
00132         const sorted_runs_type & result()
00133         {
00134             if (!result_computed)
00135             {
00136                 compute_result();
00137                 result_computed = true;
00138 #ifdef STXXL_PRINT_STAT_AFTER_RF
00139                 STXXL_MSG(*stats::get_instance());
00140 #endif //STXXL_PRINT_STAT_AFTER_RF
00141             }
00142             return result_;
00143         }
00144     };
00145 
00149     template <class Input_, class Cmp_, unsigned BlockSize_, class AllocStr_>
00150     void basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result()
00151     {
00152         unsigned_type i = 0;
00153         unsigned_type m2 = m_ / 2;
00154         const unsigned_type el_in_run = m2 * block_type::size; // # el in a run
00155         STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2);
00156         unsigned_type blocks1_length = 0, blocks2_length = 0;
00157         block_type * Blocks1 = NULL;
00158 
00159 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
00160         Blocks1 = new block_type[m2 * 2];
00161 #else
00162         while (!input.empty() && blocks1_length != block_type::size)
00163         {
00164             result_.small_.push_back(*input);
00165             ++input;
00166             ++blocks1_length;
00167         }
00168 
00169         if (blocks1_length == block_type::size && !input.empty())
00170         {
00171             Blocks1 = new block_type[m2 * 2];
00172             std::copy(result_.small_.begin(), result_.small_.end(), Blocks1[0].begin());
00173             result_.small_.clear();
00174         }
00175         else
00176         {
00177             STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
00178             result_.elements = blocks1_length;
00179             potentially_parallel::
00180             sort(result_.small_.begin(), result_.small_.end(), cmp);
00181             return;
00182         }
00183 #endif //STXXL_SMALL_INPUT_PSORT_OPT
00184 
00185         // the first block may be there already
00186         blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
00187 
00188         // sort first run
00189         sort_run(Blocks1, blocks1_length);
00190         if (blocks1_length <= block_type::size && input.empty())
00191         {
00192             // small input, do not flush it on the disk(s)
00193             STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
00194             assert(result_.small_.empty());
00195             result_.small_.insert(result_.small_.end(), Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
00196             result_.elements = blocks1_length;
00197             delete[] Blocks1;
00198             return;
00199         }
00200 
00201         block_type * Blocks2 = Blocks1 + m2;
00202         block_manager * bm = block_manager::get_instance();
00203         request_ptr * write_reqs = new request_ptr[m2];
00204         run_type run;
00205 
00206         unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size);  // in blocks
00207         run.resize(cur_run_size);
00208         bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00209 
00210         disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00211 
00212         // fill the rest of the last block with max values
00213         fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
00214 
00215         for (i = 0; i < cur_run_size; ++i)
00216         {
00217             run[i].value = Blocks1[i][0];
00218             write_reqs[i] = Blocks1[i].write(run[i].bid);
00219         }
00220         result_.runs.push_back(run);
00221         result_.runs_sizes.push_back(blocks1_length);
00222         result_.elements += blocks1_length;
00223 
00224         if (input.empty())
00225         {
00226             // return
00227             wait_all(write_reqs, write_reqs + cur_run_size);
00228             delete[] write_reqs;
00229             delete[] Blocks1;
00230             return;
00231         }
00232 
00233         STXXL_VERBOSE1("Filling the second part of the allocated blocks");
00234         blocks2_length = fetch(Blocks2, 0, el_in_run);
00235 
00236         if (input.empty())
00237         {
00238             // optimization if the whole set fits into both halves
00239             // (re)sort internally and return
00240             blocks2_length += el_in_run;
00241             sort_run(Blocks1, blocks2_length);  // sort first an second run together
00242             wait_all(write_reqs, write_reqs + cur_run_size);
00243             bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00244 
00245             cur_run_size = div_ceil(blocks2_length, block_type::size);
00246             run.resize(cur_run_size);
00247             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00248 
00249             // fill the rest of the last block with max values
00250             fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
00251 
00252             assert(cur_run_size > m2);
00253 
00254             for (i = 0; i < m2; ++i)
00255             {
00256                 run[i].value = Blocks1[i][0];
00257                 write_reqs[i]->wait();
00258                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00259             }
00260 
00261             request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2];
00262 
00263             for ( ; i < cur_run_size; ++i)
00264             {
00265                 run[i].value = Blocks1[i][0];
00266                 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
00267             }
00268 
00269             result_.runs[0] = run;
00270             result_.runs_sizes[0] = blocks2_length;
00271             result_.elements = blocks2_length;
00272 
00273             wait_all(write_reqs, write_reqs + m2);
00274             delete[] write_reqs;
00275             wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
00276             delete[] write_reqs1;
00277 
00278             delete[] Blocks1;
00279 
00280             return;
00281         }
00282 
00283         // more than 2 runs can be filled, i. e. the general case
00284 
00285         sort_run(Blocks2, blocks2_length);
00286 
00287         cur_run_size = div_ceil(blocks2_length, block_type::size);  // in blocks
00288         run.resize(cur_run_size);
00289         bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00290 
00291         for (i = 0; i < cur_run_size; ++i)
00292         {
00293             run[i].value = Blocks2[i][0];
00294             write_reqs[i]->wait();
00295             write_reqs[i] = Blocks2[i].write(run[i].bid);
00296         }
00297         assert((blocks2_length % el_in_run) == 0);
00298 
00299         result_.runs.push_back(run);
00300         result_.runs_sizes.push_back(blocks2_length);
00301         result_.elements += blocks2_length;
00302 
00303         while (!input.empty())
00304         {
00305             blocks1_length = fetch(Blocks1, 0, el_in_run);
00306             sort_run(Blocks1, blocks1_length);
00307             cur_run_size = div_ceil(blocks1_length, block_type::size);  // in blocks
00308             run.resize(cur_run_size);
00309             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00310 
00311             // fill the rest of the last block with max values (occurs only on the last run)
00312             fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
00313 
00314             for (i = 0; i < cur_run_size; ++i)
00315             {
00316                 run[i].value = Blocks1[i][0];
00317                 write_reqs[i]->wait();
00318                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00319             }
00320             result_.runs.push_back(run);
00321             result_.runs_sizes.push_back(blocks1_length);
00322             result_.elements += blocks1_length;
00323 
00324             std::swap(Blocks1, Blocks2);
00325             std::swap(blocks1_length, blocks2_length);
00326         }
00327 
00328         wait_all(write_reqs, write_reqs + m2);
00329         delete[] write_reqs;
00330         delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00331     }
00332 
00339     template <
00340         class Input_,
00341         class Cmp_,
00342         unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00343         class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00344     class runs_creator : public basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>
00345     {
00346     private:
00347         typedef basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> base;
00348 
00349     public:
00350         typedef typename base::block_type block_type;
00351 
00352     private:
00353         using base::input;
00354 
00355     public:
00360         runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) : base(i, c, memory_to_use)
00361         { }
00362     };
00363 
00364 
00372     template <class ValueType_>
00373     struct use_push
00374     {
00375         typedef ValueType_ value_type;
00376     };
00377 
00388     template <
00389         class ValueType_,
00390         class Cmp_,
00391         unsigned BlockSize_,
00392         class AllocStr_>
00393     class runs_creator<
00394         use_push<ValueType_>,
00395         Cmp_,
00396         BlockSize_,
00397         AllocStr_>
00398         : private noncopyable
00399     {
00400         Cmp_ cmp;
00401 
00402     public:
00403         typedef Cmp_ cmp_type;
00404         typedef ValueType_ value_type;
00405         typedef typed_block<BlockSize_, value_type> block_type;
00406         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00407         typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00408         typedef sorted_runs_type result_type;
00409 
00410     private:
00411         typedef typename sorted_runs_type::run_type run_type;
00412         sorted_runs_type result_; // stores the result (sorted runs)
00413         unsigned_type m_;         // memory for internal use in blocks
00414 
00415         bool result_computed;     // true after the result() method was called for the first time
00416 
00417         const unsigned_type m2;
00418         const unsigned_type el_in_run;
00419         unsigned_type cur_el;
00420         block_type * Blocks1;
00421         block_type * Blocks2;
00422         request_ptr * write_reqs;
00423         run_type run;
00424 
00425         void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
00426         {
00427             unsigned_type last_idx = num_blocks * block_type::size;
00428             if (first_idx < last_idx) {
00429                 typename element_iterator_traits<block_type>::element_iterator curr =
00430                     make_element_iterator(blocks, first_idx);
00431                 while (first_idx != last_idx) {
00432                     *curr = cmp.max_value();
00433                     ++curr;
00434                     ++first_idx;
00435                 }
00436             }
00437         }
00438 
00439         void sort_run(block_type * run, unsigned_type elements)
00440         {
00441             potentially_parallel::
00442             sort(make_element_iterator(run, 0),
00443                  make_element_iterator(run, elements),
00444                  cmp);
00445         }
00446 
00447         void compute_result()
00448         {
00449             if (cur_el == 0)
00450                 return;
00451 
00452             unsigned_type cur_el_reg = cur_el;
00453             sort_run(Blocks1, cur_el_reg);
00454             result_.elements += cur_el_reg;
00455             if (cur_el_reg <= block_type::size && result_.elements == cur_el_reg)
00456             {
00457                 // small input, do not flush it on the disk(s)
00458                 STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << cur_el_reg);
00459                 result_.small_.resize(cur_el_reg);
00460                 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin());
00461                 return;
00462             }
00463 
00464             const unsigned_type cur_run_size = div_ceil(cur_el_reg, block_type::size);     // in blocks
00465             run.resize(cur_run_size);
00466             block_manager * bm = block_manager::get_instance();
00467             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00468 
00469             disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00470 
00471             result_.runs_sizes.push_back(cur_el_reg);
00472 
00473             // fill the rest of the last block with max values
00474             fill_with_max_value(Blocks1, cur_run_size, cur_el_reg);
00475 
00476             unsigned_type i = 0;
00477             for ( ; i < cur_run_size; ++i)
00478             {
00479                 run[i].value = Blocks1[i][0];
00480                 if (write_reqs[i].get())
00481                     write_reqs[i]->wait();
00482 
00483                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00484             }
00485             result_.runs.push_back(run);
00486 
00487             for (i = 0; i < m2; ++i)
00488                 if (write_reqs[i].get())
00489                     write_reqs[i]->wait();
00490         }
00491 
00492         void cleanup()
00493         {
00494             delete[] write_reqs;
00495             delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00496             write_reqs = NULL;
00497             Blocks1 = Blocks2 = NULL;
00498         }
00499 
00500     public:
00504         runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00505             cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false),
00506             m2(m_ / 2),
00507             el_in_run(m2 * block_type::size),
00508             cur_el(0),
00509             Blocks1(new block_type[m2 * 2]),
00510             Blocks2(Blocks1 + m2),
00511             write_reqs(new request_ptr[m2])
00512         {
00513             sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00514             if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00515                 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00516             }
00517             assert(m2 > 0);
00518         }
00519 
00520         ~runs_creator()
00521         {
00522             if (!result_computed)
00523                 cleanup();
00524         }
00525 
00528         void push(const value_type & val)
00529         {
00530             assert(result_computed == false);
00531             unsigned_type cur_el_reg = cur_el;
00532             if (cur_el_reg < el_in_run)
00533             {
00534                 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = val;
00535                 ++cur_el;
00536                 return;
00537             }
00538 
00539             assert(el_in_run == cur_el);
00540             cur_el = 0;
00541 
00542             //sort and store Blocks1
00543             sort_run(Blocks1, el_in_run);
00544             result_.elements += el_in_run;
00545 
00546             const unsigned_type cur_run_size = div_ceil(el_in_run, block_type::size);    // in blocks
00547             run.resize(cur_run_size);
00548             block_manager * bm = block_manager::get_instance();
00549             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00550 
00551             disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00552 
00553             result_.runs_sizes.push_back(el_in_run);
00554 
00555             for (unsigned_type i = 0; i < cur_run_size; ++i)
00556             {
00557                 run[i].value = Blocks1[i][0];
00558                 if (write_reqs[i].get())
00559                     write_reqs[i]->wait();
00560 
00561                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00562             }
00563 
00564             result_.runs.push_back(run);
00565 
00566             std::swap(Blocks1, Blocks2);
00567 
00568             push(val);
00569         }
00570 
00574         const sorted_runs_type & result()
00575         {
00576             if (1)
00577             {
00578                 if (!result_computed)
00579                 {
00580                     compute_result();
00581                     result_computed = true;
00582                     cleanup();
00583 #ifdef STXXL_PRINT_STAT_AFTER_RF
00584                     STXXL_MSG(*stats::get_instance());
00585 #endif //STXXL_PRINT_STAT_AFTER_RF
00586                 }
00587             }
00588             return result_;
00589         }
00590     };
00591 
00592 
00599     template <class ValueType_>
00600     struct from_sorted_sequences
00601     {
00602         typedef ValueType_ value_type;
00603     };
00604 
00615     template <
00616         class ValueType_,
00617         class Cmp_,
00618         unsigned BlockSize_,
00619         class AllocStr_>
00620     class runs_creator<
00621         from_sorted_sequences<ValueType_>,
00622         Cmp_,
00623         BlockSize_,
00624         AllocStr_>
00625         : private noncopyable
00626     {
00627         typedef ValueType_ value_type;
00628         typedef typed_block<BlockSize_, value_type> block_type;
00629         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00630         typedef AllocStr_ alloc_strategy_type;
00631         Cmp_ cmp;
00632 
00633     public:
00634         typedef Cmp_ cmp_type;
00635         typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00636         typedef sorted_runs_type result_type;
00637 
00638     private:
00639         typedef typename sorted_runs_type::run_type run_type;
00640 
00641         sorted_runs_type result_; // stores the result (sorted runs)
00642         unsigned_type m_;         // memory for internal use in blocks
00643         buffered_writer<block_type> writer;
00644         block_type * cur_block;
00645         unsigned_type offset;
00646         unsigned_type iblock;
00647         unsigned_type irun;
00648         alloc_strategy_type alloc_strategy;  // needs to be reset after each run
00649 
00650     public:
00655         runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00656             cmp(c),
00657             m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
00658             writer(m_, m_ / 2),
00659             cur_block(writer.get_free_block()),
00660             offset(0),
00661             iblock(0),
00662             irun(0)
00663         {
00664             sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00665             assert(m_ > 0);
00666             if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00667                 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00668             }
00669         }
00670 
00673         void push(const value_type & val)
00674         {
00675             assert(offset < block_type::size);
00676 
00677             (*cur_block)[offset] = val;
00678             ++offset;
00679 
00680             if (offset == block_type::size)
00681             {
00682                 // write current block
00683 
00684                 block_manager * bm = block_manager::get_instance();
00685                 // allocate space for the block
00686                 result_.runs.resize(irun + 1);
00687                 result_.runs[irun].resize(iblock + 1);
00688                 bm->new_blocks(
00689                     alloc_strategy,
00690                     make_bid_iterator(result_.runs[irun].begin() + iblock),
00691                     make_bid_iterator(result_.runs[irun].end()),
00692                     iblock
00693                     );
00694 
00695                 result_.runs[irun][iblock].value = (*cur_block)[0];         // init trigger
00696                 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00697                 ++iblock;
00698 
00699                 offset = 0;
00700             }
00701 
00702             ++result_.elements;
00703         }
00704 
00706         void finish()
00707         {
00708             if (offset == 0 && iblock == 0) // current run is empty
00709                 return;
00710 
00711 
00712             result_.runs_sizes.resize(irun + 1);
00713             result_.runs_sizes.back() = iblock * block_type::size + offset;
00714 
00715             if (offset)    // if current block is partially filled
00716             {
00717                 while (offset != block_type::size)
00718                 {
00719                     (*cur_block)[offset] = cmp.max_value();
00720                     ++offset;
00721                 }
00722                 offset = 0;
00723 
00724                 block_manager * bm = block_manager::get_instance();
00725                 // allocate space for the block
00726                 result_.runs.resize(irun + 1);
00727                 result_.runs[irun].resize(iblock + 1);
00728                 bm->new_blocks(
00729                     alloc_strategy,
00730                     make_bid_iterator(result_.runs[irun].begin() + iblock),
00731                     make_bid_iterator(result_.runs[irun].end()),
00732                     iblock
00733                     );
00734 
00735                 result_.runs[irun][iblock].value = (*cur_block)[0];         // init trigger
00736                 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00737             }
00738             else
00739             { }
00740 
00741             alloc_strategy = alloc_strategy_type();  // reinitialize block allocator for the next run
00742             iblock = 0;
00743             ++irun;
00744         }
00745 
00749         const sorted_runs_type & result()
00750         {
00751             finish();
00752             writer.flush();
00753 
00754             return result_;
00755         }
00756     };
00757 
00758 
00763     template <class RunsType_, class Cmp_>
00764     bool check_sorted_runs(const RunsType_ & sruns, Cmp_ cmp)
00765     {
00766         sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00767         typedef typename RunsType_::block_type block_type;
00768         typedef typename block_type::value_type value_type;
00769         STXXL_VERBOSE2("Elements: " << sruns.elements);
00770         unsigned_type nruns = sruns.runs.size();
00771         STXXL_VERBOSE2("Runs: " << nruns);
00772         unsigned_type irun = 0;
00773         for (irun = 0; irun < nruns; ++irun)
00774         {
00775             const unsigned_type nblocks = sruns.runs[irun].size();
00776             block_type * blocks = new block_type[nblocks];
00777             request_ptr * reqs = new request_ptr[nblocks];
00778             for (unsigned_type j = 0; j < nblocks; ++j)
00779             {
00780                 reqs[j] = blocks[j].read(sruns.runs[irun][j].bid);
00781             }
00782             wait_all(reqs, reqs + nblocks);
00783             for (unsigned_type j = 0; j < nblocks; ++j)
00784             {
00785                 if (cmp(blocks[j][0], sruns.runs[irun][j].value) ||
00786                     cmp(sruns.runs[irun][j].value, blocks[j][0])) 
00787                 {
00788                     STXXL_ERRMSG("check_sorted_runs  wrong trigger in the run");
00789                     return false;
00790                 }
00791             }
00792             if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
00793                                   make_element_iterator(blocks, sruns.runs_sizes[irun]),
00794                                   cmp))
00795             {
00796                 STXXL_ERRMSG("check_sorted_runs  wrong order in the run");
00797                 return false;
00798             }
00799 
00800             delete[] reqs;
00801             delete[] blocks;
00802         }
00803 
00804         STXXL_MSG("Checking runs finished successfully");
00805 
00806         return true;
00807     }
00808 
00809 
00811     //     MERGE RUNS                                                     //
00813 
00820     template <class RunsType_,
00821               class Cmp_,
00822               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00823     class basic_runs_merger : private noncopyable
00824     {
00825     protected:
00826         typedef RunsType_ sorted_runs_type;
00827         typedef AllocStr_ alloc_strategy;
00828         typedef typename sorted_runs_type::size_type size_type;
00829         typedef Cmp_ value_cmp;
00830         typedef typename sorted_runs_type::run_type run_type;
00831         typedef typename sorted_runs_type::block_type block_type;
00832         typedef block_type out_block_type;
00833         typedef typename run_type::value_type trigger_entry_type;
00834         typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00835         typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00836         typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00837         typedef loser_tree<run_cursor_type, run_cursor2_cmp_type> loser_tree_type;
00838         typedef stxxl::int64 diff_type;
00839         typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00840         typedef typename std::vector<sequence>::size_type seqs_size_type;
00841 
00842     public:
00844         typedef typename sorted_runs_type::value_type value_type;
00845 
00846     private:
00847         sorted_runs_type sruns;
00848         value_cmp cmp;
00849         size_type elements_remaining;
00850 
00851         out_block_type * current_block;
00852         unsigned_type buffer_pos;
00853         value_type current_value;               // cache for the current value
00854 
00855         run_type consume_seq;
00856         int_type * prefetch_seq;
00857         prefetcher_type * prefetcher;
00858         loser_tree_type * losers;
00859 #if STXXL_PARALLEL_MULTIWAY_MERGE
00860         std::vector<sequence> * seqs;
00861         std::vector<block_type *> * buffers;
00862         diff_type num_currently_mergeable;
00863 #endif
00864 
00865 #if STXXL_CHECK_ORDER_IN_SORTS
00866         value_type last_element;
00867 #endif //STXXL_CHECK_ORDER_IN_SORTS
00868         
00870 
00871         void merge_recursively(unsigned_type memory_to_use);
00872 
00873         void deallocate_prefetcher()
00874         {
00875             if (prefetcher)
00876             {
00877                 delete losers;
00878 #if STXXL_PARALLEL_MULTIWAY_MERGE
00879                 delete seqs;
00880                 delete buffers;
00881 #endif
00882                 delete prefetcher;
00883                 delete[] prefetch_seq;
00884                 prefetcher = NULL;
00885             }
00886             // free blocks in runs , (or the user should do it?)
00887             sruns.deallocate_blocks();
00888         }
00889 
00890         void fill_current_block()
00891         {
00892             STXXL_VERBOSE1("fill_current_block");
00893             if (do_parallel_merge())
00894             {
00895 #if STXXL_PARALLEL_MULTIWAY_MERGE
00896 // begin of STL-style merging
00897                 diff_type rest = out_block_type::size;          // elements still to merge for this output block
00898 
00899                 do                                              // while rest > 0 and still elements available
00900                 {
00901                     if (num_currently_mergeable < rest)
00902                     {
00903                         if (!prefetcher || prefetcher->empty())
00904                         {
00905                             // anything remaining is already in memory
00906                             num_currently_mergeable = elements_remaining;
00907                         }
00908                         else
00909                         {
00910                             num_currently_mergeable = sort_helper::count_elements_less_equal(
00911                                 *seqs, consume_seq[prefetcher->pos()].value, cmp);
00912                         }
00913                     }
00914 
00915                     diff_type output_size = STXXL_MIN(num_currently_mergeable, rest);     // at most rest elements
00916 
00917                     STXXL_VERBOSE1("before merge " << output_size);
00918 
00919                     stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size);
00920                     // sequence iterators are progressed appropriately
00921 
00922                     rest -= output_size;
00923                     num_currently_mergeable -= output_size;
00924 
00925                     STXXL_VERBOSE1("after merge");
00926 
00927                     sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *prefetcher);
00928                 } while (rest > 0 && (*seqs).size() > 0);
00929 
00930 #if STXXL_CHECK_ORDER_IN_SORTS
00931                 if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp))
00932                 {
00933                     for (value_type * i = current_block->begin() + 1; i != current_block->end(); ++i)
00934                         if (cmp(*i, *(i - 1)))
00935                         {
00936                             STXXL_VERBOSE1("Error at position " << (i - current_block->begin()));
00937                         }
00938                     assert(false);
00939                 }
00940 #endif //STXXL_CHECK_ORDER_IN_SORTS
00941 
00942 // end of STL-style merging
00943 #else
00944                 STXXL_THROW_UNREACHABLE();
00945 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
00946             }
00947             else
00948             {
00949 // begin of native merging procedure
00950                 losers->multi_merge(current_block->elem, current_block->elem + STXXL_MIN<size_type>(out_block_type::size, elements_remaining));
00951 // end of native merging procedure
00952             }
00953             STXXL_VERBOSE1("current block filled");
00954 
00955             if (elements_remaining <= out_block_type::size)
00956                 deallocate_prefetcher();
00957         }
00958 
00959     public:
00964         basic_runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
00965             sruns(r),
00966             cmp(c),
00967             elements_remaining(sruns.elements),
00968             current_block(NULL),
00969             buffer_pos(0),
00970             prefetch_seq(NULL),
00971             prefetcher(NULL),
00972             losers(NULL)
00973 #if STXXL_PARALLEL_MULTIWAY_MERGE
00974             , seqs(NULL),
00975             buffers(NULL),
00976             num_currently_mergeable(0)
00977 #endif
00978 #if STXXL_CHECK_ORDER_IN_SORTS
00979             , last_element(cmp.min_value())
00980 #endif //STXXL_CHECK_ORDER_IN_SORTS
00981         {
00982             initialize(r, memory_to_use);
00983         }
00984 
00985     protected:
00986         void initialize(const sorted_runs_type & r, unsigned_type memory_to_use)
00987         {
00988             sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00989 
00990             sruns = r;
00991             elements_remaining = r.elements;
00992 
00993             if (empty())
00994                 return;
00995 
00996             if (!sruns.small_run().empty())
00997             {
00998                 // we have a small input <= B, that is kept in the main memory
00999                 STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << elements_remaining);
01000                 assert(elements_remaining == size_type(sruns.small_run().size()));
01001                 assert(sruns.small_run().size() <= out_block_type::size);
01002                 current_block = new out_block_type;
01003                 std::copy(sruns.small_run().begin(), sruns.small_run().end(), current_block->begin());
01004                 current_value = current_block->elem[0];
01005                 buffer_pos = 1;
01006 
01007                 return;
01008             }
01009 
01010 #if STXXL_CHECK_ORDER_IN_SORTS
01011             assert(check_sorted_runs(r, cmp));
01012 #endif //STXXL_CHECK_ORDER_IN_SORTS
01013 
01014             disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
01015 
01016             int_type disks_number = config::get_instance()->disks_number();
01017             unsigned_type min_prefetch_buffers = 2 * disks_number;
01018             unsigned_type input_buffers = (memory_to_use > sizeof(out_block_type) ? memory_to_use - sizeof(out_block_type) : 0) / block_type::raw_size;
01019             unsigned_type nruns = sruns.runs.size();
01020 
01021             if (input_buffers < nruns + min_prefetch_buffers)
01022             {
01023                 // can not merge runs in one pass
01024                 // merge recursively:
01025                 STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better");
01026                 STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)");
01027                 STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger.");
01028                 STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers);
01029 
01030                 // check whether we have enough memory to merge recursively
01031                 unsigned_type recursive_merge_buffers = memory_to_use / block_type::raw_size;
01032                 if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
01033                     // recursive merge uses min_prefetch_buffers for input buffering and min_prefetch_buffers output buffering
01034                     // as well as 1 current output block and at least 2 input blocks
01035                     STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but "
01036                                  << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and");
01037                     STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting.");
01038                     throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
01039                 }
01040 
01041                 merge_recursively(memory_to_use);
01042 
01043                 nruns = sruns.runs.size();
01044             }
01045 
01046             assert(nruns + min_prefetch_buffers <= input_buffers);
01047 
01048             unsigned_type i;
01049             unsigned_type prefetch_seq_size = 0;
01050             for (i = 0; i < nruns; ++i)
01051             {
01052                 prefetch_seq_size += sruns.runs[i].size();
01053             }
01054 
01055             consume_seq.resize(prefetch_seq_size);
01056             prefetch_seq = new int_type[prefetch_seq_size];
01057 
01058             typename run_type::iterator copy_start = consume_seq.begin();
01059             for (i = 0; i < nruns; ++i)
01060             {
01061                 copy_start = std::copy(
01062                     sruns.runs[i].begin(),
01063                     sruns.runs[i].end(),
01064                     copy_start);
01065             }
01066 
01067             std::stable_sort(consume_seq.begin(), consume_seq.end(),
01068                              sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
01069 
01070             const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
01071 
01072 #if STXXL_SORT_OPTIMAL_PREFETCHING
01073             // heuristic
01074             const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
01075 
01076             compute_prefetch_schedule(
01077                 consume_seq,
01078                 prefetch_seq,
01079                 n_opt_prefetch_buffers,
01080                 disks_number);
01081 #else
01082             for (i = 0; i < prefetch_seq_size; ++i)
01083                 prefetch_seq[i] = i;
01084 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
01085 
01086             prefetcher = new prefetcher_type(
01087                 consume_seq.begin(),
01088                 consume_seq.end(),
01089                 prefetch_seq,
01090                 STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
01091 
01092             if (do_parallel_merge())
01093             {
01094 #if STXXL_PARALLEL_MULTIWAY_MERGE
01095 // begin of STL-style merging
01096                 seqs = new std::vector<sequence>(nruns);
01097                 buffers = new std::vector<block_type *>(nruns);
01098 
01099                 for (unsigned_type i = 0; i < nruns; ++i)                                       //initialize sequences
01100                 {
01101                     (*buffers)[i] = prefetcher->pull_block();                                   //get first block of each run
01102                     (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());  //this memory location stays the same, only the data is exchanged
01103                 }
01104 // end of STL-style merging
01105 #else
01106                 STXXL_THROW_UNREACHABLE();
01107 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
01108             }
01109             else
01110             {
01111 // begin of native merging procedure
01112                 losers = new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp));
01113 // end of native merging procedure
01114             }
01115 
01116             current_block = new out_block_type;
01117             fill_current_block();
01118 
01119             current_value = current_block->elem[0];
01120             buffer_pos = 1;
01121         }
01122 
01123     public:
01125         bool empty() const
01126         {
01127             return elements_remaining == 0;
01128         }
01129 
01131         const value_type & operator * () const
01132         {
01133             assert(!empty());
01134             return current_value;
01135         }
01136 
01138         const value_type * operator -> () const
01139         {
01140             return &(operator * ());
01141         }
01142 
01144         basic_runs_merger & operator ++ ()  // preincrement operator
01145         {
01146             assert(!empty());
01147 
01148             --elements_remaining;
01149 
01150             if (buffer_pos != out_block_type::size)
01151             {
01152                 current_value = current_block->elem[buffer_pos];
01153                 ++buffer_pos;
01154             }
01155             else
01156             {
01157                 if (!empty())
01158                 {
01159                     fill_current_block();
01160 
01161 #if STXXL_CHECK_ORDER_IN_SORTS
01162                     assert(stxxl::is_sorted(current_block->elem, current_block->elem + STXXL_MIN<size_type>(elements_remaining, current_block->size), cmp));
01163                     assert(!cmp(current_block->elem[0], current_value));
01164 #endif //STXXL_CHECK_ORDER_IN_SORTS
01165                     current_value = current_block->elem[0];
01166                     buffer_pos = 1;
01167                 }
01168             }
01169 
01170 #if STXXL_CHECK_ORDER_IN_SORTS
01171             if (!empty())
01172             {
01173                 assert(!cmp(current_value, last_element));
01174                 last_element = current_value;
01175             }
01176 #endif //STXXL_CHECK_ORDER_IN_SORTS
01177 
01178             return *this;
01179         }
01180 
01183         virtual ~basic_runs_merger()
01184         {
01185             deallocate_prefetcher();
01186             delete current_block;
01187         }
01188     };
01189 
01190 
01191     template <class RunsType_, class Cmp_, class AllocStr_>
01192     void basic_runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively(unsigned_type memory_to_use)
01193     {
01194         block_manager * bm = block_manager::get_instance();
01195         unsigned_type ndisks = config::get_instance()->disks_number();
01196         unsigned_type nwrite_buffers = 2 * ndisks;
01197         unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type);
01198 
01199         // memory consumption of the recursive merger (uses block_type as out_block_type)
01200         unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type);
01201         unsigned_type recursive_merger_memory_out_block = sizeof(block_type);
01202         unsigned_type memory_for_buffers = memory_for_write_buffers
01203                                            + recursive_merger_memory_prefetch_buffers
01204                                            + recursive_merger_memory_out_block;
01205         // maximum arity in the recursive merger
01206         unsigned_type max_arity = (memory_to_use > memory_for_buffers ? memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
01207 
01208         unsigned_type nruns = sruns.runs.size();
01209         const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
01210         assert(merge_factor > 1);
01211         assert(merge_factor <= max_arity);
01212         while (nruns > max_arity)
01213         {
01214             unsigned_type new_nruns = div_ceil(nruns, merge_factor);
01215             STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
01216                           " opt_merge_factor: " << merge_factor << " max_arity: " << max_arity << " new_nruns: " << new_nruns);
01217 
01218             sorted_runs_type new_runs;
01219             new_runs.runs.resize(new_nruns);
01220             new_runs.runs_sizes.resize(new_nruns);
01221             new_runs.elements = sruns.elements;
01222 
01223             unsigned_type runs_left = nruns;
01224             unsigned_type cur_out_run = 0;
01225             unsigned_type elements_in_new_run = 0;
01226 
01227             while (runs_left > 0)
01228             {
01229                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01230                 elements_in_new_run = 0;
01231                 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
01232                 {
01233                     elements_in_new_run += sruns.runs_sizes[i];
01234                 }
01235                 const unsigned_type blocks_in_new_run1 = div_ceil(elements_in_new_run, block_type::size);
01236 
01237                 new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
01238                 // allocate run
01239                 new_runs.runs[cur_out_run++].resize(blocks_in_new_run1);
01240                 runs_left -= runs2merge;
01241             }
01242 
01243             // allocate blocks for the new runs
01244             for (unsigned_type i = 0; i < new_runs.runs.size(); ++i)
01245                 bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[i].begin()), make_bid_iterator(new_runs.runs[i].end()));
01246 
01247             // merge all
01248             runs_left = nruns;
01249             cur_out_run = 0;
01250             size_type elements_left = sruns.elements;
01251 
01252             while (runs_left > 0)
01253             {
01254                 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01255                 STXXL_VERBOSE("Merging " << runs2merge << " runs");
01256 
01257                 sorted_runs_type cur_runs;
01258                 cur_runs.runs.resize(runs2merge);
01259                 cur_runs.runs_sizes.resize(runs2merge);
01260 
01261                 std::copy(sruns.runs.begin() + nruns - runs_left,
01262                           sruns.runs.begin() + nruns - runs_left + runs2merge,
01263                           cur_runs.runs.begin());
01264                 std::copy(sruns.runs_sizes.begin() + nruns - runs_left,
01265                           sruns.runs_sizes.begin() + nruns - runs_left + runs2merge,
01266                           cur_runs.runs_sizes.begin());
01267 
01268                 runs_left -= runs2merge;
01269                 /*
01270                    cur_runs.elements = (runs_left)?
01271                    (new_runs.runs[cur_out_run].size()*block_type::size):
01272                    (elements_left);
01273                  */
01274                 cur_runs.elements = new_runs.runs_sizes[cur_out_run];
01275                 elements_left -= cur_runs.elements;
01276 
01277                 if (runs2merge > 1)
01278                 {
01279                     basic_runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, memory_to_use - memory_for_write_buffers);
01280 
01281                     {   // make sure everything is being destroyed in right time
01282                         buf_ostream<block_type, typename run_type::iterator> out(
01283                             new_runs.runs[cur_out_run].begin(),
01284                             nwrite_buffers);
01285 
01286                         size_type cnt = 0;
01287                         const size_type cnt_max = cur_runs.elements;
01288 
01289                         while (cnt != cnt_max)
01290                         {
01291                             *out = *merger;
01292                             if ((cnt % block_type::size) == 0) // have to write the trigger value
01293                                 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
01294 
01295                             ++cnt;
01296                             ++out;
01297                             ++merger;
01298                         }
01299                         assert(merger.empty());
01300 
01301                         while (cnt % block_type::size)
01302                         {
01303                             *out = cmp.max_value();
01304                             ++out;
01305                             ++cnt;
01306                         }
01307                     }
01308                 }
01309                 else
01310                 {
01311                     bm->delete_blocks(
01312                         make_bid_iterator(new_runs.runs.back().begin()),
01313                         make_bid_iterator(new_runs.runs.back().end()));
01314 
01315                     assert(cur_runs.runs.size() == 1);
01316 
01317                     std::copy(cur_runs.runs.front().begin(),
01318                               cur_runs.runs.front().end(),
01319                               new_runs.runs.back().begin());
01320                     new_runs.runs_sizes.back() = cur_runs.runs_sizes.front();
01321                 }
01322 
01323                 ++cur_out_run;
01324             }
01325             assert(elements_left == 0);
01326 
01327             nruns = new_nruns;
01328             sruns = new_runs;
01329         }
01330     }
01331 
01332 
01339     template <class RunsType_,
01340               class Cmp_,
01341               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
01342     class runs_merger : public basic_runs_merger<RunsType_, Cmp_, AllocStr_>
01343     {
01344     private:
01345         typedef RunsType_ sorted_runs_type;
01346         typedef basic_runs_merger<RunsType_, Cmp_, AllocStr_> base;
01347         typedef typename base::value_cmp value_cmp;
01348         typedef typename base::block_type block_type;
01349 
01350     public:
01355         runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
01356             base(r, c, memory_to_use)
01357         { }
01358     };
01359 
01360 
01362     //     SORT                                                           //
01364 
01372     template <class Input_,
01373               class Cmp_,
01374               unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
01375               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY,
01376               class runs_creator_type = runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> >
01377     class sort : public noncopyable
01378     {
01379         typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
01380         typedef runs_merger<sorted_runs_type, Cmp_, AllocStr_> runs_merger_type;
01381 
01382         runs_creator_type creator;
01383         runs_merger_type merger;
01384 
01385     public:
01387         typedef typename Input_::value_type value_type;
01388 
01393         sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) :
01394             creator(in, c, memory_to_use),
01395             merger(creator.result(), c, memory_to_use)
01396         {
01397             sort_helper::verify_sentinel_strict_weak_ordering(c);
01398         }
01399 
01405         sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) :
01406             creator(in, c, memory_to_use_rc),
01407             merger(creator.result(), c, memory_to_use_m)
01408         {
01409             sort_helper::verify_sentinel_strict_weak_ordering(c);
01410         }
01411 
01412 
01414         bool empty() const
01415         {
01416             return merger.empty();
01417         }
01418 
01420         const value_type & operator * () const
01421         {
01422             assert(!empty());
01423             return *merger;
01424         }
01425 
01426         const value_type * operator -> () const
01427         {
01428             assert(!empty());
01429             return merger.operator -> ();
01430         }
01431 
01433         sort & operator ++ ()
01434         {
01435             ++merger;
01436             return *this;
01437         }
01438     };
01439 
01444     template <
01445         class ValueType_,
01446         unsigned BlockSize_>
01447     class compute_sorted_runs_type
01448     {
01449         typedef ValueType_ value_type;
01450         typedef BID<BlockSize_> bid_type;
01451         typedef sort_helper::trigger_entry<bid_type, value_type> trigger_entry_type;
01452 
01453     public:
01454         typedef sorted_runs<trigger_entry_type> result;
01455     };
01456 
01458 }
01459 
01462 
01464 
01473 template <unsigned BlockSize,
01474           class RandomAccessIterator,
01475           class CmpType,
01476           class AllocStr>
01477 void sort(RandomAccessIterator begin,
01478           RandomAccessIterator end,
01479           CmpType cmp,
01480           unsigned_type MemSize,
01481           AllocStr AS)
01482 {
01483     STXXL_UNUSED(AS);
01484 #ifdef BOOST_MSVC
01485     typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
01486 #else
01487     typedef __typeof__(stream::streamify(begin, end)) InputType;
01488 #endif //BOOST_MSVC
01489     InputType Input(begin, end);
01490     typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type;
01491     sorter_type Sort(Input, cmp, MemSize);
01492     stream::materialize(Sort, begin);
01493 }
01494 
01496 
01497 __STXXL_END_NAMESPACE
01498 
01499 #endif // !STXXL_SORT_STREAM_HEADER
01500 // vim: et:ts=4:sw=4

Generated by  doxygen 1.7.1