• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • Examples
  • File List

block_prefetcher.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/mng/block_prefetcher.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
00007  *
00008  *  Distributed under the Boost Software License, Version 1.0.
00009  *  (See accompanying file LICENSE_1_0.txt or copy at
00010  *  http://www.boost.org/LICENSE_1_0.txt)
00011  **************************************************************************/
00012 
00013 #ifndef STXXL_BLOCK_PREFETCHER_HEADER
00014 #define STXXL_BLOCK_PREFETCHER_HEADER
00015 
00016 #include <vector>
00017 #include <queue>
00018 
00019 #include <stxxl/bits/common/switch.h>
00020 #include <stxxl/bits/io/request.h>
00021 
00022 
00023 __STXXL_BEGIN_NAMESPACE
00024 
00027 
00028 
00029 class set_switch_handler
00030 {
00031     onoff_switch & switch_;
00032 
00033 public:
00034     set_switch_handler(onoff_switch & switch__) : switch_(switch__) { }
00035     void operator () (request * /*req*/) { switch_.on(); }
00036 };
00037 
00042 template <typename block_type, typename bid_iterator_type>
00043 class block_prefetcher
00044 {
00045     block_prefetcher() { }
00046     typedef typename block_type::bid_type bid_type;
00047 
00048 public:
00049     typedef void (*callback_type)(const bid_type &);
00050 
00051 protected:
00052     bid_iterator_type consume_seq_begin;
00053     bid_iterator_type consume_seq_end;
00054     unsigned_type seq_length;
00055 
00056     int_type * prefetch_seq;
00057 
00058     unsigned_type nextread;
00059     unsigned_type nextconsume;
00060 
00061     const int_type nreadblocks;
00062 
00063     block_type * read_buffers;
00064     request_ptr * read_reqs;
00065     bid_type * read_bids;
00066 
00067     onoff_switch * completed;
00068     int_type * pref_buffer;
00069 
00070     callback_type do_after_fetch;
00071 
00072     block_type * wait(int_type iblock)
00073     {
00074         STXXL_VERBOSE1("block_prefetcher: waiting block " << iblock);
00075         {
00076             stats::scoped_wait_timer wait_timer(stats::WAIT_OP_READ);
00077 
00078             completed[iblock].wait_for_on();
00079         }
00080         STXXL_VERBOSE1("block_prefetcher: finished waiting block " << iblock);
00081         int_type ibuffer = pref_buffer[iblock];
00082         STXXL_VERBOSE1("block_prefetcher: returning buffer " << ibuffer);
00083         assert(ibuffer >= 0 && ibuffer < nreadblocks);
00084         if (do_after_fetch)
00085             do_after_fetch(read_bids[ibuffer]);
00086         return (read_buffers + ibuffer);
00087     }
00088 
00089 public:
00096     block_prefetcher(
00097         bid_iterator_type _cons_begin,
00098         bid_iterator_type _cons_end,
00099         int_type * _pref_seq,
00100         int_type _prefetch_buf_size,
00101         callback_type do_after_fetch = NULL
00102         ) :
00103         consume_seq_begin(_cons_begin),
00104         consume_seq_end(_cons_end),
00105         seq_length(_cons_end - _cons_begin),
00106         prefetch_seq(_pref_seq),
00107         nextread(STXXL_MIN(unsigned_type(_prefetch_buf_size), seq_length)),
00108         nextconsume(0),
00109         nreadblocks(nextread),
00110         do_after_fetch(do_after_fetch)
00111     {
00112         STXXL_VERBOSE1("block_prefetcher: seq_length=" << seq_length);
00113         STXXL_VERBOSE1("block_prefetcher: _prefetch_buf_size=" << _prefetch_buf_size);
00114         assert(seq_length > 0);
00115         assert(_prefetch_buf_size > 0);
00116         int_type i;
00117         read_buffers = new block_type[nreadblocks];
00118         read_reqs = new request_ptr[nreadblocks];
00119         read_bids = new bid_type[nreadblocks];
00120         pref_buffer = new int_type[seq_length];
00121 
00122         std::fill(pref_buffer, pref_buffer + seq_length, -1);
00123 
00124         completed = new onoff_switch[seq_length];
00125 
00126         for (i = 0; i < nreadblocks; ++i)
00127         {
00128             assert(prefetch_seq[i] < int_type(seq_length));
00129             assert(prefetch_seq[i] >= 0);
00130             read_bids[i] = *(consume_seq_begin + prefetch_seq[i]);
00131             STXXL_VERBOSE1("block_prefetcher: reading block " << i <<
00132                            " prefetch_seq[" << i << "]=" << prefetch_seq[i] <<
00133                            " @ " << &read_buffers[i] <<
00134                            " @ " << read_bids[i]);
00135             read_reqs[i] = read_buffers[i].read(
00136                 read_bids[i],
00137                 set_switch_handler(*(completed + prefetch_seq[i])));
00138             pref_buffer[prefetch_seq[i]] = i;
00139         }
00140     }
00143     block_type * pull_block()
00144     {
00145         STXXL_VERBOSE1("block_prefetcher: pulling a block");
00146         return wait(nextconsume++);
00147     }
00153     bool block_consumed(block_type * & buffer)
00154     {
00155         int_type ibuffer = buffer - read_buffers;
00156         STXXL_VERBOSE1("block_prefetcher: buffer " << ibuffer << " consumed");
00157         if (read_reqs[ibuffer].valid())
00158             read_reqs[ibuffer]->wait();
00159 
00160         read_reqs[ibuffer] = NULL;
00161 
00162         if (nextread < seq_length)
00163         {
00164             assert(ibuffer >= 0 && ibuffer < nreadblocks);
00165             int_type next_2_prefetch = prefetch_seq[nextread++];
00166             STXXL_VERBOSE1("block_prefetcher: prefetching block " << next_2_prefetch);
00167 
00168             assert((next_2_prefetch < int_type(seq_length)) && (next_2_prefetch >= 0));
00169             assert(!completed[next_2_prefetch].is_on());
00170 
00171             pref_buffer[next_2_prefetch] = ibuffer;
00172             read_bids[ibuffer] = *(consume_seq_begin + next_2_prefetch);
00173             read_reqs[ibuffer] = read_buffers[ibuffer].read(
00174                 read_bids[ibuffer],
00175                 set_switch_handler(*(completed + next_2_prefetch))
00176                 );
00177         }
00178 
00179         if (nextconsume >= seq_length)
00180             return false;
00181 
00182 
00183         buffer = wait(nextconsume++);
00184 
00185         return true;
00186     }
00187 
00188     // no more consumable blocks available, but can't delete the prefetcher,
00189     // because not all blocks may have been returned, yet
00190     bool empty() const
00191     {
00192         return nextconsume >= seq_length;
00193     }
00194 
00195     // index of the next element in the consume sequence
00196     unsigned_type pos() const
00197     {
00198         return nextconsume;
00199     }
00200 
00202     ~block_prefetcher()
00203     {
00204         for (int_type i = 0; i < nreadblocks; ++i)
00205             if (read_reqs[i].valid())
00206                 read_reqs[i]->wait();
00207 
00208 
00209         delete[] read_reqs;
00210         delete[] read_bids;
00211         delete[] completed;
00212         delete[] pref_buffer;
00213         delete[] read_buffers;
00214     }
00215 };
00216 
00218 
00219 __STXXL_END_NAMESPACE
00220 
00221 #endif // !STXXL_BLOCK_PREFETCHER_HEADER

Generated by  doxygen 1.7.1