00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #ifndef STXXL_SORT_HEADER
00016 #define STXXL_SORT_HEADER
00017
00018 #include <list>
00019 #include <functional>
00020
00021 #include <stxxl/bits/mng/mng.h>
00022 #include <stxxl/bits/common/rand.h>
00023 #include <stxxl/bits/mng/adaptor.h>
00024 #include <stxxl/bits/common/simple_vector.h>
00025 #include <stxxl/bits/common/switch.h>
00026 #include <stxxl/bits/common/settings.h>
00027 #include <stxxl/bits/mng/block_alloc_interleaved.h>
00028 #include <stxxl/bits/algo/intksort.h>
00029 #include <stxxl/bits/algo/adaptor.h>
00030 #include <stxxl/bits/algo/async_schedule.h>
00031 #include <stxxl/bits/mng/block_prefetcher.h>
00032 #include <stxxl/bits/mng/buf_writer.h>
00033 #include <stxxl/bits/algo/run_cursor.h>
00034 #include <stxxl/bits/algo/losertree.h>
00035 #include <stxxl/bits/algo/inmemsort.h>
00036 #include <stxxl/bits/parallel.h>
00037
00038
00039
00040
00041
00042
00043 __STXXL_BEGIN_NAMESPACE
00044
00047
00048
00051 namespace sort_local
00052 {
00053 template <typename BIDTp_, typename ValTp_>
00054 struct trigger_entry
00055 {
00056 typedef BIDTp_ bid_type;
00057 typedef ValTp_ value_type;
00058
00059 bid_type bid;
00060 value_type value;
00061
00062 operator bid_type ()
00063 {
00064 return bid;
00065 }
00066 };
00067
00068 template <typename BIDTp_, typename ValTp_, typename ValueCmp_>
00069 struct trigger_entry_cmp : public std::binary_function<trigger_entry<BIDTp_, ValTp_>, trigger_entry<BIDTp_, ValTp_>, bool>
00070 {
00071 typedef trigger_entry<BIDTp_, ValTp_> trigger_entry_type;
00072 ValueCmp_ cmp;
00073 trigger_entry_cmp(ValueCmp_ c) : cmp(c) { }
00074 trigger_entry_cmp(const trigger_entry_cmp & a) : cmp(a.cmp) { }
00075 bool operator () (const trigger_entry_type & a, const trigger_entry_type & b) const
00076 {
00077 return cmp(a.value, b.value);
00078 }
00079 };
00080
00081 template <typename block_type,
00082 typename prefetcher_type,
00083 typename value_cmp>
00084 struct run_cursor2_cmp
00085 {
00086 typedef run_cursor2<block_type, prefetcher_type> cursor_type;
00087 value_cmp cmp;
00088
00089 run_cursor2_cmp(value_cmp c) : cmp(c) { }
00090 run_cursor2_cmp(const run_cursor2_cmp & a) : cmp(a.cmp) { }
00091 inline bool operator () (const cursor_type & a, const cursor_type & b)
00092 {
00093 if (UNLIKELY(b.empty()))
00094 return true;
00095
00096 if (UNLIKELY(a.empty()))
00097 return false;
00098
00099
00100 return (cmp(a.current(), b.current()));
00101 }
00102 };
00103
00104 template <typename block_type, typename bid_type>
00105 struct read_next_after_write_completed
00106 {
00107 block_type * block;
00108 bid_type bid;
00109 request_ptr * req;
00110 void operator () (request * )
00111 {
00112 * req = block->read(bid);
00113 }
00114 };
00115
00116
00117 template <
00118 typename block_type,
00119 typename run_type,
00120 typename input_bid_iterator,
00121 typename value_cmp>
00122 void
00123 create_runs(
00124 input_bid_iterator it,
00125 run_type ** runs,
00126 int_type nruns,
00127 int_type _m,
00128 value_cmp cmp)
00129 {
00130 typedef typename block_type::value_type type;
00131 typedef typename block_type::bid_type bid_type;
00132 STXXL_VERBOSE1("stxxl::create_runs nruns=" << nruns << " m=" << _m);
00133
00134 int_type m2 = _m / 2;
00135 block_manager * bm = block_manager::get_instance();
00136 block_type * Blocks1 = new block_type[m2];
00137 block_type * Blocks2 = new block_type[m2];
00138 bid_type * bids1 = new bid_type[m2];
00139 bid_type * bids2 = new bid_type[m2];
00140 request_ptr * read_reqs1 = new request_ptr[m2];
00141 request_ptr * read_reqs2 = new request_ptr[m2];
00142 request_ptr * write_reqs = new request_ptr[m2];
00143 read_next_after_write_completed<block_type, bid_type> * next_run_reads =
00144 new read_next_after_write_completed<block_type, bid_type>[m2];
00145
00146 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00147
00148 int_type i;
00149 int_type run_size = 0, next_run_size = 0;
00150
00151 assert(nruns >= 2);
00152
00153 run_size = runs[0]->size();
00154 assert(run_size == m2);
00155
00156 for (i = 0; i < run_size; ++i)
00157 {
00158 STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks1[i].elem));
00159 bids1[i] = *(it++);
00160 read_reqs1[i] = Blocks1[i].read(bids1[i]);
00161 }
00162
00163 run_size = runs[1]->size();
00164
00165 for (i = 0; i < run_size; ++i)
00166 {
00167 STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks2[i].elem));
00168 bids2[i] = *(it++);
00169 read_reqs2[i] = Blocks2[i].read(bids2[i]);
00170 }
00171
00172 for (int_type k = 0; k < nruns - 1; ++k)
00173 {
00174 run_type * run = runs[k];
00175 run_size = run->size();
00176 assert(run_size == m2);
00177 next_run_size = runs[k + 1]->size();
00178 assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2));
00179
00180 STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
00181 wait_all(read_reqs1, run_size);
00182 STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
00183 for (i = 0; i < run_size; ++i)
00184 bm->delete_block(bids1[i]);
00185
00186 if (block_type::has_filler)
00187 std::sort(
00188 ArrayOfSequencesIterator<
00189 block_type, typename block_type::value_type, block_type::size
00190 >(Blocks1, 0),
00191 ArrayOfSequencesIterator<
00192 block_type, typename block_type::value_type, block_type::size
00193 >(Blocks1, run_size * block_type::size),
00194 cmp);
00195 else
00196 std::sort(Blocks1[0].elem, Blocks1[run_size].elem, cmp);
00197
00198
00199 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00200 if (k > 0)
00201 wait_all(write_reqs, m2);
00202 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00203
00204 int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0;
00205 for (i = 0; i < m2; ++i)
00206 {
00207 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
00208 (*run)[i].value = Blocks1[i][0];
00209 if (i >= runplus2size) {
00210 write_reqs[i] = Blocks1[i].write((*run)[i].bid);
00211 }
00212 else
00213 {
00214 next_run_reads[i].block = Blocks1 + i;
00215 next_run_reads[i].req = read_reqs1 + i;
00216 bids1[i] = next_run_reads[i].bid = *(it++);
00217 write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]);
00218 }
00219 }
00220 std::swap(Blocks1, Blocks2);
00221 std::swap(bids1, bids2);
00222 std::swap(read_reqs1, read_reqs2);
00223 }
00224
00225 run_type * run = runs[nruns - 1];
00226 run_size = run->size();
00227 STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
00228 wait_all(read_reqs1, run_size);
00229 STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
00230 for (i = 0; i < run_size; ++i)
00231 bm->delete_block(bids1[i]);
00232
00233 if (block_type::has_filler) {
00234 std::sort(
00235 ArrayOfSequencesIterator<
00236 block_type, typename block_type::value_type, block_type::size
00237 >(Blocks1, 0),
00238 ArrayOfSequencesIterator<
00239 block_type, typename block_type::value_type, block_type::size
00240 >(Blocks1, run_size * block_type::size),
00241 cmp);
00242 } else {
00243 std::sort(Blocks1[0].elem, Blocks1[run_size].elem, cmp);
00244 }
00245
00246 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00247 wait_all(write_reqs, m2);
00248 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00249
00250 for (i = 0; i < run_size; ++i)
00251 {
00252 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
00253 (*run)[i].value = Blocks1[i][0];
00254 write_reqs[i] = Blocks1[i].write((*run)[i].bid);
00255 }
00256
00257 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00258 wait_all(write_reqs, run_size);
00259 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00260
00261 delete[] Blocks1;
00262 delete[] Blocks2;
00263 delete[] bids1;
00264 delete[] bids2;
00265 delete[] read_reqs1;
00266 delete[] read_reqs2;
00267 delete[] write_reqs;
00268 delete[] next_run_reads;
00269 }
00270
00271
00272 template <typename block_type, typename run_type, typename value_cmp>
00273 bool check_sorted_runs(run_type ** runs,
00274 unsigned_type nruns,
00275 unsigned_type m,
00276 value_cmp cmp)
00277 {
00278 typedef typename block_type::value_type value_type;
00279
00280
00281 STXXL_MSG("check_sorted_runs Runs: " << nruns);
00282 unsigned_type irun = 0;
00283 for (irun = 0; irun < nruns; ++irun)
00284 {
00285 const unsigned_type nblocks_per_run = runs[irun]->size();
00286 unsigned_type blocks_left = nblocks_per_run;
00287 block_type * blocks = new block_type[m];
00288 request_ptr * reqs = new request_ptr[m];
00289 value_type last = cmp.min_value();
00290
00291 for (unsigned_type off = 0; off < nblocks_per_run; off += m)
00292 {
00293 const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
00294 const unsigned_type nelements = nblocks * block_type::size;
00295 blocks_left -= nblocks;
00296
00297 for (unsigned_type j = 0; j < nblocks; ++j)
00298 {
00299 reqs[j] = blocks[j].read((*runs[irun])[j + off].bid);
00300 }
00301 wait_all(reqs, reqs + nblocks);
00302
00303 if (off && cmp(blocks[0][0], last))
00304 {
00305 STXXL_MSG("check_sorted_runs wrong first value in the run " << irun);
00306 STXXL_MSG(" first value: " << blocks[0][0]);
00307 STXXL_MSG(" last value: " << last);
00308 for (unsigned_type k = 0; k < block_type::size; ++k)
00309 STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k]);
00310
00311 return false;
00312 }
00313
00314 for (unsigned_type j = 0; j < nblocks; ++j)
00315 {
00316 if (!(blocks[j][0] == (*runs[irun])[j + off].value))
00317 {
00318 STXXL_MSG("check_sorted_runs wrong trigger in the run " << irun << " block " << (j + off));
00319 STXXL_MSG(" trigger value: " << (*runs[irun])[j + off].value);
00320 STXXL_MSG("Data in the block:");
00321 for (unsigned_type k = 0; k < block_type::size; ++k)
00322 STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k]);
00323
00324 STXXL_MSG("BIDS:");
00325 for (unsigned_type k = 0; k < nblocks; ++k)
00326 {
00327 if (k == j)
00328 STXXL_MSG("Bad one comes next.");
00329 STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00330 }
00331
00332 return false;
00333 }
00334 }
00335 if (!stxxl::is_sorted(
00336 ArrayOfSequencesIterator<
00337 block_type, typename block_type::value_type, block_type::size
00338 >(blocks, 0),
00339 ArrayOfSequencesIterator<
00340 block_type, typename block_type::value_type, block_type::size
00341 >(blocks, nelements),
00342 cmp))
00343 {
00344 STXXL_MSG("check_sorted_runs wrong order in the run " << irun);
00345 STXXL_MSG("Data in blocks:");
00346 for (unsigned_type j = 0; j < nblocks; ++j)
00347 {
00348 for (unsigned_type k = 0; k < block_type::size; ++k)
00349 STXXL_MSG(" Element " << k << " in block " << (j + off) << " is :" << blocks[j][k]);
00350 }
00351 STXXL_MSG("BIDS:");
00352 for (unsigned_type k = 0; k < nblocks; ++k)
00353 {
00354 STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00355 }
00356
00357 return false;
00358 }
00359
00360 last = blocks[nblocks - 1][block_type::size - 1];
00361 }
00362
00363 assert(blocks_left == 0);
00364 delete[] reqs;
00365 delete[] blocks;
00366 }
00367
00368 return true;
00369 }
00370
00371
00372 template <typename block_type, typename run_type, typename value_cmp>
00373 void merge_runs(run_type ** in_runs, int_type nruns, run_type * out_run, unsigned_type _m, value_cmp cmp
00374 )
00375 {
00376 typedef typename block_type::bid_type bid_type;
00377 typedef typename block_type::value_type value_type;
00378 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00379 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00380 typedef run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00381
00382 int_type i;
00383 run_type consume_seq(out_run->size());
00384
00385 int_type * prefetch_seq = new int_type[out_run->size()];
00386
00387 typename run_type::iterator copy_start = consume_seq.begin();
00388 for (i = 0; i < nruns; i++)
00389 {
00390
00391 copy_start = std::copy(
00392 in_runs[i]->begin(),
00393 in_runs[i]->end(),
00394 copy_start);
00395 }
00396
00397 std::stable_sort(consume_seq.begin(), consume_seq.end(),
00398 trigger_entry_cmp<bid_type, value_type, value_cmp>(cmp));
00399
00400 int_type disks_number = config::get_instance()->disks_number();
00401
00402 #ifdef PLAY_WITH_OPT_PREF
00403 const int_type n_write_buffers = 4 * disks_number;
00404 #else
00405 const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (3 * (int_type(_m) - nruns) / 4));
00406 const int_type n_write_buffers = STXXL_MAX(2 * disks_number, int_type(_m) - nruns - n_prefetch_buffers);
00407 #ifdef SORT_OPTIMAL_PREFETCHING
00408
00409 const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
00410 #endif
00411 #endif
00412
00413 #ifdef SORT_OPTIMAL_PREFETCHING
00414 compute_prefetch_schedule(
00415 consume_seq,
00416 prefetch_seq,
00417 n_opt_prefetch_buffers,
00418 disks_number);
00419 #else
00420 for (i = 0; i < out_run->size(); i++)
00421 prefetch_seq[i] = i;
00422
00423 #endif
00424
00425 prefetcher_type prefetcher(consume_seq.begin(),
00426 consume_seq.end(),
00427 prefetch_seq,
00428 nruns + n_prefetch_buffers);
00429
00430 buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
00431
00432 int_type out_run_size = out_run->size();
00433
00434 block_type * out_buffer = writer.get_free_block();
00435
00436
00437
00438
00439 if (do_parallel_merge())
00440 {
00441 #if STXXL_PARALLEL_MULTIWAY_MERGE
00442
00443
00444
00445 typedef stxxl::int64 diff_type;
00446 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00447 typedef typename std::vector<sequence>::size_type seqs_size_type;
00448 std::vector<sequence> seqs(nruns);
00449 std::vector<block_type *> buffers(nruns);
00450
00451 for (int_type i = 0; i < nruns; i++)
00452 {
00453 buffers[i] = prefetcher.pull_block();
00454 seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end());
00455
00456 }
00457
00458 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00459 value_type last_elem = cmp.min_value();
00460 #endif
00461
00462 for (int_type j = 0; j < out_run_size; ++j)
00463 {
00464 diff_type rest = block_type::size;
00465
00466 STXXL_VERBOSE1("output block " << j);
00467 do {
00468 value_type * min_last_element = NULL;
00469 diff_type total_size = 0;
00470
00471 for (seqs_size_type i = 0; i < seqs.size(); i++)
00472 {
00473 if (seqs[i].first == seqs[i].second)
00474 continue;
00475
00476 if (min_last_element == NULL)
00477 min_last_element = &(*(seqs[i].second - 1));
00478 else
00479 min_last_element = cmp(*min_last_element, *(seqs[i].second - 1)) ? min_last_element : &(*(seqs[i].second - 1));
00480
00481 total_size += seqs[i].second - seqs[i].first;
00482 STXXL_VERBOSE1("last " << *(seqs[i].second - 1) << " block size " << (seqs[i].second - seqs[i].first));
00483 }
00484
00485 assert(min_last_element != NULL);
00486
00487 STXXL_VERBOSE1("min_last_element " << min_last_element << " total size " << total_size + (block_type::size - rest));
00488
00489 diff_type less_equal_than_min_last = 0;
00490
00491 for (seqs_size_type i = 0; i < seqs.size(); i++)
00492 {
00493 if (seqs[i].first == seqs[i].second)
00494 continue;
00495
00496 typename block_type::iterator position = std::upper_bound(seqs[i].first, seqs[i].second, *min_last_element, cmp);
00497 STXXL_VERBOSE1("greater equal than " << position - seqs[i].first);
00498 less_equal_than_min_last += position - seqs[i].first;
00499 }
00500
00501 STXXL_VERBOSE1("finished loop");
00502
00503 ptrdiff_t output_size = (std::min)(less_equal_than_min_last, rest);
00504
00505 STXXL_VERBOSE1("before merge" << output_size);
00506
00507 stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size);
00508
00509
00510 STXXL_VERBOSE1("after merge");
00511
00512 (*out_run)[j].value = (*out_buffer)[0];
00513
00514 rest -= output_size;
00515
00516 STXXL_VERBOSE1("so long");
00517
00518 for (seqs_size_type i = 0; i < seqs.size(); i++)
00519 {
00520 if (seqs[i].first == seqs[i].second)
00521 {
00522 if (prefetcher.block_consumed(buffers[i]))
00523 {
00524 seqs[i].first = buffers[i]->begin();
00525 seqs[i].second = buffers[i]->end();
00526 STXXL_VERBOSE1("block ran empty " << i);
00527 }
00528 else
00529 {
00530 seqs.erase(seqs.begin() + i);
00531 buffers.erase(buffers.begin() + i);
00532 STXXL_VERBOSE1("seq removed " << i);
00533 }
00534 }
00535 }
00536 } while (rest > 0 && seqs.size() > 0);
00537
00538 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00539 if (!stxxl::is_sorted(out_buffer->begin(), out_buffer->end(), cmp))
00540 {
00541 for (value_type * i = out_buffer->begin() + 1; i != out_buffer->end(); i++)
00542 if (cmp(*i, *(i - 1)))
00543 {
00544 STXXL_VERBOSE1("Error at position " << (i - out_buffer->begin()));
00545 }
00546 assert(false);
00547 }
00548
00549 if (j > 0)
00550 assert(cmp((*out_buffer)[0], last_elem) == false);
00551
00552 last_elem = (*out_buffer)[block_type::size - 1];
00553 #endif
00554
00555
00556 out_buffer = writer.write(out_buffer, (*out_run)[j].bid);
00557 }
00558
00559
00560
00561 #else
00562 assert(false);
00563 #endif
00564 }
00565 else
00566 {
00567
00568
00569 loser_tree<run_cursor_type, run_cursor2_cmp_type, block_type::size>
00570 losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp));
00571
00572 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00573 value_type last_elem = cmp.min_value();
00574 #endif
00575
00576 for (i = 0; i < out_run_size; ++i)
00577 {
00578 losers.multi_merge(out_buffer->elem);
00579 (*out_run)[i].value = *(out_buffer->elem);
00580
00581 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00582 assert(stxxl::is_sorted(
00583 out_buffer->begin(),
00584 out_buffer->end(),
00585 cmp));
00586
00587 if (i)
00588 assert(cmp(*(out_buffer->elem), last_elem) == false);
00589
00590 last_elem = (*out_buffer).elem[block_type::size - 1];
00591 #endif
00592
00593 out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
00594 }
00595
00596
00597 }
00598
00599 delete[] prefetch_seq;
00600
00601 block_manager * bm = block_manager::get_instance();
00602 for (i = 0; i < nruns; ++i)
00603 {
00604 unsigned_type sz = in_runs[i]->size();
00605 for (unsigned_type j = 0; j < sz; ++j)
00606 bm->delete_block((*in_runs[i])[j].bid);
00607
00608
00609 delete in_runs[i];
00610 }
00611 }
00612
00613
00614 template <typename block_type,
00615 typename alloc_strategy,
00616 typename input_bid_iterator,
00617 typename value_cmp>
00618 simple_vector<trigger_entry<typename block_type::bid_type, typename block_type::value_type> > *
00619 sort_blocks(input_bid_iterator input_bids,
00620 unsigned_type _n,
00621 unsigned_type _m,
00622 value_cmp cmp
00623 )
00624 {
00625 typedef typename block_type::value_type type;
00626 typedef typename block_type::bid_type bid_type;
00627 typedef trigger_entry<bid_type, type> trigger_entry_type;
00628 typedef simple_vector<trigger_entry_type> run_type;
00629 typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
00630
00631 unsigned_type m2 = _m / 2;
00632 unsigned_type full_runs = _n / m2;
00633 unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
00634 unsigned_type nruns = full_runs + partial_runs;
00635 unsigned_type i;
00636
00637 config * cfg = config::get_instance();
00638 block_manager * mng = block_manager::get_instance();
00639 const unsigned_type ndisks = cfg->disks_number();
00640
00641
00642
00643 double begin = timestamp(), after_runs_creation, end;
00644
00645 run_type ** runs = new run_type *[nruns];
00646
00647 for (i = 0; i < full_runs; i++)
00648 runs[i] = new run_type(m2);
00649
00650
00651 if (partial_runs)
00652 runs[i] = new run_type(_n - full_runs * m2);
00653
00654
00655 for (i = 0; i < nruns; ++i)
00656 {
00657
00658 mng->new_blocks(alloc_strategy(0, ndisks),
00659 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->begin()),
00660 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->end()));
00661 }
00662
00663 sort_local::create_runs<block_type,
00664 run_type,
00665 input_bid_iterator,
00666 value_cmp>(input_bids, runs, nruns, _m, cmp);
00667
00668 after_runs_creation = timestamp();
00669
00670 double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
00671
00672 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00673
00674
00675
00676 const int_type merge_factor = static_cast<int_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) /
00677 log(double(_m))))));
00678 run_type ** new_runs;
00679
00680 while (nruns > 1)
00681 {
00682 int_type new_nruns = div_and_round_up(nruns, merge_factor);
00683 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
00684 " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
00685
00686 new_runs = new run_type *[new_nruns];
00687
00688 int_type runs_left = nruns;
00689 int_type cur_out_run = 0;
00690 int_type blocks_in_new_run = 0;
00691
00692 while (runs_left > 0)
00693 {
00694 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00695 blocks_in_new_run = 0;
00696 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
00697 blocks_in_new_run += runs[i]->size();
00698
00699
00700 new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
00701 runs_left -= runs2merge;
00702 }
00703
00704 if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && (input_bids->storage->get_id() == -1))
00705 {
00706
00707 input_bid_iterator cur = input_bids;
00708 for (int_type i = 0; cur != (input_bids + _n); ++cur)
00709 {
00710 (*new_runs[0])[i++].bid = *cur;
00711 }
00712
00713 bid_type & firstBID = (*new_runs[0])[0].bid;
00714 if (firstBID.storage->get_id() != -1)
00715 {
00716
00717
00718 mng->new_blocks(FR(), &firstBID, (&firstBID) + 1);
00719 }
00720 bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
00721 if (lastBID.storage->get_id() != -1)
00722 {
00723
00724
00725 mng->new_blocks(FR(), &lastBID, (&lastBID) + 1);
00726 }
00727 }
00728 else
00729 {
00730 mng->new_blocks(interleaved_alloc_strategy(new_nruns, 0, ndisks),
00731 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
00732 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
00733 }
00734
00735 runs_left = nruns;
00736 cur_out_run = 0;
00737 while (runs_left > 0)
00738 {
00739 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00740 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00741 assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp)));
00742 #endif
00743 STXXL_VERBOSE("Merging " << runs2merge << " runs");
00744 merge_runs<block_type, run_type>(runs + nruns - runs_left,
00745 runs2merge, *(new_runs + (cur_out_run++)), _m, cmp
00746 );
00747 runs_left -= runs2merge;
00748 }
00749
00750 nruns = new_nruns;
00751 delete[] runs;
00752 runs = new_runs;
00753 }
00754
00755 run_type * result = *runs;
00756 delete[] runs;
00757
00758 end = timestamp();
00759
00760 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Run creation time: " <<
00761 after_runs_creation - begin << " s");
00762 STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
00763 STXXL_VERBOSE(*stats::get_instance());
00764 UNUSED(begin + io_wait_after_rf);
00765
00766 return result;
00767 }
00768 }
00769
00770
00778 template <typename ExtIterator_, typename StrictWeakOrdering_>
00779 void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M)
00780 {
00781 typedef simple_vector<sort_local::trigger_entry<typename ExtIterator_::bid_type,
00782 typename ExtIterator_::vector_type::value_type> > run_type;
00783
00784 typedef typename ExtIterator_::vector_type::value_type value_type;
00785 typedef typename ExtIterator_::block_type block_type;
00786
00787
00788 assert(!cmp(cmp.min_value(), cmp.min_value()));
00789 assert(cmp(cmp.min_value(), cmp.max_value()));
00790 assert(!cmp(cmp.max_value(), cmp.max_value()));
00791
00792 unsigned_type n = 0;
00793
00794 block_manager * mng = block_manager::get_instance();
00795
00796 first.flush();
00797
00798 if ((last - first) * sizeof(value_type) * sort_memory_usage_factor() < M)
00799 {
00800 stl_in_memory_sort(first, last, cmp);
00801 }
00802 else
00803 {
00804 assert(2 * block_type::raw_size * sort_memory_usage_factor() <= M);
00805
00806 if (first.block_offset())
00807 {
00808 if (last.block_offset())
00809
00810 {
00811 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00812 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00813 typename ExtIterator_::bid_type first_bid, last_bid;
00814 request_ptr req;
00815
00816 req = first_block->read(*first.bid());
00817 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1);
00818 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
00819 req->wait();
00820
00821
00822 req = last_block->read(*last.bid());
00823
00824 unsigned_type i = 0;
00825 for ( ; i < first.block_offset(); ++i)
00826 {
00827 first_block->elem[i] = cmp.min_value();
00828 }
00829
00830 req->wait();
00831
00832
00833 req = first_block->write(first_bid);
00834 for (i = last.block_offset(); i < block_type::size; ++i)
00835 {
00836 last_block->elem[i] = cmp.max_value();
00837 }
00838
00839 req->wait();
00840
00841
00842 req = last_block->write(last_bid);
00843
00844 n = last.bid() - first.bid() + 1;
00845
00846 std::swap(first_bid, *first.bid());
00847 std::swap(last_bid, *last.bid());
00848
00849 req->wait();
00850
00851
00852 delete first_block;
00853 delete last_block;
00854
00855 run_type * out =
00856 sort_local::sort_blocks<
00857 typename ExtIterator_::block_type,
00858 typename ExtIterator_::vector_type::alloc_strategy,
00859 typename ExtIterator_::bids_container_iterator>
00860 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00861
00862
00863 first_block = new typename ExtIterator_::block_type;
00864 last_block = new typename ExtIterator_::block_type;
00865 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00866 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
00867 request_ptr * reqs = new request_ptr[2];
00868
00869 reqs[0] = first_block->read(first_bid);
00870 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00871
00872 reqs[0]->wait();
00873 reqs[1]->wait();
00874
00875 reqs[0] = last_block->read(last_bid);
00876 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
00877
00878 for (i = first.block_offset(); i < block_type::size; i++)
00879 {
00880 first_block->elem[i] = sorted_first_block->elem[i];
00881 }
00882
00883 reqs[0]->wait();
00884 reqs[1]->wait();
00885
00886 req = first_block->write(first_bid);
00887
00888 for (i = 0; i < last.block_offset(); ++i)
00889 {
00890 last_block->elem[i] = sorted_last_block->elem[i];
00891 }
00892
00893 req->wait();
00894
00895 req = last_block->write(last_bid);
00896
00897 mng->delete_block(out->begin()->bid);
00898 mng->delete_block((*out)[out->size() - 1].bid);
00899
00900 *first.bid() = first_bid;
00901 *last.bid() = last_bid;
00902
00903 typename run_type::iterator it = out->begin();
00904 ++it;
00905 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00906 ++cur_bid;
00907
00908 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00909 {
00910 *cur_bid = (*it).bid;
00911 }
00912
00913 delete first_block;
00914 delete sorted_first_block;
00915 delete sorted_last_block;
00916 delete[] reqs;
00917 delete out;
00918
00919 req->wait();
00920
00921
00922 delete last_block;
00923 }
00924 else
00925 {
00926
00927
00928 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00929 typename ExtIterator_::bid_type first_bid;
00930 request_ptr req;
00931
00932 req = first_block->read(*first.bid());
00933 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1);
00934 req->wait();
00935
00936
00937 unsigned_type i = 0;
00938 for ( ; i < first.block_offset(); ++i)
00939 {
00940 first_block->elem[i] = cmp.min_value();
00941 }
00942
00943 req = first_block->write(first_bid);
00944
00945 n = last.bid() - first.bid();
00946
00947 std::swap(first_bid, *first.bid());
00948
00949 req->wait();
00950
00951
00952 delete first_block;
00953
00954 run_type * out =
00955 sort_local::sort_blocks<
00956 typename ExtIterator_::block_type,
00957 typename ExtIterator_::vector_type::alloc_strategy,
00958 typename ExtIterator_::bids_container_iterator>
00959 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00960
00961
00962 first_block = new typename ExtIterator_::block_type;
00963
00964 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00965
00966 request_ptr * reqs = new request_ptr[2];
00967
00968 reqs[0] = first_block->read(first_bid);
00969 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00970
00971 reqs[0]->wait();
00972 reqs[1]->wait();
00973
00974 for (i = first.block_offset(); i < block_type::size; ++i)
00975 {
00976 first_block->elem[i] = sorted_first_block->elem[i];
00977 }
00978
00979 req = first_block->write(first_bid);
00980
00981 mng->delete_block(out->begin()->bid);
00982
00983 *first.bid() = first_bid;
00984
00985 typename run_type::iterator it = out->begin();
00986 ++it;
00987 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00988 ++cur_bid;
00989
00990 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00991 {
00992 *cur_bid = (*it).bid;
00993 }
00994
00995 *cur_bid = (*it).bid;
00996
00997 delete sorted_first_block;
00998 delete[] reqs;
00999 delete out;
01000
01001 req->wait();
01002
01003 delete first_block;
01004 }
01005 }
01006 else
01007 {
01008 if (last.block_offset())
01009
01010 {
01011 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
01012 typename ExtIterator_::bid_type last_bid;
01013 request_ptr req;
01014 unsigned_type i;
01015
01016 req = last_block->read(*last.bid());
01017 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
01018 req->wait();
01019
01020
01021 for (i = last.block_offset(); i < block_type::size; ++i)
01022 {
01023 last_block->elem[i] = cmp.max_value();
01024 }
01025
01026 req = last_block->write(last_bid);
01027
01028 n = last.bid() - first.bid() + 1;
01029
01030 std::swap(last_bid, *last.bid());
01031
01032 req->wait();
01033
01034
01035 delete last_block;
01036
01037 run_type * out =
01038 sort_local::sort_blocks<
01039 typename ExtIterator_::block_type,
01040 typename ExtIterator_::vector_type::alloc_strategy,
01041 typename ExtIterator_::bids_container_iterator>
01042 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
01043
01044
01045 last_block = new typename ExtIterator_::block_type;
01046 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
01047 request_ptr * reqs = new request_ptr[2];
01048
01049 reqs[0] = last_block->read(last_bid);
01050 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
01051
01052 reqs[0]->wait();
01053 reqs[1]->wait();
01054
01055 for (i = 0; i < last.block_offset(); ++i)
01056 {
01057 last_block->elem[i] = sorted_last_block->elem[i];
01058 }
01059
01060 req = last_block->write(last_bid);
01061
01062 mng->delete_block((*out)[out->size() - 1].bid);
01063
01064 *last.bid() = last_bid;
01065
01066 typename run_type::iterator it = out->begin();
01067 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
01068
01069 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
01070 {
01071 *cur_bid = (*it).bid;
01072 }
01073
01074 delete sorted_last_block;
01075 delete[] reqs;
01076 delete out;
01077
01078 req->wait();
01079
01080 delete last_block;
01081 }
01082 else
01083 {
01084
01085 n = last.bid() - first.bid();
01086
01087 run_type * out =
01088 sort_local::sort_blocks<typename ExtIterator_::block_type,
01089 typename ExtIterator_::vector_type::alloc_strategy,
01090 typename ExtIterator_::bids_container_iterator>
01091 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
01092
01093 typename run_type::iterator it = out->begin();
01094 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
01095
01096 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
01097 {
01098 *cur_bid = (*it).bid;
01099 }
01100
01101 delete out;
01102 }
01103 }
01104 }
01105
01106 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01107 assert(stxxl::is_sorted(first, last, cmp));
01108 #endif
01109 }
01110
01112
01113 __STXXL_END_NAMESPACE
01114
01115 #endif // !STXXL_SORT_HEADER
01116