• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • Examples
  • File List

sort.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/algo/sort.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2003 Roman Dementiev <[email protected]>
00007  *  Copyright (C) 2006 Johannes Singler <[email protected]>
00008  *  Copyright (C) 2008-2011 Andreas Beckmann <[email protected]>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #ifndef STXXL_SORT_HEADER
00016 #define STXXL_SORT_HEADER
00017 
00018 #include <functional>
00019 
00020 #include <stxxl/bits/mng/mng.h>
00021 #include <stxxl/bits/common/rand.h>
00022 #include <stxxl/bits/mng/adaptor.h>
00023 #include <stxxl/bits/common/simple_vector.h>
00024 #include <stxxl/bits/common/settings.h>
00025 #include <stxxl/bits/mng/block_alloc_interleaved.h>
00026 #include <stxxl/bits/io/request_operations.h>
00027 #include <stxxl/bits/algo/sort_base.h>
00028 #include <stxxl/bits/algo/sort_helper.h>
00029 #include <stxxl/bits/algo/intksort.h>
00030 #include <stxxl/bits/algo/adaptor.h>
00031 #include <stxxl/bits/algo/async_schedule.h>
00032 #include <stxxl/bits/mng/block_prefetcher.h>
00033 #include <stxxl/bits/mng/buf_writer.h>
00034 #include <stxxl/bits/algo/run_cursor.h>
00035 #include <stxxl/bits/algo/losertree.h>
00036 #include <stxxl/bits/algo/inmemsort.h>
00037 #include <stxxl/bits/parallel.h>
00038 #include <stxxl/bits/common/is_sorted.h>
00039 
00040 
00041 __STXXL_BEGIN_NAMESPACE
00042 
00045 
00046 
00049 namespace sort_local
00050 {
00051     template <typename block_type, typename bid_type>
00052     struct read_next_after_write_completed
00053     {
00054         block_type * block;
00055         bid_type bid;
00056         request_ptr * req;
00057         void operator () (request * /*completed_req*/)
00058         {
00059             *req = block->read(bid);
00060         }
00061     };
00062 
00063 
00064     template <
00065         typename block_type,
00066         typename run_type,
00067         typename input_bid_iterator,
00068         typename value_cmp>
00069     void
00070     create_runs(
00071         input_bid_iterator it,
00072         run_type ** runs,
00073         int_type nruns,
00074         int_type _m,
00075         value_cmp cmp)
00076     {
00077         typedef typename block_type::value_type type;
00078         typedef typename block_type::bid_type bid_type;
00079         STXXL_VERBOSE1("stxxl::create_runs nruns=" << nruns << " m=" << _m);
00080 
00081         int_type m2 = _m / 2;
00082         block_manager * bm = block_manager::get_instance();
00083         block_type * Blocks1 = new block_type[m2];
00084         block_type * Blocks2 = new block_type[m2];
00085         bid_type * bids1 = new bid_type[m2];
00086         bid_type * bids2 = new bid_type[m2];
00087         request_ptr * read_reqs1 = new request_ptr[m2];
00088         request_ptr * read_reqs2 = new request_ptr[m2];
00089         request_ptr * write_reqs = new request_ptr[m2];
00090         read_next_after_write_completed<block_type, bid_type> * next_run_reads =
00091             new read_next_after_write_completed<block_type, bid_type>[m2];
00092 
00093         disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00094 
00095         int_type i;
00096         int_type run_size = 0;
00097 
00098         assert(nruns >= 2);
00099 
00100         run_size = runs[0]->size();
00101         assert(run_size == m2);
00102 
00103         for (i = 0; i < run_size; ++i)
00104         {
00105             STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks1[i].elem));
00106             bids1[i] = *(it++);
00107             read_reqs1[i] = Blocks1[i].read(bids1[i]);
00108         }
00109 
00110         run_size = runs[1]->size();
00111 
00112         for (i = 0; i < run_size; ++i)
00113         {
00114             STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks2[i].elem));
00115             bids2[i] = *(it++);
00116             read_reqs2[i] = Blocks2[i].read(bids2[i]);
00117         }
00118 
00119         for (int_type k = 0; k < nruns - 1; ++k)
00120         {
00121             run_type * run = runs[k];
00122             run_size = run->size();
00123             assert(run_size == m2);
00124             #ifndef NDEBUG
00125             int_type next_run_size = runs[k + 1]->size();
00126             #endif
00127             assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2));
00128 
00129             STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
00130             wait_all(read_reqs1, run_size);
00131             STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
00132             for (i = 0; i < run_size; ++i)
00133                 bm->delete_block(bids1[i]);
00134 
00135             potentially_parallel::
00136             sort(make_element_iterator(Blocks1, 0),
00137                  make_element_iterator(Blocks1, run_size * block_type::size),
00138                  cmp);
00139 
00140             STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00141             if (k > 0)
00142                 wait_all(write_reqs, m2);
00143             STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00144 
00145             int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0;
00146             for (i = 0; i < m2; ++i)
00147             {
00148                 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
00149                 (*run)[i].value = Blocks1[i][0];
00150                 if (i >= runplus2size) {
00151                     write_reqs[i] = Blocks1[i].write((*run)[i].bid);
00152                 }
00153                 else
00154                 {
00155                     next_run_reads[i].block = Blocks1 + i;
00156                     next_run_reads[i].req = read_reqs1 + i;
00157                     bids1[i] = next_run_reads[i].bid = *(it++);
00158                     write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]);
00159                 }
00160             }
00161             std::swap(Blocks1, Blocks2);
00162             std::swap(bids1, bids2);
00163             std::swap(read_reqs1, read_reqs2);
00164         }
00165 
00166         run_type * run = runs[nruns - 1];
00167         run_size = run->size();
00168         STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
00169         wait_all(read_reqs1, run_size);
00170         STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
00171         for (i = 0; i < run_size; ++i)
00172             bm->delete_block(bids1[i]);
00173 
00174         potentially_parallel::
00175         sort(make_element_iterator(Blocks1, 0),
00176              make_element_iterator(Blocks1, run_size * block_type::size),
00177              cmp);
00178 
00179         STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00180         wait_all(write_reqs, m2);
00181         STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00182 
00183         for (i = 0; i < run_size; ++i)
00184         {
00185             STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
00186             (*run)[i].value = Blocks1[i][0];
00187             write_reqs[i] = Blocks1[i].write((*run)[i].bid);
00188         }
00189 
00190         STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00191         wait_all(write_reqs, run_size);
00192         STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00193 
00194         delete[] Blocks1;
00195         delete[] Blocks2;
00196         delete[] bids1;
00197         delete[] bids2;
00198         delete[] read_reqs1;
00199         delete[] read_reqs2;
00200         delete[] write_reqs;
00201         delete[] next_run_reads;
00202     }
00203 
00204 
00205     template <typename block_type, typename run_type, typename value_cmp>
00206     bool check_sorted_runs(run_type ** runs,
00207                            unsigned_type nruns,
00208                            unsigned_type m,
00209                            value_cmp cmp)
00210     {
00211         typedef typename block_type::value_type value_type;
00212 
00213         STXXL_MSG("check_sorted_runs  Runs: " << nruns);
00214         unsigned_type irun = 0;
00215         for (irun = 0; irun < nruns; ++irun)
00216         {
00217             const unsigned_type nblocks_per_run = runs[irun]->size();
00218             unsigned_type blocks_left = nblocks_per_run;
00219             block_type * blocks = new block_type[m];
00220             request_ptr * reqs = new request_ptr[m];
00221             value_type last = cmp.min_value();
00222 
00223             for (unsigned_type off = 0; off < nblocks_per_run; off += m)
00224             {
00225                 const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
00226                 const unsigned_type nelements = nblocks * block_type::size;
00227                 blocks_left -= nblocks;
00228 
00229                 for (unsigned_type j = 0; j < nblocks; ++j)
00230                 {
00231                     reqs[j] = blocks[j].read((*runs[irun])[j + off].bid);
00232                 }
00233                 wait_all(reqs, reqs + nblocks);
00234 
00235                 if (off && cmp(blocks[0][0], last))
00236                 {
00237                     STXXL_MSG("check_sorted_runs  wrong first value in the run " << irun);
00238                     STXXL_MSG(" first value: " << blocks[0][0]);
00239                     STXXL_MSG(" last  value: " << last);
00240                     for (unsigned_type k = 0; k < block_type::size; ++k)
00241                         STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k]);
00242 
00243                     return false;
00244                 }
00245 
00246                 for (unsigned_type j = 0; j < nblocks; ++j)
00247                 {
00248                     if (!(blocks[j][0] == (*runs[irun])[j + off].value))
00249                     {
00250                         STXXL_MSG("check_sorted_runs  wrong trigger in the run " << irun << " block " << (j + off));
00251                         STXXL_MSG("                   trigger value: " << (*runs[irun])[j + off].value);
00252                         STXXL_MSG("Data in the block:");
00253                         for (unsigned_type k = 0; k < block_type::size; ++k)
00254                             STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k]);
00255 
00256                         STXXL_MSG("BIDS:");
00257                         for (unsigned_type k = 0; k < nblocks; ++k)
00258                         {
00259                             if (k == j)
00260                                 STXXL_MSG("Bad one comes next.");
00261                             STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00262                         }
00263 
00264                         return false;
00265                     }
00266                 }
00267                 if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
00268                                       make_element_iterator(blocks, nelements),
00269                                       cmp))
00270                 {
00271                     STXXL_MSG("check_sorted_runs  wrong order in the run " << irun);
00272                     STXXL_MSG("Data in blocks:");
00273                     for (unsigned_type j = 0; j < nblocks; ++j)
00274                     {
00275                         for (unsigned_type k = 0; k < block_type::size; ++k)
00276                             STXXL_MSG("     Element " << k << " in block " << (j + off) << " is :" << blocks[j][k]);
00277                     }
00278                     STXXL_MSG("BIDS:");
00279                     for (unsigned_type k = 0; k < nblocks; ++k)
00280                     {
00281                         STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00282                     }
00283 
00284                     return false;
00285                 }
00286 
00287                 last = blocks[nblocks - 1][block_type::size - 1];
00288             }
00289 
00290             assert(blocks_left == 0);
00291             delete[] reqs;
00292             delete[] blocks;
00293         }
00294 
00295         return true;
00296     }
00297 
00298 
00299     template <typename block_type, typename run_type, typename value_cmp>
00300     void merge_runs(run_type ** in_runs, int_type nruns, run_type * out_run, unsigned_type _m, value_cmp cmp)
00301     {
00302         typedef typename block_type::value_type value_type;
00303         typedef typename run_type::value_type trigger_entry_type;
00304         typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00305         typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00306         typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00307 
00308         run_type consume_seq(out_run->size());
00309 
00310         int_type * prefetch_seq = new int_type[out_run->size()];
00311 
00312         typename run_type::iterator copy_start = consume_seq.begin();
00313         for (int_type i = 0; i < nruns; i++)
00314         {
00315             // TODO: try to avoid copy
00316             copy_start = std::copy(
00317                 in_runs[i]->begin(),
00318                 in_runs[i]->end(),
00319                 copy_start);
00320         }
00321 
00322         std::stable_sort(consume_seq.begin(), consume_seq.end(),
00323                          sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
00324 
00325         int_type disks_number = config::get_instance()->disks_number();
00326 
00327 #ifdef PLAY_WITH_OPT_PREF
00328         const int_type n_write_buffers = 4 * disks_number;
00329 #else
00330         const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (3 * (int_type(_m) - nruns) / 4));
00331         const int_type n_write_buffers = STXXL_MAX(2 * disks_number, int_type(_m) - nruns - n_prefetch_buffers);
00332  #if STXXL_SORT_OPTIMAL_PREFETCHING
00333         // heuristic
00334         const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
00335  #endif
00336 #endif
00337 
00338 #if STXXL_SORT_OPTIMAL_PREFETCHING
00339         compute_prefetch_schedule(
00340             consume_seq,
00341             prefetch_seq,
00342             n_opt_prefetch_buffers,
00343             disks_number);
00344 #else
00345         for (unsigned_type i = 0; i < out_run->size(); i++)
00346             prefetch_seq[i] = i;
00347 
00348 #endif
00349 
00350         prefetcher_type prefetcher(consume_seq.begin(),
00351                                    consume_seq.end(),
00352                                    prefetch_seq,
00353                                    nruns + n_prefetch_buffers);
00354 
00355         buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
00356 
00357         int_type out_run_size = out_run->size();
00358 
00359         block_type * out_buffer = writer.get_free_block();
00360 
00361 //If parallelism is activated, one can still fall back to the
00362 //native merge routine by setting stxxl::SETTINGS::native_merge= true, //otherwise, it is used anyway.
00363 
00364         if (do_parallel_merge())
00365         {
00366 #if STXXL_PARALLEL_MULTIWAY_MERGE
00367 
00368 // begin of STL-style merging
00369 
00370             typedef stxxl::int64 diff_type;
00371             typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00372             typedef typename std::vector<sequence>::size_type seqs_size_type;
00373             std::vector<sequence> seqs(nruns);
00374             std::vector<block_type *> buffers(nruns);
00375 
00376             for (int_type i = 0; i < nruns; i++)                // initialize sequences
00377             {
00378                 buffers[i] = prefetcher.pull_block();           // get first block of each run
00379                 seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end());
00380                 // this memory location stays the same, only the data is exchanged
00381             }
00382 
00383  #if STXXL_CHECK_ORDER_IN_SORTS
00384             value_type last_elem = cmp.min_value();
00385  #endif
00386             diff_type num_currently_mergeable = 0;
00387 
00388             for (int_type j = 0; j < out_run_size; ++j)                 // for the whole output run, out_run_size is in blocks
00389             {
00390                 diff_type rest = block_type::size;                      // elements still to merge for this output block
00391 
00392                 STXXL_VERBOSE1("output block " << j);
00393                 do {
00394                     if (num_currently_mergeable < rest)
00395                     {
00396                         if (prefetcher.empty())
00397                         {
00398                             // anything remaining is already in memory
00399                             num_currently_mergeable = (out_run_size - j) * block_type::size
00400                                                       - (block_type::size - rest);
00401                         }
00402                         else
00403                         {
00404                             num_currently_mergeable = sort_helper::count_elements_less_equal(
00405                                 seqs, consume_seq[prefetcher.pos()].value, cmp);
00406                         }
00407                     }
00408 
00409                     diff_type output_size = STXXL_MIN(num_currently_mergeable, rest);   // at most rest elements
00410 
00411                     STXXL_VERBOSE1("before merge " << output_size);
00412 
00413                     stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size);
00414                     // sequence iterators are progressed appropriately
00415 
00416                     rest -= output_size;
00417                     num_currently_mergeable -= output_size;
00418 
00419                     STXXL_VERBOSE1("after merge");
00420 
00421                     sort_helper::refill_or_remove_empty_sequences(seqs, buffers, prefetcher);
00422                 } while (rest > 0 && seqs.size() > 0);
00423 
00424  #if STXXL_CHECK_ORDER_IN_SORTS
00425                 if (!stxxl::is_sorted(out_buffer->begin(), out_buffer->end(), cmp))
00426                 {
00427                     for (value_type * i = out_buffer->begin() + 1; i != out_buffer->end(); i++)
00428                         if (cmp(*i, *(i - 1)))
00429                         {
00430                             STXXL_VERBOSE1("Error at position " << (i - out_buffer->begin()));
00431                         }
00432                     assert(false);
00433                 }
00434 
00435                 if (j > 0) // do not check in first iteration
00436                     assert(cmp((*out_buffer)[0], last_elem) == false);
00437 
00438                 last_elem = (*out_buffer)[block_type::size - 1];
00439  #endif
00440 
00441                 (*out_run)[j].value = (*out_buffer)[0];                              // save smallest value
00442 
00443                 out_buffer = writer.write(out_buffer, (*out_run)[j].bid);
00444             }
00445 
00446 // end of STL-style merging
00447 
00448 #else
00449             STXXL_THROW_UNREACHABLE();
00450 #endif
00451         }
00452         else
00453         {
00454 // begin of native merging procedure
00455 
00456             loser_tree<run_cursor_type, run_cursor2_cmp_type>
00457             losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp));
00458 
00459 #if STXXL_CHECK_ORDER_IN_SORTS
00460             value_type last_elem = cmp.min_value();
00461 #endif
00462 
00463             for (int_type i = 0; i < out_run_size; ++i)
00464             {
00465                 losers.multi_merge(out_buffer->elem, out_buffer->elem + block_type::size);
00466                 (*out_run)[i].value = *(out_buffer->elem);
00467 
00468 #if STXXL_CHECK_ORDER_IN_SORTS
00469                 assert(stxxl::is_sorted(
00470                            out_buffer->begin(),
00471                            out_buffer->end(),
00472                            cmp));
00473 
00474                 if (i)
00475                     assert(cmp(*(out_buffer->elem), last_elem) == false);
00476 
00477                 last_elem = (*out_buffer).elem[block_type::size - 1];
00478 #endif
00479 
00480                 out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
00481             }
00482 
00483 // end of native merging procedure
00484         }
00485 
00486         delete[] prefetch_seq;
00487 
00488         block_manager * bm = block_manager::get_instance();
00489         for (int_type i = 0; i < nruns; ++i)
00490         {
00491             unsigned_type sz = in_runs[i]->size();
00492             for (unsigned_type j = 0; j < sz; ++j)
00493                 bm->delete_block((*in_runs[i])[j].bid);
00494 
00495 
00496             delete in_runs[i];
00497         }
00498     }
00499 
00500 
00501     template <typename block_type,
00502               typename alloc_strategy,
00503               typename input_bid_iterator,
00504               typename value_cmp>
00505     simple_vector<sort_helper::trigger_entry<block_type> > *
00506     sort_blocks(input_bid_iterator input_bids,
00507                 unsigned_type _n,
00508                 unsigned_type _m,
00509                 value_cmp cmp
00510                 )
00511     {
00512         typedef typename block_type::value_type type;
00513         typedef typename block_type::bid_type bid_type;
00514         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00515         typedef simple_vector<trigger_entry_type> run_type;
00516         typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
00517 
00518         unsigned_type m2 = _m / 2;
00519         unsigned_type full_runs = _n / m2;
00520         unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
00521         unsigned_type nruns = full_runs + partial_runs;
00522         unsigned_type i;
00523 
00524         block_manager * mng = block_manager::get_instance();
00525 
00526         //STXXL_VERBOSE ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs);
00527 
00528         double begin = timestamp(), after_runs_creation, end;
00529 
00530         run_type ** runs = new run_type *[nruns];
00531 
00532         for (i = 0; i < full_runs; i++)
00533             runs[i] = new run_type(m2);
00534 
00535 
00536         if (partial_runs)
00537             runs[i] = new run_type(_n - full_runs * m2);
00538 
00539         for (i = 0; i < nruns; ++i)
00540             mng->new_blocks(alloc_strategy(), make_bid_iterator(runs[i]->begin()), make_bid_iterator(runs[i]->end()));
00541 
00542         sort_local::create_runs<block_type,
00543                                 run_type,
00544                                 input_bid_iterator,
00545                                 value_cmp>(input_bids, runs, nruns, _m, cmp);
00546 
00547         after_runs_creation = timestamp();
00548 
00549         double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
00550 
00551         disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00552 
00553         const int_type merge_factor = optimal_merge_factor(nruns, _m);
00554         run_type ** new_runs;
00555 
00556         while (nruns > 1)
00557         {
00558             int_type new_nruns = div_ceil(nruns, merge_factor);
00559             STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
00560                           " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
00561 
00562             new_runs = new run_type *[new_nruns];
00563 
00564             int_type runs_left = nruns;
00565             int_type cur_out_run = 0;
00566             int_type blocks_in_new_run = 0;
00567 
00568             while (runs_left > 0)
00569             {
00570                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00571                 blocks_in_new_run = 0;
00572                 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
00573                     blocks_in_new_run += runs[i]->size();
00574 
00575                 // allocate run
00576                 new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
00577                 runs_left -= runs2merge;
00578             }
00579             // allocate blocks for the new runs
00580             if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && !input_bids->is_managed())
00581             {
00582                 // if we sort a file we can reuse the input bids for the output
00583                 input_bid_iterator cur = input_bids;
00584                 for (int_type i = 0; cur != (input_bids + _n); ++cur)
00585                 {
00586                     (*new_runs[0])[i++].bid = *cur;
00587                 }
00588 
00589                 bid_type & firstBID = (*new_runs[0])[0].bid;
00590                 if (firstBID.is_managed())
00591                 {
00592                     // the first block does not belong to the file
00593                     // need to reallocate it
00594                     mng->new_block(FR(), firstBID);
00595                 }
00596                 bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
00597                 if (lastBID.is_managed())
00598                 {
00599                     // the first block does not belong to the file
00600                     // need to reallocate it
00601                     mng->new_block(FR(), lastBID);
00602                 }
00603             }
00604             else
00605             {
00606                 mng->new_blocks(interleaved_alloc_strategy(new_nruns, alloc_strategy()),
00607                                 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
00608                                 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
00609             }
00610             // merge all
00611             runs_left = nruns;
00612             cur_out_run = 0;
00613             while (runs_left > 0)
00614             {
00615                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00616 #if STXXL_CHECK_ORDER_IN_SORTS
00617                 assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp)));
00618 #endif
00619                 STXXL_VERBOSE("Merging " << runs2merge << " runs");
00620                 merge_runs<block_type, run_type>(runs + nruns - runs_left,
00621                                                  runs2merge, *(new_runs + (cur_out_run++)), _m, cmp
00622                                                  );
00623                 runs_left -= runs2merge;
00624             }
00625 
00626             nruns = new_nruns;
00627             delete[] runs;
00628             runs = new_runs;
00629         }
00630 
00631         run_type * result = *runs;
00632         delete[] runs;
00633 
00634         end = timestamp();
00635 
00636         STXXL_VERBOSE("Elapsed time        : " << end - begin << " s. Run creation time: " <<
00637                       after_runs_creation - begin << " s");
00638         STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
00639         STXXL_VERBOSE(*stats::get_instance());
00640         STXXL_UNUSED(begin + after_runs_creation + end + io_wait_after_rf);
00641 
00642         return result;
00643     }
00644 }
00645 
00646 
00692 
00693 
00694 
00695 
00696 
00697 
00698 
00699 template <typename ExtIterator_, typename StrictWeakOrdering_>
00700 void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M)
00701 {
00702     sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00703 
00704     typedef simple_vector<sort_helper::trigger_entry<typename ExtIterator_::block_type> > run_type;
00705 
00706     typedef typename ExtIterator_::vector_type::value_type value_type;
00707     typedef typename ExtIterator_::block_type block_type;
00708 
00709     unsigned_type n = 0;
00710 
00711     block_manager * mng = block_manager::get_instance();
00712 
00713     first.flush();
00714 
00715     if ((last - first) * sizeof(value_type) * sort_memory_usage_factor() < M)
00716     {
00717         stl_in_memory_sort(first, last, cmp);
00718     }
00719     else
00720     {
00721         if (!(2 * block_type::raw_size * sort_memory_usage_factor() <= M)) {
00722             throw bad_parameter("stxxl::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
00723         }
00724 
00725         if (first.block_offset())
00726         {
00727             if (last.block_offset())              // first and last element are
00728             // not the first elements of their block
00729             {
00730                 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00731                 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00732                 typename ExtIterator_::bid_type first_bid, last_bid;
00733                 request_ptr req;
00734 
00735                 req = first_block->read(*first.bid());
00736                 mng->new_block(FR(), first_bid);                // try to overlap
00737                 mng->new_block(FR(), last_bid);
00738                 req->wait();
00739 
00740 
00741                 req = last_block->read(*last.bid());
00742 
00743                 unsigned_type i = 0;
00744                 for ( ; i < first.block_offset(); ++i)
00745                 {
00746                     first_block->elem[i] = cmp.min_value();
00747                 }
00748 
00749                 req->wait();
00750 
00751 
00752                 req = first_block->write(first_bid);
00753                 for (i = last.block_offset(); i < block_type::size; ++i)
00754                 {
00755                     last_block->elem[i] = cmp.max_value();
00756                 }
00757 
00758                 req->wait();
00759 
00760 
00761                 req = last_block->write(last_bid);
00762 
00763                 n = last.bid() - first.bid() + 1;
00764 
00765                 std::swap(first_bid, *first.bid());
00766                 std::swap(last_bid, *last.bid());
00767 
00768                 req->wait();
00769 
00770 
00771                 delete first_block;
00772                 delete last_block;
00773 
00774                 run_type * out =
00775                     sort_local::sort_blocks<
00776                         typename ExtIterator_::block_type,
00777                         typename ExtIterator_::vector_type::alloc_strategy_type,
00778                         typename ExtIterator_::bids_container_iterator>
00779                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00780 
00781 
00782                 first_block = new typename ExtIterator_::block_type;
00783                 last_block = new typename ExtIterator_::block_type;
00784                 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00785                 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
00786                 request_ptr * reqs = new request_ptr[2];
00787 
00788                 reqs[0] = first_block->read(first_bid);
00789                 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00790 
00791                 reqs[0]->wait();
00792                 reqs[1]->wait();
00793 
00794                 reqs[0] = last_block->read(last_bid);
00795                 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
00796 
00797                 for (i = first.block_offset(); i < block_type::size; i++)
00798                 {
00799                     first_block->elem[i] = sorted_first_block->elem[i];
00800                 }
00801 
00802                 reqs[0]->wait();
00803                 reqs[1]->wait();
00804 
00805                 req = first_block->write(first_bid);
00806 
00807                 for (i = 0; i < last.block_offset(); ++i)
00808                 {
00809                     last_block->elem[i] = sorted_last_block->elem[i];
00810                 }
00811 
00812                 req->wait();
00813 
00814                 req = last_block->write(last_bid);
00815 
00816                 mng->delete_block(out->begin()->bid);
00817                 mng->delete_block((*out)[out->size() - 1].bid);
00818 
00819                 *first.bid() = first_bid;
00820                 *last.bid() = last_bid;
00821 
00822                 typename run_type::iterator it = out->begin();
00823                 ++it;
00824                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00825                 ++cur_bid;
00826 
00827                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00828                 {
00829                     *cur_bid = (*it).bid;
00830                 }
00831 
00832                 delete first_block;
00833                 delete sorted_first_block;
00834                 delete sorted_last_block;
00835                 delete[] reqs;
00836                 delete out;
00837 
00838                 req->wait();
00839 
00840 
00841                 delete last_block;
00842             }
00843             else
00844             {
00845                 // first element is
00846                 // not the first element of its block
00847                 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00848                 typename ExtIterator_::bid_type first_bid;
00849                 request_ptr req;
00850 
00851                 req = first_block->read(*first.bid());
00852                 mng->new_block(FR(), first_bid);                // try to overlap
00853                 req->wait();
00854 
00855 
00856                 unsigned_type i = 0;
00857                 for ( ; i < first.block_offset(); ++i)
00858                 {
00859                     first_block->elem[i] = cmp.min_value();
00860                 }
00861 
00862                 req = first_block->write(first_bid);
00863 
00864                 n = last.bid() - first.bid();
00865 
00866                 std::swap(first_bid, *first.bid());
00867 
00868                 req->wait();
00869 
00870 
00871                 delete first_block;
00872 
00873                 run_type * out =
00874                     sort_local::sort_blocks<
00875                         typename ExtIterator_::block_type,
00876                         typename ExtIterator_::vector_type::alloc_strategy_type,
00877                         typename ExtIterator_::bids_container_iterator>
00878                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00879 
00880 
00881                 first_block = new typename ExtIterator_::block_type;
00882 
00883                 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00884 
00885                 request_ptr * reqs = new request_ptr[2];
00886 
00887                 reqs[0] = first_block->read(first_bid);
00888                 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00889 
00890                 reqs[0]->wait();
00891                 reqs[1]->wait();
00892 
00893                 for (i = first.block_offset(); i < block_type::size; ++i)
00894                 {
00895                     first_block->elem[i] = sorted_first_block->elem[i];
00896                 }
00897 
00898                 req = first_block->write(first_bid);
00899 
00900                 mng->delete_block(out->begin()->bid);
00901 
00902                 *first.bid() = first_bid;
00903 
00904                 typename run_type::iterator it = out->begin();
00905                 ++it;
00906                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00907                 ++cur_bid;
00908 
00909                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00910                 {
00911                     *cur_bid = (*it).bid;
00912                 }
00913 
00914                 *cur_bid = (*it).bid;
00915 
00916                 delete sorted_first_block;
00917                 delete[] reqs;
00918                 delete out;
00919 
00920                 req->wait();
00921 
00922                 delete first_block;
00923             }
00924         }
00925         else
00926         {
00927             if (last.block_offset())            // last is
00928             // not the first element of its block
00929             {
00930                 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00931                 typename ExtIterator_::bid_type last_bid;
00932                 request_ptr req;
00933                 unsigned_type i;
00934 
00935                 req = last_block->read(*last.bid());
00936                 mng->new_block(FR(), last_bid);
00937                 req->wait();
00938 
00939 
00940                 for (i = last.block_offset(); i < block_type::size; ++i)
00941                 {
00942                     last_block->elem[i] = cmp.max_value();
00943                 }
00944 
00945                 req = last_block->write(last_bid);
00946 
00947                 n = last.bid() - first.bid() + 1;
00948 
00949                 std::swap(last_bid, *last.bid());
00950 
00951                 req->wait();
00952 
00953 
00954                 delete last_block;
00955 
00956                 run_type * out =
00957                     sort_local::sort_blocks<
00958                         typename ExtIterator_::block_type,
00959                         typename ExtIterator_::vector_type::alloc_strategy_type,
00960                         typename ExtIterator_::bids_container_iterator>
00961                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00962 
00963 
00964                 last_block = new typename ExtIterator_::block_type;
00965                 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
00966                 request_ptr * reqs = new request_ptr[2];
00967 
00968                 reqs[0] = last_block->read(last_bid);
00969                 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
00970 
00971                 reqs[0]->wait();
00972                 reqs[1]->wait();
00973 
00974                 for (i = 0; i < last.block_offset(); ++i)
00975                 {
00976                     last_block->elem[i] = sorted_last_block->elem[i];
00977                 }
00978 
00979                 req = last_block->write(last_bid);
00980 
00981                 mng->delete_block((*out)[out->size() - 1].bid);
00982 
00983                 *last.bid() = last_bid;
00984 
00985                 typename run_type::iterator it = out->begin();
00986                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00987 
00988                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00989                 {
00990                     *cur_bid = (*it).bid;
00991                 }
00992 
00993                 delete sorted_last_block;
00994                 delete[] reqs;
00995                 delete out;
00996 
00997                 req->wait();
00998 
00999                 delete last_block;
01000             }
01001             else
01002             {
01003                 // first and last element are first elements of their of blocks
01004                 n = last.bid() - first.bid();
01005 
01006                 run_type * out =
01007                     sort_local::sort_blocks<typename ExtIterator_::block_type,
01008                                             typename ExtIterator_::vector_type::alloc_strategy_type,
01009                                             typename ExtIterator_::bids_container_iterator>
01010                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
01011 
01012                 typename run_type::iterator it = out->begin();
01013                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
01014 
01015                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
01016                 {
01017                     *cur_bid = (*it).bid;
01018                 }
01019 
01020                 delete out;
01021             }
01022         }
01023     }
01024 
01025 #if STXXL_CHECK_ORDER_IN_SORTS
01026     typedef typename ExtIterator_::const_iterator const_iterator;
01027     assert(stxxl::is_sorted(const_iterator(first), const_iterator(last), cmp));
01028 #endif
01029 }
01030 
01032 
01033 __STXXL_END_NAMESPACE
01034 
01035 #endif // !STXXL_SORT_HEADER
01036 // vim: et:ts=4:sw=4

Generated by  doxygen 1.7.1