00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef STXXL_QUEUE_HEADER
00014 #define STXXL_QUEUE_HEADER
00015
00016 #include <vector>
00017 #include <queue>
00018 #include <deque>
00019
00020 #include <stxxl/bits/mng/mng.h>
00021 #include <stxxl/bits/mng/typed_block.h>
00022 #include <stxxl/bits/common/simple_vector.h>
00023 #include <stxxl/bits/common/tmeta.h>
00024 #include <stxxl/bits/mng/read_write_pool.h>
00025 #include <stxxl/bits/mng/write_pool.h>
00026 #include <stxxl/bits/mng/prefetch_pool.h>
00027
00028
00029 __STXXL_BEGIN_NAMESPACE
00030
00031 #ifndef STXXL_VERBOSE_QUEUE
00032 #define STXXL_VERBOSE_QUEUE STXXL_VERBOSE2
00033 #endif
00034
00037
00038
00040
00046 template <class ValTp,
00047 unsigned BlkSz = STXXL_DEFAULT_BLOCK_SIZE(ValTp),
00048 class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY,
00049 class SzTp = stxxl::uint64>
00050 class queue : private noncopyable
00051 {
00052 public:
00053 typedef ValTp value_type;
00054 typedef AllocStr alloc_strategy_type;
00055 typedef SzTp size_type;
00056 enum {
00057 block_size = BlkSz
00058 };
00059
00060 typedef typed_block<block_size, value_type> block_type;
00061 typedef BID<block_size> bid_type;
00062
00063 private:
00064 typedef read_write_pool<block_type> pool_type;
00065
00066 size_type size_;
00067 bool delete_pool;
00068 pool_type * pool;
00069 block_type * front_block;
00070 block_type * back_block;
00071 value_type * front_element;
00072 value_type * back_element;
00073 alloc_strategy_type alloc_strategy;
00074 unsigned_type alloc_count;
00075 std::deque<bid_type> bids;
00076 block_manager * bm;
00077 unsigned_type blocks2prefetch;
00078
00079 public:
00081
00086 explicit queue(unsigned_type w_pool_size = 3, unsigned_type p_pool_size = 1, int blocks2prefetch_ = -1) :
00087 size_(0),
00088 delete_pool(true),
00089 alloc_count(0),
00090 bm(block_manager::get_instance())
00091 {
00092 STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(sizes)");
00093 pool = new pool_type(p_pool_size, w_pool_size);
00094 init(blocks2prefetch_);
00095 }
00096
00098
00105 _STXXL_DEPRECATED(
00106 queue(write_pool<block_type> & w_pool, prefetch_pool<block_type> & p_pool, int blocks2prefetch_ = -1)) :
00107 size_(0),
00108 delete_pool(true),
00109 alloc_count(0),
00110 bm(block_manager::get_instance())
00111 {
00112 STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(pools)");
00113 pool = new pool_type(w_pool, p_pool);
00114 init(blocks2prefetch_);
00115 }
00116
00118
00124 queue(pool_type & pool_, int blocks2prefetch_ = -1) :
00125 size_(0),
00126 delete_pool(false),
00127 pool(&pool_),
00128 alloc_count(0),
00129 bm(block_manager::get_instance())
00130 {
00131 STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(pool)");
00132 init(blocks2prefetch_);
00133 }
00134
00135 private:
00136 void init(int blocks2prefetch_)
00137 {
00138 if (pool->size_write() < 2) {
00139 STXXL_ERRMSG("queue: invalid configuration, not enough blocks (" << pool->size_write() <<
00140 ") in write pool, at least 2 are needed, resizing to 3");
00141 pool->resize_write(3);
00142 }
00143
00144 if (pool->size_write() < 3) {
00145 STXXL_MSG("queue: inefficient configuration, no blocks for buffered writing available");
00146 }
00147
00148 if (pool->size_prefetch() < 1) {
00149 STXXL_MSG("queue: inefficient configuration, no blocks for prefetching available");
00150 }
00151
00152 front_block = back_block = pool->steal();
00153 back_element = back_block->elem - 1;
00154 front_element = back_block->elem;
00155 set_prefetch_aggr(blocks2prefetch_);
00156 }
00157
00158 public:
00163 void set_prefetch_aggr(int_type blocks2prefetch_)
00164 {
00165 if (blocks2prefetch_ < 0)
00166 blocks2prefetch = pool->size_prefetch();
00167 else
00168 blocks2prefetch = blocks2prefetch_;
00169 }
00170
00172 unsigned_type get_prefetch_aggr() const
00173 {
00174 return blocks2prefetch;
00175 }
00176
00178 void push(const value_type & val)
00179 {
00180 if (back_element == back_block->elem + (block_type::size - 1))
00181 {
00182
00183 if (front_block == back_block)
00184 {
00185
00186 STXXL_VERBOSE1("queue::push Case 1");
00187 }
00188 else
00189 {
00190 STXXL_VERBOSE1("queue::push Case 2");
00191
00192
00193 bid_type newbid;
00194
00195 bm->new_block(alloc_strategy, newbid, alloc_count++);
00196
00197 STXXL_VERBOSE_QUEUE("queue[" << this << "]: push block " << back_block << " @ " << FMT_BID(newbid));
00198 bids.push_back(newbid);
00199 pool->write(back_block, newbid);
00200 if (bids.size() <= blocks2prefetch) {
00201 STXXL_VERBOSE1("queue::push Case Hints");
00202 pool->hint(newbid);
00203 }
00204 }
00205 back_block = pool->steal();
00206
00207 back_element = back_block->elem;
00208 *back_element = val;
00209 ++size_;
00210 return;
00211 }
00212 ++back_element;
00213 *back_element = val;
00214 ++size_;
00215 }
00216
00218 void pop()
00219 {
00220 assert(!empty());
00221
00222 if (front_element == front_block->elem + (block_type::size - 1))
00223 {
00224
00225 if (back_block == front_block)
00226 {
00227 STXXL_VERBOSE1("queue::pop Case 3");
00228 assert(size() == 1);
00229 assert(back_element == front_element);
00230 assert(bids.empty());
00231
00232 back_element = back_block->elem - 1;
00233 front_element = back_block->elem;
00234 size_ = 0;
00235 return;
00236 }
00237
00238 --size_;
00239 if (size_ <= block_type::size)
00240 {
00241 STXXL_VERBOSE1("queue::pop Case 4");
00242 assert(bids.empty());
00243
00244 pool->add(front_block);
00245 front_block = back_block;
00246 front_element = back_block->elem;
00247 return;
00248 }
00249 STXXL_VERBOSE1("queue::pop Case 5");
00250
00251 assert(!bids.empty());
00252 request_ptr req = pool->read(front_block, bids.front());
00253 STXXL_VERBOSE_QUEUE("queue[" << this << "]: pop block " << front_block << " @ " << FMT_BID(bids.front()));
00254
00255
00256 for (unsigned_type i = 0; i < blocks2prefetch && i < bids.size() - 1; ++i)
00257 {
00258 STXXL_VERBOSE1("queue::pop Case Hints");
00259 pool->hint(bids[i + 1]);
00260 }
00261
00262 front_element = front_block->elem;
00263 req->wait();
00264
00265 bm->delete_block(bids.front());
00266 bids.pop_front();
00267 return;
00268 }
00269
00270 ++front_element;
00271 --size_;
00272 }
00273
00275 size_type size() const
00276 {
00277 return size_;
00278 }
00279
00281 bool empty() const
00282 {
00283 return (size_ == 0);
00284 }
00285
00287 value_type & back()
00288 {
00289 assert(!empty());
00290 return *back_element;
00291 }
00292
00294 const value_type & back() const
00295 {
00296 assert(!empty());
00297 return *back_element;
00298 }
00299
00301 value_type & front()
00302 {
00303 assert(!empty());
00304 return *front_element;
00305 }
00306
00308 const value_type & front() const
00309 {
00310 assert(!empty());
00311 return *front_element;
00312 }
00313
00314 ~queue()
00315 {
00316 pool->add(front_block);
00317 if (front_block != back_block)
00318 pool->add(back_block);
00319
00320
00321 if (delete_pool)
00322 {
00323 delete pool;
00324 }
00325
00326 if (!bids.empty())
00327 bm->delete_blocks(bids.begin(), bids.end());
00328 }
00329 };
00330
00332
00333 __STXXL_END_NAMESPACE
00334
00335 #endif // !STXXL_QUEUE_HEADER
00336