00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef STXXL_IOBASE_HEADER
00014 #define STXXL_IOBASE_HEADER
00015
00016 #ifndef STXXL_IO_STATS
00017 #define STXXL_IO_STATS 1
00018 #endif
00019
00020
00021 #ifdef STXXL_BOOST_CONFIG
00022 #include <boost/config.hpp>
00023 #endif
00024
00025 #if defined (__linux__)
00026 #define STXXL_CHECK_BLOCK_ALIGNING
00027 #endif
00028
00029
00030
00031
00032
00033
00034 #include <cstdlib>
00035 #include <cstdio>
00036 #include <cstring>
00037 #include <cerrno>
00038
00039 #include <fcntl.h>
00040 #include <sys/types.h>
00041 #include <sys/stat.h>
00042
00043 #include <iostream>
00044 #include <algorithm>
00045 #include <string>
00046 #include <queue>
00047 #include <map>
00048 #include <set>
00049
00050 #ifdef BOOST_MSVC
00051
00052 #include <io.h>
00053 #else
00054 #include <unistd.h>
00055 #include <sys/resource.h>
00056 #include <sys/wait.h>
00057 #endif
00058
00059 #ifdef STXXL_BOOST_THREADS // Use Portable Boost threads
00060
00061 #include <boost/thread/thread.hpp>
00062 #include <boost/thread/mutex.hpp>
00063 #include <boost/bind.hpp>
00064 #else
00065 #include <pthread.h>
00066 #endif
00067
00068
00069 #ifndef O_SYNC
00070 #define O_SYNC 0
00071 #endif
00072 #ifndef O_RSYNC
00073 #define O_RSYNC 0
00074 #endif
00075 #ifndef O_DSYNC
00076 #define O_DSYNC 0
00077 #endif
00078
00079
00080 #if defined (__linux__)
00081
00082 #if !defined (O_DIRECT) && (defined (__alpha__) || defined (__i386__))
00083 #define O_DIRECT 040000
00084 #endif
00085 #endif
00086
00087
00088 #ifndef O_DIRECT
00089 #define O_DIRECT O_SYNC
00090 #endif
00091
00092
00093 #include <stxxl/bits/namespace.h>
00094 #include <stxxl/bits/io/iostats.h>
00095 #include <stxxl/bits/common/semaphore.h>
00096 #include <stxxl/bits/common/mutex.h>
00097 #include <stxxl/bits/common/switch.h>
00098 #include <stxxl/bits/common/state.h>
00099 #include <stxxl/bits/common/exceptions.h>
00100 #include <stxxl/bits/io/completion_handler.h>
00101
00102
00103 __STXXL_BEGIN_NAMESPACE
00104
00109
00110 #define BLOCK_ALIGN 4096
00111
00112 typedef void * (*thread_function_t)(void *);
00113 typedef stxxl::int64 DISKID;
00114
00115 class request;
00116 class request_ptr;
00117
00119
00120 struct default_completion_handler
00121 {
00123 void operator () (request *) { }
00124 };
00125
00127
00130 class file : private noncopyable
00131 {
00132 protected:
00133 int id;
00134
00138 file(int _id) : id(_id) { }
00139
00140 public:
00142
00145 enum open_mode
00146 {
00147 RDONLY = 1,
00148 WRONLY = 2,
00149 RDWR = 4,
00150 CREAT = 8,
00151 DIRECT = 16,
00152 TRUNC = 32
00153 };
00154
00161 virtual request_ptr aread(void * buffer, stxxl::int64 pos, size_t bytes,
00162 completion_handler on_cmpl) = 0;
00169 virtual request_ptr awrite(void * buffer, stxxl::int64 pos, size_t bytes,
00170 completion_handler on_cmpl) = 0;
00171
00174 virtual void set_size(stxxl::int64 newsize) = 0;
00177 virtual stxxl::int64 size() = 0;
00179 int get_disk_number()
00180 {
00181 return id;
00182 }
00186 int get_id()
00187 {
00188 return id;
00189 }
00190
00192 virtual void lock() { }
00193
00194 virtual ~file() { }
00195 };
00196
00197 class mc;
00198 class disk_queue;
00199 class disk_queues;
00200
00202
00206 class request : private noncopyable
00207 {
00208 friend int wait_any(request_ptr req_array[], int count);
00209 template <class request_iterator_>
00210 friend
00211 request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end);
00212 friend class file;
00213 friend class disk_queue;
00214 friend class disk_queues;
00215 friend class request_ptr;
00216
00217 protected:
00218 virtual bool add_waiter(onoff_switch * sw) = 0;
00219 virtual void delete_waiter(onoff_switch * sw) = 0;
00220
00221 virtual void serve() = 0;
00222
00223
00224 completion_handler on_complete;
00225 int ref_cnt;
00226 std::auto_ptr<stxxl::io_error> error;
00227
00228 #ifdef STXXL_BOOST_THREADS
00229 boost::mutex ref_cnt_mutex;
00230 #else
00231 mutex ref_cnt_mutex;
00232 #endif
00233
00234 public:
00235 enum request_type { READ, WRITE };
00236
00237 protected:
00238 file * file_;
00239 void * buffer;
00240 stxxl::int64 offset;
00241 size_t bytes;
00242 request_type type;
00243
00244 void completed()
00245 {
00246 on_complete(this);
00247 }
00248
00249
00250 int nref()
00251 {
00252 #ifdef STXXL_BOOST_THREADS
00253 boost::mutex::scoped_lock Lock(ref_cnt_mutex);
00254 return ref_cnt;
00255 #else
00256 ref_cnt_mutex.lock();
00257 int ref_cnt_ = ref_cnt;
00258 ref_cnt_mutex.unlock();
00259 return ref_cnt_;
00260 #endif
00261 }
00262
00263 public:
00264 request(completion_handler on_compl,
00265 file * file__,
00266 void * buffer_,
00267 stxxl::int64 offset_,
00268 size_t bytes_,
00269 request_type type_) :
00270 on_complete(on_compl), ref_cnt(0),
00271 file_(file__),
00272 buffer(buffer_),
00273 offset(offset_),
00274 bytes(bytes_),
00275 type(type_)
00276 {
00277 STXXL_VERBOSE3("request " << static_cast<void *>(this) << ": creation, cnt: " << ref_cnt);
00278 }
00280 virtual void wait() = 0;
00283 virtual bool poll() = 0;
00286 virtual const char * io_type()
00287 {
00288 return "none";
00289 }
00290 virtual ~request()
00291 {
00292 STXXL_VERBOSE3("request " << static_cast<void *>(this) << ": deletion, cnt: " << ref_cnt);
00293 }
00294 file * get_file() const { return file_; }
00295 void * get_buffer() const { return buffer; }
00296 stxxl::int64 get_offset() const { return offset; }
00297 size_t get_size() const { return bytes; }
00298 size_t size() const { return bytes; }
00299 request_type get_type() const { return type; }
00300
00301 virtual std::ostream & print(std::ostream & out) const
00302 {
00303 out << "File object address: " << (void *)get_file();
00304 out << " Buffer address: " << (void *)get_buffer();
00305 out << " File offset: " << get_offset();
00306 out << " Transfer size: " << get_size() << " bytes";
00307 out << " Type of transfer: " << ((get_type() == READ) ? "READ" : "WRITE");
00308 return out;
00309 }
00310
00313 void error_occured(const char * msg)
00314 {
00315 error.reset(new stxxl::io_error(msg));
00316 }
00317
00320 void error_occured(const std::string & msg)
00321 {
00322 error.reset(new stxxl::io_error(msg));
00323 }
00324
00326 void check_errors() throw (stxxl::io_error)
00327 {
00328 if (error.get())
00329 throw * (error.get());
00330 }
00331
00332 private:
00333 void add_ref()
00334 {
00335 #ifdef STXXL_BOOST_THREADS
00336 boost::mutex::scoped_lock Lock(ref_cnt_mutex);
00337 ref_cnt++;
00338 STXXL_VERBOSE3("request add_ref() " << static_cast<void *>(this) << ": adding reference, cnt: " << ref_cnt);
00339
00340 #else
00341 ref_cnt_mutex.lock();
00342 ref_cnt++;
00343 STXXL_VERBOSE3("request add_ref() " << static_cast<void *>(this) << ": adding reference, cnt: " << ref_cnt);
00344 ref_cnt_mutex.unlock();
00345 #endif
00346 }
00347 bool sub_ref()
00348 {
00349 #ifdef STXXL_BOOST_THREADS
00350 boost::mutex::scoped_lock Lock(ref_cnt_mutex);
00351 int val = --ref_cnt;
00352 STXXL_VERBOSE3("request sub_ref() " << static_cast<void *>(this) << ": subtracting reference cnt: " << ref_cnt);
00353 Lock.unlock();
00354
00355 #else
00356 ref_cnt_mutex.lock();
00357 int val = --ref_cnt;
00358 STXXL_VERBOSE3("request sub_ref() " << static_cast<void *>(this) << ": subtracting reference cnt: " << ref_cnt);
00359
00360 ref_cnt_mutex.unlock();
00361 #endif
00362 assert(val >= 0);
00363 return (val == 0);
00364 }
00365 };
00366
00367 inline std::ostream & operator << (std::ostream & out, const request & req)
00368 {
00369 return req.print(out);
00370 }
00371
00373
00375 class request_ptr
00376 {
00377 request * ptr;
00378 void add_ref()
00379 {
00380 if (ptr)
00381 {
00382 ptr->add_ref();
00383 }
00384 }
00385 void sub_ref()
00386 {
00387 if (ptr)
00388 {
00389 if (ptr->sub_ref())
00390 {
00391 STXXL_VERBOSE3("the last copy " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00392 delete ptr;
00393 ptr = NULL;
00394 }
00395 else
00396 {
00397 STXXL_VERBOSE3("more copies " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00398 }
00399 }
00400 }
00401
00402 public:
00404 request_ptr(request * ptr_ = NULL) : ptr(ptr_)
00405 {
00406 STXXL_VERBOSE3("create constructor (request =" << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this));
00407 add_ref();
00408 }
00410 request_ptr(const request_ptr & p) : ptr(p.ptr)
00411 {
00412 STXXL_VERBOSE3("copy constructor (copying " << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this));
00413 add_ref();
00414 }
00416 ~request_ptr()
00417 {
00418 STXXL_VERBOSE3("Destructor of a request_ptr pointing to " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00419 sub_ref();
00420 }
00423 request_ptr & operator = (const request_ptr & p)
00424 {
00425
00426 return (*this = p.ptr);
00427 }
00430 request_ptr & operator = (request * p)
00431 {
00432 STXXL_VERBOSE3("assign operator begin (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this));
00433 if (p != ptr)
00434 {
00435 sub_ref();
00436 ptr = p;
00437 add_ref();
00438 }
00439 STXXL_VERBOSE3("assign operator end (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this));
00440 return *this;
00441 }
00444 request & operator * () const
00445 {
00446 assert(ptr);
00447 return *ptr;
00448 }
00451 request * operator -> () const
00452 {
00453 assert(ptr);
00454 return ptr;
00455 }
00460 request * get() const { return ptr; }
00461
00463 bool valid() const { return ptr; }
00464
00466 bool empty() const { return !ptr; }
00467 };
00468
00470
00475 inline int wait_any(request_ptr req_array[], int count);
00479 inline void wait_all(request_ptr req_array[], int count);
00485 inline bool poll_any(request_ptr req_array[], int count, int & index);
00486
00487
00488 void wait_all(request_ptr req_array[], int count)
00489 {
00490 for (int i = 0; i < count; i++)
00491 {
00492 req_array[i]->wait();
00493 }
00494 }
00495
00496 template <class request_iterator_>
00497 void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00498 {
00499 while (reqs_begin != reqs_end)
00500 {
00501 (request_ptr(*reqs_begin))->wait();
00502 ++reqs_begin;
00503 }
00504 }
00505
00506 bool poll_any(request_ptr req_array[], int count, int & index)
00507 {
00508 index = -1;
00509 for (int i = 0; i < count; i++)
00510 {
00511 if (req_array[i]->poll())
00512 {
00513 index = i;
00514 return true;
00515 }
00516 }
00517 return false;
00518 }
00519
00520 template <class request_iterator_>
00521 request_iterator_ poll_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00522 {
00523 while (reqs_begin != reqs_end)
00524 {
00525 if ((request_ptr(*reqs_begin))->poll())
00526 return reqs_begin;
00527
00528 ++reqs_begin;
00529 }
00530 return reqs_end;
00531 }
00532
00533
00534 int wait_any(request_ptr req_array[], int count)
00535 {
00536 START_COUNT_WAIT_TIME
00537 onoff_switch sw;
00538 int i = 0, index = -1;
00539
00540 for ( ; i < count; i++)
00541 {
00542 if (req_array[i]->add_waiter(&sw))
00543 {
00544
00545 index = i;
00546
00547 while (--i >= 0)
00548 req_array[i]->delete_waiter(&sw);
00549
00550
00551 END_COUNT_WAIT_TIME
00552
00553 req_array[index]->check_errors();
00554
00555 return index;
00556 }
00557 }
00558
00559 sw.wait_for_on();
00560
00561 for (i = 0; i < count; i++)
00562 {
00563 req_array[i]->delete_waiter(&sw);
00564 if (index < 0 && req_array[i]->poll())
00565 index = i;
00566 }
00567
00568 END_COUNT_WAIT_TIME
00569 return index;
00570 }
00571
00572 template <class request_iterator_>
00573 request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00574 {
00575 START_COUNT_WAIT_TIME
00576 onoff_switch sw;
00577
00578 request_iterator_ cur = reqs_begin, result = reqs_end;
00579
00580 for ( ; cur != reqs_end; cur++)
00581 {
00582 if ((request_ptr(*cur))->add_waiter(&sw))
00583 {
00584
00585 result = cur;
00586
00587 if (cur != reqs_begin)
00588 {
00589 while (--cur != reqs_begin)
00590 (request_ptr(*cur))->delete_waiter(&sw);
00591
00592 (request_ptr(*cur))->delete_waiter(&sw);
00593 }
00594
00595 END_COUNT_WAIT_TIME
00596
00597 (request_ptr(*result))->check_errors();
00598
00599 return result;
00600 }
00601 }
00602
00603 sw.wait_for_on();
00604
00605 for (cur = reqs_begin; cur != reqs_end; cur++)
00606 {
00607 (request_ptr(*cur))->delete_waiter(&sw);
00608 if (result == reqs_end && (request_ptr(*cur))->poll())
00609 result = cur;
00610 }
00611
00612 END_COUNT_WAIT_TIME
00613 return result;
00614 }
00615
00616 class disk_queue : private noncopyable
00617 {
00618 public:
00619 enum priority_op { READ, WRITE, NONE };
00620
00621 private:
00622 #ifdef STXXL_BOOST_THREADS
00623 boost::mutex write_mutex;
00624 boost::mutex read_mutex;
00625 #else
00626 mutex write_mutex;
00627 mutex read_mutex;
00628 #endif
00629 std::queue<request_ptr> write_queue;
00630 std::queue<request_ptr> read_queue;
00631
00632 semaphore sem;
00633
00634 priority_op _priority_op;
00635
00636 #ifdef STXXL_BOOST_THREADS
00637 boost::thread thread;
00638 #else
00639 pthread_t thread;
00640 #endif
00641
00642
00643 #if STXXL_IO_STATS
00644 stats * iostats;
00645 #endif
00646
00647 static void * worker(void * arg);
00648
00649 public:
00650 disk_queue(int n = 1);
00651
00652 void set_priority_op(priority_op op)
00653 {
00654 _priority_op = op;
00655 }
00656 void add_readreq(request_ptr & req);
00657 void add_writereq(request_ptr & req);
00658 ~disk_queue();
00659 };
00660
00663 class disk_queues : private noncopyable
00664 {
00665 protected:
00666 std::map<DISKID, disk_queue *> queues;
00667 disk_queues() { }
00668
00669 public:
00670 void add_readreq(request_ptr & req, DISKID disk)
00671 {
00672 if (queues.find(disk) == queues.end())
00673 {
00674
00675 queues[disk] = new disk_queue();
00676 }
00677 queues[disk]->add_readreq(req);
00678 }
00679 void add_writereq(request_ptr & req, DISKID disk)
00680 {
00681 if (queues.find(disk) == queues.end())
00682 {
00683
00684 queues[disk] = new disk_queue();
00685 }
00686 queues[disk]->add_writereq(req);
00687 }
00688 ~disk_queues()
00689 {
00690
00691 for (std::map<DISKID, disk_queue *>::iterator i =
00692 queues.begin(); i != queues.end(); i++)
00693 delete (*i).second;
00694 }
00695 static disk_queues * get_instance()
00696 {
00697 if (!instance)
00698 instance = new disk_queues();
00699
00700
00701 return instance;
00702 }
00708 void set_priority_op(disk_queue::priority_op op)
00709 {
00710 for (std::map<DISKID, disk_queue *>::iterator i =
00711 queues.begin(); i != queues.end(); i++)
00712 i->second->set_priority_op(op);
00713 }
00714
00715 private:
00716 static disk_queues * instance;
00717 };
00718
00720
00721 __STXXL_END_NAMESPACE
00722
00723 #endif // !STXXL_IOBASE_HEADER