00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #ifndef STXXL_IOBASE_HEADER
00015 #define STXXL_IOBASE_HEADER
00016
00017 #ifdef STXXL_BOOST_CONFIG
00018 #include <boost/config.hpp>
00019 #endif
00020
00021 #if defined (__linux__)
00022 #define STXXL_CHECK_BLOCK_ALIGNING
00023 #endif
00024
00025
00026
00027
00028
00029
00030 #include <cstdlib>
00031 #include <cstdio>
00032 #include <cstring>
00033 #include <cerrno>
00034
00035 #include <fcntl.h>
00036 #include <sys/types.h>
00037 #include <sys/stat.h>
00038
00039 #include <iostream>
00040 #include <algorithm>
00041 #include <string>
00042 #include <queue>
00043 #include <map>
00044 #include <set>
00045
00046 #ifdef BOOST_MSVC
00047
00048 #include <io.h>
00049 #else
00050 #include <unistd.h>
00051 #include <sys/resource.h>
00052 #include <sys/wait.h>
00053 #endif
00054
00055 #ifdef STXXL_BOOST_THREADS // Use Portable Boost threads
00056
00057 #include <boost/thread/thread.hpp>
00058 #include <boost/thread/mutex.hpp>
00059 #include <boost/bind.hpp>
00060 #else
00061 #include <pthread.h>
00062 #endif
00063
00064
00065 #ifndef O_SYNC
00066 #define O_SYNC 0
00067 #endif
00068 #ifndef O_RSYNC
00069 #define O_RSYNC 0
00070 #endif
00071 #ifndef O_DSYNC
00072 #define O_DSYNC 0
00073 #endif
00074
00075
00076 #if defined (__linux__)
00077
00078 #if !defined (O_DIRECT) && (defined (__alpha__) || defined (__i386__))
00079 #define O_DIRECT 040000
00080 #endif
00081 #endif
00082
00083
00084 #ifndef O_DIRECT
00085 #define O_DIRECT O_SYNC
00086 #endif
00087
00088
00089 #include <stxxl/bits/namespace.h>
00090 #include <stxxl/bits/io/iostats.h>
00091 #include <stxxl/bits/common/semaphore.h>
00092 #include <stxxl/bits/common/mutex.h>
00093 #include <stxxl/bits/common/switch.h>
00094 #include <stxxl/bits/common/state.h>
00095 #include <stxxl/bits/common/exceptions.h>
00096 #include <stxxl/bits/io/completion_handler.h>
00097
00098
00099 __STXXL_BEGIN_NAMESPACE
00100
00105
00106 #define BLOCK_ALIGN 4096
00107
00108 typedef void * (*thread_function_t)(void *);
00109 typedef stxxl::int64 DISKID;
00110
00111 class request;
00112 class request_ptr;
00113
00115
00116 struct default_completion_handler
00117 {
00119 void operator () (request *) { }
00120 };
00121
00123
00126 class file : private noncopyable
00127 {
00128 protected:
00129 int id;
00130
00134 file(int _id) : id(_id) { }
00135
00136 public:
00138
00141 enum open_mode
00142 {
00143 RDONLY = 1,
00144 WRONLY = 2,
00145 RDWR = 4,
00146 CREAT = 8,
00147 DIRECT = 16,
00148 TRUNC = 32
00149 };
00150
00157 virtual request_ptr aread(void * buffer, stxxl::int64 pos, size_t bytes,
00158 completion_handler on_cmpl) = 0;
00165 virtual request_ptr awrite(void * buffer, stxxl::int64 pos, size_t bytes,
00166 completion_handler on_cmpl) = 0;
00167
00170 virtual void set_size(stxxl::int64 newsize) = 0;
00173 virtual stxxl::int64 size() = 0;
00175 __STXXL_DEPRECATED(int get_disk_number())
00176 {
00177 return id;
00178 }
00182 int get_id()
00183 {
00184 return id;
00185 }
00186
00188 virtual void lock() { }
00189
00191 virtual void delete_region(int64 offset, unsigned_type size)
00192 {
00193 UNUSED(offset);
00194 UNUSED(size);
00195 }
00196
00197 virtual ~file() { }
00198 };
00199
00200 class mc;
00201 class disk_queue;
00202 class disk_queues;
00203
00205
00209 class request : private noncopyable
00210 {
00211 friend int wait_any(request_ptr req_array[], int count);
00212 template <class request_iterator_>
00213 friend
00214 request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end);
00215 friend class file;
00216 friend class disk_queue;
00217 friend class disk_queues;
00218 friend class request_ptr;
00219
00220 protected:
00221 virtual bool add_waiter(onoff_switch * sw) = 0;
00222 virtual void delete_waiter(onoff_switch * sw) = 0;
00223
00224 virtual void serve() = 0;
00225
00226
00227 completion_handler on_complete;
00228 int ref_cnt;
00229 std::auto_ptr<stxxl::io_error> error;
00230
00231 mutex ref_cnt_mutex;
00232
00233 public:
00234 enum request_type { READ, WRITE };
00235
00236 protected:
00237 file * file_;
00238 void * buffer;
00239 stxxl::int64 offset;
00240 size_t bytes;
00241 request_type type;
00242
00243 void completed()
00244 {
00245 on_complete(this);
00246 }
00247
00248
00249 int nref()
00250 {
00251 scoped_mutex_lock Lock(ref_cnt_mutex);
00252 return ref_cnt;
00253 }
00254
00255 public:
00256 request(completion_handler on_compl,
00257 file * file__,
00258 void * buffer_,
00259 stxxl::int64 offset_,
00260 size_t bytes_,
00261 request_type type_) :
00262 on_complete(on_compl), ref_cnt(0),
00263 file_(file__),
00264 buffer(buffer_),
00265 offset(offset_),
00266 bytes(bytes_),
00267 type(type_)
00268 {
00269 STXXL_VERBOSE3("request " << static_cast<void *>(this) << ": creation, cnt: " << ref_cnt);
00270 }
00272 virtual void wait() = 0;
00275 virtual bool poll() = 0;
00278 virtual const char * io_type()
00279 {
00280 return "none";
00281 }
00282 virtual ~request()
00283 {
00284 STXXL_VERBOSE3("request " << static_cast<void *>(this) << ": deletion, cnt: " << ref_cnt);
00285 }
00286 file * get_file() const { return file_; }
00287 void * get_buffer() const { return buffer; }
00288 stxxl::int64 get_offset() const { return offset; }
00289 size_t get_size() const { return bytes; }
00290 size_t size() const { return bytes; }
00291 request_type get_type() const { return type; }
00292
00293 virtual std::ostream & print(std::ostream & out) const
00294 {
00295 out << "File object address: " << (void *)get_file();
00296 out << " Buffer address: " << (void *)get_buffer();
00297 out << " File offset: " << get_offset();
00298 out << " Transfer size: " << get_size() << " bytes";
00299 out << " Type of transfer: " << ((get_type() == READ) ? "READ" : "WRITE");
00300 return out;
00301 }
00302
00305 void error_occured(const char * msg)
00306 {
00307 error.reset(new stxxl::io_error(msg));
00308 }
00309
00312 void error_occured(const std::string & msg)
00313 {
00314 error.reset(new stxxl::io_error(msg));
00315 }
00316
00318 void check_errors() throw (stxxl::io_error)
00319 {
00320 if (error.get())
00321 throw * (error.get());
00322 }
00323
00324 private:
00325 void add_ref()
00326 {
00327 scoped_mutex_lock Lock(ref_cnt_mutex);
00328 ref_cnt++;
00329 STXXL_VERBOSE3("request add_ref() " << static_cast<void *>(this) << ": adding reference, cnt: " << ref_cnt);
00330 }
00331
00332 bool sub_ref()
00333 {
00334 int val;
00335 {
00336 scoped_mutex_lock Lock(ref_cnt_mutex);
00337 val = --ref_cnt;
00338 STXXL_VERBOSE3("request sub_ref() " << static_cast<void *>(this) << ": subtracting reference cnt: " << ref_cnt);
00339 }
00340 assert(val >= 0);
00341 return (val == 0);
00342 }
00343 };
00344
00345 inline std::ostream & operator << (std::ostream & out, const request & req)
00346 {
00347 return req.print(out);
00348 }
00349
00351
00353 class request_ptr
00354 {
00355 request * ptr;
00356 void add_ref()
00357 {
00358 if (ptr)
00359 {
00360 ptr->add_ref();
00361 }
00362 }
00363 void sub_ref()
00364 {
00365 if (ptr)
00366 {
00367 if (ptr->sub_ref())
00368 {
00369 STXXL_VERBOSE3("the last copy " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00370 delete ptr;
00371 ptr = NULL;
00372 }
00373 else
00374 {
00375 STXXL_VERBOSE3("more copies " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00376 }
00377 }
00378 }
00379
00380 public:
00382 request_ptr(request * ptr_ = NULL) : ptr(ptr_)
00383 {
00384 STXXL_VERBOSE3("create constructor (request =" << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this));
00385 add_ref();
00386 }
00388 request_ptr(const request_ptr & p) : ptr(p.ptr)
00389 {
00390 STXXL_VERBOSE3("copy constructor (copying " << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this));
00391 add_ref();
00392 }
00394 ~request_ptr()
00395 {
00396 STXXL_VERBOSE3("Destructor of a request_ptr pointing to " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00397 sub_ref();
00398 }
00401 request_ptr & operator = (const request_ptr & p)
00402 {
00403
00404 return (*this = p.ptr);
00405 }
00408 request_ptr & operator = (request * p)
00409 {
00410 STXXL_VERBOSE3("assign operator begin (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this));
00411 if (p != ptr)
00412 {
00413 sub_ref();
00414 ptr = p;
00415 add_ref();
00416 }
00417 STXXL_VERBOSE3("assign operator end (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this));
00418 return *this;
00419 }
00422 request & operator * () const
00423 {
00424 assert(ptr);
00425 return *ptr;
00426 }
00429 request * operator -> () const
00430 {
00431 assert(ptr);
00432 return ptr;
00433 }
00438 request * get() const { return ptr; }
00439
00441 bool valid() const { return ptr; }
00442
00444 bool empty() const { return !ptr; }
00445 };
00446
00448
00453 inline int wait_any(request_ptr req_array[], int count);
00457 inline void wait_all(request_ptr req_array[], int count);
00463 inline bool poll_any(request_ptr req_array[], int count, int & index);
00464
00465
00466 void wait_all(request_ptr req_array[], int count)
00467 {
00468 for (int i = 0; i < count; i++)
00469 {
00470 req_array[i]->wait();
00471 }
00472 }
00473
00474 template <class request_iterator_>
00475 void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00476 {
00477 while (reqs_begin != reqs_end)
00478 {
00479 (request_ptr(*reqs_begin))->wait();
00480 ++reqs_begin;
00481 }
00482 }
00483
00484 bool poll_any(request_ptr req_array[], int count, int & index)
00485 {
00486 index = -1;
00487 for (int i = 0; i < count; i++)
00488 {
00489 if (req_array[i]->poll())
00490 {
00491 index = i;
00492 return true;
00493 }
00494 }
00495 return false;
00496 }
00497
00498 template <class request_iterator_>
00499 request_iterator_ poll_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00500 {
00501 while (reqs_begin != reqs_end)
00502 {
00503 if ((request_ptr(*reqs_begin))->poll())
00504 return reqs_begin;
00505
00506 ++reqs_begin;
00507 }
00508 return reqs_end;
00509 }
00510
00511
00512 int wait_any(request_ptr req_array[], int count)
00513 {
00514 stats::scoped_wait_timer wait_timer;
00515
00516 onoff_switch sw;
00517 int i = 0, index = -1;
00518
00519 for ( ; i < count; i++)
00520 {
00521 if (req_array[i]->add_waiter(&sw))
00522 {
00523
00524 index = i;
00525
00526 while (--i >= 0)
00527 req_array[i]->delete_waiter(&sw);
00528
00529 req_array[index]->check_errors();
00530
00531 return index;
00532 }
00533 }
00534
00535 sw.wait_for_on();
00536
00537 for (i = 0; i < count; i++)
00538 {
00539 req_array[i]->delete_waiter(&sw);
00540 if (index < 0 && req_array[i]->poll())
00541 index = i;
00542 }
00543
00544 return index;
00545 }
00546
00547 template <class request_iterator_>
00548 request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00549 {
00550 stats::scoped_wait_timer wait_timer;
00551
00552 onoff_switch sw;
00553
00554 request_iterator_ cur = reqs_begin, result = reqs_end;
00555
00556 for ( ; cur != reqs_end; cur++)
00557 {
00558 if ((request_ptr(*cur))->add_waiter(&sw))
00559 {
00560
00561 result = cur;
00562
00563 if (cur != reqs_begin)
00564 {
00565 while (--cur != reqs_begin)
00566 (request_ptr(*cur))->delete_waiter(&sw);
00567
00568 (request_ptr(*cur))->delete_waiter(&sw);
00569 }
00570
00571 (request_ptr(*result))->check_errors();
00572
00573 return result;
00574 }
00575 }
00576
00577 sw.wait_for_on();
00578
00579 for (cur = reqs_begin; cur != reqs_end; cur++)
00580 {
00581 (request_ptr(*cur))->delete_waiter(&sw);
00582 if (result == reqs_end && (request_ptr(*cur))->poll())
00583 result = cur;
00584 }
00585
00586 return result;
00587 }
00588
00589 class disk_queue : private noncopyable
00590 {
00591 public:
00592 enum priority_op { READ, WRITE, NONE };
00593
00594 private:
00595 mutex write_mutex;
00596 mutex read_mutex;
00597 std::queue<request_ptr> write_queue;
00598 std::queue<request_ptr> read_queue;
00599
00600 semaphore sem;
00601
00602 priority_op _priority_op;
00603
00604 #ifdef STXXL_BOOST_THREADS
00605 boost::thread thread;
00606 #else
00607 pthread_t thread;
00608 #endif
00609
00610
00611 static void * worker(void * arg);
00612
00613 public:
00614 disk_queue(int n = 1);
00615
00616 void set_priority_op(priority_op op)
00617 {
00618 _priority_op = op;
00619 }
00620 void add_readreq(request_ptr & req);
00621 void add_writereq(request_ptr & req);
00622 ~disk_queue();
00623 };
00624
00627 class disk_queues : public singleton<disk_queues>
00628 {
00629 friend class singleton<disk_queues>;
00630
00631 protected:
00632 std::map<DISKID, disk_queue *> queues;
00633 disk_queues() { }
00634
00635 public:
00636 void add_readreq(request_ptr & req, DISKID disk)
00637 {
00638 if (queues.find(disk) == queues.end())
00639 {
00640
00641 queues[disk] = new disk_queue();
00642 }
00643 queues[disk]->add_readreq(req);
00644 }
00645 void add_writereq(request_ptr & req, DISKID disk)
00646 {
00647 if (queues.find(disk) == queues.end())
00648 {
00649
00650 queues[disk] = new disk_queue();
00651 }
00652 queues[disk]->add_writereq(req);
00653 }
00654 ~disk_queues()
00655 {
00656
00657 for (std::map<DISKID, disk_queue *>::iterator i =
00658 queues.begin(); i != queues.end(); i++)
00659 delete (*i).second;
00660 }
00666 void set_priority_op(disk_queue::priority_op op)
00667 {
00668 for (std::map<DISKID, disk_queue *>::iterator i =
00669 queues.begin(); i != queues.end(); i++)
00670 i->second->set_priority_op(op);
00671 }
00672 };
00673
00675
00676 __STXXL_END_NAMESPACE
00677
00678 #endif // !STXXL_IOBASE_HEADER