libdap Updated for version 3.21.1
libdap4 is an implementation of OPeNDAP's DAP protocol.
MarshallerThread.cc
Go to the documentation of this file.
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of libdap, A C++ implementation of the OPeNDAP Data
4// Access Protocol.
5
6// Copyright (c) 2015 OPeNDAP, Inc.
7// Author: James Gallagher <jgallagher@opendap.org>
8//
9// This library is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// This library is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22//
23// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24
25/*
26 * MarshallerThread.cc
27 *
28 * Created on: Aug 27, 2015
29 * Author: jimg
30 */
31
32#include "config.h"
33
34#include <fcntl.h>
35#include <pthread.h>
36#include <sys/time.h>
37#include <unistd.h>
38
39#include <ostream>
40#include <sstream>
41
42#include "Error.h"
43#include "InternalErr.h"
44#include "MarshallerThread.h"
45#include "debug.h"
46
47using namespace libdap;
48using namespace std;
49
50// Set TIMING to 1 to enable timing output
51#define TIMING 0
52
53#if TIMING
54bool MarshallerThread::print_time = false;
55
61static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start) {
62 /* Perform the carry for the later subtraction by updating y. */
63 if (stop->tv_usec < start->tv_usec) {
64 int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
65 start->tv_usec -= 1000000 * nsec;
66 start->tv_sec += nsec;
67 }
68 if (stop->tv_usec - start->tv_usec > 1000000) {
69 int nsec = (start->tv_usec - stop->tv_usec) / 1000000;
70 start->tv_usec += 1000000 * nsec;
71 start->tv_sec -= nsec;
72 }
73
74 double result = stop->tv_sec - start->tv_sec;
75 result += double(stop->tv_usec - start->tv_usec) / 1000000;
76 return result;
77}
78#endif
79
89Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) : m_mutex(lock) {
90 int status = pthread_mutex_lock(&m_mutex);
91
92 DBG(cerr << "Locking the mutex! (waiting; " << pthread_self() << ")" << endl);
93
94 if (status != 0)
95 throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
96 while (count != 0) {
97 status = pthread_cond_wait(&cond, &m_mutex);
98 if (status != 0)
99 throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
100 }
101
102 DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
103}
104
109 DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
110
111 (void)pthread_mutex_unlock(&m_mutex);
112}
113
127
128ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count)
129 : m_mutex(lock), m_cond(cond), m_count(count) {
130 int status = pthread_mutex_lock(&m_mutex);
131
132 DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);
133
134 if (status != 0)
135 throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
136
137 DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
138}
139
141 DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
142
143 m_count = 0;
144
145 (void)pthread_cond_signal(&m_cond);
146 (void)pthread_mutex_unlock(&m_mutex);
147}
148
150 if (pthread_attr_init(&d_thread_attr) != 0)
151 throw Error(internal_error, "Failed to initialize pthread attributes.");
152 if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0)
153 throw Error(internal_error, "Failed to complete pthread attribute initialization.");
154
155 if (pthread_mutex_init(&d_out_mutex, nullptr) != 0)
156 throw Error(internal_error, "Failed to initialize mutex.");
157 if (pthread_cond_init(&d_out_cond, nullptr) != 0)
158 throw Error(internal_error, "Failed to initialize cond.");
159}
160
162 (void)pthread_mutex_lock(&d_out_mutex);
163 // d_child_thread_count is passed into the thread in a structure (see write_thread())
164 // and is decremented by the ChildLocker dtor when write_thread() exits. jhrg 2/7/19
165 if (d_child_thread_count != 0) {
166 (void)pthread_cond_wait(&d_out_cond, &d_out_mutex);
167 d_child_thread_count = 0;
168 }
169
170 (void)pthread_mutex_unlock(&d_out_mutex);
171
172 pthread_mutex_destroy(&d_out_mutex);
173 pthread_cond_destroy(&d_out_cond);
174
175 pthread_attr_destroy(&d_thread_attr);
176}
177
178// not a static method
184void MarshallerThread::start_thread(void *(*thread)(void *arg), ostream &out, char *byte_buf, std::streamsize bytes) {
185 auto *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf, bytes);
186 int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
187 if (status != 0)
188 throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
189}
190
194void MarshallerThread::start_thread(void *(*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes) {
195 auto *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf, bytes);
196 int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
197 if (status != 0)
198 throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
199}
200
211 auto *args = reinterpret_cast<write_args *>(arg);
212
213 ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
214
215#if TIMING
216 struct timeval tp_s;
217 if (print_time && gettimeofday(&tp_s, 0) != 0)
218 cerr << "could not read time" << endl;
219#endif
220
221 // force an error
222 // return (void*)-1;
223
224 if (args->d_out_file != -1) {
225 auto bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
226 if (bytes_written != args->d_num)
227 return (void *)-1;
228 } else {
229 args->d_out.write(args->d_buf, args->d_num);
230 if (args->d_out.fail()) {
231 ostringstream oss;
232 oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
233 args->d_error = oss.str();
234 return (void *)-1;
235 }
236 }
237
238 delete[] args->d_buf;
239 delete args;
240
241#if TIMING
242 struct timeval tp_e;
243 if (print_time) {
244 if (gettimeofday(&tp_e, 0) != 0)
245 cerr << "could not read time" << endl;
246
247 cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
248 }
249#endif
250
251 return nullptr;
252}
253
267 auto *args = reinterpret_cast<write_args *>(arg);
268
269 ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
270
271 if (args->d_out_file != -1) {
272 auto bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
273 if (bytes_written != args->d_num)
274 return (void *)-1;
275 } else {
276 args->d_out.write(args->d_buf + 4, args->d_num);
277 if (args->d_out.fail()) {
278 ostringstream oss;
279 oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
280 args->d_error = oss.str();
281 return (void *)-1;
282 }
283 }
284
285 delete[] args->d_buf;
286 delete args;
287
288 return nullptr;
289}
#define internal_error
Internal server error (500)
Definition Error.h:63
A class for error processing.
Definition Error.h:92
A class for software fault reporting.
Definition InternalErr.h:61
Locker()=delete
static void * write_thread(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, std::streamsize bytes_written)
static void * write_thread_part(void *arg)
#define DBG(x)
Definition debug.h:58
top level DAP object to house generic methods
Definition AISConnect.cc:30