00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef STXXL_STABLE_KSORT_HEADER
00014 #define STXXL_STABLE_KSORT_HEADER
00015
00016
00017
00018
00019
00020 #include <cmath>
00021
00022 #include <stxxl/bits/mng/mng.h>
00023 #include <stxxl/bits/mng/buf_istream.h>
00024 #include <stxxl/bits/mng/buf_ostream.h>
00025 #include <stxxl/bits/common/simple_vector.h>
00026 #include <stxxl/bits/algo/intksort.h>
00027
00028 #ifndef STXXL_VERBOSE_STABLE_KSORT
00029 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1
00030 #endif
00031
00032
00033 __STXXL_BEGIN_NAMESPACE
00034
00037
00040 namespace stable_ksort_local
00041 {
00042 template <class type_, class type_key>
00043 void classify_block(type_ * begin, type_ * end, type_key * & out, int_type * bucket, unsigned_type offset, unsigned shift)
00044 {
00045 for (type_ * p = begin; p < end; p++, out++)
00046 {
00047 out->ptr = p;
00048 typename type_::key_type key = p->key();
00049 int_type ibucket = (key - offset) >> shift;
00050 out->key = key;
00051 bucket[ibucket]++;
00052 }
00053 }
00054
00055 template <typename type>
00056 struct type_key
00057 {
00058 typedef typename type::key_type key_type;
00059 key_type key;
00060 type * ptr;
00061
00062 type_key() { }
00063 type_key(key_type k, type * p) : key(k), ptr(p)
00064 { }
00065 };
00066
00067 template <typename type>
00068 bool operator < (const type_key<type> & a, const type_key<type> & b)
00069 {
00070 return a.key < b.key;
00071 }
00072
00073 template <typename type>
00074 bool operator > (const type_key<type> & a, const type_key<type> & b)
00075 {
00076 return a.key > b.key;
00077 }
00078
00079
00080 template <typename BIDType_, typename AllocStrategy_>
00081 class bid_sequence
00082 {
00083 public:
00084 typedef BIDType_ bid_type;
00085 typedef bid_type & reference;
00086 typedef AllocStrategy_ alloc_strategy;
00087 typedef typename simple_vector<bid_type>::size_type size_type;
00088 typedef typename simple_vector<bid_type>::iterator iterator;
00089
00090 protected:
00091 simple_vector<bid_type> * bids;
00092 alloc_strategy alloc_strategy_;
00093
00094 public:
00095 bid_sequence() { }
00096 bid_sequence(size_type size_)
00097 {
00098 bids = new simple_vector<bid_type>(size_);
00099 block_manager * mng = block_manager::get_instance();
00100 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
00101 }
00102 void init(size_type size_)
00103 {
00104 bids = new simple_vector<bid_type>(size_);
00105 block_manager * mng = block_manager::get_instance();
00106 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
00107 }
00108 reference operator [] (size_type i)
00109 {
00110 size_type size_ = size();
00111 if (i < size_)
00112 return *(bids->begin() + i);
00113
00114 block_manager * mng = block_manager::get_instance();
00115 simple_vector<bid_type> * larger_bids = new simple_vector<bid_type>((i + 1) * 2);
00116 std::copy(bids->begin(), bids->end(), larger_bids->begin());
00117 mng->new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end());
00118 delete bids;
00119 bids = larger_bids;
00120 return *(larger_bids->begin() + i);
00121 }
00122 size_type size() { return bids->size(); }
00123 iterator begin() { return bids->begin(); }
00124 ~bid_sequence()
00125 {
00126 block_manager::get_instance()->delete_blocks(bids->begin(), bids->end());
00127 delete bids;
00128 }
00129 };
00130
00131 template <typename ExtIterator_>
00132 void distribute(
00133 bid_sequence<typename ExtIterator_::vector_type::block_type::bid_type,
00134 typename ExtIterator_::vector_type::alloc_strategy> * bucket_bids,
00135 int64 * bucket_sizes,
00136 const int_type nbuckets,
00137 const int_type lognbuckets,
00138 ExtIterator_ first,
00139 ExtIterator_ last,
00140 const int_type nread_buffers,
00141 const int_type nwrite_buffers)
00142 {
00143 typedef typename ExtIterator_::vector_type::value_type value_type;
00144 typedef typename value_type::key_type key_type;
00145 typedef typename ExtIterator_::block_type block_type;
00146 typedef typename block_type::bid_type bid_type;
00147 typedef buf_istream<typename ExtIterator_::block_type,
00148 typename ExtIterator_::bids_container_iterator> buf_istream_type;
00149
00150 int_type i = 0;
00151
00152 buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0),
00153 nread_buffers);
00154
00155 buffered_writer<block_type> out(
00156 nbuckets + nwrite_buffers,
00157 nwrite_buffers);
00158
00159 unsigned_type * bucket_block_offsets = new unsigned_type[nbuckets];
00160 unsigned_type * bucket_iblock = new unsigned_type[nbuckets];
00161 block_type ** bucket_blocks = new block_type *[nbuckets];
00162
00163 std::fill(bucket_sizes, bucket_sizes + nbuckets, 0);
00164 std::fill(bucket_iblock, bucket_iblock + nbuckets, 0);
00165 std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0);
00166
00167 for (i = 0; i < nbuckets; i++)
00168 bucket_blocks[i] = out.get_free_block();
00169
00170
00171 ExtIterator_ cur = first - first.block_offset();
00172
00173
00174 for ( ; cur != first; cur++)
00175 ++in;
00176
00177
00178 const int_type shift = sizeof(key_type) * 8 - lognbuckets;
00179
00180 STXXL_VERBOSE_STABLE_KSORT("Shift by: " << shift << " bits, lognbuckets: " << lognbuckets);
00181 for ( ; cur != last; cur++)
00182 {
00183 key_type cur_key = in.current().key();
00184 int_type ibucket = cur_key >> shift;
00185
00186 int_type block_offset = bucket_block_offsets[ibucket];
00187 in >> (bucket_blocks[ibucket]->elem[block_offset++]);
00188 if (block_offset == block_type::size)
00189 {
00190 block_offset = 0;
00191 int_type iblock = bucket_iblock[ibucket]++;
00192 bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]);
00193 }
00194 bucket_block_offsets[ibucket] = block_offset;
00195 }
00196 for (i = 0; i < nbuckets; i++)
00197 {
00198 if (bucket_block_offsets[i])
00199 {
00200 out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]);
00201 }
00202 bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] +
00203 bucket_block_offsets[i];
00204 STXXL_VERBOSE_STABLE_KSORT("Bucket " << i << " has size " << bucket_sizes[i] <<
00205 ", estimated size: " << ((last - first) / int64(nbuckets)));
00206 }
00207
00208 delete[] bucket_blocks;
00209 delete[] bucket_block_offsets;
00210 delete[] bucket_iblock;
00211 }
00212 }
00213
00214 template <typename ExtIterator_>
00215 void stable_ksort(ExtIterator_ first, ExtIterator_ last, unsigned_type M)
00216 {
00217 STXXL_MSG("Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,(std::numeric_limits<key_type>::max)()]");
00218 typedef typename ExtIterator_::vector_type::value_type value_type;
00219 typedef typename value_type::key_type key_type;
00220 typedef typename ExtIterator_::block_type block_type;
00221 typedef typename block_type::bid_type bid_type;
00222 typedef typename ExtIterator_::vector_type::alloc_strategy alloc_strategy;
00223 typedef stable_ksort_local::bid_sequence<bid_type, alloc_strategy> bucket_bids_type;
00224 typedef stable_ksort_local::type_key<value_type> type_key_;
00225
00226 first.flush();
00227
00228 double begin = timestamp();
00229
00230 unsigned_type i = 0;
00231 config * cfg = config::get_instance();
00232 const unsigned_type m = M / block_type::raw_size;
00233 assert(2 * block_type::raw_size <= M);
00234 const unsigned_type write_buffers_multiple = 2;
00235 const unsigned_type read_buffers_multiple = 2;
00236 const unsigned_type ndisks = cfg->disks_number();
00237 const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks;
00238 const unsigned_type nmaxbuckets = m - min_num_read_write_buffers;
00239 const unsigned_type lognbuckets = static_cast<unsigned_type>(log2(double(nmaxbuckets)));
00240 const unsigned_type nbuckets = 1 << lognbuckets;
00241 const unsigned_type est_bucket_size = div_and_round_up((last - first) / int64(nbuckets),
00242 int64(block_type::size));
00243
00244 if (m < min_num_read_write_buffers + 2 || nbuckets < 2) {
00245 STXXL_ERRMSG("stxxl::stable_ksort: Not enough memory. Blocks available: " << m <<
00246 ", required for r/w buffers: " << min_num_read_write_buffers <<
00247 ", required for buckets: 2, nbuckets: " << nbuckets);
00248 abort();
00249 }
00250 STXXL_VERBOSE_STABLE_KSORT("Elements to sort: " << (last - first));
00251 STXXL_VERBOSE_STABLE_KSORT("Number of buckets has to be reduced from " << nmaxbuckets << " to " << nbuckets);
00252 const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
00253 const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
00254
00255 STXXL_VERBOSE_STABLE_KSORT("Read buffers in distribution phase: " << nread_buffers);
00256 STXXL_VERBOSE_STABLE_KSORT("Write buffers in distribution phase: " << nwrite_buffers);
00257
00258 bucket_bids_type * bucket_bids = new bucket_bids_type[nbuckets];
00259 for (i = 0; i < nbuckets; ++i)
00260 bucket_bids[i].init(est_bucket_size);
00261
00262 int64 * bucket_sizes = new int64[nbuckets];
00263
00264 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00265
00266 stable_ksort_local::distribute(
00267 bucket_bids,
00268 bucket_sizes,
00269 nbuckets,
00270 lognbuckets,
00271 first,
00272 last,
00273 nread_buffers,
00274 nwrite_buffers);
00275
00276 double dist_end = timestamp(), end;
00277 double io_wait_after_d = stats::get_instance()->get_io_wait_time();
00278
00279 {
00280
00281 unsigned_type write_buffers_multiple_bs = 2;
00282 unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2;
00283 int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size;
00284 int64 max_bucket_size_act = 0;
00285
00286
00287 for (i = 0; i < nbuckets; i++)
00288 {
00289 max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act);
00290 if (bucket_sizes[i] > max_bucket_size_rec)
00291 {
00292 STXXL_ERRMSG("Bucket " << i << " is too large: " << bucket_sizes[i] <<
00293 " records, maximum: " << max_bucket_size_rec);
00294 STXXL_ERRMSG("Recursion on buckets is not yet implemented, aborting.");
00295 abort();
00296 }
00297 }
00298
00299
00300 const int_type max_bucket_size_act_bl = div_and_round_up(max_bucket_size_act, block_type::size);
00301 STXXL_VERBOSE_STABLE_KSORT("Reducing required number of required blocks per bucket from " <<
00302 max_bucket_size_bl << " to " << max_bucket_size_act_bl);
00303 max_bucket_size_rec = max_bucket_size_act;
00304 max_bucket_size_bl = max_bucket_size_act_bl;
00305 const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl;
00306 STXXL_VERBOSE_STABLE_KSORT("Write buffers in bucket sorting phase: " << nwrite_buffers_bs);
00307
00308 typedef buf_ostream<block_type, typename ExtIterator_::bids_container_iterator> buf_ostream_type;
00309 buf_ostream_type out(first.bid(), nwrite_buffers_bs);
00310
00311 disk_queues::get_instance()->set_priority_op(disk_queue::READ);
00312
00313 if (first.block_offset())
00314 {
00315
00316 block_type * block = new block_type;
00317 request_ptr req;
00318 req = block->read(*first.bid());
00319 req->wait();
00320
00321 for (i = 0; i < first.block_offset(); i++)
00322 {
00323 out << block->elem[i];
00324 }
00325 delete block;
00326 }
00327 block_type * blocks1 = new block_type[max_bucket_size_bl];
00328 block_type * blocks2 = new block_type[max_bucket_size_bl];
00329 request_ptr * reqs1 = new request_ptr[max_bucket_size_bl];
00330 request_ptr * reqs2 = new request_ptr[max_bucket_size_bl];
00331 type_key_ * refs1 = new type_key_[max_bucket_size_rec];
00332 type_key_ * refs2 = new type_key_[max_bucket_size_rec];
00333
00334
00335 unsigned_type nbucket_blocks = div_and_round_up(bucket_sizes[0], block_type::size);
00336 for (i = 0; i < nbucket_blocks; i++)
00337 reqs1[i] = blocks1[i].read(bucket_bids[0][i]);
00338
00339
00340 nbucket_blocks = div_and_round_up(bucket_sizes[1], block_type::size);
00341 for (i = 0; i < nbucket_blocks; i++)
00342 reqs2[i] = blocks2[i].read(bucket_bids[1][i]);
00343
00344
00345 key_type offset = 0;
00346 const unsigned log_k1 =
00347 (std::max)(static_cast<unsigned>(ceil(log2(double(
00348 max_bucket_size_rec * sizeof(type_key_) / STXXL_L2_SIZE)))), 1U);
00349 unsigned_type k1 = 1 << log_k1;
00350 int_type * bucket1 = new int_type[k1];
00351
00352 const unsigned shift = sizeof(key_type) * 8 - lognbuckets;
00353 const unsigned shift1 = shift - log_k1;
00354
00355 STXXL_VERBOSE_STABLE_KSORT("Classifying " << nbuckets << " buckets, max size:" << max_bucket_size_rec <<
00356 " block size:" << block_type::size << " log_k1:" << log_k1);
00357
00358 for (unsigned_type k = 0; k < nbuckets; k++)
00359 {
00360 nbucket_blocks = div_and_round_up(bucket_sizes[k], block_type::size);
00361 const unsigned log_k1_k =
00362 (std::max)(static_cast<unsigned>(ceil(log2(
00363 double(bucket_sizes[k] * sizeof(type_key_) / STXXL_L2_SIZE)))), 1U);
00364 assert(log_k1_k <= log_k1);
00365 k1 = 1 << log_k1_k;
00366 std::fill(bucket1, bucket1 + k1, 0);
00367
00368 STXXL_VERBOSE_STABLE_KSORT("Classifying bucket " << k << " size:" << bucket_sizes[k] <<
00369 " blocks:" << nbucket_blocks << " log_k1:" << log_k1_k);
00370
00371 type_key_ * ref_ptr = refs1;
00372 key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k);
00373 for (i = 0; i < nbucket_blocks - 1; i++)
00374 {
00375 reqs1[i]->wait();
00376 stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(), ref_ptr, bucket1, offset1, shift1 );
00377 }
00378
00379 const unsigned_type last_block_size = bucket_sizes[k] - int64(nbucket_blocks - 1) * block_type::size;
00380 reqs1[i]->wait();
00381
00382
00383
00384 classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size, ref_ptr, bucket1, offset1, shift1);
00385
00386 exclusive_prefix_sum(bucket1, k1);
00387 classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1);
00388
00389 type_key_ * c = refs2;
00390 type_key_ * d = refs1;
00391 for (i = 0; i < k1; i++)
00392 {
00393 type_key_ * cEnd = refs2 + bucket1[i];
00394 type_key_ * dEnd = refs1 + bucket1[i];
00395
00396 const unsigned log_k2 = static_cast<unsigned>(log2(double(bucket1[i]))) - 1;
00397 const unsigned_type k2 = 1 << log_k2;
00398 int_type * bucket2 = new int_type[k2];
00399 const unsigned shift2 = shift1 - log_k2;
00400
00401
00402 l1sort(c, cEnd, d, bucket2, k2,
00403 offset1 + (key_type(1) << key_type(shift1)) * key_type(i),
00404 shift2);
00405
00406
00407 for (type_key_ * p = d; p < dEnd; p++)
00408 out << (*(p->ptr));
00409
00410
00411 delete[] bucket2;
00412 c = cEnd;
00413 d = dEnd;
00414 }
00415
00416 const unsigned_type bucket2submit = k + 2;
00417 if (bucket2submit < nbuckets)
00418 {
00419 nbucket_blocks = div_and_round_up(bucket_sizes[bucket2submit], block_type::size);
00420 for (i = 0; i < nbucket_blocks; i++)
00421 reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]);
00422 }
00423
00424 std::swap(blocks1, blocks2);
00425 std::swap(reqs1, reqs2);
00426 }
00427
00428 delete[] bucket1;
00429 delete[] refs1;
00430 delete[] refs2;
00431 delete[] blocks1;
00432 delete[] blocks2;
00433 delete[] reqs1;
00434 delete[] reqs2;
00435 delete[] bucket_bids;
00436 delete[] bucket_sizes;
00437
00438 if (last.block_offset())
00439 {
00440
00441 block_type * block = new block_type;
00442 request_ptr req = block->read(*last.bid());
00443 req->wait();
00444
00445 for (i = last.block_offset(); i < block_type::size; i++)
00446 {
00447 out << block->elem[i];
00448 }
00449 delete block;
00450 }
00451
00452 end = timestamp();
00453 }
00454
00455 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Distribution time: " <<
00456 dist_end - begin << " s");
00457 STXXL_VERBOSE("Time in I/O wait(ds): " << io_wait_after_d << " s");
00458 STXXL_VERBOSE(*stats::get_instance());
00459 UNUSED(begin + dist_end + io_wait_after_d);
00460 }
00461
00463
00464 __STXXL_END_NAMESPACE
00465
00466 #endif // !STXXL_STABLE_KSORT_HEADER