13 #ifndef STXXL_MNG_BUF_WRITER_HEADER
14 #define STXXL_MNG_BUF_WRITER_HEADER
37 template <
typename block_type>
41 typedef typename block_type::bid_type
bid_type;
67 typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp>
batch_type;
76 nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
77 writebatchsize(write_batch_size ? write_batch_size : 1)
79 write_buffers =
new block_type[nwriteblocks];
81 write_bids =
new bid_type[nwriteblocks];
84 free_write_blocks.push_back(i);
86 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
93 for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
94 it != busy_write_blocks.end(); ++it)
96 if (write_reqs[ibuffer = (*it)]->poll())
98 busy_write_blocks.erase(it);
99 free_write_blocks.push_back(ibuffer);
104 if (
UNLIKELY(free_write_blocks.empty()))
109 for ( ; i < size; ++i)
111 reqs[i] = write_reqs[busy_write_blocks[i]];
114 int_type completed_global = busy_write_blocks[completed];
116 busy_write_blocks.erase(busy_write_blocks.begin() + completed);
118 return (write_buffers + completed_global);
120 ibuffer = free_write_blocks.back();
121 free_write_blocks.pop_back();
123 return (write_buffers + ibuffer);
130 block_type *
write(block_type* filled_block,
const bid_type& bid)
132 if (batch_write_blocks.size() >= writebatchsize)
135 while (!batch_write_blocks.empty())
137 int_type ibuffer = batch_write_blocks.top().ibuffer;
138 batch_write_blocks.pop();
140 if (write_reqs[ibuffer].valid())
141 write_reqs[ibuffer]->wait();
143 write_reqs[ibuffer] = write_buffers[ibuffer].
write(write_bids[ibuffer]);
145 busy_write_blocks.push_back(ibuffer);
150 int_type ibuffer = filled_block - write_buffers;
151 write_bids[ibuffer] = bid;
152 batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
154 return get_free_block();
160 while (!batch_write_blocks.empty())
162 ibuffer = batch_write_blocks.top().ibuffer;
163 batch_write_blocks.pop();
165 if (write_reqs[ibuffer].valid())
166 write_reqs[ibuffer]->wait();
168 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
170 busy_write_blocks.push_back(ibuffer);
172 for (std::vector<int_type>::const_iterator it =
173 busy_write_blocks.begin();
174 it != busy_write_blocks.end(); it++)
177 write_reqs[ibuffer]->wait();
180 assert(batch_write_blocks.empty());
181 free_write_blocks.clear();
182 busy_write_blocks.clear();
185 free_write_blocks.push_back(i);
192 while (!batch_write_blocks.empty())
194 ibuffer = batch_write_blocks.top().ibuffer;
195 batch_write_blocks.pop();
197 if (write_reqs[ibuffer].valid())
198 write_reqs[ibuffer]->wait();
200 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
202 busy_write_blocks.push_back(ibuffer);
204 for (std::vector<int_type>::const_iterator it =
205 busy_write_blocks.begin();
206 it != busy_write_blocks.end(); it++)
209 write_reqs[ibuffer]->wait();
213 delete[] write_buffers;
222 #endif // !STXXL_MNG_BUF_WRITER_HEADER
block_type * write(block_type *filled_block, const bid_type &bid)
Submits block for writing.
virtual ~buffered_writer()
Flushes not yet written buffers and frees used memory.
buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size)
Constructs an object.
const unsigned_type nwriteblocks
void flush()
Flushes not yet written buffers.
const unsigned_type writebatchsize
block_type * write_buffers
block_type::bid_type bid_type
block_type * get_free_block()
Returns free block from the internal buffer pool.
choose_int_types< my_pointer_size >::int_type int_type
#define STXXL_BEGIN_NAMESPACE
batch_type batch_write_blocks
std::priority_queue< batch_entry, std::vector< batch_entry >, batch_entry_cmp > batch_type
std::vector< int_type > busy_write_blocks
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
number of elements in block
request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Suspends calling thread until any of requests is completed.
request_ptr write(const bid_type &bid, completion_handler on_cmpl=default_completion_handler())
Writes block to the disk(s).
Encapsulates asynchronous buffered block writing engine.
batch_entry(stxxl::int64 o, int_type b)
std::vector< int_type > free_write_blocks
#define STXXL_END_NAMESPACE