libdap  Updated for version 3.20.6
libdap4 is an implementation of OPeNDAP's DAP protocol.
MarshallerThread.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of libdap, A C++ implementation of the OPeNDAP Data
4 // Access Protocol.
5 
6 // Copyright (c) 2015 OPeNDAP, Inc.
7 // Author: James Gallagher <jgallagher@opendap.org>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 //
23 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24 
25 /*
26  * MarshallerThread.cc
27  *
28  * Created on: Aug 27, 2015
29  * Author: jimg
30  */
31 
32 #include "config.h"
33 
34 #include <pthread.h>
35 #include <sys/time.h>
36 #include <fcntl.h>
37 #include <unistd.h>
38 
39 #include <ostream>
40 #include <sstream>
41 
42 #include "MarshallerThread.h"
43 #include "Error.h"
44 #include "InternalErr.h"
45 #include "debug.h"
46 
47 using namespace libdap;
48 using namespace std;
49 
50 #if 0
51 bool MarshallerThread::print_time = false;
52 
58 static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start)
59 {
60  /* Perform the carry for the later subtraction by updating y. */
61  if (stop->tv_usec < start->tv_usec) {
62  int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
63  start->tv_usec -= 1000000 * nsec;
64  start->tv_sec += nsec;
65  }
66  if (stop->tv_usec - start->tv_usec > 1000000) {
67  int nsec = (start->tv_usec - stop->tv_usec) / 1000000;
68  start->tv_usec += 1000000 * nsec;
69  start->tv_sec -= nsec;
70  }
71 
72  double result = stop->tv_sec - start->tv_sec;
73  result += double(stop->tv_usec - start->tv_usec) / 1000000;
74  return result;
75 }
76 #endif
77 
78 
88 Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
89  m_mutex(lock)
90 {
91  int status = pthread_mutex_lock(&m_mutex);
92 
93  DBG(cerr << "Locking the mutex! (waiting; " << pthread_self() << ")" << endl);
94 
95  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
96  while (count != 0) {
97  status = pthread_cond_wait(&cond, &m_mutex);
98  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
99  }
100  if (count != 0) throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
101 
102  DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
103 }
104 
109 {
110  DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
111 
112  (void) pthread_mutex_unlock(&m_mutex);
113 #if 0
114  int status = pthread_mutex_unlock(&m_mutex);
115  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
116 #endif
117 }
118 
119 
133 ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
134  m_mutex(lock), m_cond(cond), m_count(count)
135 {
136  int status = pthread_mutex_lock(&m_mutex);
137 
138  DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);
139 
140  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
141 
142  DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
143 }
144 
145 ChildLocker::~ChildLocker()
146 {
147  DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
148 
149  m_count = 0;
150 
151  (void) pthread_cond_signal(&m_cond);
152  (void) pthread_mutex_unlock(&m_mutex);
153 
154 #if 0
155  int status = pthread_cond_signal(&m_cond);
156  if (status != 0)
157  throw InternalErr(__FILE__, __LINE__, "Could not signal main thread from ChildLocker!");
158 
159  status = pthread_mutex_unlock(&m_mutex);
160  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
161 #endif
162 }
163 
164 MarshallerThread::MarshallerThread() :
165  d_thread(0), d_child_thread_count(0)
166 {
167  if (pthread_attr_init(&d_thread_attr) != 0) throw Error(internal_error, "Failed to initialize pthread attributes.");
168  if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0)
169  throw Error(internal_error, "Failed to complete pthread attribute initialization.");
170 
171  if (pthread_mutex_init(&d_out_mutex, 0) != 0) throw Error(internal_error, "Failed to initialize mutex.");
172  if (pthread_cond_init(&d_out_cond, 0) != 0) throw Error(internal_error, "Failed to initialize cond.");
173 }
174 
175 MarshallerThread::~MarshallerThread()
176 {
177  (void) pthread_mutex_lock(&d_out_mutex);
178 #if 0
179  int status = pthread_mutex_lock(&d_out_mutex);
180  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
181 #endif
182  // d_child_thread_count is passed into the thread in a structure (see write_thread())
183  // and is decremented by the ChildLocker dtor when write_thread() exits. jhrg 2/7/19
184  if (d_child_thread_count != 0) {
185  (void) pthread_cond_wait(&d_out_cond, &d_out_mutex);
186  d_child_thread_count = 0;
187 #if 0
188  status = pthread_cond_wait(&d_out_cond, &d_out_mutex);
189  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
190 #endif
191  }
192 
193  (void) pthread_mutex_unlock(&d_out_mutex);
194 
195 #if 0
196  if (d_child_thread_count != 0)
197  throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
198 
199  status = pthread_mutex_unlock(&d_out_mutex);
200  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
201 #endif
202 
203  pthread_mutex_destroy(&d_out_mutex);
204  pthread_cond_destroy(&d_out_cond);
205 
206  pthread_attr_destroy(&d_thread_attr);
207 }
208 
209 // not a static method
215 void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf,
216  unsigned int bytes)
217 {
218  write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf,
219  bytes);
220  int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
221  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
222 }
223 
227 void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes)
228 {
229  write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf,
230  bytes);
231  int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
232  if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
233 }
234 
244 void *
246 {
247  write_args *args = reinterpret_cast<write_args *>(arg);
248 
249  ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
250 
251 #if 0
252  struct timeval tp_s;
253  if (print_time && gettimeofday(&tp_s, 0) != 0) cerr << "could not read time" << endl;
254 #endif
255 
256  // force an error
257  // return (void*)-1;
258 
259  if (args->d_out_file != -1) {
260  int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
261  if (bytes_written != args->d_num)
262  return (void*) -1;
263  }
264  else {
265  args->d_out.write(args->d_buf, args->d_num);
266  if (args->d_out.fail()) {
267  ostringstream oss;
268  oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
269  args->d_error = oss.str();
270  return (void*) -1;
271  }
272  }
273 
274  delete [] args->d_buf;
275  delete args;
276 
277 #if 0
278  struct timeval tp_e;
279  if (print_time) {
280  if (gettimeofday(&tp_e, 0) != 0) cerr << "could not read time" << endl;
281 
282  cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
283  }
284 #endif
285 
286  return 0;
287 }
288 
301 void *
303 {
304  write_args *args = reinterpret_cast<write_args *>(arg);
305 
306  ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
307 
308  if (args->d_out_file != -1) {
309  int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
310  if (bytes_written != args->d_num) return (void*) -1;
311  }
312  else {
313  args->d_out.write(args->d_buf + 4, args->d_num);
314  if (args->d_out.fail()) {
315  ostringstream oss;
316  oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
317  args->d_error = oss.str();
318  return (void*) -1;
319  }
320  }
321 
322  delete [] args->d_buf;
323  delete args;
324 
325  return 0;
326 }
327 
static void * write_thread(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
STL namespace.
top level DAP object to house generic methods
Definition: AISConnect.cc:30
A class for software fault reporting.
Definition: InternalErr.h:64
static void * write_thread_part(void *arg)
A class for error processing.
Definition: Error.h:92