13 #ifndef STXXL_BUFFERED_WRITER_HEADER
14 #define STXXL_BUFFERED_WRITER_HEADER
19 #include <stxxl/bits/io/request_operations.h>
20 #include <stxxl/bits/io/disk_queues.h>
23 __STXXL_BEGIN_NAMESPACE
36 template <
typename block_type>
42 typedef typename block_type::bid_type bid_type;
44 const unsigned_type nwriteblocks;
45 block_type * write_buffers;
46 bid_type * write_bids;
48 const unsigned_type writebatchsize;
50 std::vector<int_type> free_write_blocks;
51 std::vector<int_type> busy_write_blocks;
58 batch_entry(stxxl::int64 o,
int b) : offset(o), ibuffer(b) { }
60 struct batch_entry_cmp
62 bool operator () (
const batch_entry & a,
const batch_entry & b)
const
64 return (a.offset > b.offset);
68 typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type;
69 batch_type batch_write_blocks;
77 nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
78 writebatchsize(write_batch_size ? write_batch_size : 1)
80 write_buffers =
new block_type[nwriteblocks];
82 write_bids =
new bid_type[nwriteblocks];
84 for (unsigned_type i = 0; i < nwriteblocks; i++)
85 free_write_blocks.push_back(i);
87 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
94 for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
95 it != busy_write_blocks.end(); ++it)
97 if (write_reqs[ibuffer = (*it)]->
poll())
99 busy_write_blocks.erase(it);
100 free_write_blocks.push_back(ibuffer);
105 if (UNLIKELY(free_write_blocks.empty()))
107 int_type size = busy_write_blocks.size();
110 for ( ; i < size; ++i)
112 reqs[i] = write_reqs[busy_write_blocks[i]];
114 int_type completed =
wait_any(reqs, size);
115 int_type completed_global = busy_write_blocks[completed];
117 busy_write_blocks.erase(busy_write_blocks.begin() + completed);
119 return (write_buffers + completed_global);
121 ibuffer = free_write_blocks.back();
122 free_write_blocks.pop_back();
124 return (write_buffers + ibuffer);
131 block_type *
write(block_type * filled_block,
const bid_type & bid)
133 if (batch_write_blocks.size() >= writebatchsize)
136 while (!batch_write_blocks.empty())
138 int_type ibuffer = batch_write_blocks.top().ibuffer;
139 batch_write_blocks.pop();
141 if (write_reqs[ibuffer].valid())
142 write_reqs[ibuffer]->
wait();
144 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
146 busy_write_blocks.push_back(ibuffer);
151 int_type ibuffer = filled_block - write_buffers;
152 write_bids[ibuffer] = bid;
153 batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
161 while (!batch_write_blocks.empty())
163 ibuffer = batch_write_blocks.top().ibuffer;
164 batch_write_blocks.pop();
166 if (write_reqs[ibuffer].valid())
167 write_reqs[ibuffer]->
wait();
169 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
171 busy_write_blocks.push_back(ibuffer);
173 for (std::vector<int_type>::const_iterator it =
174 busy_write_blocks.begin();
175 it != busy_write_blocks.end(); it++)
178 write_reqs[ibuffer]->
wait();
181 assert(batch_write_blocks.empty());
182 free_write_blocks.clear();
183 busy_write_blocks.clear();
185 for (unsigned_type i = 0; i < nwriteblocks; i++)
186 free_write_blocks.push_back(i);
193 while (!batch_write_blocks.empty())
195 ibuffer = batch_write_blocks.top().ibuffer;
196 batch_write_blocks.pop();
198 if (write_reqs[ibuffer].valid())
199 write_reqs[ibuffer]->
wait();
201 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
203 busy_write_blocks.push_back(ibuffer);
205 for (std::vector<int_type>::const_iterator it =
206 busy_write_blocks.begin();
207 it != busy_write_blocks.end(); it++)
210 write_reqs[ibuffer]->
wait();
214 delete[] write_buffers;
221 __STXXL_END_NAMESPACE
223 #endif // !STXXL_BUFFERED_WRITER_HEADER
virtual ~buffered_writer()
Flushes not yet written buffers and frees used memory.
Definition: buf_writer.h:190
Encapsulates asynchronous buffered block writing engine.
Definition: buf_writer.h:37
block_type * get_free_block()
Returns free block from the internal buffer pool.
Definition: buf_writer.h:91
void flush()
Flushes not yet written buffers.
Definition: buf_writer.h:158
block_type * write(block_type *filled_block, const bid_type &bid)
Submits block for writing.
Definition: buf_writer.h:131
virtual bool poll()=0
Polls the status of the request.
Implemented as reference counting smart pointer.
Definition: request_ptr.h:34
buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size)
Constructs an object.
Definition: buf_writer.h:76
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Suspends calling thread until any of requests is completed.
Definition: request_operations.h:108