STXXL  1.4-dev
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
request_queue_impl_qwqr.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * lib/io/request_queue_impl_qwqr.cpp
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2005 Roman Dementiev <[email protected]>
7  * Copyright (C) 2008, 2009 Andreas Beckmann <[email protected]>
8  * Copyright (C) 2009 Johannes Singler <[email protected]>
9  * Copyright (C) 2013 Timo Bingmann <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #include <algorithm>
17 
21 #include <stxxl/bits/parallel.h>
22 
23 #if STXXL_STD_THREADS && STXXL_MSVC >= 1700
24  #include <windows.h>
25 #endif
26 
27 #ifndef STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
28 #define STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1
29 #endif
30 
32 
33 struct file_offset_match : public std::binary_function<request_ptr, request_ptr, bool>
34 {
35  bool operator () (
36  const request_ptr& a,
37  const request_ptr& b) const
38  {
39  // matching file and offset are enough to cause problems
40  return (a->get_offset() == b->get_offset()) &&
41  (a->get_file() == b->get_file());
42  }
43 };
44 
45 request_queue_impl_qwqr::request_queue_impl_qwqr(int n)
46  : m_thread_state(NOT_RUNNING), m_sem(0)
47 {
48  STXXL_UNUSED(n);
49  start_thread(worker, static_cast<void*>(this), m_thread, m_thread_state);
50 }
51 
53 {
54  if (req.empty())
55  STXXL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
56  if (m_thread_state() != RUNNING)
57  STXXL_THROW_INVALID_ARGUMENT("Request submitted to not running queue.");
58  if (!dynamic_cast<serving_request*>(req.get()))
59  STXXL_ERRMSG("Incompatible request submitted to running queue.");
60 
61  if (req.get()->get_type() == request::READ)
62  {
63 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
64  {
66  if (std::find_if(m_write_queue.begin(), m_write_queue.end(),
67  bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL)
68  != m_write_queue.end())
69  {
70  STXXL_ERRMSG("READ request submitted for a BID with a pending WRITE request");
71  }
72  }
73 #endif
75  m_read_queue.push_back(req);
76  }
77  else
78  {
79 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
80  {
82  if (std::find_if(m_read_queue.begin(), m_read_queue.end(),
83  bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL)
84  != m_read_queue.end())
85  {
86  STXXL_ERRMSG("WRITE request submitted for a BID with a pending READ request");
87  }
88  }
89 #endif
91  m_write_queue.push_back(req);
92  }
93 
94  m_sem++;
95 }
96 
98 {
99  if (req.empty())
100  STXXL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
101  if (m_thread_state() != RUNNING)
102  STXXL_THROW_INVALID_ARGUMENT("Request canceled to not running queue.");
103  if (!dynamic_cast<serving_request*>(req.get()))
104  STXXL_ERRMSG("Incompatible request submitted to running queue.");
105 
106  bool was_still_in_queue = false;
107  if (req.get()->get_type() == request::READ)
108  {
110  queue_type::iterator pos
111  = std::find(m_read_queue.begin(), m_read_queue.end(),
113  if (pos != m_read_queue.end())
114  {
115  m_read_queue.erase(pos);
116  was_still_in_queue = true;
117  m_sem--;
118  }
119  }
120  else
121  {
123  queue_type::iterator pos
124  = std::find(m_write_queue.begin(), m_write_queue.end(),
126  if (pos != m_write_queue.end())
127  {
128  m_write_queue.erase(pos);
129  was_still_in_queue = true;
130  m_sem--;
131  }
132  }
133 
134  return was_still_in_queue;
135 }
136 
138 {
140 }
141 
143 {
144  self* pthis = static_cast<self*>(arg);
145 
146  bool write_phase = true;
147  for ( ; ; )
148  {
149  pthis->m_sem--;
150 
151  if (write_phase)
152  {
153  scoped_mutex_lock WriteLock(pthis->m_write_mutex);
154  if (!pthis->m_write_queue.empty())
155  {
156  request_ptr req = pthis->m_write_queue.front();
157  pthis->m_write_queue.pop_front();
158 
159  WriteLock.unlock();
160 
161  //assert(req->get_reference_count()) > 1);
162  dynamic_cast<serving_request*>(req.get())->serve();
163  }
164  else
165  {
166  WriteLock.unlock();
167 
168  pthis->m_sem++;
169 
170  if (pthis->m_priority_op == WRITE)
171  write_phase = false;
172  }
173 
174  if (pthis->m_priority_op == NONE || pthis->m_priority_op == READ)
175  write_phase = false;
176  }
177  else
178  {
179  scoped_mutex_lock ReadLock(pthis->m_read_mutex);
180 
181  if (!pthis->m_read_queue.empty())
182  {
183  request_ptr req = pthis->m_read_queue.front();
184  pthis->m_read_queue.pop_front();
185 
186  ReadLock.unlock();
187 
188  STXXL_VERBOSE2("queue: before serve request has " << req->get_reference_count() << " references ");
189  //assert(req->get_reference_count() > 1);
190  dynamic_cast<serving_request*>(req.get())->serve();
191  STXXL_VERBOSE2("queue: after serve request has " << req->get_reference_count() << " references ");
192  }
193  else
194  {
195  ReadLock.unlock();
196 
197  pthis->m_sem++;
198 
199  if (pthis->m_priority_op == READ)
200  write_phase = true;
201  }
202 
203  if (pthis->m_priority_op == NONE || pthis->m_priority_op == WRITE)
204  write_phase = true;
205  }
206 
207  // terminate if it has been requested and queues are empty
208  if (pthis->m_thread_state() == TERMINATING) {
209  if ((pthis->m_sem--) == 0)
210  break;
211  else
212  pthis->m_sem++;
213  }
214  }
215 
216  pthis->m_thread_state.set_to(TERMINATED);
217 
218 #if STXXL_STD_THREADS && STXXL_MSVC >= 1700
219  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
220  // request_queue_impl_worker.cpp. -tb
221  ExitThread(NULL);
222 #else
223  return NULL;
224 #endif
225 }
226 
228 // vim: et:ts=4:sw=4
Implementation of a local request queue having two queues, one for read and one for write requests...
void unlock()
unlock mutex hold prematurely
Definition: mutex.h:144
Type * get() const
return the enclosed pointer.
Definition: counting_ptr.h:122
bool empty() const
test for a NULL pointer
Definition: counting_ptr.h:142
#define STXXL_THROW_INVALID_ARGUMENT(error_message)
Throws std::invalid_argument with &quot;Error in [function] : [error_message]&quot;.
void stop_thread(thread_type &t, state< thread_state > &s, semaphore &sem)
#define STXXL_VERBOSE2(x)
Definition: verbose.h:121
Request which serves an I/O by calling the synchronous routine of the file.
counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.h:113
Aquire a lock that&#39;s valid until the end of scope.
Definition: mutex.h:123
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
void STXXL_UNUSED(const U &)
Definition: unused.h:22
#define STXXL_ERRMSG(x)
Definition: verbose.h:94
request_type get_type() const
Definition: request.h:67
void start_thread(void *(*worker)(void *), void *arg, thread_type &t, state< thread_state > &s)
#define _STXXL_FORCE_SEQUENTIAL
Definition: parallel.h:34
ExtIterator find(ExtIterator begin, ExtIterator end, const EqualityComparable &value, int_type nbuffers=0)
External equivalent of std::find, see stxxl::find.
Definition: scan.h:289
unsigned_type get_reference_count() const
Return the number of references to this object (for debugging)
Definition: counting_ptr.h:512
#define STXXL_END_NAMESPACE
Definition: namespace.h:17