00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef STXXL_BLOCK_PREFETCHER_HEADER
00014 #define STXXL_BLOCK_PREFETCHER_HEADER
00015
00016 #include <vector>
00017 #include <queue>
00018
00019 #include <stxxl/bits/common/switch.h>
00020 #include <stxxl/bits/io/request.h>
00021
00022
00023 __STXXL_BEGIN_NAMESPACE
00024
00027
00028
00029 class set_switch_handler
00030 {
00031 onoff_switch & switch_;
00032
00033 public:
00034 set_switch_handler(onoff_switch & switch__) : switch_(switch__) { }
00035 void operator () (request * ) { switch_.on(); }
00036 };
00037
00042 template <typename block_type, typename bid_iterator_type>
00043 class block_prefetcher
00044 {
00045 block_prefetcher() { }
00046 typedef typename block_type::bid_type bid_type;
00047
00048 public:
00049 typedef void (*callback_type)(const bid_type &);
00050
00051 protected:
00052 bid_iterator_type consume_seq_begin;
00053 bid_iterator_type consume_seq_end;
00054 unsigned_type seq_length;
00055
00056 int_type * prefetch_seq;
00057
00058 unsigned_type nextread;
00059 unsigned_type nextconsume;
00060
00061 const int_type nreadblocks;
00062
00063 block_type * read_buffers;
00064 request_ptr * read_reqs;
00065 bid_type * read_bids;
00066
00067 onoff_switch * completed;
00068 int_type * pref_buffer;
00069
00070 callback_type do_after_fetch;
00071
00072 block_type * wait(int_type iblock)
00073 {
00074 STXXL_VERBOSE1("block_prefetcher: waiting block " << iblock);
00075 {
00076 stats::scoped_wait_timer wait_timer(stats::WAIT_OP_READ);
00077
00078 completed[iblock].wait_for_on();
00079 }
00080 STXXL_VERBOSE1("block_prefetcher: finished waiting block " << iblock);
00081 int_type ibuffer = pref_buffer[iblock];
00082 STXXL_VERBOSE1("block_prefetcher: returning buffer " << ibuffer);
00083 assert(ibuffer >= 0 && ibuffer < nreadblocks);
00084 if (do_after_fetch)
00085 do_after_fetch(read_bids[ibuffer]);
00086 return (read_buffers + ibuffer);
00087 }
00088
00089 public:
00096 block_prefetcher(
00097 bid_iterator_type _cons_begin,
00098 bid_iterator_type _cons_end,
00099 int_type * _pref_seq,
00100 int_type _prefetch_buf_size,
00101 callback_type do_after_fetch = NULL
00102 ) :
00103 consume_seq_begin(_cons_begin),
00104 consume_seq_end(_cons_end),
00105 seq_length(_cons_end - _cons_begin),
00106 prefetch_seq(_pref_seq),
00107 nextread(STXXL_MIN(unsigned_type(_prefetch_buf_size), seq_length)),
00108 nextconsume(0),
00109 nreadblocks(nextread),
00110 do_after_fetch(do_after_fetch)
00111 {
00112 STXXL_VERBOSE1("block_prefetcher: seq_length=" << seq_length);
00113 STXXL_VERBOSE1("block_prefetcher: _prefetch_buf_size=" << _prefetch_buf_size);
00114 assert(seq_length > 0);
00115 assert(_prefetch_buf_size > 0);
00116 int_type i;
00117 read_buffers = new block_type[nreadblocks];
00118 read_reqs = new request_ptr[nreadblocks];
00119 read_bids = new bid_type[nreadblocks];
00120 pref_buffer = new int_type[seq_length];
00121
00122 std::fill(pref_buffer, pref_buffer + seq_length, -1);
00123
00124 completed = new onoff_switch[seq_length];
00125
00126 for (i = 0; i < nreadblocks; ++i)
00127 {
00128 assert(prefetch_seq[i] < int_type(seq_length));
00129 assert(prefetch_seq[i] >= 0);
00130 read_bids[i] = *(consume_seq_begin + prefetch_seq[i]);
00131 STXXL_VERBOSE1("block_prefetcher: reading block " << i <<
00132 " prefetch_seq[" << i << "]=" << prefetch_seq[i] <<
00133 " @ " << &read_buffers[i] <<
00134 " @ " << read_bids[i]);
00135 read_reqs[i] = read_buffers[i].read(
00136 read_bids[i],
00137 set_switch_handler(*(completed + prefetch_seq[i])));
00138 pref_buffer[prefetch_seq[i]] = i;
00139 }
00140 }
00143 block_type * pull_block()
00144 {
00145 STXXL_VERBOSE1("block_prefetcher: pulling a block");
00146 return wait(nextconsume++);
00147 }
00153 bool block_consumed(block_type * & buffer)
00154 {
00155 int_type ibuffer = buffer - read_buffers;
00156 STXXL_VERBOSE1("block_prefetcher: buffer " << ibuffer << " consumed");
00157 if (read_reqs[ibuffer].valid())
00158 read_reqs[ibuffer]->wait();
00159
00160 read_reqs[ibuffer] = NULL;
00161
00162 if (nextread < seq_length)
00163 {
00164 assert(ibuffer >= 0 && ibuffer < nreadblocks);
00165 int_type next_2_prefetch = prefetch_seq[nextread++];
00166 STXXL_VERBOSE1("block_prefetcher: prefetching block " << next_2_prefetch);
00167
00168 assert((next_2_prefetch < int_type(seq_length)) && (next_2_prefetch >= 0));
00169 assert(!completed[next_2_prefetch].is_on());
00170
00171 pref_buffer[next_2_prefetch] = ibuffer;
00172 read_bids[ibuffer] = *(consume_seq_begin + next_2_prefetch);
00173 read_reqs[ibuffer] = read_buffers[ibuffer].read(
00174 read_bids[ibuffer],
00175 set_switch_handler(*(completed + next_2_prefetch))
00176 );
00177 }
00178
00179 if (nextconsume >= seq_length)
00180 return false;
00181
00182
00183 buffer = wait(nextconsume++);
00184
00185 return true;
00186 }
00187
00188
00189
00190 bool empty() const
00191 {
00192 return nextconsume >= seq_length;
00193 }
00194
00195
00196 unsigned_type pos() const
00197 {
00198 return nextconsume;
00199 }
00200
00202 ~block_prefetcher()
00203 {
00204 for (int_type i = 0; i < nreadblocks; ++i)
00205 if (read_reqs[i].valid())
00206 read_reqs[i]->wait();
00207
00208
00209 delete[] read_reqs;
00210 delete[] read_bids;
00211 delete[] completed;
00212 delete[] pref_buffer;
00213 delete[] read_buffers;
00214 }
00215 };
00216
00218
00219 __STXXL_END_NAMESPACE
00220
00221 #endif // !STXXL_BLOCK_PREFETCHER_HEADER