Stxxl  1.3.2
buf_writer.h
1 /***************************************************************************
2  * include/stxxl/bits/mng/buf_writer.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
7  *
8  * Distributed under the Boost Software License, Version 1.0.
9  * (See accompanying file LICENSE_1_0.txt or copy at
10  * http://www.boost.org/LICENSE_1_0.txt)
11  **************************************************************************/
12 
13 #ifndef STXXL_BUFFERED_WRITER_HEADER
14 #define STXXL_BUFFERED_WRITER_HEADER
15 
16 #include <vector>
17 #include <queue>
18 
19 #include <stxxl/bits/io/request_operations.h>
20 #include <stxxl/bits/io/disk_queues.h>
21 
22 
23 __STXXL_BEGIN_NAMESPACE
24 
31 
32 
36 template <typename block_type>
38 {
39  buffered_writer() { }
40 
41 protected:
42  typedef typename block_type::bid_type bid_type;
43 
44  const unsigned_type nwriteblocks;
45  block_type * write_buffers;
46  bid_type * write_bids;
47  request_ptr * write_reqs;
48  const unsigned_type writebatchsize;
49 
50  std::vector<int_type> free_write_blocks; // contains free write blocks
51  std::vector<int_type> busy_write_blocks; // blocks that are in writing, notice that if block is not in free_
52  // an not in busy then block is not yet filled
53 
54  struct batch_entry
55  {
56  stxxl::int64 offset;
57  int_type ibuffer;
58  batch_entry(stxxl::int64 o, int b) : offset(o), ibuffer(b) { }
59  };
60  struct batch_entry_cmp
61  {
62  bool operator () (const batch_entry & a, const batch_entry & b) const
63  {
64  return (a.offset > b.offset);
65  }
66  };
67 
68  typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type;
69  batch_type batch_write_blocks; // sorted sequence of blocks to write
70 
71 public:
76  buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size) :
77  nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
78  writebatchsize(write_batch_size ? write_batch_size : 1)
79  {
80  write_buffers = new block_type[nwriteblocks];
81  write_reqs = new request_ptr[nwriteblocks];
82  write_bids = new bid_type[nwriteblocks];
83 
84  for (unsigned_type i = 0; i < nwriteblocks; i++)
85  free_write_blocks.push_back(i);
86 
87  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
88  }
91  block_type * get_free_block()
92  {
93  int_type ibuffer;
94  for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
95  it != busy_write_blocks.end(); ++it)
96  {
97  if (write_reqs[ibuffer = (*it)]->poll())
98  {
99  busy_write_blocks.erase(it);
100  free_write_blocks.push_back(ibuffer);
101 
102  break;
103  }
104  }
105  if (UNLIKELY(free_write_blocks.empty()))
106  {
107  int_type size = busy_write_blocks.size();
108  request_ptr * reqs = new request_ptr[size];
109  int_type i = 0;
110  for ( ; i < size; ++i)
111  {
112  reqs[i] = write_reqs[busy_write_blocks[i]];
113  }
114  int_type completed = wait_any(reqs, size);
115  int_type completed_global = busy_write_blocks[completed];
116  delete[] reqs;
117  busy_write_blocks.erase(busy_write_blocks.begin() + completed);
118 
119  return (write_buffers + completed_global);
120  }
121  ibuffer = free_write_blocks.back();
122  free_write_blocks.pop_back();
123 
124  return (write_buffers + ibuffer);
125  }
131  block_type * write(block_type * filled_block, const bid_type & bid) // writes filled_block and returns a new block
132  {
133  if (batch_write_blocks.size() >= writebatchsize)
134  {
135  // flush batch
136  while (!batch_write_blocks.empty())
137  {
138  int_type ibuffer = batch_write_blocks.top().ibuffer;
139  batch_write_blocks.pop();
140 
141  if (write_reqs[ibuffer].valid())
142  write_reqs[ibuffer]->wait();
143 
144  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
145 
146  busy_write_blocks.push_back(ibuffer);
147  }
148  }
149  // STXXL_MSG("Adding write request to batch");
150 
151  int_type ibuffer = filled_block - write_buffers;
152  write_bids[ibuffer] = bid;
153  batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
154 
155  return get_free_block();
156  }
158  void flush()
159  {
160  int_type ibuffer;
161  while (!batch_write_blocks.empty())
162  {
163  ibuffer = batch_write_blocks.top().ibuffer;
164  batch_write_blocks.pop();
165 
166  if (write_reqs[ibuffer].valid())
167  write_reqs[ibuffer]->wait();
168 
169  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
170 
171  busy_write_blocks.push_back(ibuffer);
172  }
173  for (std::vector<int_type>::const_iterator it =
174  busy_write_blocks.begin();
175  it != busy_write_blocks.end(); it++)
176  {
177  ibuffer = *it;
178  write_reqs[ibuffer]->wait();
179  }
180 
181  assert(batch_write_blocks.empty());
182  free_write_blocks.clear();
183  busy_write_blocks.clear();
184 
185  for (unsigned_type i = 0; i < nwriteblocks; i++)
186  free_write_blocks.push_back(i);
187  }
188 
191  {
192  int_type ibuffer;
193  while (!batch_write_blocks.empty())
194  {
195  ibuffer = batch_write_blocks.top().ibuffer;
196  batch_write_blocks.pop();
197 
198  if (write_reqs[ibuffer].valid())
199  write_reqs[ibuffer]->wait();
200 
201  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
202 
203  busy_write_blocks.push_back(ibuffer);
204  }
205  for (std::vector<int_type>::const_iterator it =
206  busy_write_blocks.begin();
207  it != busy_write_blocks.end(); it++)
208  {
209  ibuffer = *it;
210  write_reqs[ibuffer]->wait();
211  }
212 
213  delete[] write_reqs;
214  delete[] write_buffers;
215  delete[] write_bids;
216  }
217 };
218 
220 
221 __STXXL_END_NAMESPACE
222 
223 #endif // !STXXL_BUFFERED_WRITER_HEADER
virtual ~buffered_writer()
Flushes not yet written buffers and frees used memory.
Definition: buf_writer.h:190
Encapsulates asynchronous buffered block writing engine.
Definition: buf_writer.h:37
block_type * get_free_block()
Returns free block from the internal buffer pool.
Definition: buf_writer.h:91
void flush()
Flushes not yet written buffers.
Definition: buf_writer.h:158
block_type * write(block_type *filled_block, const bid_type &bid)
Submits block for writing.
Definition: buf_writer.h:131
virtual bool poll()=0
Polls the status of the request.
Implemented as reference counting smart pointer.
Definition: request_ptr.h:34
buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size)
Constructs an object.
Definition: buf_writer.h:76
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Suspends calling thread until any of requests is completed.
Definition: request_operations.h:108