• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • Examples
  • File List

iobase.h

00001 /***************************************************************************
00002  *  include/stxxl/bits/io/iobase.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002 Roman Dementiev <[email protected]>
00007  *  Copyright (C) 2008 Andreas Beckmann <[email protected]>
00008  *
00009  *  Distributed under the Boost Software License, Version 1.0.
00010  *  (See accompanying file LICENSE_1_0.txt or copy at
00011  *  http://www.boost.org/LICENSE_1_0.txt)
00012  **************************************************************************/
00013 
00014 #ifndef STXXL_IOBASE_HEADER
00015 #define STXXL_IOBASE_HEADER
00016 
00017 #ifdef STXXL_BOOST_CONFIG
00018  #include <boost/config.hpp>
00019 #endif
00020 
00021 #if defined (__linux__)
00022  #define STXXL_CHECK_BLOCK_ALIGNING
00023 #endif
00024 
00025 //#ifdef __sun__
00026 //#define O_DIRECT 0
00027 //#endif
00028 
00029 
00030 #include <cstdlib>
00031 #include <cstdio>
00032 #include <cstring>
00033 #include <cerrno>
00034 
00035 #include <fcntl.h>
00036 #include <sys/types.h>
00037 #include <sys/stat.h>
00038 
00039 #include <iostream>
00040 #include <algorithm>
00041 #include <string>
00042 #include <queue>
00043 #include <map>
00044 #include <set>
00045 
00046 #ifdef BOOST_MSVC
00047 // this is not stxxl/bits/io/io.h !
00048  #include <io.h>
00049 #else
00050  #include <unistd.h>
00051  #include <sys/resource.h>
00052  #include <sys/wait.h>
00053 #endif
00054 
00055 #ifdef STXXL_BOOST_THREADS // Use Portable Boost threads
00056 // Boost.Threads headers
00057  #include <boost/thread/thread.hpp>
00058  #include <boost/thread/mutex.hpp>
00059  #include <boost/bind.hpp>
00060 #else
00061  #include <pthread.h>
00062 #endif
00063 
00064 
00065 #ifndef O_SYNC
00066  #define O_SYNC 0
00067 #endif
00068 #ifndef O_RSYNC
00069  #define O_RSYNC 0
00070 #endif
00071 #ifndef O_DSYNC
00072  #define O_DSYNC 0
00073 #endif
00074 
00075 
00076 #if defined (__linux__)
00077 //#include <asm/fcntl.h>
00078  #if !defined (O_DIRECT) && (defined (__alpha__) || defined (__i386__))
00079   #define O_DIRECT 040000       /* direct disk access */
00080  #endif
00081 #endif
00082 
00083 
00084 #ifndef O_DIRECT
00085  #define O_DIRECT O_SYNC
00086 #endif
00087 
00088 
00089 #include <stxxl/bits/namespace.h>
00090 #include <stxxl/bits/io/iostats.h>
00091 #include <stxxl/bits/common/semaphore.h>
00092 #include <stxxl/bits/common/mutex.h>
00093 #include <stxxl/bits/common/switch.h>
00094 #include <stxxl/bits/common/state.h>
00095 #include <stxxl/bits/common/exceptions.h>
00096 #include <stxxl/bits/io/completion_handler.h>
00097 
00098 
00099 __STXXL_BEGIN_NAMESPACE
00100 
00105 
00106 #define BLOCK_ALIGN 4096
00107 
00108 typedef void * (*thread_function_t)(void *);
00109 typedef stxxl::int64 DISKID;
00110 
00111 class request;
00112 class request_ptr;
00113 
00115 
00116 struct default_completion_handler
00117 {
00119     void operator () (request *) { }
00120 };
00121 
00123 
00126 class file : private noncopyable
00127 {
00128 protected:
00129     int id;
00130 
00134     file(int _id) : id(_id) { }
00135 
00136 public:
00138 
00141     enum open_mode
00142     {
00143         RDONLY = 1,                         
00144         WRONLY = 2,                         
00145         RDWR = 4,                           
00146         CREAT = 8,                          
00147         DIRECT = 16,                        
00148         TRUNC = 32                          
00149     };
00150 
00157     virtual request_ptr aread(void * buffer, stxxl::int64 pos, size_t bytes,
00158                               completion_handler on_cmpl) = 0;
00165     virtual request_ptr awrite(void * buffer, stxxl::int64 pos, size_t bytes,
00166                                completion_handler on_cmpl) = 0;
00167 
00170     virtual void set_size(stxxl::int64 newsize) = 0;
00173     virtual stxxl::int64 size() = 0;
00175     __STXXL_DEPRECATED(int get_disk_number())
00176     {
00177         return id;
00178     }
00182     int get_id()
00183     {
00184         return id;
00185     }
00186 
00188     virtual void lock() { }
00189 
00191     virtual void delete_region(int64 offset, unsigned_type size)
00192     {
00193         UNUSED(offset);
00194         UNUSED(size);
00195     }
00196 
00197     virtual ~file() { }
00198 };
00199 
00200 class mc;
00201 class disk_queue;
00202 class disk_queues;
00203 
00205 
00209 class request : private noncopyable
00210 {
00211     friend int wait_any(request_ptr req_array[], int count);
00212     template <class request_iterator_>
00213     friend
00214     request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end);
00215     friend class file;
00216     friend class disk_queue;
00217     friend class disk_queues;
00218     friend class request_ptr;
00219 
00220 protected:
00221     virtual bool add_waiter(onoff_switch * sw) = 0;
00222     virtual void delete_waiter(onoff_switch * sw) = 0;
00223     //virtual void enqueue () = 0;
00224     virtual void serve() = 0;
00225     //virtual unsigned size() const;
00226 
00227     completion_handler on_complete;
00228     int ref_cnt;
00229     std::auto_ptr<stxxl::io_error> error;
00230 
00231     mutex ref_cnt_mutex;
00232 
00233 public:
00234     enum request_type { READ, WRITE };
00235 
00236 protected:
00237     file * file_;
00238     void * buffer;
00239     stxxl::int64 offset;
00240     size_t bytes;
00241     request_type type;
00242 
00243     void completed()
00244     {
00245         on_complete(this);
00246     }
00247 
00248     // returns number of references
00249     int nref()
00250     {
00251         scoped_mutex_lock Lock(ref_cnt_mutex);
00252         return ref_cnt;
00253     }
00254 
00255 public:
00256     request(completion_handler on_compl,
00257             file * file__,
00258             void * buffer_,
00259             stxxl::int64 offset_,
00260             size_t bytes_,
00261             request_type type_) :
00262         on_complete(on_compl), ref_cnt(0),
00263         file_(file__),
00264         buffer(buffer_),
00265         offset(offset_),
00266         bytes(bytes_),
00267         type(type_)
00268     {
00269         STXXL_VERBOSE3("request " << static_cast<void *>(this) << ": creation, cnt: " << ref_cnt);
00270     }
00272     virtual void wait() = 0;
00275     virtual bool poll() = 0;
00278     virtual const char * io_type()
00279     {
00280         return "none";
00281     }
00282     virtual ~request()
00283     {
00284         STXXL_VERBOSE3("request " << static_cast<void *>(this) << ": deletion, cnt: " << ref_cnt);
00285     }
00286     file * get_file() const { return file_; }
00287     void * get_buffer() const { return buffer; }
00288     stxxl::int64 get_offset() const { return offset; }
00289     size_t get_size() const { return bytes; }
00290     size_t size() const { return bytes; }
00291     request_type get_type() const { return type; }
00292 
00293     virtual std::ostream & print(std::ostream & out) const
00294     {
00295         out << "File object address: " << (void *)get_file();
00296         out << " Buffer address: " << (void *)get_buffer();
00297         out << " File offset: " << get_offset();
00298         out << " Transfer size: " << get_size() << " bytes";
00299         out << " Type of transfer: " << ((get_type() == READ) ? "READ" : "WRITE");
00300         return out;
00301     }
00302 
00305     void error_occured(const char * msg)
00306     {
00307         error.reset(new stxxl::io_error(msg));
00308     }
00309 
00312     void error_occured(const std::string & msg)
00313     {
00314         error.reset(new stxxl::io_error(msg));
00315     }
00316 
00318     void check_errors() throw (stxxl::io_error)
00319     {
00320         if (error.get())
00321             throw * (error.get());
00322     }
00323 
00324 private:
00325     void add_ref()
00326     {
00327         scoped_mutex_lock Lock(ref_cnt_mutex);
00328         ref_cnt++;
00329         STXXL_VERBOSE3("request add_ref() " << static_cast<void *>(this) << ": adding reference, cnt: " << ref_cnt);
00330     }
00331 
00332     bool sub_ref()
00333     {
00334         int val;
00335         {
00336             scoped_mutex_lock Lock(ref_cnt_mutex);
00337             val = --ref_cnt;
00338             STXXL_VERBOSE3("request sub_ref() " << static_cast<void *>(this) << ": subtracting reference cnt: " << ref_cnt);
00339         }
00340         assert(val >= 0);
00341         return (val == 0);
00342     }
00343 };
00344 
00345 inline std::ostream & operator << (std::ostream & out, const request & req)
00346 {
00347     return req.print(out);
00348 }
00349 
00351 
00353 class request_ptr
00354 {
00355     request * ptr;
00356     void add_ref()
00357     {
00358         if (ptr)
00359         {
00360             ptr->add_ref();
00361         }
00362     }
00363     void sub_ref()
00364     {
00365         if (ptr)
00366         {
00367             if (ptr->sub_ref())
00368             {
00369                 STXXL_VERBOSE3("the last copy " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00370                 delete ptr;
00371                 ptr = NULL;
00372             }
00373             else
00374             {
00375                 STXXL_VERBOSE3("more copies " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00376             }
00377         }
00378     }
00379 
00380 public:
00382     request_ptr(request * ptr_ = NULL) : ptr(ptr_)
00383     {
00384         STXXL_VERBOSE3("create constructor (request =" << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this));
00385         add_ref();
00386     }
00388     request_ptr(const request_ptr & p) : ptr(p.ptr)
00389     {
00390         STXXL_VERBOSE3("copy constructor (copying " << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this));
00391         add_ref();
00392     }
00394     ~request_ptr()
00395     {
00396         STXXL_VERBOSE3("Destructor of a request_ptr pointing to " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
00397         sub_ref();
00398     }
00401     request_ptr & operator = (const request_ptr & p)
00402     {
00403         // assert(p.ptr);
00404         return (*this = p.ptr);
00405     }
00408     request_ptr & operator = (request * p)
00409     {
00410         STXXL_VERBOSE3("assign operator begin (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this));
00411         if (p != ptr)
00412         {
00413             sub_ref();
00414             ptr = p;
00415             add_ref();
00416         }
00417         STXXL_VERBOSE3("assign operator end (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this));
00418         return *this;
00419     }
00422     request & operator * () const
00423     {
00424         assert(ptr);
00425         return *ptr;
00426     }
00429     request * operator -> () const
00430     {
00431         assert(ptr);
00432         return ptr;
00433     }
00438     request * get() const { return ptr; }
00439 
00441     bool valid() const { return ptr; }
00442 
00444     bool empty() const { return !ptr; }
00445 };
00446 
00448 
00453 inline int wait_any(request_ptr req_array[], int count);
00457 inline void wait_all(request_ptr req_array[], int count);
00463 inline bool poll_any(request_ptr req_array[], int count, int & index);
00464 
00465 
00466 void wait_all(request_ptr req_array[], int count)
00467 {
00468     for (int i = 0; i < count; i++)
00469     {
00470         req_array[i]->wait();
00471     }
00472 }
00473 
00474 template <class request_iterator_>
00475 void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00476 {
00477     while (reqs_begin != reqs_end)
00478     {
00479         (request_ptr(*reqs_begin))->wait();
00480         ++reqs_begin;
00481     }
00482 }
00483 
00484 bool poll_any(request_ptr req_array[], int count, int & index)
00485 {
00486     index = -1;
00487     for (int i = 0; i < count; i++)
00488     {
00489         if (req_array[i]->poll())
00490         {
00491             index = i;
00492             return true;
00493         }
00494     }
00495     return false;
00496 }
00497 
00498 template <class request_iterator_>
00499 request_iterator_ poll_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00500 {
00501     while (reqs_begin != reqs_end)
00502     {
00503         if ((request_ptr(*reqs_begin))->poll())
00504             return reqs_begin;
00505 
00506         ++reqs_begin;
00507     }
00508     return reqs_end;
00509 }
00510 
00511 
00512 int wait_any(request_ptr req_array[], int count)
00513 {
00514     stats::scoped_wait_timer wait_timer;
00515 
00516     onoff_switch sw;
00517     int i = 0, index = -1;
00518 
00519     for ( ; i < count; i++)
00520     {
00521         if (req_array[i]->add_waiter(&sw))
00522         {
00523             // already done
00524             index = i;
00525 
00526             while (--i >= 0)
00527                 req_array[i]->delete_waiter(&sw);
00528 
00529             req_array[index]->check_errors();
00530 
00531             return index;
00532         }
00533     }
00534 
00535     sw.wait_for_on();
00536 
00537     for (i = 0; i < count; i++)
00538     {
00539         req_array[i]->delete_waiter(&sw);
00540         if (index < 0 && req_array[i]->poll())
00541             index = i;
00542     }
00543 
00544     return index;
00545 }
00546 
00547 template <class request_iterator_>
00548 request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
00549 {
00550     stats::scoped_wait_timer wait_timer;
00551 
00552     onoff_switch sw;
00553 
00554     request_iterator_ cur = reqs_begin, result = reqs_end;
00555 
00556     for ( ; cur != reqs_end; cur++)
00557     {
00558         if ((request_ptr(*cur))->add_waiter(&sw))
00559         {
00560             // already done
00561             result = cur;
00562 
00563             if (cur != reqs_begin)
00564             {
00565                 while (--cur != reqs_begin)
00566                     (request_ptr(*cur))->delete_waiter(&sw);
00567 
00568                 (request_ptr(*cur))->delete_waiter(&sw);
00569             }
00570 
00571             (request_ptr(*result))->check_errors();
00572 
00573             return result;
00574         }
00575     }
00576 
00577     sw.wait_for_on();
00578 
00579     for (cur = reqs_begin; cur != reqs_end; cur++)
00580     {
00581         (request_ptr(*cur))->delete_waiter(&sw);
00582         if (result == reqs_end && (request_ptr(*cur))->poll())
00583             result = cur;
00584     }
00585 
00586     return result;
00587 }
00588 
00589 class disk_queue : private noncopyable
00590 {
00591 public:
00592     enum priority_op { READ, WRITE, NONE };
00593 
00594 private:
00595     mutex write_mutex;
00596     mutex read_mutex;
00597     std::queue<request_ptr> write_queue;
00598     std::queue<request_ptr> read_queue;
00599 
00600     semaphore sem;
00601 
00602     priority_op _priority_op;
00603 
00604 #ifdef STXXL_BOOST_THREADS
00605     boost::thread thread;
00606 #else
00607     pthread_t thread;
00608 #endif
00609 
00610 
00611     static void * worker(void * arg);
00612 
00613 public:
00614     disk_queue(int n = 1);             // max number of requests simultaneously submitted to disk
00615 
00616     void set_priority_op(priority_op op)
00617     {
00618         _priority_op = op;
00619     }
00620     void add_readreq(request_ptr & req);
00621     void add_writereq(request_ptr & req);
00622     ~disk_queue();
00623 };
00624 
00627 class disk_queues : public singleton<disk_queues>
00628 {
00629     friend class singleton<disk_queues>;
00630 
00631 protected:
00632     std::map<DISKID, disk_queue *> queues;
00633     disk_queues() { }
00634 
00635 public:
00636     void add_readreq(request_ptr & req, DISKID disk)
00637     {
00638         if (queues.find(disk) == queues.end())
00639         {
00640             // create new disk queue
00641             queues[disk] = new disk_queue();
00642         }
00643         queues[disk]->add_readreq(req);
00644     }
00645     void add_writereq(request_ptr & req, DISKID disk)
00646     {
00647         if (queues.find(disk) == queues.end())
00648         {
00649             // create new disk queue
00650             queues[disk] = new disk_queue();
00651         }
00652         queues[disk]->add_writereq(req);
00653     }
00654     ~disk_queues()
00655     {
00656         // deallocate all queues
00657         for (std::map<DISKID, disk_queue *>::iterator i =
00658                  queues.begin(); i != queues.end(); i++)
00659             delete (*i).second;
00660     }
00666     void set_priority_op(disk_queue::priority_op op)
00667     {
00668         for (std::map<DISKID, disk_queue *>::iterator i =
00669                  queues.begin(); i != queues.end(); i++)
00670             i->second->set_priority_op(op);
00671     }
00672 };
00673 
00675 
00676 __STXXL_END_NAMESPACE
00677 
00678 #endif // !STXXL_IOBASE_HEADER

Generated by  doxygen 1.7.1