13 #ifndef STXXL_MNG_BUF_WRITER_HEADER
14 #define STXXL_MNG_BUF_WRITER_HEADER
35 template <
typename BlockType>
39 typedef typename block_type::bid_type
bid_type;
66 typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp>
batch_type;
75 : nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
76 writebatchsize(write_batch_size ? write_batch_size : 1)
78 write_buffers =
new block_type[nwriteblocks];
81 write_bids =
new bid_type[nwriteblocks];
84 free_write_blocks.push_back(i);
86 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
93 for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
94 it != busy_write_blocks.end(); ++it)
96 if (write_reqs[ibuffer = (*it)]->poll())
98 busy_write_blocks.erase(it);
99 free_write_blocks.push_back(ibuffer);
104 if (
UNLIKELY(free_write_blocks.empty()))
106 int_type size = busy_write_blocks.size();
109 for ( ; i < size; ++i)
111 reqs[i] = write_reqs[busy_write_blocks[i]];
114 int_type completed_global = busy_write_blocks[completed];
116 busy_write_blocks.erase(busy_write_blocks.begin() + completed);
118 return (write_buffers + completed_global);
120 ibuffer = free_write_blocks.back();
121 free_write_blocks.pop_back();
123 return (write_buffers + ibuffer);
130 block_type *
write(block_type* filled_block,
const bid_type& bid)
132 if (batch_write_blocks.size() >= writebatchsize)
135 while (!batch_write_blocks.empty())
137 int_type ibuffer = batch_write_blocks.top().ibuffer;
138 batch_write_blocks.pop();
140 if (write_reqs[ibuffer].valid())
141 write_reqs[ibuffer]->wait();
143 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
145 busy_write_blocks.push_back(ibuffer);
150 int_type ibuffer = filled_block - write_buffers;
151 write_bids[ibuffer] = bid;
152 batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
154 return get_free_block();
160 while (!batch_write_blocks.empty())
162 ibuffer = batch_write_blocks.top().ibuffer;
163 batch_write_blocks.pop();
165 if (write_reqs[ibuffer].valid())
166 write_reqs[ibuffer]->wait();
168 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
170 busy_write_blocks.push_back(ibuffer);
172 for (std::vector<int_type>::const_iterator it =
173 busy_write_blocks.begin();
174 it != busy_write_blocks.end(); it++)
177 write_reqs[ibuffer]->wait();
180 assert(batch_write_blocks.empty());
181 free_write_blocks.clear();
182 busy_write_blocks.clear();
185 free_write_blocks.push_back(i);
192 while (!batch_write_blocks.empty())
194 ibuffer = batch_write_blocks.top().ibuffer;
195 batch_write_blocks.pop();
197 if (write_reqs[ibuffer].valid())
198 write_reqs[ibuffer]->wait();
200 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
202 busy_write_blocks.push_back(ibuffer);
204 for (std::vector<int_type>::const_iterator it =
205 busy_write_blocks.begin();
206 it != busy_write_blocks.end(); it++)
209 write_reqs[ibuffer]->wait();
213 delete[] write_buffers;
222 #endif // !STXXL_MNG_BUF_WRITER_HEADER
block_type * write(block_type *filled_block, const bid_type &bid)
Submits block for writing.
~buffered_writer()
Flushes not yet written buffers and frees used memory.
const unsigned_type writebatchsize
block_type * write_buffers
batch_type batch_write_blocks
batch_entry(stxxl::int64 o, int_type b)
choose_int_types< my_pointer_size >::int_type int_type
const unsigned_type nwriteblocks
block_type * get_free_block()
Returns free block from the internal buffer pool.
#define STXXL_BEGIN_NAMESPACE
void flush()
Flushes not yet written buffers.
std::vector< int_type > busy_write_blocks
block_type::bid_type bid_type
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
std::vector< int_type > free_write_blocks
RequestIterator wait_any(RequestIterator reqs_begin, RequestIterator reqs_end)
Suspends calling thread until any of requests is completed.
Encapsulates asynchronous buffered block writing engine.
buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size)
Constructs an object.
#define STXXL_END_NAMESPACE
std::priority_queue< batch_entry, std::vector< batch_entry >, batch_entry_cmp > batch_type