STXXL  1.4.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
buf_writer.h
Go to the documentation of this file.
1 /***************************************************************************
2  * include/stxxl/bits/mng/buf_writer.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
7  *
8  * Distributed under the Boost Software License, Version 1.0.
9  * (See accompanying file LICENSE_1_0.txt or copy at
10  * http://www.boost.org/LICENSE_1_0.txt)
11  **************************************************************************/
12 
13 #ifndef STXXL_MNG_BUF_WRITER_HEADER
14 #define STXXL_MNG_BUF_WRITER_HEADER
15 
16 #include <vector>
17 #include <queue>
18 
21 #include <stxxl/bits/noncopyable.h>
22 
24 
25 //! \defgroup schedlayer Block Scheduling Sublayer
26 //! \ingroup mnglayer
27 //! Group of classes which help in scheduling
28 //! sequences of read and write requests
29 //! via prefetching and buffered writing
30 //! \{
31 
32 //! Encapsulates asynchronous buffered block writing engine.
33 //!
34 //! \c buffered_writer overlaps I/Os with filling of output buffer.
35 template <typename BlockType>
37 {
38  typedef BlockType block_type;
39  typedef typename block_type::bid_type bid_type;
40 
41 protected:
43  block_type* write_buffers;
44  bid_type* write_bids;
47 
48  std::vector<int_type> free_write_blocks; // contains free write blocks
49  std::vector<int_type> busy_write_blocks; // blocks that are in writing, notice that if block is not in free_
50  // an not in busy then block is not yet filled
51 
52  struct batch_entry
53  {
56  batch_entry(stxxl::int64 o, int_type b) : offset(o), ibuffer(b) { }
57  };
59  {
60  bool operator () (const batch_entry& a, const batch_entry& b) const
61  {
62  return (a.offset > b.offset);
63  }
64  };
65 
66  typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type;
67  batch_type batch_write_blocks; // sorted sequence of blocks to write
68 
69 public:
70  //! Constructs an object.
71  //! \param write_buf_size number of write buffers to use
72  //! \param write_batch_size number of blocks to accumulate in
73  //! order to flush write requests (bulk buffered writing)
74  buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size)
75  : nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
76  writebatchsize(write_batch_size ? write_batch_size : 1)
77  {
78  write_buffers = new block_type[nwriteblocks];
79  write_reqs = new request_ptr[nwriteblocks];
80 
81  write_bids = new bid_type[nwriteblocks];
82 
83  for (unsigned_type i = 0; i < nwriteblocks; i++)
84  free_write_blocks.push_back(i);
85 
86  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
87  }
88  //! Returns free block from the internal buffer pool.
89  //! \return pointer to the block from the internal buffer pool
90  block_type * get_free_block()
91  {
92  int_type ibuffer;
93  for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
94  it != busy_write_blocks.end(); ++it)
95  {
96  if (write_reqs[ibuffer = (*it)]->poll())
97  {
98  busy_write_blocks.erase(it);
99  free_write_blocks.push_back(ibuffer);
100 
101  break;
102  }
103  }
104  if (UNLIKELY(free_write_blocks.empty()))
105  {
106  int_type size = busy_write_blocks.size();
107  request_ptr* reqs = new request_ptr[size];
108  int_type i = 0;
109  for ( ; i < size; ++i)
110  {
111  reqs[i] = write_reqs[busy_write_blocks[i]];
112  }
113  int_type completed = wait_any(reqs, size);
114  int_type completed_global = busy_write_blocks[completed];
115  delete[] reqs;
116  busy_write_blocks.erase(busy_write_blocks.begin() + completed);
117 
118  return (write_buffers + completed_global);
119  }
120  ibuffer = free_write_blocks.back();
121  free_write_blocks.pop_back();
122 
123  return (write_buffers + ibuffer);
124  }
125  //! Submits block for writing.
126  //! \param filled_block pointer to the block
127  //! \remark parameter \c filled_block must be value returned by \c get_free_block() or \c write() methods
128  //! \param bid block identifier, a place to write data of the \c filled_block
129  //! \return pointer to the new free block from the pool
130  block_type * write(block_type* filled_block, const bid_type& bid) // writes filled_block and returns a new block
131  {
132  if (batch_write_blocks.size() >= writebatchsize)
133  {
134  // flush batch
135  while (!batch_write_blocks.empty())
136  {
137  int_type ibuffer = batch_write_blocks.top().ibuffer;
138  batch_write_blocks.pop();
139 
140  if (write_reqs[ibuffer].valid())
141  write_reqs[ibuffer]->wait();
142 
143  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
144 
145  busy_write_blocks.push_back(ibuffer);
146  }
147  }
148  // STXXL_MSG("Adding write request to batch");
149 
150  int_type ibuffer = filled_block - write_buffers;
151  write_bids[ibuffer] = bid;
152  batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
153 
154  return get_free_block();
155  }
156  //! Flushes not yet written buffers.
157  void flush()
158  {
159  int_type ibuffer;
160  while (!batch_write_blocks.empty())
161  {
162  ibuffer = batch_write_blocks.top().ibuffer;
163  batch_write_blocks.pop();
164 
165  if (write_reqs[ibuffer].valid())
166  write_reqs[ibuffer]->wait();
167 
168  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
169 
170  busy_write_blocks.push_back(ibuffer);
171  }
172  for (std::vector<int_type>::const_iterator it =
173  busy_write_blocks.begin();
174  it != busy_write_blocks.end(); it++)
175  {
176  ibuffer = *it;
177  write_reqs[ibuffer]->wait();
178  }
179 
180  assert(batch_write_blocks.empty());
181  free_write_blocks.clear();
182  busy_write_blocks.clear();
183 
184  for (unsigned_type i = 0; i < nwriteblocks; i++)
185  free_write_blocks.push_back(i);
186  }
187 
188  //! Flushes not yet written buffers and frees used memory.
190  {
191  int_type ibuffer;
192  while (!batch_write_blocks.empty())
193  {
194  ibuffer = batch_write_blocks.top().ibuffer;
195  batch_write_blocks.pop();
196 
197  if (write_reqs[ibuffer].valid())
198  write_reqs[ibuffer]->wait();
199 
200  write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
201 
202  busy_write_blocks.push_back(ibuffer);
203  }
204  for (std::vector<int_type>::const_iterator it =
205  busy_write_blocks.begin();
206  it != busy_write_blocks.end(); it++)
207  {
208  ibuffer = *it;
209  write_reqs[ibuffer]->wait();
210  }
211 
212  delete[] write_reqs;
213  delete[] write_buffers;
214  delete[] write_bids;
215  }
216 };
217 
218 //! \}
219 
221 
222 #endif // !STXXL_MNG_BUF_WRITER_HEADER
stxxl::int64 offset
Definition: buf_writer.h:54
long long int int64
Definition: types.h:38
block_type * write(block_type *filled_block, const bid_type &bid)
Submits block for writing.
Definition: buf_writer.h:130
int_type ibuffer
Definition: buf_writer.h:55
~buffered_writer()
Flushes not yet written buffers and frees used memory.
Definition: buf_writer.h:189
request_ptr * write_reqs
Definition: buf_writer.h:45
const unsigned_type writebatchsize
Definition: buf_writer.h:46
block_type * write_buffers
Definition: buf_writer.h:43
batch_type batch_write_blocks
Definition: buf_writer.h:67
batch_entry(stxxl::int64 o, int_type b)
Definition: buf_writer.h:56
choose_int_types< my_pointer_size >::int_type int_type
Definition: types.h:63
const unsigned_type nwriteblocks
Definition: buf_writer.h:42
#define UNLIKELY(c)
Definition: utils.h:225
block_type * get_free_block()
Returns free block from the internal buffer pool.
Definition: buf_writer.h:90
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
void flush()
Flushes not yet written buffers.
Definition: buf_writer.h:157
std::vector< int_type > busy_write_blocks
Definition: buf_writer.h:49
bid_type * write_bids
Definition: buf_writer.h:44
block_type::bid_type bid_type
Definition: buf_writer.h:39
Definition: buf_writer.h:58
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
Definition: types.h:64
std::vector< int_type > free_write_blocks
Definition: buf_writer.h:48
RequestIterator wait_any(RequestIterator reqs_begin, RequestIterator reqs_end)
Suspends calling thread until any of requests is completed.
Encapsulates asynchronous buffered block writing engine.
Definition: buf_writer.h:36
buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size)
Constructs an object.
Definition: buf_writer.h:74
Definition: buf_writer.h:52
#define STXXL_END_NAMESPACE
Definition: namespace.h:17
std::priority_queue< batch_entry, std::vector< batch_entry >, batch_entry_cmp > batch_type
Definition: buf_writer.h:66