STXXL  1.4-dev
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
linuxaio_queue.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * lib/io/linuxaio_queue.cpp
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2011 Johannes Singler <[email protected]>
7  * Copyright (C) 2014 Timo Bingmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
15 
16 #if STXXL_HAVE_LINUXAIO_FILE
17 
18 #include <unistd.h>
19 #include <sys/syscall.h>
20 
21 #include <stxxl/bits/verbose.h>
26 
27 #include <algorithm>
28 
29 #ifndef STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
30 #define STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1
31 #endif
32 
34 
35 linuxaio_queue::linuxaio_queue(int desired_queue_length)
36  : num_waiting_requests(0), num_free_events(0), num_posted_requests(0),
37  post_thread_state(NOT_RUNNING), wait_thread_state(NOT_RUNNING)
38 {
39  if (desired_queue_length == 0) {
40  // default value, 64 entries per queue (i.e. usually per disk) should
41  // be enough
42  max_events = 64;
43  }
44  else
45  max_events = desired_queue_length;
46 
47  // negotiate maximum number of simultaneous events with the OS
48  context = 0;
49  long result;
50  while ((result = syscall(SYS_io_setup, max_events, &context)) == -1 &&
51  errno == EAGAIN && max_events > 1)
52  {
53  max_events <<= 1; // try with half as many events
54  }
55  if (result != 0) {
56  STXXL_THROW_ERRNO(io_error, "linuxaio_queue::linuxaio_queue"
57  " io_setup() nr_events=" << max_events);
58  }
59 
60  for (int e = 0; e < max_events; ++e)
61  num_free_events++; // cannot set semaphore to value directly
62 
63  STXXL_MSG("Set up an linuxaio queue with " << max_events << " entries.");
64 
65  start_thread(post_async, static_cast<void*>(this), post_thread, post_thread_state);
66  start_thread(wait_async, static_cast<void*>(this), wait_thread, wait_thread_state);
67 }
68 
70 {
73  syscall(SYS_io_destroy, context);
74 }
75 
77 {
78  if (req.empty())
79  STXXL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
80  if (post_thread_state() != RUNNING)
81  STXXL_ERRMSG("Request submitted to stopped queue.");
82  if (!dynamic_cast<linuxaio_request*>(req.get()))
83  STXXL_ERRMSG("Non-LinuxAIO request submitted to LinuxAIO queue.");
84 
86 
87  waiting_requests.push_back(req);
89 }
90 
92 {
93  if (req.empty())
94  STXXL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
95  if (post_thread_state() != RUNNING)
96  STXXL_ERRMSG("Request canceled in stopped queue.");
97  if (!dynamic_cast<linuxaio_request*>(req.get()))
98  STXXL_ERRMSG("Non-LinuxAIO request submitted to LinuxAIO queue.");
99 
100  queue_type::iterator pos;
101  {
103 
104  pos = std::find(waiting_requests.begin(), waiting_requests.end(),
106  if (pos != waiting_requests.end())
107  {
108  waiting_requests.erase(pos);
109 
110  // polymorphic_downcast to linuxaio_request,
111  // request is canceled, but was not yet posted.
112  dynamic_cast<linuxaio_request*>(req.get())->completed(false, true);
113 
114  num_waiting_requests--; // will never block
115  return true;
116  }
117  }
118 
120 
121  pos = std::find(posted_requests.begin(), posted_requests.end(),
123  if (pos != posted_requests.end())
124  {
125  // polymorphic_downcast to linuxaio_request,
126  bool canceled_io_operation = (dynamic_cast<linuxaio_request*>(req.get()))->cancel_aio();
127 
128  if (canceled_io_operation)
129  {
130  posted_requests.erase(pos);
131 
132  // polymorphic_downcast to linuxaio_request,
133 
134  // request is canceled, already posted
135  dynamic_cast<linuxaio_request*>(req.get())->completed(true, true);
136 
137  num_free_events++;
138  num_posted_requests--; // will never block
139  return true;
140  }
141  }
142 
143  return false;
144 }
145 
146 // internal routines, run by the posting thread
148 {
149  request_ptr req;
150  io_event* events = new io_event[max_events];
151 
152  for ( ; ; ) // as long as thread is running
153  {
154  // might block until next request or message comes in
155  int num_currently_waiting_requests = num_waiting_requests--;
156 
157  // terminate if termination has been requested
158  if (post_thread_state() == TERMINATING && num_currently_waiting_requests == 0)
159  break;
160 
162  if (!waiting_requests.empty())
163  {
164  req = waiting_requests.front();
165  waiting_requests.pop_front();
166  lock.unlock();
167 
168  num_free_events--; // might block because too many requests are posted
169 
170  // polymorphic_downcast
171  while (!dynamic_cast<linuxaio_request*>(req.get())->post())
172  {
173  // post failed, so first handle events to make queues (more)
174  // empty, then try again.
175 
176  // wait for at least one event to complete, no time limit
177  long num_events = syscall(SYS_io_getevents, context, 1, max_events, events, NULL);
178  if (num_events < 0) {
179  STXXL_THROW_ERRNO(io_error, "linuxaio_queue::post_requests"
180  " io_getevents() nr_events=" << num_events);
181  }
182 
183  handle_events(events, num_events, false);
184  }
185 
186  // request is finally posted
187 
188  {
190  posted_requests.push_back(req);
192  }
193  }
194  else
195  {
196  lock.unlock();
197 
198  // num_waiting_requests-- was premature, compensate for that
200  }
201  }
202 
203  delete[] events;
204 }
205 
206 void linuxaio_queue::handle_events(io_event* events, long num_events, bool canceled)
207 {
208  for (int e = 0; e < num_events; ++e)
209  {
210  // unsigned_type is as long as a pointer, and like this, we avoid an icpc warning
211  request_ptr* r = reinterpret_cast<request_ptr*>(static_cast<unsigned_type>(events[e].data));
212  r->get()->completed(canceled);
213  delete r; // release auto_ptr reference
214  num_free_events++;
215  num_posted_requests--; // will never block
216  }
217 }
218 
219 // internal routines, run by the waiting thread
221 {
222  request_ptr req;
223  io_event* events = new io_event[max_events];
224 
225  for ( ; ; ) // as long as thread is running
226  {
227  // might block until next request is posted or message comes in
228  int num_currently_posted_requests = num_posted_requests--;
229 
230  // terminate if termination has been requested
231  if (wait_thread_state() == TERMINATING && num_currently_posted_requests == 0)
232  break;
233 
234  // wait for at least one of them to finish
235  long num_events;
236  while (1) {
237  num_events = syscall(SYS_io_getevents, context, 1, max_events, events, NULL);
238  if (num_events < 0) {
239  if (errno == EINTR) {
240  // io_getevents may return prematurely in case a signal is received
241  continue;
242  }
243 
244  STXXL_THROW_ERRNO(io_error, "linuxaio_queue::wait_requests"
245  " io_getevents() nr_events=" << max_events);
246  }
247  break;
248  }
249 
250  num_posted_requests++; // compensate for the one eaten prematurely above
251 
252  handle_events(events, num_events, false);
253  }
254 
255  delete[] events;
256 }
257 
259 {
260  (static_cast<linuxaio_queue*>(arg))->post_requests();
261 
262  self_type* pthis = static_cast<self_type*>(arg);
264 
265 #if STXXL_STD_THREADS && STXXL_MSVC >= 1700
266  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
267  // request_queue_impl_worker.cpp. -tb
268  ExitThread(NULL);
269 #else
270  return NULL;
271 #endif
272 }
273 
275 {
276  (static_cast<linuxaio_queue*>(arg))->wait_requests();
277 
278  self_type* pthis = static_cast<self_type*>(arg);
280 
281 #if STXXL_STD_THREADS && STXXL_MSVC >= 1700
282  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
283  // request_queue_impl_worker.cpp. -tb
284  ExitThread(NULL);
285 #else
286  return NULL;
287 #endif
288 }
289 
291 
292 #endif // #if STXXL_HAVE_LINUXAIO_FILE
293 // vim: et:ts=4:sw=4
bool cancel_request(request_ptr &req)
static void * wait_async(void *arg)
void set_to(const value_type &new_state)
Definition: state.h:43
state< thread_state > post_thread_state
void unlock()
unlock mutex hold prematurely
Definition: mutex.h:144
Type * get() const
return the enclosed pointer.
Definition: counting_ptr.h:122
bool empty() const
test for a NULL pointer
Definition: counting_ptr.h:142
#define STXXL_THROW_INVALID_ARGUMENT(error_message)
Throws std::invalid_argument with &quot;Error in [function] : [error_message]&quot;.
void stop_thread(thread_type &t, state< thread_state > &s, semaphore &sem)
void add_request(request_ptr &req)
int max_events
max number of OS requests
static void * post_async(void *arg)
semaphore num_posted_requests
void handle_events(io_event *events, long num_events, bool canceled)
aio_context_t context
OS context.
Aquire a lock that&#39;s valid until the end of scope.
Definition: mutex.h:123
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
queue_type waiting_requests
#define STXXL_ERRMSG(x)
Definition: verbose.h:94
queue_type posted_requests
state< thread_state > wait_thread_state
semaphore num_waiting_requests
number of requests in waitings_requests
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
Definition: types.h:64
#define STXXL_THROW_ERRNO(exception_type, error_message)
Throws exception_type with &quot;Error in [function] : [error_message] : [errno message]&quot;.
#define STXXL_MSG(x)
Definition: verbose.h:73
void start_thread(void *(*worker)(void *), void *arg, thread_type &t, state< thread_state > &s)
#define _STXXL_FORCE_SEQUENTIAL
Definition: parallel.h:34
Request for an linuxaio_file.
Queue for linuxaio_file(s)
ExtIterator find(ExtIterator begin, ExtIterator end, const EqualityComparable &value, int_type nbuffers=0)
External equivalent of std::find, see stxxl::find.
Definition: scan.h:289
virtual void completed(bool canceled)=0
#define STXXL_END_NAMESPACE
Definition: namespace.h:17