00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #ifndef STXXL_QUEUE_HEADER
00015 #define STXXL_QUEUE_HEADER
00016
00017 #include <vector>
00018 #include <queue>
00019 #include <deque>
00020
00021 #include <stxxl/bits/deprecated.h>
00022 #include <stxxl/bits/mng/mng.h>
00023 #include <stxxl/bits/mng/typed_block.h>
00024 #include <stxxl/bits/common/simple_vector.h>
00025 #include <stxxl/bits/common/tmeta.h>
00026 #include <stxxl/bits/mng/read_write_pool.h>
00027 #include <stxxl/bits/mng/write_pool.h>
00028 #include <stxxl/bits/mng/prefetch_pool.h>
00029
00030
00031 __STXXL_BEGIN_NAMESPACE
00032
00033 #ifndef STXXL_VERBOSE_QUEUE
00034 #define STXXL_VERBOSE_QUEUE STXXL_VERBOSE2
00035 #endif
00036
00039
00040
00042
00047 template <class ValTp,
00048 unsigned BlkSz = STXXL_DEFAULT_BLOCK_SIZE(ValTp),
00049 class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY,
00050 class SzTp = stxxl::uint64>
00051 class queue : private noncopyable
00052 {
00053 public:
00054 typedef ValTp value_type;
00055 typedef AllocStr alloc_strategy_type;
00056 typedef SzTp size_type;
00057 enum {
00058 block_size = BlkSz
00059 };
00060
00061 typedef typed_block<block_size, value_type> block_type;
00062 typedef BID<block_size> bid_type;
00063
00064 private:
00065 typedef read_write_pool<block_type> pool_type;
00066
00067 size_type size_;
00068 bool delete_pool;
00069 pool_type * pool;
00070 block_type * front_block;
00071 block_type * back_block;
00072 value_type * front_element;
00073 value_type * back_element;
00074 alloc_strategy_type alloc_strategy;
00075 unsigned_type alloc_count;
00076 std::deque<bid_type> bids;
00077 block_manager * bm;
00078 unsigned_type blocks2prefetch;
00079
00080 public:
00082
00087 explicit queue(unsigned_type w_pool_size = 3, unsigned_type p_pool_size = 1, int blocks2prefetch_ = -1) :
00088 size_(0),
00089 delete_pool(true),
00090 alloc_count(0),
00091 bm(block_manager::get_instance())
00092 {
00093 STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(sizes)");
00094 pool = new pool_type(p_pool_size, w_pool_size);
00095 init(blocks2prefetch_);
00096 }
00097
00099
00106 _STXXL_DEPRECATED(
00107 queue(write_pool<block_type> & w_pool, prefetch_pool<block_type> & p_pool, int blocks2prefetch_ = -1)) :
00108 size_(0),
00109 delete_pool(true),
00110 alloc_count(0),
00111 bm(block_manager::get_instance())
00112 {
00113 STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(pools)");
00114 pool = new pool_type(w_pool, p_pool);
00115 init(blocks2prefetch_);
00116 }
00117
00119
00125 queue(pool_type & pool_, int blocks2prefetch_ = -1) :
00126 size_(0),
00127 delete_pool(false),
00128 pool(&pool_),
00129 alloc_count(0),
00130 bm(block_manager::get_instance())
00131 {
00132 STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(pool)");
00133 init(blocks2prefetch_);
00134 }
00135
00136 private:
00137 void init(int blocks2prefetch_)
00138 {
00139 if (pool->size_write() < 2) {
00140 STXXL_ERRMSG("queue: invalid configuration, not enough blocks (" << pool->size_write() <<
00141 ") in write pool, at least 2 are needed, resizing to 3");
00142 pool->resize_write(3);
00143 }
00144
00145 if (pool->size_write() < 3) {
00146 STXXL_MSG("queue: inefficient configuration, no blocks for buffered writing available");
00147 }
00148
00149 if (pool->size_prefetch() < 1) {
00150 STXXL_MSG("queue: inefficient configuration, no blocks for prefetching available");
00151 }
00152
00153 front_block = back_block = pool->steal();
00154 back_element = back_block->begin() - 1;
00155 front_element = back_block->begin();
00156 set_prefetch_aggr(blocks2prefetch_);
00157 }
00158
00159 public:
00164 void set_prefetch_aggr(int_type blocks2prefetch_)
00165 {
00166 if (blocks2prefetch_ < 0)
00167 blocks2prefetch = pool->size_prefetch();
00168 else
00169 blocks2prefetch = blocks2prefetch_;
00170 }
00171
00173 unsigned_type get_prefetch_aggr() const
00174 {
00175 return blocks2prefetch;
00176 }
00177
00179 void push(const value_type & val)
00180 {
00181 if (back_element == back_block->begin() + (block_type::size - 1))
00182 {
00183
00184 if (front_block == back_block)
00185 {
00186
00187 STXXL_VERBOSE1("queue::push Case 1");
00188 }
00189 else
00190 {
00191 STXXL_VERBOSE1("queue::push Case 2");
00192
00193
00194 bid_type newbid;
00195
00196 bm->new_block(alloc_strategy, newbid, alloc_count++);
00197
00198 STXXL_VERBOSE_QUEUE("queue[" << this << "]: push block " << back_block << " @ " << FMT_BID(newbid));
00199 bids.push_back(newbid);
00200 pool->write(back_block, newbid);
00201 if (bids.size() <= blocks2prefetch) {
00202 STXXL_VERBOSE1("queue::push Case Hints");
00203 pool->hint(newbid);
00204 }
00205 }
00206 back_block = pool->steal();
00207
00208 back_element = back_block->begin();
00209 *back_element = val;
00210 ++size_;
00211 return;
00212 }
00213 ++back_element;
00214 *back_element = val;
00215 ++size_;
00216 }
00217
00219 void pop()
00220 {
00221 assert(!empty());
00222
00223 if (front_element == front_block->begin() + (block_type::size - 1))
00224 {
00225
00226 if (back_block == front_block)
00227 {
00228 STXXL_VERBOSE1("queue::pop Case 3");
00229 assert(size() == 1);
00230 assert(back_element == front_element);
00231 assert(bids.empty());
00232
00233 back_element = back_block->begin() - 1;
00234 front_element = back_block->begin();
00235 size_ = 0;
00236 return;
00237 }
00238
00239 --size_;
00240 if (size_ <= block_type::size)
00241 {
00242 STXXL_VERBOSE1("queue::pop Case 4");
00243 assert(bids.empty());
00244
00245 pool->add(front_block);
00246 front_block = back_block;
00247 front_element = back_block->begin();
00248 return;
00249 }
00250 STXXL_VERBOSE1("queue::pop Case 5");
00251
00252 assert(!bids.empty());
00253 request_ptr req = pool->read(front_block, bids.front());
00254 STXXL_VERBOSE_QUEUE("queue[" << this << "]: pop block " << front_block << " @ " << FMT_BID(bids.front()));
00255
00256
00257 for (unsigned_type i = 0; i < blocks2prefetch && i < bids.size() - 1; ++i)
00258 {
00259 STXXL_VERBOSE1("queue::pop Case Hints");
00260 pool->hint(bids[i + 1]);
00261 }
00262
00263 front_element = front_block->begin();
00264 req->wait();
00265
00266 bm->delete_block(bids.front());
00267 bids.pop_front();
00268 return;
00269 }
00270
00271 ++front_element;
00272 --size_;
00273 }
00274
00276 size_type size() const
00277 {
00278 return size_;
00279 }
00280
00282 bool empty() const
00283 {
00284 return (size_ == 0);
00285 }
00286
00288 value_type & back()
00289 {
00290 assert(!empty());
00291 return *back_element;
00292 }
00293
00295 const value_type & back() const
00296 {
00297 assert(!empty());
00298 return *back_element;
00299 }
00300
00302 value_type & front()
00303 {
00304 assert(!empty());
00305 return *front_element;
00306 }
00307
00309 const value_type & front() const
00310 {
00311 assert(!empty());
00312 return *front_element;
00313 }
00314
00315 ~queue()
00316 {
00317 pool->add(front_block);
00318 if (front_block != back_block)
00319 pool->add(back_block);
00320
00321
00322 if (delete_pool)
00323 {
00324 delete pool;
00325 }
00326
00327 if (!bids.empty())
00328 bm->delete_blocks(bids.begin(), bids.end());
00329 }
00330 };
00331
00333
00334 __STXXL_END_NAMESPACE
00335
00336 #endif // !STXXL_QUEUE_HEADER
00337