00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef STXXL_BLOCK_PREFETCHER_HEADER
00014 #define STXXL_BLOCK_PREFETCHER_HEADER
00015
00016 #include <vector>
00017 #include <queue>
00018
00019 #include <stxxl/bits/common/switch.h>
00020 #include <stxxl/bits/io/iobase.h>
00021
00022
00023 __STXXL_BEGIN_NAMESPACE
00024
00027
00028
00029 class set_switch_handler
00030 {
00031 onoff_switch & switch_;
00032
00033 public:
00034 set_switch_handler(onoff_switch & switch__) : switch_(switch__) { }
00035 void operator () (request * ) { switch_.on(); }
00036 };
00037
00042 template <typename block_type, typename bid_iterator_type>
00043 class block_prefetcher
00044 {
00045 block_prefetcher() { }
00046
00047 protected:
00048 bid_iterator_type consume_seq_begin;
00049 bid_iterator_type consume_seq_end;
00050 unsigned_type seq_length;
00051
00052 int_type * prefetch_seq;
00053
00054 unsigned_type nextread;
00055 unsigned_type nextconsume;
00056
00057 const int_type nreadblocks;
00058
00059 block_type * read_buffers;
00060 request_ptr * read_reqs;
00061
00062 onoff_switch * completed;
00063 int_type * pref_buffer;
00064
00065 block_type * wait(int_type iblock)
00066 {
00067 STXXL_VERBOSE1("block_prefetcher: waiting block " << iblock);
00068 {
00069 stats::scoped_wait_timer wait_timer;
00070
00071 #ifdef NO_OVERLAPPING
00072 read_reqs[pref_buffer[iblock]]->poll();
00073 #endif
00074
00075 completed[iblock].wait_for_on();
00076 }
00077 STXXL_VERBOSE1("block_prefetcher: finished waiting block " << iblock);
00078 int_type ibuffer = pref_buffer[iblock];
00079 STXXL_VERBOSE1("block_prefetcher: returning buffer " << ibuffer);
00080 assert(ibuffer >= 0 && ibuffer < nreadblocks);
00081 return (read_buffers + ibuffer);
00082 }
00083
00084 public:
00091 block_prefetcher(
00092 bid_iterator_type _cons_begin,
00093 bid_iterator_type _cons_end,
00094 int_type * _pref_seq,
00095 int_type _prefetch_buf_size
00096 ) :
00097 consume_seq_begin(_cons_begin),
00098 consume_seq_end(_cons_end),
00099 seq_length(_cons_end - _cons_begin),
00100 prefetch_seq(_pref_seq),
00101 nextread(STXXL_MIN(unsigned_type(_prefetch_buf_size), seq_length)),
00102 nextconsume(0),
00103 nreadblocks(nextread)
00104 {
00105 STXXL_VERBOSE1("block_prefetcher: seq_length=" << seq_length);
00106 STXXL_VERBOSE1("block_prefetcher: _prefetch_buf_size=" << _prefetch_buf_size);
00107 assert(seq_length > 0);
00108 assert(_prefetch_buf_size > 0);
00109 int_type i;
00110 read_buffers = new block_type[nreadblocks];
00111 read_reqs = new request_ptr[nreadblocks];
00112 pref_buffer = new int_type[seq_length];
00113
00114 std::fill(pref_buffer, pref_buffer + seq_length, -1);
00115
00116 completed = new onoff_switch[seq_length];
00117
00118 for (i = 0; i < nreadblocks; ++i)
00119 {
00120 STXXL_VERBOSE1("block_prefetcher: reading block " << i
00121 << " prefetch_seq[" << i << "]=" << prefetch_seq[i]);
00122 assert(prefetch_seq[i] < int_type(seq_length));
00123 assert(prefetch_seq[i] >= 0);
00124 read_reqs[i] = read_buffers[i].read(
00125 *(consume_seq_begin + prefetch_seq[i]),
00126 set_switch_handler(*(completed + prefetch_seq[i])));
00127 pref_buffer[prefetch_seq[i]] = i;
00128 }
00129 }
00132 block_type * pull_block()
00133 {
00134 STXXL_VERBOSE1("block_prefetcher: pulling a block");
00135 return wait(nextconsume++);
00136 }
00142 bool block_consumed(block_type * & buffer)
00143 {
00144 int_type ibuffer = buffer - read_buffers;
00145 STXXL_VERBOSE1("block_prefetcher: buffer " << ibuffer << " consumed");
00146 if (read_reqs[ibuffer].valid())
00147 read_reqs[ibuffer]->wait();
00148
00149 read_reqs[ibuffer] = NULL;
00150
00151 if (nextread < seq_length)
00152 {
00153 assert(ibuffer >= 0 && ibuffer < nreadblocks);
00154 int_type next_2_prefetch = prefetch_seq[nextread++];
00155 STXXL_VERBOSE1("block_prefetcher: prefetching block " << next_2_prefetch);
00156
00157 assert(next_2_prefetch < int_type(seq_length) && next_2_prefetch >= 0);
00158 assert(!completed[next_2_prefetch].is_on());
00159
00160 pref_buffer[next_2_prefetch] = ibuffer;
00161 read_reqs[ibuffer] = read_buffers[ibuffer].read(
00162 *(consume_seq_begin + next_2_prefetch),
00163 set_switch_handler(*(completed + next_2_prefetch))
00164 );
00165 }
00166
00167 if (nextconsume >= seq_length)
00168 return false;
00169
00170
00171 buffer = wait(nextconsume++);
00172
00173 return true;
00174 }
00176 ~block_prefetcher()
00177 {
00178 for (int_type i = 0; i < nreadblocks; ++i)
00179 if (read_reqs[i].valid())
00180 read_reqs[i]->wait();
00181
00182
00183 delete[] read_reqs;
00184 delete[] completed;
00185 delete[] pref_buffer;
00186 delete[] read_buffers;
00187 }
00188 };
00189
00191
00192 __STXXL_END_NAMESPACE
00193
00194 #endif // !STXXL_BLOCK_PREFETCHER_HEADER