00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef STXXL_BLOCK_PREFETCHER_HEADER
00014 #define STXXL_BLOCK_PREFETCHER_HEADER
00015
00016 #include <vector>
00017 #include <queue>
00018
00019 #include <stxxl/bits/common/switch.h>
00020 #include <stxxl/bits/io/iobase.h>
00021
00022
00023 __STXXL_BEGIN_NAMESPACE
00024
00027
00028
00029 class set_switch_handler
00030 {
00031 onoff_switch & switch_;
00032
00033 public:
00034 set_switch_handler(onoff_switch & switch__) : switch_(switch__) { }
00035 void operator () (request * ) { switch_.on(); }
00036 };
00037
00042 template <typename block_type, typename bid_iterator_type>
00043 class block_prefetcher
00044 {
00045 block_prefetcher() { }
00046
00047 protected:
00048 bid_iterator_type consume_seq_begin;
00049 bid_iterator_type consume_seq_end;
00050 unsigned_type seq_length;
00051
00052 int_type * prefetch_seq;
00053
00054 unsigned_type nextread;
00055 unsigned_type nextconsume;
00056
00057 const int_type nreadblocks;
00058
00059 block_type * read_buffers;
00060 request_ptr * read_reqs;
00061
00062 onoff_switch * completed;
00063 int_type * pref_buffer;
00064
00065 block_type * wait(int_type iblock)
00066 {
00067 STXXL_VERBOSE1("block_prefetcher: waiting block " << iblock);
00068 START_COUNT_WAIT_TIME
00069
00070 #ifdef NO_OVERLAPPING
00071 read_reqs[pref_buffer[iblock]]->poll();
00072 #endif
00073
00074 completed[iblock].wait_for_on();
00075 END_COUNT_WAIT_TIME
00076 STXXL_VERBOSE1("block_prefetcher: finished waiting block " << iblock);
00077 int_type ibuffer = pref_buffer[iblock];
00078 STXXL_VERBOSE1("block_prefetcher: returning buffer " << ibuffer);
00079 assert(ibuffer >= 0 && ibuffer < nreadblocks);
00080 return (read_buffers + ibuffer);
00081 }
00082
00083 public:
00090 block_prefetcher(
00091 bid_iterator_type _cons_begin,
00092 bid_iterator_type _cons_end,
00093 int_type * _pref_seq,
00094 int_type _prefetch_buf_size
00095 ) :
00096 consume_seq_begin(_cons_begin),
00097 consume_seq_end(_cons_end),
00098 seq_length(_cons_end - _cons_begin),
00099 prefetch_seq(_pref_seq),
00100 nextread(STXXL_MIN(unsigned_type(_prefetch_buf_size), seq_length)),
00101 nextconsume(0),
00102 nreadblocks(nextread)
00103 {
00104 STXXL_VERBOSE1("block_prefetcher: seq_length=" << seq_length);
00105 STXXL_VERBOSE1("block_prefetcher: _prefetch_buf_size=" << _prefetch_buf_size);
00106 assert(seq_length > 0);
00107 assert(_prefetch_buf_size > 0);
00108 int_type i;
00109 read_buffers = new block_type[nreadblocks];
00110 read_reqs = new request_ptr[nreadblocks];
00111 pref_buffer = new int_type[seq_length];
00112
00113 std::fill(pref_buffer, pref_buffer + seq_length, -1);
00114
00115 completed = new onoff_switch[seq_length];
00116
00117 for (i = 0; i < nreadblocks; ++i)
00118 {
00119 STXXL_VERBOSE1("block_prefetcher: reading block " << i
00120 << " prefetch_seq[" << i << "]=" << prefetch_seq[i]);
00121 assert(prefetch_seq[i] < int_type(seq_length));
00122 assert(prefetch_seq[i] >= 0);
00123 read_reqs[i] = read_buffers[i].read(
00124 *(consume_seq_begin + prefetch_seq[i]),
00125 set_switch_handler(*(completed + prefetch_seq[i])));
00126 pref_buffer[prefetch_seq[i]] = i;
00127 }
00128 }
00131 block_type * pull_block()
00132 {
00133 STXXL_VERBOSE1("block_prefetcher: pulling a block");
00134 return wait(nextconsume++);
00135 }
00141 bool block_consumed(block_type * & buffer)
00142 {
00143 int_type ibuffer = buffer - read_buffers;
00144 STXXL_VERBOSE1("block_prefetcher: buffer " << ibuffer << " consumed");
00145 if (read_reqs[ibuffer].valid())
00146 read_reqs[ibuffer]->wait();
00147
00148 read_reqs[ibuffer] = NULL;
00149
00150 if (nextread < seq_length)
00151 {
00152 assert(ibuffer >= 0 && ibuffer < nreadblocks);
00153 int_type next_2_prefetch = prefetch_seq[nextread++];
00154 STXXL_VERBOSE1("block_prefetcher: prefetching block " << next_2_prefetch);
00155
00156 assert(next_2_prefetch < int_type(seq_length) && next_2_prefetch >= 0);
00157 assert(!completed[next_2_prefetch].is_on());
00158
00159 pref_buffer[next_2_prefetch] = ibuffer;
00160 read_reqs[ibuffer] = read_buffers[ibuffer].read(
00161 *(consume_seq_begin + next_2_prefetch),
00162 set_switch_handler(*(completed + next_2_prefetch))
00163 );
00164 }
00165
00166 if (nextconsume >= seq_length)
00167 return false;
00168
00169
00170 buffer = wait(nextconsume++);
00171
00172 return true;
00173 }
00175 ~block_prefetcher()
00176 {
00177 for (int_type i = 0; i < nreadblocks; ++i)
00178 if (read_reqs[i].valid())
00179 read_reqs[i]->wait();
00180
00181
00182 delete[] read_reqs;
00183 delete[] completed;
00184 delete[] pref_buffer;
00185 delete[] read_buffers;
00186 }
00187 };
00188
00190
00191 __STXXL_END_NAMESPACE
00192
00193 #endif // !STXXL_BLOCK_PREFETCHER_HEADER