00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef STXXL_ASYNC_SCHEDULE_HEADER
00014 #define STXXL_ASYNC_SCHEDULE_HEADER
00015
00016 #include <queue>
00017 #include <algorithm>
00018 #include <functional>
00019
00020 #ifdef STXXL_BOOST_CONFIG
00021 #include <boost/config.hpp>
00022 #endif
00023 #include <stxxl/bits/io/iobase.h>
00024
00025 #include <stxxl/bits/compat_hash_map.h>
00026 #include <stxxl/bits/compat_hash_set.h>
00027
00028
00029 __STXXL_BEGIN_NAMESPACE
00030
00031 struct sim_event
00032 {
00033 int_type timestamp;
00034 int_type iblock;
00035 inline sim_event(int_type t, int_type b) : timestamp(t), iblock(b) { }
00036 };
00037
00038 struct sim_event_cmp : public std::binary_function<sim_event, sim_event, bool>
00039 {
00040 inline bool operator () (const sim_event & a, const sim_event & b) const
00041 {
00042 return a.timestamp > b.timestamp;
00043 }
00044 };
00045
00046 int_type simulate_async_write(
00047 int * disks,
00048 const int_type L,
00049 const int_type m_init,
00050 const int_type D,
00051 std::pair<int_type, int_type> * o_time);
00052
00053 typedef std::pair<int_type, int_type> write_time_pair;
00054 struct write_time_cmp : public std::binary_function<write_time_pair, write_time_pair, bool>
00055 {
00056 inline bool operator () (const write_time_pair & a, const write_time_pair & b) const
00057 {
00058 return a.second > b.second;
00059 }
00060 };
00061
00062 void compute_prefetch_schedule(
00063 int_type * first,
00064 int_type * last,
00065 int_type * out_first,
00066 int_type m,
00067 int_type D);
00068
00069 template <typename run_type>
00070 void simulate_async_write(
00071 const run_type & input,
00072 const int_type m_init,
00073 const int_type D,
00074 std::pair<int_type, int_type> * o_time)
00075 {
00076 typedef std::priority_queue<sim_event, std::vector<sim_event>, sim_event_cmp> event_queue_type;
00077 typedef std::queue<int_type> disk_queue_type;
00078
00079 const int_type L = input.size();
00080 assert(L >= D);
00081 disk_queue_type * disk_queues = new disk_queue_type[L];
00082 event_queue_type event_queue;
00083
00084 int_type m = m_init;
00085 int_type i = L - 1;
00086 int_type oldtime = 0;
00087 bool * disk_busy = new bool[D];
00088
00089 while (m && (i >= 0))
00090 {
00091 int_type disk = input[i].bid.storage->get_id();
00092 disk_queues[disk].push(i);
00093 i--;
00094 m--;
00095 }
00096
00097 for (int_type ii = 0; ii < D; ii++)
00098 if (!disk_queues[ii].empty())
00099 {
00100 int_type j = disk_queues[ii].front();
00101 disk_queues[ii].pop();
00102 event_queue.push(sim_event(1, j));
00103 }
00104
00105 while (!event_queue.empty())
00106 {
00107 sim_event cur = event_queue.top();
00108 event_queue.pop();
00109 if (oldtime != cur.timestamp)
00110 {
00111
00112 for (int_type i = 0; i < D; i++)
00113 disk_busy[i] = false;
00114
00115
00116 oldtime = cur.timestamp;
00117 }
00118 o_time[cur.iblock] = std::pair<int_type, int_type>(cur.iblock, cur.timestamp + 1);
00119
00120 m++;
00121 if (i >= 0)
00122 {
00123 m--;
00124 int_type disk = input[i].bid.storage->get_id();
00125 if (disk_busy[disk])
00126 {
00127 disk_queues[disk].push(i);
00128 }
00129 else
00130 {
00131 event_queue.push(sim_event(cur.timestamp + 1, i));
00132 disk_busy[disk] = true;
00133 }
00134
00135 i--;
00136 }
00137
00138
00139 int_type disk = input[cur.iblock].bid.storage->get_id();
00140 if (!disk_busy[disk] && !disk_queues[disk].empty())
00141 {
00142 event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front()));
00143 disk_queues[disk].pop();
00144 disk_busy[disk] = true;
00145 }
00146 }
00147
00148 delete[] disk_busy;
00149 delete[] disk_queues;
00150 }
00151
00152
00153 template <typename run_type>
00154 void compute_prefetch_schedule(
00155 const run_type & input,
00156 int_type * out_first,
00157 int_type m,
00158 int_type D)
00159 {
00160 typedef std::pair<int_type, int_type> pair_type;
00161 const int_type L = input.size();
00162 if (L <= D)
00163 {
00164 for (int_type i = 0; i < L; ++i)
00165 out_first[i] = i;
00166
00167 return;
00168 }
00169 pair_type * write_order = new pair_type[L];
00170
00171 simulate_async_write(input, m, D, write_order);
00172
00173 std::stable_sort(write_order, write_order + L, write_time_cmp());
00174
00175 for (int_type i = 0; i < L; i++)
00176 out_first[i] = write_order[i].first;
00177
00178
00179 delete[] write_order;
00180 }
00181
00182
00183 template <typename bid_iterator_type>
00184 void simulate_async_write(
00185 bid_iterator_type input,
00186 const int_type L,
00187 const int_type m_init,
00188 const int_type ,
00189 std::pair<int_type, int_type> * o_time)
00190 {
00191 typedef std::priority_queue<sim_event, std::vector<sim_event>, sim_event_cmp> event_queue_type;
00192 typedef std::queue<int_type> disk_queue_type;
00193
00194
00195 typedef typename compat_hash_map<int, disk_queue_type>::result disk_queues_type;
00196 disk_queues_type disk_queues;
00197 event_queue_type event_queue;
00198
00199 int_type m = m_init;
00200 int_type i = L - 1;
00201 int_type oldtime = 0;
00202
00203 compat_hash_set<int>::result disk_busy;
00204
00205 while (m && (i >= 0))
00206 {
00207 int_type disk = (*(input + i)).storage->get_id();
00208 disk_queues[disk].push(i);
00209 i--;
00210 m--;
00211 }
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222 disk_queues_type::iterator it = disk_queues.begin();
00223 for ( ; it != disk_queues.end(); ++it)
00224 {
00225 int_type j = (*it).second.front();
00226 (*it).second.pop();
00227 event_queue.push(sim_event(1, j));
00228 }
00229
00230 while (!event_queue.empty())
00231 {
00232 sim_event cur = event_queue.top();
00233 event_queue.pop();
00234 if (oldtime != cur.timestamp)
00235 {
00236
00237
00238
00239 disk_busy.clear();
00240
00241 oldtime = cur.timestamp;
00242 }
00243 o_time[cur.iblock] = std::pair<int_type, int_type>(cur.iblock, cur.timestamp + 1);
00244
00245 m++;
00246 if (i >= 0)
00247 {
00248 m--;
00249 int_type disk = (*(input + i)).storage->get_id();
00250 if (disk_busy.find(disk) != disk_busy.end())
00251 {
00252 disk_queues[disk].push(i);
00253 }
00254 else
00255 {
00256 event_queue.push(sim_event(cur.timestamp + 1, i));
00257 disk_busy.insert(disk);
00258 }
00259
00260 i--;
00261 }
00262
00263
00264 int_type disk = (*(input + cur.iblock)).storage->get_id();
00265 if (disk_busy.find(disk) == disk_busy.end() && !disk_queues[disk].empty())
00266 {
00267 event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front()));
00268 disk_queues[disk].pop();
00269 disk_busy.insert(disk);
00270 }
00271 }
00272
00273
00274
00275 }
00276
00277
00278 template <typename bid_iterator_type>
00279 void compute_prefetch_schedule(
00280 bid_iterator_type input_begin,
00281 bid_iterator_type input_end,
00282 int_type * out_first,
00283 int_type m,
00284 int_type D)
00285 {
00286 typedef std::pair<int_type, int_type> pair_type;
00287 const int_type L = input_end - input_begin;
00288 STXXL_VERBOSE1("compute_prefetch_schedule: sequence length=" << L << " disks=" << D);
00289 if (L <= D)
00290 {
00291 for (int_type i = 0; i < L; ++i)
00292 out_first[i] = i;
00293
00294 return;
00295 }
00296 pair_type * write_order = new pair_type[L];
00297
00298 simulate_async_write(input_begin, L, m, D, write_order);
00299
00300 std::stable_sort(write_order, write_order + L, write_time_cmp());
00301
00302 STXXL_VERBOSE1("Computed prefetch order for " << L << " blocks with " <<
00303 D << " disks and " << m << " blocks");
00304 for (int_type i = 0; i < L; i++)
00305 {
00306 out_first[i] = write_order[i].first;
00307 STXXL_VERBOSE1(write_order[i].first);
00308 }
00309
00310 delete[] write_order;
00311 }
00312
00313 __STXXL_END_NAMESPACE
00314
00315 #endif // !STXXL_ASYNC_SCHEDULE_HEADER