00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef STXXL_BUFFERED_WRITER_HEADER
00014 #define STXXL_BUFFERED_WRITER_HEADER
00015
00016 #include <vector>
00017
00018 #include <stxxl/bits/mng/mng.h>
00019
00020
00021 __STXXL_BEGIN_NAMESPACE
00022
00029
00030
00034 template <typename block_type>
00035 class buffered_writer
00036 {
00037 buffered_writer() { }
00038
00039 protected:
00040 typedef typename block_type::bid_type bid_type;
00041
00042 const unsigned_type nwriteblocks;
00043 block_type * write_buffers;
00044 bid_type * write_bids;
00045 request_ptr * write_reqs;
00046 const unsigned_type writebatchsize;
00047
00048 std::vector<int_type> free_write_blocks;
00049 std::vector<int_type> busy_write_blocks;
00050
00051
00052 struct batch_entry
00053 {
00054 stxxl::int64 offset;
00055 int_type ibuffer;
00056 batch_entry(stxxl::int64 o, int b) : offset(o), ibuffer(b) { }
00057 };
00058 struct batch_entry_cmp
00059 {
00060 bool operator () (const batch_entry & a, const batch_entry & b) const
00061 {
00062 return (a.offset > b.offset);
00063 }
00064 };
00065
00066 typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type;
00067 batch_type batch_write_blocks;
00068
00069 public:
00074 buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size) :
00075 nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
00076 writebatchsize(write_batch_size ? write_batch_size : 1)
00077 {
00078 write_buffers = new block_type[nwriteblocks];
00079 write_reqs = new request_ptr[nwriteblocks];
00080 write_bids = new bid_type[nwriteblocks];
00081
00082 for (unsigned_type i = 0; i < nwriteblocks; i++)
00083 free_write_blocks.push_back(i);
00084
00085
00086 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00087 }
00090 block_type * get_free_block()
00091 {
00092 int_type ibuffer;
00093 for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
00094 it != busy_write_blocks.end(); ++it)
00095 {
00096 if (write_reqs[ibuffer = (*it)]->poll())
00097 {
00098 busy_write_blocks.erase(it);
00099 free_write_blocks.push_back(ibuffer);
00100
00101 break;
00102 }
00103 }
00104 if (UNLIKELY(free_write_blocks.empty()))
00105 {
00106 int_type size = busy_write_blocks.size();
00107 request_ptr * reqs = new request_ptr[size];
00108 int_type i = 0;
00109 for ( ; i < size; ++i)
00110 {
00111 reqs[i] = write_reqs[busy_write_blocks[i]];
00112 }
00113 int_type completed = wait_any(reqs, size);
00114 int_type completed_global = busy_write_blocks[completed];
00115 delete[] reqs;
00116 busy_write_blocks.erase(busy_write_blocks.begin() + completed);
00117
00118 return (write_buffers + completed_global);
00119 }
00120 ibuffer = free_write_blocks.back();
00121 free_write_blocks.pop_back();
00122
00123 return (write_buffers + ibuffer);
00124 }
00130 block_type * write(block_type * filled_block, const bid_type & bid)
00131 {
00132 if (batch_write_blocks.size() >= writebatchsize)
00133 {
00134
00135 while (!batch_write_blocks.empty())
00136 {
00137 int_type ibuffer = batch_write_blocks.top().ibuffer;
00138 batch_write_blocks.pop();
00139
00140 if (write_reqs[ibuffer].valid())
00141 write_reqs[ibuffer]->wait();
00142
00143 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00144
00145 busy_write_blocks.push_back(ibuffer);
00146 }
00147 }
00148
00149
00150 int_type ibuffer = filled_block - write_buffers;
00151 write_bids[ibuffer] = bid;
00152 batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
00153
00154 return get_free_block();
00155 }
00157 void flush()
00158 {
00159 int_type ibuffer;
00160 while (!batch_write_blocks.empty())
00161 {
00162 ibuffer = batch_write_blocks.top().ibuffer;
00163 batch_write_blocks.pop();
00164
00165 if (write_reqs[ibuffer].valid())
00166 write_reqs[ibuffer]->wait();
00167
00168 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00169
00170 busy_write_blocks.push_back(ibuffer);
00171 }
00172 for (std::vector<int_type>::const_iterator it =
00173 busy_write_blocks.begin();
00174 it != busy_write_blocks.end(); it++)
00175 {
00176 ibuffer = *it;
00177 write_reqs[ibuffer]->wait();
00178 }
00179
00180 assert(batch_write_blocks.empty());
00181 free_write_blocks.clear();
00182 busy_write_blocks.clear();
00183
00184 for (unsigned_type i = 0; i < nwriteblocks; i++)
00185 free_write_blocks.push_back(i);
00186 }
00187
00189 virtual ~buffered_writer()
00190 {
00191 int_type ibuffer;
00192 while (!batch_write_blocks.empty())
00193 {
00194 ibuffer = batch_write_blocks.top().ibuffer;
00195 batch_write_blocks.pop();
00196
00197 if (write_reqs[ibuffer].valid())
00198 write_reqs[ibuffer]->wait();
00199
00200 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00201
00202 busy_write_blocks.push_back(ibuffer);
00203 }
00204 for (std::vector<int_type>::const_iterator it =
00205 busy_write_blocks.begin();
00206 it != busy_write_blocks.end(); it++)
00207 {
00208 ibuffer = *it;
00209 write_reqs[ibuffer]->wait();
00210 }
00211
00212 delete[] write_reqs;
00213 delete[] write_buffers;
00214 delete[] write_bids;
00215 }
00216 };
00217
00219
00220 __STXXL_END_NAMESPACE
00221
00222 #endif // !STXXL_BUFFERED_WRITER_HEADER