STXXL  1.4.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
request_queue_impl_1q.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * lib/io/request_queue_impl_1q.cpp
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2005 Roman Dementiev <[email protected]>
7  * Copyright (C) 2009 Andreas Beckmann <[email protected]>
8  * Copyright (C) 2009 Johannes Singler <[email protected]>
9  * Copyright (C) 2013 Timo Bingmann <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #include <algorithm>
17 
18 #include <stxxl/bits/config.h>
22 #include <stxxl/bits/parallel.h>
23 
24 #if STXXL_STD_THREADS && STXXL_MSVC >= 1700
25  #include <windows.h>
26 #endif
27 
28 #ifndef STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
29 #define STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1
30 #endif
31 
33 
34 struct file_offset_match : public std::binary_function<request_ptr, request_ptr, bool>
35 {
36  bool operator () (
37  const request_ptr& a,
38  const request_ptr& b) const
39  {
40  // matching file and offset are enough to cause problems
41  return (a->get_offset() == b->get_offset()) &&
42  (a->get_file() == b->get_file());
43  }
44 };
45 
46 request_queue_impl_1q::request_queue_impl_1q(int n)
47  : m_thread_state(NOT_RUNNING), m_sem(0)
48 {
49  STXXL_UNUSED(n);
50  start_thread(worker, static_cast<void*>(this), m_thread, m_thread_state);
51 }
52 
54 {
55  if (req.empty())
56  STXXL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
57  if (m_thread_state() != RUNNING)
58  STXXL_THROW_INVALID_ARGUMENT("Request submitted to not running queue.");
59  if (!dynamic_cast<serving_request*>(req.get()))
60  STXXL_ERRMSG("Incompatible request submitted to running queue.");
61 
62 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
63  {
65  if (std::find_if(m_queue.begin(), m_queue.end(),
66  bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL)
67  != m_queue.end())
68  {
69  STXXL_ERRMSG("request submitted for a BID with a pending request");
70  }
71  }
72 #endif
74  m_queue.push_back(req);
75 
76  m_sem++;
77 }
78 
80 {
81  if (req.empty())
82  STXXL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
83  if (m_thread_state() != RUNNING)
84  STXXL_THROW_INVALID_ARGUMENT("Request canceled to not running queue.");
85  if (!dynamic_cast<serving_request*>(req.get()))
86  STXXL_ERRMSG("Incompatible request submitted to running queue.");
87 
88  bool was_still_in_queue = false;
89  {
91  queue_type::iterator pos
92  = std::find(m_queue.begin(), m_queue.end(),
94 
95  if (pos != m_queue.end())
96  {
97  m_queue.erase(pos);
98  was_still_in_queue = true;
99  m_sem--;
100  }
101  }
102 
103  return was_still_in_queue;
104 }
105 
107 {
109 }
110 
112 {
113  self* pthis = static_cast<self*>(arg);
114 
115  for ( ; ; )
116  {
117  pthis->m_sem--;
118 
119  {
120  scoped_mutex_lock Lock(pthis->m_queue_mutex);
121  if (!pthis->m_queue.empty())
122  {
123  request_ptr req = pthis->m_queue.front();
124  pthis->m_queue.pop_front();
125 
126  Lock.unlock();
127 
128  //assert(req->nref() > 1);
129  dynamic_cast<serving_request*>(req.get())->serve();
130  }
131  else
132  {
133  Lock.unlock();
134 
135  pthis->m_sem++;
136  }
137  }
138 
139  // terminate if it has been requested and queues are empty
140  if (pthis->m_thread_state() == TERMINATING) {
141  if ((pthis->m_sem--) == 0)
142  break;
143  else
144  pthis->m_sem++;
145  }
146  }
147 
148  pthis->m_thread_state.set_to(TERMINATED);
149 
150 #if STXXL_STD_THREADS && STXXL_MSVC >= 1700
151  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
152  // request_queue_impl_worker.cpp. -tb
153  ExitThread(NULL);
154 #else
155  return NULL;
156 #endif
157 }
158 
160 // vim: et:ts=4:sw=4
Implementation of a local request queue having only one queue for both read and write requests...
void unlock()
unlock mutex hold prematurely
Definition: mutex.h:126
Type * get() const
return the enclosed pointer.
Definition: counting_ptr.h:122
void add_request(request_ptr &req)
bool empty() const
test for a NULL pointer
Definition: counting_ptr.h:142
#define STXXL_THROW_INVALID_ARGUMENT(error_message)
Throws std::invalid_argument with &quot;Error in [function] : [error_message]&quot;.
void stop_thread(thread_type &t, state< thread_state > &s, semaphore &sem)
Request which serves an I/O by calling the synchronous routine of the file.
counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.h:113
Aquire a lock that&#39;s valid until the end of scope.
Definition: mutex.h:105
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
void STXXL_UNUSED(const U &)
Definition: unused.h:22
state< thread_state > m_thread_state
#define STXXL_ERRMSG(x)
Definition: verbose.h:80
void start_thread(void *(*worker)(void *), void *arg, thread_type &t, state< thread_state > &s)
#define _STXXL_FORCE_SEQUENTIAL
Definition: parallel.h:50
bool cancel_request(request_ptr &req)
ExtIterator find(ExtIterator begin, ExtIterator end, const EqualityComparable &value, int_type nbuffers=0)
External equivalent of std::find, see stxxl::find.
Definition: scan.h:289
#define STXXL_END_NAMESPACE
Definition: namespace.h:17