00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef STXXL_BUFFERED_WRITER_HEADER
00014 #define STXXL_BUFFERED_WRITER_HEADER
00015
00016 #include <vector>
00017 #include <queue>
00018
00019 #include <stxxl/bits/io/request_operations.h>
00020 #include <stxxl/bits/io/disk_queues.h>
00021
00022
00023 __STXXL_BEGIN_NAMESPACE
00024
00031
00032
00036 template <typename block_type>
00037 class buffered_writer
00038 {
00039 buffered_writer() { }
00040
00041 protected:
00042 typedef typename block_type::bid_type bid_type;
00043
00044 const unsigned_type nwriteblocks;
00045 block_type * write_buffers;
00046 bid_type * write_bids;
00047 request_ptr * write_reqs;
00048 const unsigned_type writebatchsize;
00049
00050 std::vector<int_type> free_write_blocks;
00051 std::vector<int_type> busy_write_blocks;
00052
00053
00054 struct batch_entry
00055 {
00056 stxxl::int64 offset;
00057 int_type ibuffer;
00058 batch_entry(stxxl::int64 o, int b) : offset(o), ibuffer(b) { }
00059 };
00060 struct batch_entry_cmp
00061 {
00062 bool operator () (const batch_entry & a, const batch_entry & b) const
00063 {
00064 return (a.offset > b.offset);
00065 }
00066 };
00067
00068 typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type;
00069 batch_type batch_write_blocks;
00070
00071 public:
00076 buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size) :
00077 nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
00078 writebatchsize(write_batch_size ? write_batch_size : 1)
00079 {
00080 write_buffers = new block_type[nwriteblocks];
00081 write_reqs = new request_ptr[nwriteblocks];
00082 write_bids = new bid_type[nwriteblocks];
00083
00084 for (unsigned_type i = 0; i < nwriteblocks; i++)
00085 free_write_blocks.push_back(i);
00086
00087 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00088 }
00091 block_type * get_free_block()
00092 {
00093 int_type ibuffer;
00094 for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
00095 it != busy_write_blocks.end(); ++it)
00096 {
00097 if (write_reqs[ibuffer = (*it)]->poll())
00098 {
00099 busy_write_blocks.erase(it);
00100 free_write_blocks.push_back(ibuffer);
00101
00102 break;
00103 }
00104 }
00105 if (UNLIKELY(free_write_blocks.empty()))
00106 {
00107 int_type size = busy_write_blocks.size();
00108 request_ptr * reqs = new request_ptr[size];
00109 int_type i = 0;
00110 for ( ; i < size; ++i)
00111 {
00112 reqs[i] = write_reqs[busy_write_blocks[i]];
00113 }
00114 int_type completed = wait_any(reqs, size);
00115 int_type completed_global = busy_write_blocks[completed];
00116 delete[] reqs;
00117 busy_write_blocks.erase(busy_write_blocks.begin() + completed);
00118
00119 return (write_buffers + completed_global);
00120 }
00121 ibuffer = free_write_blocks.back();
00122 free_write_blocks.pop_back();
00123
00124 return (write_buffers + ibuffer);
00125 }
00131 block_type * write(block_type * filled_block, const bid_type & bid)
00132 {
00133 if (batch_write_blocks.size() >= writebatchsize)
00134 {
00135
00136 while (!batch_write_blocks.empty())
00137 {
00138 int_type ibuffer = batch_write_blocks.top().ibuffer;
00139 batch_write_blocks.pop();
00140
00141 if (write_reqs[ibuffer].valid())
00142 write_reqs[ibuffer]->wait();
00143
00144 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00145
00146 busy_write_blocks.push_back(ibuffer);
00147 }
00148 }
00149
00150
00151 int_type ibuffer = filled_block - write_buffers;
00152 write_bids[ibuffer] = bid;
00153 batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
00154
00155 return get_free_block();
00156 }
00158 void flush()
00159 {
00160 int_type ibuffer;
00161 while (!batch_write_blocks.empty())
00162 {
00163 ibuffer = batch_write_blocks.top().ibuffer;
00164 batch_write_blocks.pop();
00165
00166 if (write_reqs[ibuffer].valid())
00167 write_reqs[ibuffer]->wait();
00168
00169 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00170
00171 busy_write_blocks.push_back(ibuffer);
00172 }
00173 for (std::vector<int_type>::const_iterator it =
00174 busy_write_blocks.begin();
00175 it != busy_write_blocks.end(); it++)
00176 {
00177 ibuffer = *it;
00178 write_reqs[ibuffer]->wait();
00179 }
00180
00181 assert(batch_write_blocks.empty());
00182 free_write_blocks.clear();
00183 busy_write_blocks.clear();
00184
00185 for (unsigned_type i = 0; i < nwriteblocks; i++)
00186 free_write_blocks.push_back(i);
00187 }
00188
00190 virtual ~buffered_writer()
00191 {
00192 int_type ibuffer;
00193 while (!batch_write_blocks.empty())
00194 {
00195 ibuffer = batch_write_blocks.top().ibuffer;
00196 batch_write_blocks.pop();
00197
00198 if (write_reqs[ibuffer].valid())
00199 write_reqs[ibuffer]->wait();
00200
00201 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00202
00203 busy_write_blocks.push_back(ibuffer);
00204 }
00205 for (std::vector<int_type>::const_iterator it =
00206 busy_write_blocks.begin();
00207 it != busy_write_blocks.end(); it++)
00208 {
00209 ibuffer = *it;
00210 write_reqs[ibuffer]->wait();
00211 }
00212
00213 delete[] write_reqs;
00214 delete[] write_buffers;
00215 delete[] write_bids;
00216 }
00217 };
00218
00220
00221 __STXXL_END_NAMESPACE
00222
00223 #endif // !STXXL_BUFFERED_WRITER_HEADER