00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #ifndef STXXL_REQUEST_HEADER
00015 #define STXXL_REQUEST_HEADER
00016
00017 #include <iostream>
00018 #include <memory>
00019 #include <cassert>
00020
00021 #include <stxxl/bits/namespace.h>
00022 #include <stxxl/bits/noncopyable.h>
00023 #include <stxxl/bits/io/iostats.h>
00024 #include <stxxl/bits/common/mutex.h>
00025 #include <stxxl/bits/common/switch.h>
00026 #include <stxxl/bits/common/exceptions.h>
00027 #include <stxxl/bits/io/completion_handler.h>
00028 #include <stxxl/bits/compat_unique_ptr.h>
00029 #include <stxxl/bits/common/error_handling.h>
00030 #include <stxxl/bits/verbose.h>
00031
00032
00033 __STXXL_BEGIN_NAMESPACE
00034
00037
00038 #define BLOCK_ALIGN 4096
00039
00040 class file;
00041 class request_ptr;
00042
00044
00048 class request_base : private noncopyable
00049 {
00050 public:
00051 typedef stxxl::uint64 offset_type;
00052 typedef stxxl::unsigned_type size_type;
00053 enum request_type { READ, WRITE };
00054
00055 protected:
00056 virtual bool add_waiter(onoff_switch * sw) = 0;
00057 virtual void delete_waiter(onoff_switch * sw) = 0;
00058 virtual void notify_waiters() = 0;
00059
00060 public:
00061
00062 virtual void serve() = 0;
00063
00064 protected:
00065 virtual void completed() = 0;
00066
00067 public:
00069 virtual void wait(bool measure_time = true) = 0;
00070
00077 virtual bool cancel() = 0;
00078
00081 virtual bool poll() = 0;
00082
00085 virtual const char * io_type() const = 0;
00086
00087 virtual ~request_base()
00088 { }
00089 };
00090
00092 class request : virtual public request_base
00093 {
00094 friend int wait_any(request_ptr req_array[], int count);
00095 template <class request_iterator_>
00096 friend
00097 request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end);
00098 friend class request_queue_impl_qwqr;
00099 friend class request_ptr;
00100
00101 protected:
00102 completion_handler on_complete;
00103 int ref_cnt;
00104 compat_unique_ptr<stxxl::io_error>::result error;
00105
00106 mutex ref_cnt_mutex;
00107
00108 protected:
00109 file * file_;
00110 void * buffer;
00111 offset_type offset;
00112 size_type bytes;
00113 request_type type;
00114
00115 void completed();
00116
00117
00118 int nref()
00119 {
00120 scoped_mutex_lock Lock(ref_cnt_mutex);
00121 return ref_cnt;
00122 }
00123
00124 public:
00125 request(const completion_handler & on_compl,
00126 file * file__,
00127 void * buffer_,
00128 offset_type offset_,
00129 size_type bytes_,
00130 request_type type_);
00131
00132 virtual ~request();
00133
00134 file * get_file() const { return file_; }
00135 void * get_buffer() const { return buffer; }
00136 offset_type get_offset() const { return offset; }
00137 size_type get_size() const { return bytes; }
00138 request_type get_type() const { return type; }
00139
00140 void check_alignment() const;
00141
00142 virtual std::ostream & print(std::ostream & out) const
00143 {
00144 out << "File object address: " << (void *)get_file();
00145 out << " Buffer address: " << (void *)get_buffer();
00146 out << " File offset: " << get_offset();
00147 out << " Transfer size: " << get_size() << " bytes";
00148 out << " Type of transfer: " << ((get_type() == READ) ? "READ" : "WRITE");
00149 return out;
00150 }
00151
00154 void error_occured(const char * msg)
00155 {
00156 error.reset(new stxxl::io_error(msg));
00157 }
00158
00161 void error_occured(const std::string & msg)
00162 {
00163 error.reset(new stxxl::io_error(msg));
00164 }
00165
00167 void check_errors() throw (stxxl::io_error)
00168 {
00169 if (error.get())
00170 throw * (error.get());
00171 }
00172
00173 private:
00174 void add_ref()
00175 {
00176 scoped_mutex_lock Lock(ref_cnt_mutex);
00177 ref_cnt++;
00178 STXXL_VERBOSE3("request add_ref() " << static_cast<void *>(this) << ": adding reference, cnt: " << ref_cnt);
00179 }
00180
00181 bool sub_ref()
00182 {
00183 int val;
00184 {
00185 scoped_mutex_lock Lock(ref_cnt_mutex);
00186 val = --ref_cnt;
00187 STXXL_VERBOSE3("request sub_ref() " << static_cast<void *>(this) << ": subtracting reference cnt: " << ref_cnt);
00188 }
00189 assert(val >= 0);
00190 return (val == 0);
00191 }
00192
00193 protected:
00194 void check_nref(bool after = false)
00195 {
00196 if (nref() < 2)
00197 check_nref_failed(after);
00198 }
00199
00200 private:
00201 void check_nref_failed(bool after = false);
00202 };
00203
00204 inline std::ostream & operator << (std::ostream & out, const request & req)
00205 {
00206 return req.print(out);
00207 }
00208
00210
00212 class request_ptr
00213 {
00214 request * ptr;
00215 void add_ref()
00216 {
00217 if (ptr)
00218 {
00219 ptr->add_ref();
00220 }
00221 }
00222 void sub_ref()
00223 {
00224 if (ptr)
00225 {
00226 if (ptr->sub_ref())
00227 {
00228 STXXL_VERBOSE3("the last copy " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00229 delete ptr;
00230 ptr = NULL;
00231 }
00232 else
00233 {
00234 STXXL_VERBOSE3("more copies " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00235 }
00236 }
00237 }
00238
00239 public:
00241 request_ptr(request * ptr_ = NULL) : ptr(ptr_)
00242 {
00243 STXXL_VERBOSE3("create constructor (request =" << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this));
00244 add_ref();
00245 }
00247 request_ptr(const request_ptr & p) : ptr(p.ptr)
00248 {
00249 STXXL_VERBOSE3("copy constructor (copying " << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this));
00250 add_ref();
00251 }
00253 ~request_ptr()
00254 {
00255 STXXL_VERBOSE3("Destructor of a request_ptr pointing to " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00256 sub_ref();
00257 }
00260 request_ptr & operator = (const request_ptr & p)
00261 {
00262
00263 return (*this = p.ptr);
00264 }
00267 request_ptr & operator = (request * p)
00268 {
00269 STXXL_VERBOSE3("assign operator begin (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this));
00270 if (p != ptr)
00271 {
00272 sub_ref();
00273 ptr = p;
00274 add_ref();
00275 }
00276 STXXL_VERBOSE3("assign operator end (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this));
00277 return *this;
00278 }
00281 request & operator * () const
00282 {
00283 assert(ptr);
00284 return *ptr;
00285 }
00288 request * operator -> () const
00289 {
00290 assert(ptr);
00291 return ptr;
00292 }
00293
00294 bool operator == (const request_ptr & rp2) const
00295 {
00296 return ptr == rp2.ptr;
00297 }
00298
00303 request * get() const { return ptr; }
00304
00306 bool valid() const { return ptr; }
00307
00309 bool empty() const { return !ptr; }
00310 };
00311
00313
00314
00318 template <class request_iterator_>
00319 void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00320 {
00321 for ( ; reqs_begin != reqs_end; ++reqs_begin)
00322 (request_ptr(*reqs_begin))->wait();
00323 }
00324
00328 inline void wait_all(request_ptr req_array[], int count)
00329 {
00330 wait_all(req_array, req_array + count);
00331 }
00332
00341 template <class request_iterator_>
00342 typename std::iterator_traits<request_iterator_>::difference_type cancel_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00343 {
00344 typename std::iterator_traits<request_iterator_>::difference_type num_cancelled = 0;
00345 while (reqs_begin != reqs_end)
00346 {
00347 if ((request_ptr(*reqs_begin))->cancel())
00348 ++num_cancelled;
00349 ++reqs_begin;
00350 }
00351 return num_cancelled;
00352 }
00353
00359 template <class request_iterator_>
00360 request_iterator_ poll_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00361 {
00362 while (reqs_begin != reqs_end)
00363 {
00364 if ((request_ptr(*reqs_begin))->poll())
00365 return reqs_begin;
00366
00367 ++reqs_begin;
00368 }
00369 return reqs_end;
00370 }
00371
00372
00378 inline bool poll_any(request_ptr req_array[], int count, int & index)
00379 {
00380 request_ptr * res = poll_any(req_array, req_array + count);
00381 index = res - req_array;
00382 return res != (req_array + count);
00383 }
00384
00385
00390 template <class request_iterator_>
00391 request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00392 {
00393 stats::scoped_wait_timer wait_timer(stats::WAIT_OP_ANY);
00394
00395 onoff_switch sw;
00396
00397 request_iterator_ cur = reqs_begin, result = reqs_end;
00398
00399 for ( ; cur != reqs_end; cur++)
00400 {
00401 if ((request_ptr(*cur))->add_waiter(&sw))
00402 {
00403
00404 result = cur;
00405
00406 if (cur != reqs_begin)
00407 {
00408 while (--cur != reqs_begin)
00409 (request_ptr(*cur))->delete_waiter(&sw);
00410
00411 (request_ptr(*cur))->delete_waiter(&sw);
00412 }
00413
00414 (request_ptr(*result))->check_errors();
00415
00416 return result;
00417 }
00418 }
00419
00420 sw.wait_for_on();
00421
00422 for (cur = reqs_begin; cur != reqs_end; cur++)
00423 {
00424 (request_ptr(*cur))->delete_waiter(&sw);
00425 if (result == reqs_end && (request_ptr(*cur))->poll())
00426 result = cur;
00427 }
00428
00429 return result;
00430 }
00431
00432
00437 inline int wait_any(request_ptr req_array[], int count)
00438 {
00439 return wait_any(req_array, req_array + count) - req_array;
00440 }
00441
00443
00444 __STXXL_END_NAMESPACE
00445
00446 #endif // !STXXL_REQUEST_HEADER
00447