00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #ifndef STXXL_KSORT_HEADER
00015 #define STXXL_KSORT_HEADER
00016
00017 #include <list>
00018
00019 #include <stxxl/bits/mng/mng.h>
00020 #include <stxxl/bits/common/rand.h>
00021 #include <stxxl/bits/mng/adaptor.h>
00022 #include <stxxl/bits/common/simple_vector.h>
00023 #include <stxxl/bits/common/switch.h>
00024 #include <stxxl/bits/mng/block_alloc_interleaved.h>
00025 #include <stxxl/bits/algo/intksort.h>
00026 #include <stxxl/bits/algo/adaptor.h>
00027 #include <stxxl/bits/algo/async_schedule.h>
00028 #include <stxxl/bits/mng/block_prefetcher.h>
00029 #include <stxxl/bits/mng/buf_writer.h>
00030 #include <stxxl/bits/algo/run_cursor.h>
00031 #include <stxxl/bits/algo/losertree.h>
00032 #include <stxxl/bits/algo/inmemsort.h>
00033
00034
00035
00036
00037
00038 #define OPT_MERGING
00039
00040 __STXXL_BEGIN_NAMESPACE
00041
00044
00049
00052 namespace ksort_local
00053 {
00054 template <typename _BIDTp, typename _KeyTp>
00055 struct trigger_entry
00056 {
00057 typedef _BIDTp bid_type;
00058 typedef _KeyTp key_type;
00059
00060 bid_type bid;
00061 key_type key;
00062
00063 operator bid_type ()
00064 {
00065 return bid;
00066 }
00067 };
00068
00069
00070 template <typename _BIDTp, typename _KeyTp>
00071 inline bool operator < (const trigger_entry<_BIDTp, _KeyTp> & a,
00072 const trigger_entry<_BIDTp, _KeyTp> & b)
00073 {
00074 return (a.key < b.key);
00075 }
00076
00077 template <typename _BIDTp, typename _KeyTp>
00078 inline bool operator > (const trigger_entry<_BIDTp, _KeyTp> & a,
00079 const trigger_entry<_BIDTp, _KeyTp> & b)
00080 {
00081 return (a.key > b.key);
00082 }
00083
00084 template <typename type, typename key_type1>
00085 struct type_key
00086 {
00087 typedef key_type1 key_type;
00088 key_type key;
00089 type * ptr;
00090
00091 type_key() { }
00092 type_key(key_type k, type * p) : key(k), ptr(p)
00093 { }
00094 };
00095
00096 template <typename type, typename key1>
00097 bool operator < (const type_key<type, key1> & a, const type_key<type, key1> & b)
00098 {
00099 return a.key < b.key;
00100 }
00101
00102 template <typename type, typename key1>
00103 bool operator > (const type_key<type, key1> & a, const type_key<type, key1> & b)
00104 {
00105 return a.key > b.key;
00106 }
00107
00108
00109 template <typename block_type, typename bid_type>
00110 struct write_completion_handler
00111 {
00112 block_type * block;
00113 bid_type bid;
00114 request_ptr * req;
00115 void operator () (request * )
00116 {
00117 * req = block->read(bid);
00118 }
00119 };
00120
00121 template <typename type_key_,
00122 typename block_type,
00123 typename run_type,
00124 typename input_bid_iterator,
00125 typename key_extractor>
00126 inline void write_out(
00127 type_key_ * begin,
00128 type_key_ * end,
00129 block_type * & cur_blk,
00130 const block_type * end_blk,
00131 int_type & out_block,
00132 int_type & out_pos,
00133 run_type & run,
00134 write_completion_handler<block_type, typename block_type::bid_type> * & next_read,
00135 typename block_type::bid_type * & bids,
00136 request_ptr * write_reqs,
00137 request_ptr * read_reqs,
00138 input_bid_iterator & it,
00139 key_extractor keyobj)
00140 {
00141 typedef typename block_type::bid_type bid_type;
00142 typedef typename block_type::type type;
00143
00144 type * elem = cur_blk->elem;
00145 for (type_key_ * p = begin; p < end; p++)
00146 {
00147 elem[out_pos++] = *(p->ptr);
00148
00149 if (out_pos >= block_type::size)
00150 {
00151 run[out_block].key = keyobj(*(cur_blk->elem));
00152
00153 if (cur_blk < end_blk)
00154 {
00155 next_read->block = cur_blk;
00156 next_read->req = read_reqs + out_block;
00157 read_reqs[out_block] = NULL;
00158 bids[out_block] = next_read->bid = *(it++);
00159
00160 write_reqs[out_block] = cur_blk->write(
00161 run[out_block].bid,
00162
00163
00164 *(next_read++));
00165 }
00166 else
00167 {
00168 write_reqs[out_block] = cur_blk->write(run[out_block].bid);
00169 }
00170
00171 cur_blk++;
00172 elem = cur_blk->elem;
00173 out_block++;
00174 out_pos = 0;
00175 }
00176 }
00177 }
00178
00179 template <
00180 typename block_type,
00181 typename run_type,
00182 typename input_bid_iterator,
00183 typename key_extractor>
00184 void
00185 create_runs(
00186 input_bid_iterator it,
00187 run_type ** runs,
00188 const unsigned_type nruns,
00189 const unsigned_type m2,
00190 key_extractor keyobj)
00191 {
00192 typedef typename block_type::value_type type;
00193 typedef typename block_type::bid_type bid_type;
00194 typedef typename key_extractor::key_type key_type;
00195 typedef type_key<type, key_type> type_key_;
00196
00197 block_manager * bm = block_manager::get_instance();
00198 block_type * Blocks1 = new block_type[m2];
00199 block_type * Blocks2 = new block_type[m2];
00200 bid_type * bids = new bid_type[m2];
00201 type_key_ * refs1 = new type_key_[m2 * Blocks1->size];
00202 type_key_ * refs2 = new type_key_[m2 * Blocks1->size];
00203 request_ptr * read_reqs = new request_ptr[m2];
00204 request_ptr * write_reqs = new request_ptr[m2];
00205 write_completion_handler<block_type, bid_type> * next_run_reads =
00206 new write_completion_handler<block_type, bid_type>[m2];
00207
00208 run_type * run;
00209 run = *runs;
00210 int_type run_size = (*runs)->size();
00211 key_type offset = 0;
00212 const int log_k1 =
00213 static_cast<int>(ceil(log2((m2 * block_type::size * sizeof(type_key_) / STXXL_L2_SIZE) ?
00214 (double(m2 * block_type::size * sizeof(type_key_) / STXXL_L2_SIZE)) : 2.)));
00215 const int log_k2 = int(log2(double(m2 * Blocks1->size))) - log_k1 - 1;
00216 STXXL_VERBOSE("log_k1: " << log_k1 << " log_k2:" << log_k2);
00217 const int_type k1 = 1 << log_k1;
00218 const int_type k2 = 1 << log_k2;
00219 int_type * bucket1 = new int_type[k1];
00220 int_type * bucket2 = new int_type[k2];
00221 int_type i;
00222
00223 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00224
00225 for (i = 0; i < run_size; i++)
00226 {
00227 bids[i] = *(it++);
00228 read_reqs[i] = Blocks1[i].read(bids[i]);
00229 }
00230
00231 unsigned_type k = 0;
00232 const int shift1 = sizeof(key_type) * 8 - log_k1;
00233 const int shift2 = shift1 - log_k2;
00234 STXXL_VERBOSE("shift1: " << shift1 << " shift2:" << shift2);
00235
00236 for ( ; k < nruns; k++)
00237 {
00238 run = runs[k];
00239 run_size = run->size();
00240
00241 std::fill(bucket1, bucket1 + k1, 0);
00242
00243 type_key_ * ref_ptr = refs1;
00244 for (i = 0; i < run_size; i++)
00245 {
00246 if (k)
00247 write_reqs[i]->wait();
00248
00249 read_reqs[i]->wait();
00250 bm->delete_block(bids[i]);
00251
00252 classify_block(Blocks1[i].begin(), Blocks1[i].end(), ref_ptr, bucket1, offset, shift1, keyobj);
00253 }
00254
00255 exclusive_prefix_sum(bucket1, k1);
00256 classify(refs1, refs1 + run_size * Blocks1->size, refs2, bucket1,
00257 offset, shift1);
00258
00259 int_type out_block = 0;
00260 int_type out_pos = 0;
00261 unsigned_type next_run_size = (k < nruns - 1) ? (runs[k + 1]->size()) : 0;
00262
00263
00264 type_key_ * c = refs2;
00265 type_key_ * d = refs1;
00266 block_type * cur_blk = Blocks2;
00267 block_type * end_blk = Blocks2 + next_run_size;
00268 write_completion_handler<block_type, bid_type> * next_read = next_run_reads;
00269
00270 for (i = 0; i < k1; i++)
00271 {
00272 type_key_ * cEnd = refs2 + bucket1[i];
00273 type_key_ * dEnd = refs1 + bucket1[i];
00274
00275 l1sort(c, cEnd, d, bucket2, k2,
00276 offset + (key_type(1) << key_type(shift1)) * key_type(i), shift2);
00277
00278 write_out(
00279 d, dEnd, cur_blk, end_blk,
00280 out_block, out_pos, *run, next_read, bids,
00281 write_reqs, read_reqs, it, keyobj);
00282
00283 c = cEnd;
00284 d = dEnd;
00285 }
00286
00287 std::swap(Blocks1, Blocks2);
00288 }
00289
00290 wait_all(write_reqs, m2);
00291
00292 delete[] bucket1;
00293 delete[] bucket2;
00294 delete[] refs1;
00295 delete[] refs2;
00296 delete[] Blocks1;
00297 delete[] Blocks2;
00298 delete[] bids;
00299 delete[] next_run_reads;
00300 delete[] read_reqs;
00301 delete[] write_reqs;
00302 }
00303
00304 template <typename block_type,
00305 typename prefetcher_type,
00306 typename key_extractor>
00307 struct run_cursor2_cmp
00308 {
00309 typedef run_cursor2<block_type, prefetcher_type> cursor_type;
00310 key_extractor keyobj;
00311 run_cursor2_cmp(key_extractor keyobj_)
00312 {
00313 keyobj = keyobj_;
00314 }
00315 inline bool operator () (const cursor_type & a, const cursor_type & b)
00316 {
00317 if (UNLIKELY(b.empty()))
00318 return true;
00319
00320 if (UNLIKELY(a.empty()))
00321 return false;
00322
00323
00324 return (keyobj(a.current()) < keyobj(b.current()));
00325 }
00326
00327 private:
00328 run_cursor2_cmp() { }
00329 };
00330
00331
00332 template <typename record_type, typename key_extractor>
00333 class key_comparison : public std::binary_function<record_type, record_type, bool>
00334 {
00335 key_extractor ke;
00336
00337 public:
00338 key_comparison() { }
00339 key_comparison(key_extractor ke_) : ke(ke_) { }
00340 bool operator () (const record_type & a, const record_type & b) const
00341 {
00342 return ke(a) < ke(b);
00343 }
00344 };
00345
00346
00347 template <typename block_type, typename run_type, typename key_ext_>
00348 bool check_ksorted_runs(run_type ** runs,
00349 unsigned_type nruns,
00350 unsigned_type m,
00351 key_ext_ keyext)
00352 {
00353 typedef typename block_type::value_type value_type;
00354
00355
00356 STXXL_MSG("check_sorted_runs Runs: " << nruns);
00357 unsigned_type irun = 0;
00358 for (irun = 0; irun < nruns; ++irun)
00359 {
00360 const unsigned_type nblocks_per_run = runs[irun]->size();
00361 unsigned_type blocks_left = nblocks_per_run;
00362 block_type * blocks = new block_type[m];
00363 request_ptr * reqs = new request_ptr[m];
00364 value_type last = keyext.min_value();
00365
00366 for (unsigned_type off = 0; off < nblocks_per_run; off += m)
00367 {
00368 const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
00369 const unsigned_type nelements = nblocks * block_type::size;
00370 blocks_left -= nblocks;
00371
00372 for (unsigned_type j = 0; j < nblocks; ++j)
00373 {
00374 reqs[j] = blocks[j].read((*runs[irun])[off + j].bid);
00375 }
00376 wait_all(reqs, reqs + nblocks);
00377
00378 if (off && (keyext(blocks[0][0]) < keyext(last)))
00379 {
00380 STXXL_MSG("check_sorted_runs wrong first value in the run " << irun);
00381 STXXL_MSG(" first value: " << blocks[0][0] << " with key" << keyext(blocks[0][0]));
00382 STXXL_MSG(" last value: " << last << " with key" << keyext(last));
00383 for (unsigned_type k = 0; k < block_type::size; ++k)
00384 STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k] << " key: " << keyext(blocks[0][k]));
00385 return false;
00386 }
00387
00388 for (unsigned_type j = 0; j < nblocks; ++j)
00389 {
00390 if (keyext(blocks[j][0]) != (*runs[irun])[off + j].key)
00391 {
00392 STXXL_MSG("check_sorted_runs wrong trigger in the run " << irun << " block " << (off + j));
00393 STXXL_MSG(" trigger value: " << (*runs[irun])[off + j].key);
00394 STXXL_MSG("Data in the block:");
00395 for (unsigned_type k = 0; k < block_type::size; ++k)
00396 STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k] << " with key: " << keyext(blocks[j][k]));
00397
00398 STXXL_MSG("BIDS:");
00399 for (unsigned_type k = 0; k < nblocks; ++k)
00400 {
00401 if (k == j)
00402 STXXL_MSG("Bad one comes next.");
00403 STXXL_MSG("BID " << (off + k) << " is: " << ((*runs[irun])[off + k].bid));
00404 }
00405
00406 return false;
00407 }
00408 }
00409 if (!stxxl::is_sorted(
00410 #if 1
00411 ArrayOfSequencesIterator<
00412 block_type, typename block_type::value_type, block_type::size
00413 >(blocks, 0),
00414 ArrayOfSequencesIterator<
00415 block_type, typename block_type::value_type, block_type::size
00416 >(blocks, nelements),
00417 #else
00418 TwoToOneDimArrayRowAdaptor<
00419 block_type, value_type, block_type::size
00420 >(blocks, 0),
00421 TwoToOneDimArrayRowAdaptor<
00422 block_type, value_type, block_type::size
00423 >(blocks, nelements),
00424 #endif
00425 key_comparison<value_type, key_ext_>()))
00426 {
00427 STXXL_MSG("check_sorted_runs wrong order in the run " << irun);
00428 STXXL_MSG("Data in blocks:");
00429 for (unsigned_type j = 0; j < nblocks; ++j)
00430 {
00431 for (unsigned_type k = 0; k < block_type::size; ++k)
00432 STXXL_MSG(" Element " << k << " in block " << (off + j) << " is :" << blocks[j][k] << " with key: " << keyext(blocks[j][k]));
00433 }
00434 STXXL_MSG("BIDS:");
00435 for (unsigned_type k = 0; k < nblocks; ++k)
00436 {
00437 STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00438 }
00439
00440 return false;
00441 }
00442 last = blocks[nblocks - 1][block_type::size - 1];
00443 }
00444
00445 assert(blocks_left == 0);
00446 delete[] reqs;
00447 delete[] blocks;
00448 }
00449
00450 return true;
00451 }
00452
00453
00454 template <typename block_type, typename run_type, typename key_extractor>
00455 void merge_runs(run_type ** in_runs, unsigned_type nruns, run_type * out_run, unsigned_type _m, key_extractor keyobj)
00456 {
00457 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00458 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00459
00460 unsigned_type i;
00461 run_type consume_seq(out_run->size());
00462
00463 int_type * prefetch_seq = new int_type[out_run->size()];
00464
00465 typename run_type::iterator copy_start = consume_seq.begin();
00466 for (i = 0; i < nruns; i++)
00467 {
00468
00469 copy_start = std::copy(
00470 in_runs[i]->begin(),
00471 in_runs[i]->end(),
00472 copy_start);
00473 }
00474 std::stable_sort(consume_seq.begin(), consume_seq.end());
00475
00476 unsigned disks_number = config::get_instance()->disks_number();
00477
00478 #ifdef PLAY_WITH_OPT_PREF
00479 const int_type n_write_buffers = 4 * disks_number;
00480 #else
00481 const int_type n_prefetch_buffers = STXXL_MAX(int_type(2 * disks_number), (3 * (int_type(_m) - int_type(nruns)) / 4));
00482 STXXL_VERBOSE("Prefetch buffers " << n_prefetch_buffers);
00483 const int_type n_write_buffers = STXXL_MAX(int_type(2 * disks_number), int_type(_m) - int_type(nruns) - int_type(n_prefetch_buffers));
00484 STXXL_VERBOSE("Write buffers " << n_write_buffers);
00485
00486 const int_type n_opt_prefetch_buffers = 2 * int_type(disks_number) + (3 * (int_type(n_prefetch_buffers) - int_type(2 * disks_number))) / 10;
00487 STXXL_VERBOSE("Prefetch buffers " << n_opt_prefetch_buffers);
00488 #endif
00489
00490 #ifdef SORT_OPTIMAL_PREFETCHING
00491 compute_prefetch_schedule(
00492 consume_seq,
00493 prefetch_seq,
00494 n_opt_prefetch_buffers,
00495 disks_number);
00496 #else
00497 for (i = 0; i < out_run->size(); i++)
00498 prefetch_seq[i] = i;
00499
00500 #endif
00501
00502
00503 prefetcher_type prefetcher(consume_seq.begin(),
00504 consume_seq.end(),
00505 prefetch_seq,
00506 nruns + n_prefetch_buffers);
00507
00508 buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
00509
00510 unsigned_type out_run_size = out_run->size();
00511
00512 run_cursor2_cmp<block_type, prefetcher_type, key_extractor> cmp(keyobj);
00513 loser_tree<
00514 run_cursor_type,
00515 run_cursor2_cmp<block_type, prefetcher_type, key_extractor>,
00516 block_type::size> losers(&prefetcher, nruns, cmp);
00517
00518
00519 block_type * out_buffer = writer.get_free_block();
00520
00521 for (i = 0; i < out_run_size; i++)
00522 {
00523 losers.multi_merge(out_buffer->elem);
00524 (*out_run)[i].key = keyobj(out_buffer->elem[0]);
00525 out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
00526 }
00527
00528 delete[] prefetch_seq;
00529
00530 block_manager * bm = block_manager::get_instance();
00531 for (i = 0; i < nruns; i++)
00532 {
00533 unsigned_type sz = in_runs[i]->size();
00534 for (unsigned_type j = 0; j < sz; j++)
00535 bm->delete_block((*in_runs[i])[j].bid);
00536
00537 delete in_runs[i];
00538 }
00539 }
00540
00541
00542 template <typename block_type,
00543 typename alloc_strategy,
00544 typename input_bid_iterator,
00545 typename key_extractor>
00546
00547 simple_vector<trigger_entry<typename block_type::bid_type, typename key_extractor::key_type> > *
00548 ksort_blocks(input_bid_iterator input_bids, unsigned_type _n, unsigned_type _m, key_extractor keyobj)
00549 {
00550 typedef typename block_type::value_type type;
00551 typedef typename key_extractor::key_type key_type;
00552 typedef typename block_type::bid_type bid_type;
00553 typedef trigger_entry<bid_type, typename key_extractor::key_type> trigger_entry_type;
00554 typedef simple_vector<trigger_entry_type> run_type;
00555 typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
00556
00557 unsigned_type m2 = div_and_round_up(_m, 2);
00558 const unsigned_type m2_rf = m2 * block_type::raw_size /
00559 (block_type::raw_size + block_type::size * sizeof(type_key<type, key_type>));
00560 STXXL_VERBOSE("Reducing number of blocks in a run from " << m2 << " to " <<
00561 m2_rf << " due to key size: " << sizeof(typename key_extractor::key_type) << " bytes");
00562 m2 = m2_rf;
00563 unsigned_type full_runs = _n / m2;
00564 unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
00565 unsigned_type nruns = full_runs + partial_runs;
00566 unsigned_type i;
00567
00568 config * cfg = config::get_instance();
00569 block_manager * mng = block_manager::get_instance();
00570 int ndisks = cfg->disks_number();
00571
00572 STXXL_VERBOSE("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs);
00573
00574 double begin = timestamp(), after_runs_creation, end;
00575
00576 run_type ** runs = new run_type *[nruns];
00577
00578 for (i = 0; i < full_runs; i++)
00579 runs[i] = new run_type(m2);
00580
00581
00582 #ifdef INTERLEAVED_ALLOC
00583 if (partial_runs)
00584 {
00585 unsigned_type last_run_size = _n - full_runs * m2;
00586 runs[i] = new run_type(last_run_size);
00587
00588 mng->new_blocks(interleaved_alloc_strategy(nruns, 0, ndisks),
00589 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>
00590 (runs, 0, nruns, last_run_size),
00591 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>
00592 (runs, _n, nruns, last_run_size));
00593 }
00594 else
00595 mng->new_blocks(interleaved_alloc_strategy(nruns, 0, ndisks),
00596 RunsToBIDArrayAdaptor<block_type::raw_size, run_type>
00597 (runs, 0, nruns),
00598 RunsToBIDArrayAdaptor<block_type::raw_size, run_type>
00599 (runs, _n, nruns));
00600
00601 #else
00602 if (partial_runs)
00603 runs[i] = new run_type(_n - full_runs * m2);
00604
00605
00606 for (i = 0; i < nruns; i++)
00607 {
00608 mng->new_blocks(alloc_strategy(0, ndisks),
00609 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->begin()),
00610 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->end()));
00611 }
00612 #endif
00613
00614 create_runs<block_type,
00615 run_type,
00616 input_bid_iterator,
00617 key_extractor>(input_bids, runs, nruns, m2, keyobj);
00618
00619 after_runs_creation = timestamp();
00620
00621 double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
00622
00623 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00624
00625
00626
00627 const int_type merge_factor = static_cast<int_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) / log(double(_m))))));
00628 run_type ** new_runs;
00629
00630 while (nruns > 1)
00631 {
00632 int_type new_nruns = div_and_round_up(nruns, merge_factor);
00633 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
00634 " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
00635
00636 new_runs = new run_type *[new_nruns];
00637
00638 int_type runs_left = nruns;
00639 int_type cur_out_run = 0;
00640 int_type blocks_in_new_run = 0;
00641
00642 while (runs_left > 0)
00643 {
00644 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00645 blocks_in_new_run = 0;
00646 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
00647 blocks_in_new_run += runs[i]->size();
00648
00649
00650 new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
00651 runs_left -= runs2merge;
00652 }
00653
00654 if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && (input_bids->storage->get_id() == -1))
00655 {
00656
00657 input_bid_iterator cur = input_bids;
00658 for (int_type i = 0; cur != (input_bids + _n); ++cur)
00659 {
00660 (*new_runs[0])[i++].bid = *cur;
00661 }
00662
00663 bid_type & firstBID = (*new_runs[0])[0].bid;
00664 if (firstBID.storage->get_id() != -1)
00665 {
00666
00667
00668 mng->new_blocks(FR(), &firstBID, (&firstBID) + 1);
00669 }
00670 bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
00671 if (lastBID.storage->get_id() != -1)
00672 {
00673
00674
00675 mng->new_blocks(FR(), &lastBID, (&lastBID) + 1);
00676 }
00677 }
00678 else
00679 {
00680 mng->new_blocks(interleaved_alloc_strategy(new_nruns, 0, ndisks),
00681 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
00682 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
00683 }
00684
00685
00686
00687 runs_left = nruns;
00688 cur_out_run = 0;
00689 while (runs_left > 0)
00690 {
00691 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00692 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00693 assert((check_ksorted_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left, runs2merge, m2, keyobj)));
00694 #endif
00695 STXXL_VERBOSE("Merging " << runs2merge << " runs");
00696 merge_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left,
00697 runs2merge, *(new_runs + (cur_out_run++)), _m, keyobj);
00698 runs_left -= runs2merge;
00699 }
00700
00701 nruns = new_nruns;
00702 delete[] runs;
00703 runs = new_runs;
00704 }
00705
00706 run_type * result = *runs;
00707 delete[] runs;
00708
00709 end = timestamp();
00710
00711 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Run creation time: " <<
00712 after_runs_creation - begin << " s");
00713 STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
00714 STXXL_VERBOSE(*stats::get_instance());
00715 UNUSED(begin + io_wait_after_rf);
00716
00717 return result;
00718 }
00719 }
00720
00721
00757
00758
00759
00760
00761
00762
00763
00764 template <typename ExtIterator_, typename KeyExtractor_>
00765 void ksort(ExtIterator_ first_, ExtIterator_ last_, KeyExtractor_ keyobj, unsigned_type M__)
00766 {
00767 typedef simple_vector<ksort_local::trigger_entry<typename ExtIterator_::bid_type,
00768 typename KeyExtractor_::key_type> > run_type;
00769 typedef typename ExtIterator_::vector_type::value_type value_type;
00770 typedef typename ExtIterator_::block_type block_type;
00771
00772
00773 unsigned_type n = 0;
00774 block_manager * mng = block_manager::get_instance();
00775
00776 first_.flush();
00777
00778 if ((last_ - first_) * sizeof(value_type) < M__)
00779 {
00780 stl_in_memory_sort(first_, last_, ksort_local::key_comparison<value_type, KeyExtractor_>(keyobj));
00781 }
00782 else
00783 {
00784 assert(2 * block_type::raw_size <= M__);
00785
00786 if (first_.block_offset())
00787 {
00788 if (last_.block_offset())
00789
00790 {
00791 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00792 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00793 typename ExtIterator_::bid_type first_bid, last_bid;
00794 request_ptr req;
00795
00796 req = first_block->read(*first_.bid());
00797 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1);
00798 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
00799 req->wait();
00800
00801
00802 req = last_block->read(*last_.bid());
00803
00804 unsigned_type i = 0;
00805 for ( ; i < first_.block_offset(); i++)
00806 {
00807 first_block->elem[i] = keyobj.min_value();
00808 }
00809
00810 req->wait();
00811
00812 req = first_block->write(first_bid);
00813 for (i = last_.block_offset(); i < block_type::size; i++)
00814 {
00815 last_block->elem[i] = keyobj.max_value();
00816 }
00817
00818 req->wait();
00819
00820 req = last_block->write(last_bid);
00821
00822 n = last_.bid() - first_.bid() + 1;
00823
00824 std::swap(first_bid, *first_.bid());
00825 std::swap(last_bid, *last_.bid());
00826
00827 req->wait();
00828
00829 delete first_block;
00830 delete last_block;
00831
00832 run_type * out =
00833 ksort_local::ksort_blocks<
00834 typename ExtIterator_::block_type,
00835 typename ExtIterator_::vector_type::alloc_strategy,
00836 typename ExtIterator_::bids_container_iterator,
00837 KeyExtractor_>
00838 (first_.bid(), n, M__ / block_type::raw_size, keyobj);
00839
00840
00841 first_block = new typename ExtIterator_::block_type;
00842 last_block = new typename ExtIterator_::block_type;
00843 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00844 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
00845 request_ptr * reqs = new request_ptr[2];
00846
00847 reqs[0] = first_block->read(first_bid);
00848 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00849 wait_all(reqs, 2);
00850
00851 reqs[0] = last_block->read(last_bid);
00852 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
00853
00854 for (i = first_.block_offset(); i < block_type::size; i++)
00855 {
00856 first_block->elem[i] = sorted_first_block->elem[i];
00857 }
00858 wait_all(reqs, 2);
00859
00860 req = first_block->write(first_bid);
00861
00862 for (i = 0; i < last_.block_offset(); i++)
00863 {
00864 last_block->elem[i] = sorted_last_block->elem[i];
00865 }
00866
00867 req->wait();
00868
00869
00870 req = last_block->write(last_bid);
00871
00872 mng->delete_block(out->begin()->bid);
00873 mng->delete_block((*out)[out->size() - 1].bid);
00874
00875 *first_.bid() = first_bid;
00876 *last_.bid() = last_bid;
00877
00878 typename run_type::iterator it = out->begin();
00879 it++;
00880 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
00881 cur_bid++;
00882
00883 for ( ; cur_bid != last_.bid(); cur_bid++, it++)
00884 {
00885 *cur_bid = (*it).bid;
00886 }
00887
00888 delete first_block;
00889 delete sorted_first_block;
00890 delete sorted_last_block;
00891 delete[] reqs;
00892 delete out;
00893
00894 req->wait();
00895
00896 delete last_block;
00897 }
00898 else
00899 {
00900
00901
00902
00903 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00904 typename ExtIterator_::bid_type first_bid;
00905 request_ptr req;
00906
00907 req = first_block->read(*first_.bid());
00908 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1);
00909 req->wait();
00910
00911
00912 unsigned_type i = 0;
00913 for ( ; i < first_.block_offset(); i++)
00914 {
00915 first_block->elem[i] = keyobj.min_value();
00916 }
00917
00918 req = first_block->write(first_bid);
00919
00920 n = last_.bid() - first_.bid();
00921
00922 std::swap(first_bid, *first_.bid());
00923
00924 req->wait();
00925
00926 delete first_block;
00927
00928 run_type * out =
00929 ksort_local::ksort_blocks<
00930 typename ExtIterator_::block_type,
00931 typename ExtIterator_::vector_type::alloc_strategy,
00932 typename ExtIterator_::bids_container_iterator,
00933 KeyExtractor_>
00934 (first_.bid(), n, M__ / block_type::raw_size, keyobj);
00935
00936
00937 first_block = new typename ExtIterator_::block_type;
00938
00939 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00940
00941 request_ptr * reqs = new request_ptr[2];
00942
00943 reqs[0] = first_block->read(first_bid);
00944 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00945 wait_all(reqs, 2);
00946
00947 for (i = first_.block_offset(); i < block_type::size; i++)
00948 {
00949 first_block->elem[i] = sorted_first_block->elem[i];
00950 }
00951
00952 req = first_block->write(first_bid);
00953
00954 mng->delete_block(out->begin()->bid);
00955
00956 *first_.bid() = first_bid;
00957
00958 typename run_type::iterator it = out->begin();
00959 it++;
00960 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
00961 cur_bid++;
00962
00963 for ( ; cur_bid != last_.bid(); cur_bid++, it++)
00964 {
00965 *cur_bid = (*it).bid;
00966 }
00967
00968 *cur_bid = (*it).bid;
00969
00970 delete sorted_first_block;
00971 delete[] reqs;
00972 delete out;
00973
00974 req->wait();
00975
00976 delete first_block;
00977 }
00978 }
00979 else
00980 {
00981 if (last_.block_offset())
00982
00983 {
00984 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00985 typename ExtIterator_::bid_type last_bid;
00986 request_ptr req;
00987 unsigned_type i;
00988
00989 req = last_block->read(*last_.bid());
00990 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
00991 req->wait();
00992
00993 for (i = last_.block_offset(); i < block_type::size; i++)
00994 {
00995 last_block->elem[i] = keyobj.max_value();
00996 }
00997
00998 req = last_block->write(last_bid);
00999
01000 n = last_.bid() - first_.bid() + 1;
01001
01002 std::swap(last_bid, *last_.bid());
01003
01004 req->wait();
01005
01006 delete last_block;
01007
01008 run_type * out =
01009 ksort_local::ksort_blocks<
01010 typename ExtIterator_::block_type,
01011 typename ExtIterator_::vector_type::alloc_strategy,
01012 typename ExtIterator_::bids_container_iterator,
01013 KeyExtractor_>
01014 (first_.bid(), n, M__ / block_type::raw_size, keyobj);
01015
01016
01017 last_block = new typename ExtIterator_::block_type;
01018 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
01019 request_ptr * reqs = new request_ptr[2];
01020
01021 reqs[0] = last_block->read(last_bid);
01022 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
01023 wait_all(reqs, 2);
01024
01025 for (i = 0; i < last_.block_offset(); i++)
01026 {
01027 last_block->elem[i] = sorted_last_block->elem[i];
01028 }
01029
01030 req = last_block->write(last_bid);
01031
01032 mng->delete_block((*out)[out->size() - 1].bid);
01033
01034 *last_.bid() = last_bid;
01035
01036 typename run_type::iterator it = out->begin();
01037 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
01038
01039 for ( ; cur_bid != last_.bid(); cur_bid++, it++)
01040 {
01041 *cur_bid = (*it).bid;
01042 }
01043
01044 delete sorted_last_block;
01045 delete[] reqs;
01046 delete out;
01047
01048 req->wait();
01049
01050 delete last_block;
01051 }
01052 else
01053 {
01054
01055 n = last_.bid() - first_.bid();
01056
01057 run_type * out =
01058 ksort_local::ksort_blocks<
01059 typename ExtIterator_::block_type,
01060 typename ExtIterator_::vector_type::alloc_strategy,
01061 typename ExtIterator_::bids_container_iterator,
01062 KeyExtractor_>
01063 (first_.bid(), n, M__ / block_type::raw_size, keyobj);
01064
01065 typename run_type::iterator it = out->begin();
01066 typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
01067
01068 for ( ; cur_bid != last_.bid(); cur_bid++, it++)
01069 {
01070 *cur_bid = (*it).bid;
01071 }
01072
01073 delete out;
01074 }
01075 }
01076 }
01077
01078 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01079 assert(stxxl::is_sorted(first_, last_, ksort_local::key_comparison<value_type, KeyExtractor_>()));
01080 #endif
01081 }
01082
01083 template <typename record_type>
01084 struct ksort_defaultkey
01085 {
01086 typedef typename record_type::key_type key_type;
01087 key_type operator () (const record_type & obj) const
01088 {
01089 return obj.key();
01090 }
01091 record_type max_value() const
01092 {
01093 return record_type::max_value();
01094 }
01095 record_type min_value() const
01096 {
01097 return record_type::min_value();
01098 }
01099 };
01100
01101
01108
01117 template <typename ExtIterator_>
01118 void ksort(ExtIterator_ first_, ExtIterator_ last_, unsigned_type M__)
01119 {
01120 ksort(first_, last_,
01121 ksort_defaultkey<typename ExtIterator_::vector_type::value_type>(), M__);
01122 }
01123
01124
01126
01127 __STXXL_END_NAMESPACE
01128
01129 #endif // !STXXL_KSORT_HEADER
01130