Stxxl  1.3.2
write_pool.h
1 /***************************************************************************
2  * include/stxxl/bits/mng/write_pool.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2003-2004 Roman Dementiev <[email protected]>
7  * Copyright (C) 2009 Andreas Beckmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef STXXL_WRITE_POOL_HEADER
15 #define STXXL_WRITE_POOL_HEADER
16 
17 #include <list>
18 
19 #ifdef STXXL_BOOST_CONFIG
20  #include <boost/config.hpp>
21 #endif
22 
23 #include <stxxl/bits/noncopyable.h>
24 #include <stxxl/bits/deprecated.h>
25 #include <stxxl/bits/io/request_operations.h>
26 
27 
28 __STXXL_BEGIN_NAMESPACE
29 
32 
33 
35 template <class BlockType>
36 class write_pool : private noncopyable
37 {
38 public:
39  typedef BlockType block_type;
40  typedef typename block_type::bid_type bid_type;
41 
42  // a hack to make wait_any work with busy_entry type
43  struct busy_entry
44  {
45  block_type * block;
46  request_ptr req;
47  bid_type bid;
48 
49  busy_entry() : block(NULL) { }
50  busy_entry(const busy_entry & a) : block(a.block), req(a.req), bid(a.bid) { }
51  busy_entry(block_type * & bl, request_ptr & r, bid_type & bi) :
52  block(bl), req(r), bid(bi) { }
53 
54  operator request_ptr () { return req; }
55  };
56  typedef typename std::list<block_type *>::iterator free_blocks_iterator;
57  typedef typename std::list<busy_entry>::iterator busy_blocks_iterator;
58 
59 protected:
60  // contains free write blocks
61  std::list<block_type *> free_blocks;
62  // blocks that are in writing
63  std::list<busy_entry> busy_blocks;
64 
65  unsigned_type free_blocks_size, busy_blocks_size;
66 
67 public:
70  explicit write_pool(unsigned_type init_size = 1) : free_blocks_size(init_size), busy_blocks_size(0)
71  {
72  unsigned_type i = 0;
73  for ( ; i < init_size; ++i)
74  free_blocks.push_back(new block_type);
75  }
76 
77  void swap(write_pool & obj)
78  {
79  std::swap(free_blocks, obj.free_blocks);
80  std::swap(busy_blocks, obj.busy_blocks);
81  std::swap(free_blocks_size, obj.free_blocks_size);
82  std::swap(busy_blocks_size, busy_blocks_size);
83  }
84 
86  virtual ~write_pool()
87  {
88  STXXL_VERBOSE2("write_pool::~write_pool free_blocks_size: " <<
89  free_blocks_size << " busy_blocks_size: " << busy_blocks_size);
90  while (!free_blocks.empty())
91  {
92  delete free_blocks.back();
93  free_blocks.pop_back();
94  }
95 
96  try
97  {
98  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
99  {
100  i2->req->wait();
101  delete i2->block;
102  }
103  }
104  catch (...)
105  { }
106  }
107 
109  unsigned_type size() const { return free_blocks_size + busy_blocks_size; }
110 
117  request_ptr write(block_type * & block, bid_type bid)
118  {
119  STXXL_VERBOSE1("write_pool::write: " << block << " @ " << bid);
120  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
121  {
122  if (i2->bid == bid) {
123  assert(i2->block != block);
124  STXXL_VERBOSE1("WAW dependency");
125  // try to cancel the obsolete request
126  i2->req->cancel();
127  // invalidate the bid of the stale write request,
128  // prevents prefetch_pool from stealing a stale block
129  i2->bid.storage = 0;
130  }
131  }
132  request_ptr result = block->write(bid);
133  ++busy_blocks_size;
134  busy_blocks.push_back(busy_entry(block, result, bid));
135  block = NULL; // prevent caller from using the block any further
136  return result;
137  }
138 
141  block_type * steal()
142  {
143  assert(size() > 0);
144  if (free_blocks_size)
145  {
146  STXXL_VERBOSE1("write_pool::steal : " << free_blocks_size << " free blocks available");
147  --free_blocks_size;
148  block_type * p = free_blocks.back();
149  free_blocks.pop_back();
150  return p;
151  }
152  STXXL_VERBOSE1("write_pool::steal : all " << busy_blocks_size << " are busy");
153  busy_blocks_iterator completed = wait_any(busy_blocks.begin(), busy_blocks.end());
154  assert(completed != busy_blocks.end()); // we got something reasonable from wait_any
155  assert(completed->req->poll()); // and it is *really* completed
156  block_type * p = completed->block;
157  busy_blocks.erase(completed);
158  --busy_blocks_size;
159  check_all_busy(); // for debug
160  return p;
161  }
162 
163  // deprecated name for the steal()
164  _STXXL_DEPRECATED(block_type * get())
165  {
166  return steal();
167  }
168 
171  void resize(unsigned_type new_size)
172  {
173  int_type diff = int_type(new_size) - int_type(size());
174  if (diff > 0)
175  {
176  free_blocks_size += diff;
177  while (--diff >= 0)
178  free_blocks.push_back(new block_type);
179 
180  return;
181  }
182 
183  while (++diff <= 0)
184  delete steal();
185  }
186 
187  _STXXL_DEPRECATED(request_ptr get_request(bid_type bid))
188  {
189  busy_blocks_iterator i2 = busy_blocks.begin();
190  for ( ; i2 != busy_blocks.end(); ++i2)
191  {
192  if (i2->bid == bid)
193  return i2->req;
194  }
195  return request_ptr();
196  }
197 
198  bool has_request(bid_type bid)
199  {
200  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
201  {
202  if (i2->bid == bid)
203  return true;
204  }
205  return false;
206  }
207 
208  _STXXL_DEPRECATED(block_type * steal(bid_type bid))
209  {
210  busy_blocks_iterator i2 = busy_blocks.begin();
211  for ( ; i2 != busy_blocks.end(); ++i2)
212  {
213  if (i2->bid == bid)
214  {
215  block_type * p = i2->block;
216  i2->req->wait();
217  busy_blocks.erase(i2);
218  --busy_blocks_size;
219  return p;
220  }
221  }
222  return NULL;
223  }
224 
225  // returns a block and a (potentially unfinished) I/O request associated with it
226  std::pair<block_type *, request_ptr> steal_request(bid_type bid)
227  {
228  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
229  {
230  if (i2->bid == bid)
231  {
232  // remove busy block from list, request has not yet been waited for!
233  block_type * blk = i2->block;
234  request_ptr req = i2->req;
235  busy_blocks.erase(i2);
236  --busy_blocks_size;
237 
238  // hand over block and (unfinished) request to caller
239  return std::pair<block_type *, request_ptr>(blk, req);
240  }
241  }
242  // not matching request found, return a dummy
243  return std::pair<block_type *, request_ptr>((block_type *)NULL, request_ptr());
244  }
245 
246  void add(block_type * & block)
247  {
248  free_blocks.push_back(block);
249  ++free_blocks_size;
250  block = NULL; // prevent caller from using the block any further
251  }
252 
253 protected:
254  void check_all_busy()
255  {
256  busy_blocks_iterator cur = busy_blocks.begin();
257  int_type cnt = 0;
258  while (cur != busy_blocks.end())
259  {
260  if (cur->req->poll())
261  {
262  free_blocks.push_back(cur->block);
263  cur = busy_blocks.erase(cur);
264  ++cnt;
265  --busy_blocks_size;
266  ++free_blocks_size;
267  continue;
268  }
269  ++cur;
270  }
271  STXXL_VERBOSE1("write_pool::check_all_busy : " << cnt <<
272  " are completed out of " << busy_blocks_size + cnt << " busy blocks");
273  }
274 };
275 
277 
278 __STXXL_END_NAMESPACE
279 
280 
281 namespace std
282 {
283  template <class BlockType>
284  void swap(stxxl::write_pool<BlockType> & a,
285  stxxl::write_pool<BlockType> & b)
286  {
287  a.swap(b);
288  }
289 }
290 
291 #endif // !STXXL_WRITE_POOL_HEADER
292 // vim: et:ts=4:sw=4
request_ptr write(block_type *&block, bid_type bid)
Passes a block to the pool for writing.
Definition: write_pool.h:117
Implements dynamically resizable buffered writing pool.
Definition: write_pool.h:36
void resize(unsigned_type new_size)
Resizes size of the pool.
Definition: write_pool.h:171
virtual ~write_pool()
Waits for completion of all ongoing write requests and frees memory.
Definition: write_pool.h:86
block_type * steal()
Take out a block from the pool.
Definition: write_pool.h:141
unsigned_type size() const
Returns number of owned blocks.
Definition: write_pool.h:109
Implemented as reference counting smart pointer.
Definition: request_ptr.h:34
request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Suspends calling thread until any of requests is completed.
Definition: request_operations.h:108
write_pool(unsigned_type init_size=1)
Constructs pool.
Definition: write_pool.h:70