00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef STXXL_QUEUE_HEADER
00014 #define STXXL_QUEUE_HEADER
00015
00016 #include <vector>
00017 #include <queue>
00018 #include <deque>
00019
00020 #include <stxxl/bits/mng/mng.h>
00021 #include <stxxl/bits/common/simple_vector.h>
00022 #include <stxxl/bits/common/tmeta.h>
00023 #include <stxxl/bits/mng/write_pool.h>
00024 #include <stxxl/bits/mng/prefetch_pool.h>
00025
00026
00027 __STXXL_BEGIN_NAMESPACE
00028
00031
00032
00034
00040 template <class ValTp,
00041 unsigned BlkSz = STXXL_DEFAULT_BLOCK_SIZE(ValTp),
00042 class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY,
00043 class SzTp = stxxl::uint64>
00044 class queue : private noncopyable
00045 {
00046 public:
00047 typedef ValTp value_type;
00048 typedef AllocStr alloc_strategy;
00049 typedef SzTp size_type;
00050 enum {
00051 block_size = BlkSz
00052 };
00053
00054 typedef typed_block<block_size, value_type> block_type;
00055 typedef BID<block_size> bid_type;
00056
00057 private:
00058 size_type size_;
00059 bool delete_pools;
00060 write_pool<block_type> * w_pool;
00061 prefetch_pool<block_type> * p_pool;
00062 block_type * front_block;
00063 block_type * back_block;
00064 value_type * front_element;
00065 value_type * back_element;
00066
00067 unsigned_type alloc_counter;
00068 std::deque<bid_type> bids;
00069 block_manager * bm;
00070 unsigned_type blocks2prefetch;
00071
00072 public:
00074
00078 queue(unsigned_type w_pool_size, unsigned_type p_pool_size, unsigned_type blocks2prefetch_ = 1) :
00079 size_(0),
00080 delete_pools(true),
00081 alloc_counter(0),
00082 blocks2prefetch(blocks2prefetch_)
00083 {
00084 if (w_pool_size < 2)
00085 w_pool_size = 2;
00086
00087 if (p_pool_size < 1)
00088 p_pool_size = 1;
00089
00090 w_pool = new write_pool<block_type>(w_pool_size);
00091 front_block = back_block = w_pool->steal();
00092 back_element = back_block->elem - 1;
00093 front_element = back_block->elem;
00094 p_pool = new prefetch_pool<block_type>(p_pool_size);
00095 bm = block_manager::get_instance();
00096 }
00097
00099
00105 queue(write_pool<block_type> & w_pool_, prefetch_pool<block_type> & p_pool_, unsigned blocks2prefetch_ = 1) :
00106 size_(0),
00107 delete_pools(false),
00108 w_pool(&w_pool_),
00109 p_pool(&p_pool_),
00110 alloc_counter(0),
00111 blocks2prefetch(blocks2prefetch_)
00112 {
00113 if (w_pool->size() < 2)
00114 w_pool->resize(2);
00115
00116 if (p_pool->size() < 2)
00117 p_pool->resize(1);
00118
00119 front_block = back_block = w_pool->steal();
00120 back_element = back_block->elem - 1;
00121 front_element = back_block->elem;
00122 bm = block_manager::get_instance();
00123 }
00124
00126 void set_prefetch_aggr(unsigned_type blocks2prefetch_)
00127 {
00128 blocks2prefetch = blocks2prefetch_;
00129 }
00130
00132 unsigned_type get_prefetch_aggr() const
00133 {
00134 return blocks2prefetch;
00135 }
00136
00138 void push(const value_type & val)
00139 {
00140 if (back_element == back_block->elem + (block_type::size - 1))
00141 {
00142
00143 if (front_block == back_block)
00144 {
00145
00146 STXXL_VERBOSE1("queue::push Case 1");
00147 }
00148 else
00149 {
00150 STXXL_VERBOSE1("queue::push Case 2");
00151
00152
00153 bid_type newbid;
00154
00155 offset_allocator<alloc_strategy> alloc_str(alloc_counter++);
00156
00157
00158 bm->new_blocks(alloc_str, &newbid, (&newbid) + 1);
00159
00160 bids.push_back(newbid);
00161 w_pool->write(back_block, newbid);
00162 }
00163 back_block = w_pool->steal();
00164
00165 back_element = back_block->elem;
00166 *back_element = val;
00167 ++size_;
00168 return;
00169 }
00170 ++back_element;
00171 *back_element = val;
00172 ++size_;
00173 }
00174
00176 void pop()
00177 {
00178 assert(!empty());
00179
00180 if (front_element == front_block->elem + (block_type::size - 1))
00181 {
00182
00183 if (back_block == front_block)
00184 {
00185 STXXL_VERBOSE1("queue::pop Case 3");
00186 assert(size() == 1);
00187 assert(back_element == front_element);
00188 assert(bids.empty());
00189
00190 back_element = back_block->elem - 1;
00191 front_element = back_block->elem;
00192 size_ = 0;
00193 return;
00194 }
00195
00196 --size_;
00197 if (size_ <= block_type::size)
00198 {
00199 STXXL_VERBOSE1("queue::pop Case 4");
00200 assert(bids.empty());
00201
00202 w_pool->add(front_block);
00203 front_block = back_block;
00204 front_element = back_block->elem;
00205 return;
00206 }
00207 STXXL_VERBOSE1("queue::pop Case 5");
00208
00209 assert(!bids.empty());
00210 request_ptr req = p_pool->read(front_block, bids.front());
00211 for (unsigned_type i = 0; i < blocks2prefetch && i < bids.size() - 1; ++i)
00212 {
00213 STXXL_VERBOSE1("queue::pop Case Hints");
00214 p_pool->hint(bids[i + 1], *w_pool);
00215 }
00216
00217 front_element = front_block->elem;
00218 req->wait();
00219
00220 bm->delete_block(bids.front());
00221 bids.pop_front();
00222 return;
00223 }
00224
00225 ++front_element;
00226 --size_;
00227 }
00228
00230 size_type size() const
00231 {
00232 return size_;
00233 }
00234
00236 bool empty() const
00237 {
00238 return (size_ == 0);
00239 }
00240
00242 value_type & back()
00243 {
00244 assert(!empty());
00245 return *back_element;
00246 }
00247
00249 const value_type & back() const
00250 {
00251 assert(!empty());
00252 return *back_element;
00253 }
00254
00256 value_type & front()
00257 {
00258 assert(!empty());
00259 return *front_element;
00260 }
00261
00263 const value_type & front() const
00264 {
00265 assert(!empty());
00266 return *front_element;
00267 }
00268
00269 ~queue()
00270 {
00271 w_pool->add(front_block);
00272 if (front_block != back_block)
00273 w_pool->add(back_block);
00274
00275
00276 if (delete_pools)
00277 {
00278 delete w_pool;
00279 delete p_pool;
00280 }
00281
00282 if (!bids.empty())
00283 bm->delete_blocks(bids.begin(), bids.end());
00284 }
00285 };
00286
00288
00289 __STXXL_END_NAMESPACE
00290
00291 #endif // !STXXL_QUEUE_HEADER