00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #ifndef STXXL_BLOCK_PREFETCHER_HEADER
00016 #define STXXL_BLOCK_PREFETCHER_HEADER
00017
00018 #include <vector>
00019 #include <queue>
00020
00021 #include <stxxl/bits/common/switch.h>
00022 #include <stxxl/bits/io/request_ptr.h>
00023 #include <stxxl/bits/io/iostats.h>
00024
00025
00026 __STXXL_BEGIN_NAMESPACE
00027
00030
00031
00032 class set_switch_handler
00033 {
00034 onoff_switch & switch_;
00035 completion_handler on_compl;
00036
00037 public:
00038 set_switch_handler(onoff_switch & switch__, const completion_handler & on_compl)
00039 : switch_(switch__), on_compl(on_compl)
00040 { }
00041
00042 void operator () (request * req)
00043 {
00044 on_compl(req);
00045 switch_.on();
00046 }
00047 };
00048
00053 template <typename block_type, typename bid_iterator_type>
00054 class block_prefetcher
00055 {
00056 block_prefetcher() { }
00057 typedef typename block_type::bid_type bid_type;
00058
00059 protected:
00060 bid_iterator_type consume_seq_begin;
00061 bid_iterator_type consume_seq_end;
00062 unsigned_type seq_length;
00063
00064 int_type * prefetch_seq;
00065
00066 unsigned_type nextread;
00067 unsigned_type nextconsume;
00068
00069 const int_type nreadblocks;
00070
00071 block_type * read_buffers;
00072 request_ptr * read_reqs;
00073 bid_type * read_bids;
00074
00075 onoff_switch * completed;
00076 int_type * pref_buffer;
00077
00078 completion_handler do_after_fetch;
00079
00080 block_type * wait(int_type iblock)
00081 {
00082 STXXL_VERBOSE1("block_prefetcher: waiting block " << iblock);
00083 {
00084 stats::scoped_wait_timer wait_timer(stats::WAIT_OP_READ);
00085
00086 completed[iblock].wait_for_on();
00087 }
00088 STXXL_VERBOSE1("block_prefetcher: finished waiting block " << iblock);
00089 int_type ibuffer = pref_buffer[iblock];
00090 STXXL_VERBOSE1("block_prefetcher: returning buffer " << ibuffer);
00091 assert(ibuffer >= 0 && ibuffer < nreadblocks);
00092 return (read_buffers + ibuffer);
00093 }
00094
00095 public:
00102 block_prefetcher(
00103 bid_iterator_type _cons_begin,
00104 bid_iterator_type _cons_end,
00105 int_type * _pref_seq,
00106 int_type _prefetch_buf_size,
00107 completion_handler do_after_fetch = default_completion_handler()
00108 ) :
00109 consume_seq_begin(_cons_begin),
00110 consume_seq_end(_cons_end),
00111 seq_length(_cons_end - _cons_begin),
00112 prefetch_seq(_pref_seq),
00113 nextread(STXXL_MIN(unsigned_type(_prefetch_buf_size), seq_length)),
00114 nextconsume(0),
00115 nreadblocks(nextread),
00116 do_after_fetch(do_after_fetch)
00117 {
00118 STXXL_VERBOSE1("block_prefetcher: seq_length=" << seq_length);
00119 STXXL_VERBOSE1("block_prefetcher: _prefetch_buf_size=" << _prefetch_buf_size);
00120 assert(seq_length > 0);
00121 assert(_prefetch_buf_size > 0);
00122 int_type i;
00123 read_buffers = new block_type[nreadblocks];
00124 read_reqs = new request_ptr[nreadblocks];
00125 read_bids = new bid_type[nreadblocks];
00126 pref_buffer = new int_type[seq_length];
00127
00128 std::fill(pref_buffer, pref_buffer + seq_length, -1);
00129
00130 completed = new onoff_switch[seq_length];
00131
00132 for (i = 0; i < nreadblocks; ++i)
00133 {
00134 assert(prefetch_seq[i] < int_type(seq_length));
00135 assert(prefetch_seq[i] >= 0);
00136 read_bids[i] = *(consume_seq_begin + prefetch_seq[i]);
00137 STXXL_VERBOSE1("block_prefetcher: reading block " << i <<
00138 " prefetch_seq[" << i << "]=" << prefetch_seq[i] <<
00139 " @ " << &read_buffers[i] <<
00140 " @ " << read_bids[i]);
00141 read_reqs[i] = read_buffers[i].read(
00142 read_bids[i],
00143 set_switch_handler(*(completed + prefetch_seq[i]), do_after_fetch));
00144 pref_buffer[prefetch_seq[i]] = i;
00145 }
00146 }
00149 block_type * pull_block()
00150 {
00151 STXXL_VERBOSE1("block_prefetcher: pulling a block");
00152 return wait(nextconsume++);
00153 }
00159 bool block_consumed(block_type * & buffer)
00160 {
00161 int_type ibuffer = buffer - read_buffers;
00162 STXXL_VERBOSE1("block_prefetcher: buffer " << ibuffer << " consumed");
00163 if (read_reqs[ibuffer].valid())
00164 read_reqs[ibuffer]->wait();
00165
00166 read_reqs[ibuffer] = NULL;
00167
00168 if (nextread < seq_length)
00169 {
00170 assert(ibuffer >= 0 && ibuffer < nreadblocks);
00171 int_type next_2_prefetch = prefetch_seq[nextread++];
00172 STXXL_VERBOSE1("block_prefetcher: prefetching block " << next_2_prefetch);
00173
00174 assert((next_2_prefetch < int_type(seq_length)) && (next_2_prefetch >= 0));
00175 assert(!completed[next_2_prefetch].is_on());
00176
00177 pref_buffer[next_2_prefetch] = ibuffer;
00178 read_bids[ibuffer] = *(consume_seq_begin + next_2_prefetch);
00179 read_reqs[ibuffer] = read_buffers[ibuffer].read(
00180 read_bids[ibuffer],
00181 set_switch_handler(*(completed + next_2_prefetch), do_after_fetch)
00182 );
00183 }
00184
00185 if (nextconsume >= seq_length)
00186 return false;
00187
00188
00189 buffer = wait(nextconsume++);
00190
00191 return true;
00192 }
00193
00194
00195
00196 bool empty() const
00197 {
00198 return nextconsume >= seq_length;
00199 }
00200
00201
00202 unsigned_type pos() const
00203 {
00204 return nextconsume;
00205 }
00206
00208 ~block_prefetcher()
00209 {
00210 for (int_type i = 0; i < nreadblocks; ++i)
00211 if (read_reqs[i].valid())
00212 read_reqs[i]->wait();
00213
00214
00215 delete[] read_reqs;
00216 delete[] read_bids;
00217 delete[] completed;
00218 delete[] pref_buffer;
00219 delete[] read_buffers;
00220 }
00221 };
00222
00224
00225 __STXXL_END_NAMESPACE
00226
00227 #endif // !STXXL_BLOCK_PREFETCHER_HEADER