libdap  Updated for version 3.20.6
libdap4 is an implementation of OPeNDAP's DAP protocol.
D4StreamMarshaller.cc
1 // D4StreamMarshaller.cc
2 
3 // -*- mode: c++; c-basic-offset:4 -*-
4 
5 // This file is part of libdap, A C++ implementation of the OPeNDAP Data
6 // Access Protocol.
7 
8 // Copyright (c) 2012 OPeNDAP, Inc.
9 // Author: James Gallagher <jgallagher@opendap.org>
10 //
11 // This library is free software; you can redistribute it and/or
12 // modify it under the terms of the GNU Lesser General Public
13 // License as published by the Free Software Foundation; either
14 // version 2.1 of the License, or (at your option) any later version.
15 //
16 // This library is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 // Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public
22 // License along with this library; if not, write to the Free Software
23 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 //
25 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
26 
27 #include "config.h"
28 
29 #include <byteswap.h>
30 #include <cassert>
31 #include <cstring>
32 
33 #include <iostream>
34 #include <sstream>
35 #include <iomanip>
36 #include <limits>
37 
38 //#define DODS_DEBUG 1
39 
40 #ifdef HAVE_PTHREAD_H
41 #include <pthread.h>
42 #endif
43 
44 #include "D4StreamMarshaller.h"
45 #ifdef USE_POSIX_THREADS
46 #include "MarshallerThread.h"
47 #endif
48 
49 #if USE_XDR_FOR_IEEE754_ENCODING
50 #include "XDRUtils.h"
51 #include "util.h"
52 #endif
53 
54 #include "DapIndent.h"
55 #include "debug.h"
56 
57 using namespace std;
58 
59 namespace libdap {
60 
61 #if 0
62 // We decided to use int64_t to represent sizes of both arrays and strings,
63 // So this code is not used. jhrg 10/4/13
64 
65 // From the Google protobuf library
66 inline uint8_t* WriteVarint64ToArrayInline(uint64_t value, uint8_t* target) {
67  // Splitting into 32-bit pieces gives better performance on 32-bit
68  // processors.
69  uint32_t part0 = static_cast<uint32_t>(value );
70  uint32_t part1 = static_cast<uint32_t>(value >> 28);
71  uint32_t part2 = static_cast<uint32_t>(value >> 56);
72 
73  int size;
74 
75  // Here we can't really optimize for small numbers, since the value is
76  // split into three parts. Checking for numbers < 128, for instance,
77  // would require three comparisons, since you'd have to make sure part1
78  // and part2 are zero. However, if the caller is using 64-bit integers,
79  // it is likely that they expect the numbers to often be very large, so
80  // we probably don't want to optimize for small numbers anyway. Thus,
81  // we end up with a hard coded binary search tree...
82  if (part2 == 0) {
83  if (part1 == 0) {
84  if (part0 < (1 << 14)) {
85  if (part0 < (1 << 7)) {
86  size = 1; goto size1;
87  } else {
88  size = 2; goto size2;
89  }
90  } else {
91  if (part0 < (1 << 21)) {
92  size = 3; goto size3;
93  } else {
94  size = 4; goto size4;
95  }
96  }
97  } else {
98  if (part1 < (1 << 14)) {
99  if (part1 < (1 << 7)) {
100  size = 5; goto size5;
101  } else {
102  size = 6; goto size6;
103  }
104  } else {
105  if (part1 < (1 << 21)) {
106  size = 7; goto size7;
107  } else {
108  size = 8; goto size8;
109  }
110  }
111  }
112  } else {
113  if (part2 < (1 << 7)) {
114  size = 9; goto size9;
115  } else {
116  size = 10; goto size10;
117  }
118  }
119 
120  // GOOGLE_LOG(FATAL) << "Can't get here.";
121 
122  size10: target[9] = static_cast<uint8_t>((part2 >> 7) | 0x80);
123  size9 : target[8] = static_cast<uint8_t>((part2 ) | 0x80);
124  size8 : target[7] = static_cast<uint8_t>((part1 >> 21) | 0x80);
125  size7 : target[6] = static_cast<uint8_t>((part1 >> 14) | 0x80);
126  size6 : target[5] = static_cast<uint8_t>((part1 >> 7) | 0x80);
127  size5 : target[4] = static_cast<uint8_t>((part1 ) | 0x80);
128  size4 : target[3] = static_cast<uint8_t>((part0 >> 21) | 0x80);
129  size3 : target[2] = static_cast<uint8_t>((part0 >> 14) | 0x80);
130  size2 : target[1] = static_cast<uint8_t>((part0 >> 7) | 0x80);
131  size1 : target[0] = static_cast<uint8_t>((part0 ) | 0x80);
132 
133  target[size-1] &= 0x7F;
134  return target + size;
135 }
136 #endif
137 
138 #if USE_XDR_FOR_IEEE754_ENCODING
139 void D4StreamMarshaller::m_serialize_reals(char *val, unsigned int num, int width, Type type)
140 {
141  dods_uint64 size = num * width;
142 
143  char *buf = new char[size];
144  XDR xdr;
145  xdrmem_create(&xdr, &buf[0], size, XDR_ENCODE);
146  try {
147  if(!xdr_array(&xdr, &val, (unsigned int *)&num, size, width, XDRUtils::xdr_coder(type)))
148  throw InternalErr(__FILE__, __LINE__, "Error serializing a Float64 array");
149 
150  if (xdr_getpos(&xdr) != size)
151  throw InternalErr(__FILE__, __LINE__, "Error serializing a Float64 array");
152 
153  // If this is a little-endian host, twiddle the bytes
154  static bool twiddle_bytes = !is_host_big_endian();
155  if (twiddle_bytes) {
156  if (width == 4) {
157  dods_float32 *lbuf = reinterpret_cast<dods_float32*>(&buf[0]);
158  while (num--) {
159  dods_int32 *i = reinterpret_cast<dods_int32*>(lbuf++);
160  *i = bswap_32(*i);
161  }
162  }
163  else { // width == 8
164  dods_float64 *lbuf = reinterpret_cast<dods_float64*>(&buf[0]);
165  while (num--) {
166  dods_int64 *i = reinterpret_cast<dods_int64*>(lbuf++);
167  *i = bswap_64(*i);
168  }
169  }
170  }
171 #ifdef USE_POSIX_THREADS
172  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
173 
174  tm->increment_child_thread_count();
175  tm->start_thread(MarshallerThread::write_thread, d_out, buf, size);
176 
177  // The child thread will delete buf when it's done
178  xdr_destroy(&xdr);
179 #else
180  d_out.write(&buf[0], size);
181  xdr_destroy(&xdr);
182  delete [] buf;
183 #endif
184  }
185  catch (...) {
186  xdr_destroy(&xdr);
187  delete [] buf;
188 
189  throw;
190  }
191 }
192 #endif
193 
201 D4StreamMarshaller::D4StreamMarshaller(ostream &out, bool write_data) :
202  d_out(out), d_write_data(write_data), tm(0)
203 {
204  assert(sizeof(std::streamsize) >= sizeof(int64_t));
205 
206 #if USE_XDR_FOR_IEEE754_ENCODING
207  // XDR is used if the call std::numeric_limits<double>::is_iec559()
208  // returns false indicating that the compiler is not using IEEE 754.
209  // If it is, we just write out the bytes.
210  xdrmem_create(&d_scalar_sink, d_ieee754_buf, sizeof(dods_float64), XDR_ENCODE);
211 #endif
212 
213 #ifdef USE_POSIX_THREADS
214  tm = new MarshallerThread;
215 #endif
216 
217  // This will cause exceptions to be thrown on i/o errors. The exception
218  // will be ostream::failure
219  out.exceptions(ostream::failbit | ostream::badbit);
220 }
221 
222 D4StreamMarshaller::~D4StreamMarshaller()
223 {
224 #if USE_XDR_FOR_IEEE754_ENCODING
225  xdr_destroy(&d_scalar_sink);
226 #endif
227 
228  delete tm;
229 }
230 
234 {
235  d_checksum.Reset();
236 }
237 
249 {
250  ostringstream oss;
251  oss.setf(ios::hex, ios::basefield);
252  oss << setfill('0') << setw(8) << d_checksum.GetCrc32();
253 
254  return oss.str();
255 }
256 
264 {
265  Crc32::checksum chk = d_checksum.GetCrc32();
266 #ifdef USE_POSIX_THREADS
267  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
268 #endif
269  d_out.write(reinterpret_cast<char*>(&chk), sizeof(Crc32::checksum));
270 }
271 
276 void D4StreamMarshaller::checksum_update(const void *data, unsigned long len)
277 {
278  d_checksum.AddData(reinterpret_cast<const uint8_t*>(data), len);
279 }
280 
281 void D4StreamMarshaller::put_byte(dods_byte val)
282 {
283  checksum_update(&val, sizeof(dods_byte));
284 
285  if (d_write_data) {
286  DBG( std::cerr << "put_byte: " << val << std::endl );
287 #ifdef USE_POSIX_THREADS
288  // make sure that a child thread is not writing to d_out.
289  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
290 #endif
291  d_out.write(reinterpret_cast<char*>(&val), sizeof(dods_byte));
292  }
293 }
294 
295 void D4StreamMarshaller::put_int8(dods_int8 val)
296 {
297  checksum_update(&val, sizeof(dods_int8));
298 
299  if (d_write_data) {
300  DBG( std::cerr << "put_int8: " << val << std::endl );
301 #ifdef USE_POSIX_THREADS
302  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
303 #endif
304  d_out.write(reinterpret_cast<char*>(&val), sizeof(dods_int8));
305  }
306 }
307 
308 void D4StreamMarshaller::put_int16(dods_int16 val)
309 {
310  checksum_update(&val, sizeof(dods_int16));
311 
312  if (d_write_data) {
313 #ifdef USE_POSIX_THREADS
314  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
315 #endif
316  d_out.write(reinterpret_cast<char*>(&val), sizeof(dods_int16));
317  }
318 }
319 
320 void D4StreamMarshaller::put_int32(dods_int32 val)
321 {
322  checksum_update(&val, sizeof(dods_int32));
323 
324  if (d_write_data) {
325 #ifdef USE_POSIX_THREADS
326  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
327 #endif
328  d_out.write(reinterpret_cast<char*>(&val), sizeof(dods_int32));
329  }
330 }
331 
332 void D4StreamMarshaller::put_int64(dods_int64 val)
333 {
334  checksum_update(&val, sizeof(dods_int64));
335 
336  if (d_write_data) {
337 #ifdef USE_POSIX_THREADS
338  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
339 #endif
340  d_out.write(reinterpret_cast<const char*>(&val), sizeof(dods_int64));
341  }
342 }
343 
344 void D4StreamMarshaller::put_float32(dods_float32 val)
345 {
346 #if !USE_XDR_FOR_IEEE754_ENCODING
347  assert(std::numeric_limits<float>::is_iec559);
348 
349  checksum_update(&val, sizeof(dods_float32));
350 
351  if (d_write_data) {
352 #ifdef USE_POSIX_THREADS
353  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
354 #endif
355  d_out.write(reinterpret_cast<const char*>(&val), sizeof(dods_float32));
356  }
357 
358 #else
359  // This code uses XDR to convert from a local representation to IEEE754;
360  // The extra 'twiddle' operation makes the byte-order correct for this
361  // host should it not be big-endian. Also note the assert() at the
362  // start of the method.
363 
364  if (d_write_data) {
365  if (std::numeric_limits<float>::is_iec559 ) {
366 #ifdef USE_POSIX_THREADS
367  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
368 #endif
369  d_out.write(reinterpret_cast<char*>(&val), sizeof(dods_float32));
370  }
371  else {
372  if (!xdr_setpos(&d_scalar_sink, 0))
373  throw InternalErr(__FILE__, __LINE__, "Error serializing a Float32 variable");
374 
375  if (!xdr_float(&d_scalar_sink, &val))
376  throw InternalErr(__FILE__, __LINE__, "Error serializing a Float32 variable");
377 
378  if (xdr_getpos(&d_scalar_sink) != sizeof(dods_float32))
379  throw InternalErr(__FILE__, __LINE__, "Error serializing a Float32 variable");
380 
381  // If this is a little-endian host, twiddle the bytes
382  static bool twiddle_bytes = !is_host_big_endian();
383  if (twiddle_bytes) {
384  dods_int32 *i = reinterpret_cast<dods_int32*>(&d_ieee754_buf);
385  *i = bswap_32(*i);
386  }
387 #ifdef USE_POSIX_THREADS
388  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
389 #endif
390  d_out.write(d_ieee754_buf, sizeof(dods_float32));
391  }
392  }
393 #endif
394 }
395 
396 void D4StreamMarshaller::put_float64(dods_float64 val)
397 {
398 #if !USE_XDR_FOR_IEEE754_ENCODING
399  assert(std::numeric_limits<double>::is_iec559);
400 
401  checksum_update(&val, sizeof(dods_float64));
402 
403  if (d_write_data) {
404 #ifdef USE_POSIX_THREADS
405  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
406 #endif
407  d_out.write(reinterpret_cast<const char*>(&val), sizeof(dods_float64));
408  }
409 
410 #else
411  // See the comment above in put_float32()
412  if (d_write_data) {
413  if (std::numeric_limits<double>::is_iec559) {
414 #ifdef USE_POSIX_THREADS
415  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
416 #endif
417  d_out.write(reinterpret_cast<char*>(&val), sizeof(dods_float64));}
418  }
419  else {
420  if (!xdr_setpos(&d_scalar_sink, 0))
421  throw InternalErr(__FILE__, __LINE__, "Error serializing a Float64 variable");
422 
423  if (!xdr_double(&d_scalar_sink, &val))
424  throw InternalErr(__FILE__, __LINE__, "Error serializing a Float64 variable");
425 
426  if (xdr_getpos(&d_scalar_sink) != sizeof(dods_float64))
427  throw InternalErr(__FILE__, __LINE__, "Error serializing a Float64 variable");
428 
429  // If this is a little-endian host, twiddle the bytes
430  static bool twiddle_bytes = !is_host_big_endian();
431  if (twiddle_bytes) {
432  dods_int64 *i = reinterpret_cast<dods_int64*>(&d_ieee754_buf);
433  *i = bswap_64(*i);
434  }
435 
436 #ifdef USE_POSIX_THREADS
437  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
438 #endif
439  d_out.write(d_ieee754_buf, sizeof(dods_float64));
440  }
441  }
442 #endif
443 }
444 
445 void D4StreamMarshaller::put_uint16(dods_uint16 val)
446 {
447  checksum_update(&val, sizeof(dods_uint16));
448 
449  if (d_write_data) {
450 #ifdef USE_POSIX_THREADS
451  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
452 #endif
453  d_out.write(reinterpret_cast<char*>(&val), sizeof(dods_uint16));
454  }
455 }
456 
457 void D4StreamMarshaller::put_uint32(dods_uint32 val)
458 {
459  checksum_update(&val, sizeof(dods_uint32));
460 
461  if (d_write_data) {
462 #ifdef USE_POSIX_THREADS
463  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
464 #endif
465  d_out.write(reinterpret_cast<char*>(&val), sizeof(dods_uint32));
466  }
467 }
468 
469 void D4StreamMarshaller::put_uint64(dods_uint64 val)
470 {
471  checksum_update(&val, sizeof(dods_uint64));
472 
473  if (d_write_data) {
474 #ifdef USE_POSIX_THREADS
475  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
476 #endif
477  d_out.write(reinterpret_cast<char*>(&val), sizeof(dods_uint64));
478  }
479 }
480 
489 void D4StreamMarshaller::put_count(int64_t count)
490 {
491 #ifdef USE_POSIX_THREADS
492  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
493 #endif
494  d_out.write(reinterpret_cast<const char*>(&count), sizeof(int64_t));
495 }
496 
497 void D4StreamMarshaller::put_str(const string &val)
498 {
499  checksum_update(val.c_str(), val.length());
500 
501  if (d_write_data) {
502  int64_t len = val.length();
503 #ifdef USE_POSIX_THREADS
504  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
505 #endif
506  d_out.write(reinterpret_cast<const char*>(&len), sizeof(int64_t));
507  d_out.write(val.data(), val.length());
508  }
509 }
510 
511 void D4StreamMarshaller::put_url(const string &val)
512 {
513  put_str(val);
514 }
515 
516 void D4StreamMarshaller::put_opaque_dap4(const char *val, int64_t len)
517 {
518  assert(val);
519  assert(len >= 0);
520 
521  checksum_update(val, len);
522 
523  if (d_write_data) {
524 #ifdef USE_POSIX_THREADS
525  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
526 
527  d_out.write(reinterpret_cast<const char*>(&len), sizeof(int64_t));
528 
529  char *byte_buf = new char[len];
530  memcpy(byte_buf, val, len);
531 
532  tm->increment_child_thread_count();
533  tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, len);
534 #else
535  d_out.write(reinterpret_cast<const char*>(&len), sizeof(int64_t));
536  d_out.write(val, len);
537 #endif
538  }
539 }
540 
546 void D4StreamMarshaller::put_vector(char *val, int64_t num_bytes)
547 {
548  assert(val);
549  assert(num_bytes >= 0);
550 
551  checksum_update(val, num_bytes);
552 
553  if (d_write_data) {
554 #ifdef USE_POSIX_THREADS
555  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
556 
557  char *buf = new char[num_bytes];
558  memcpy(buf, val, num_bytes);
559 
560  tm->increment_child_thread_count();
561  tm->start_thread(MarshallerThread::write_thread, d_out, buf, num_bytes);
562 #else
563  d_out.write(val, num_bytes);
564 #endif
565  }
566 }
567 
568 void D4StreamMarshaller::put_vector(char *val, int64_t num_elem, int elem_size)
569 {
570  assert(val);
571  assert(num_elem >= 0);
572  assert(elem_size > 0);
573 
574  int64_t bytes;
575 
576  switch (elem_size) {
577  case 1:
578  assert(!"Don't call this method for bytes, use put_vector(val, bytes) instead");
579  bytes = num_elem;
580  break;
581  case 2:
582  // Don't bother testing the sign bit
583  assert(!(num_elem & 0x4000000000000000)); // 0x 40 00 --> 0100 0000
584  bytes = num_elem << 1;
585  break;
586  case 4:
587  assert(!(num_elem & 0x6000000000000000)); // 0x 60 00 --> 0110 0000
588  bytes = num_elem << 2;
589  break;
590  case 8:
591  assert(!(num_elem & 0x7000000000000000)); // 0111 0000
592  bytes = num_elem << 3;
593  break;
594  default:
595  bytes = num_elem * elem_size;
596  break;
597  }
598 
599  checksum_update(val, bytes);
600 
601  if (d_write_data) {
602 #ifdef USE_POSIX_THREADS
603  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
604 
605  char *buf = new char[bytes];
606  memcpy(buf, val, bytes);
607 
608  tm->increment_child_thread_count();
609  tm->start_thread(MarshallerThread::write_thread, d_out, buf, bytes);
610 #else
611  d_out.write(val, bytes);
612 #endif
613  }
614 }
615 
625 void D4StreamMarshaller::put_vector_float32(char *val, int64_t num_elem)
626 {
627 #if !USE_XDR_FOR_IEEE754_ENCODING
628 
629  assert(std::numeric_limits<float>::is_iec559);
630  assert(val);
631  assert(num_elem >= 0);
632  // sizeof() a 32-bit float is 4, so we're going to send 4 * num_elem bytes, so
633  // make sure that doesn't overflow a 63-bit integer (the max positive value in
634  // a signed int64; use 1110 0000 0.. (0xe000 ...) to mask for non-zero bits
635  // to test that num can be multiplied by 4. A
636  assert(!(num_elem & 0xe000000000000000));
637 
638  num_elem = num_elem << 2; // num_elem is now the number of bytes
639 
640  checksum_update(val, num_elem);
641 
642  if (d_write_data) {
643 #ifdef USE_POSIX_THREADS
644  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
645 
646  char *buf = new char[num_elem];
647  memcpy(buf, val, num_elem);
648 
649  tm->increment_child_thread_count();
650  tm->start_thread(MarshallerThread::write_thread, d_out, buf, num_elem);
651 #else
652  d_out.write(val, num_elem);
653 #endif
654  }
655 
656 #else
657  assert(val);
658  assert(num_elem >= 0);
659  // sizeof() a 32-bit float is 4, so we're going to send 4 * num_elem bytes, so
660  // make sure that doesn't overflow a 63-bit integer (the max positive value in
661  // a signed int64; use 1110 0000 0.. (0xe000 ...) to mask for non-zero bits
662  // to test that num can be multiplied by 4. A
663  assert(!(num_elem & 0xe000000000000000));
664 
665  int64_t bytes = num_elem << 2; // num_elem is now the number of bytes
666 
667  checksum_update(val, bytes);
668 
669  if (d_write_data) {
670  if (!std::numeric_limits<float>::is_iec559) {
671  // If not using IEEE 754, use XDR to get it that way.
672  m_serialize_reals(val, num_elem, 4, type);
673  }
674  else {
675 #ifdef USE_POSIX_THREADS
676  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
677 
678  char *buf = new char[bytes];
679  memcpy(buf, val, bytes);
680 
681  tm->increment_child_thread_count();
682  tm->start_thread(MarshallerThread::write_thread, d_out, buf, bytes);
683 #else
684  d_out.write(val, bytes);
685 #endif
686  }
687  }
688 #endif
689 }
690 
699 void D4StreamMarshaller::put_vector_float64(char *val, int64_t num_elem)
700 {
701 #if !USE_XDR_FOR_IEEE754_ENCODING
702 
703  assert(std::numeric_limits<double>::is_iec559);
704  assert(val);
705  assert(num_elem >= 0);
706  // See comment above
707  assert(!(num_elem & 0xf000000000000000));
708 
709  num_elem = num_elem << 3; // num_elem is now the number of bytes
710 
711  checksum_update(val, num_elem);
712 
713  if (d_write_data) {
714 #ifdef USE_POSIX_THREADS
715  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
716 
717  char *buf = new char[num_elem];
718  memcpy(buf, val, num_elem);
719 
720  tm->increment_child_thread_count();
721  tm->start_thread(MarshallerThread::write_thread, d_out, buf, num_elem);
722 #else
723  d_out.write(val, num_elem);
724 #endif
725  }
726 #else
727  assert(val);
728  assert(num_elem >= 0);
729  // sizeof() a 32-bit float is 4, so we're going to send 4 * num_elem bytes, so
730  // make sure that doesn't overflow a 63-bit integer (the max positive value in
731  // a signed int64; use 1110 0000 0.. (0xe000 ...) to mask for non-zero bits
732  // to test that num can be multiplied by 4. A
733  assert(!(num_elem & 0xe000000000000000));
734 
735  int64_t bytes = num_elem << 3; // num_elem is now the number of bytes
736 
737  checksum_update(val, bytes);
738 
739  if (d_write_data) {
740  if (!std::numeric_limits<double>::is_iec559) {
741  // If not using IEEE 754, use XDR to get it that way.
742  m_serialize_reals(val, num_elem, 8, type);
743  }
744  else {
745 #ifdef USE_POSIX_THREADS
746  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
747 
748  char *buf = new char[bytes];
749  memcpy(buf, val, bytes);
750 
751  tm->increment_child_thread_count();
752  tm->start_thread(MarshallerThread::write_thread, d_out, buf, bytes);
753 #else
754  d_out.write(val, bytes);
755 #endif
756  }
757  }
758 #endif
759 
760 }
761 
762 void D4StreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type)
763 {
764  assert(val);
765  assert(num >= 0);
766  assert(width > 0);
767 
768  switch(type) {
769  case dods_byte_c:
770  case dods_char_c:
771  case dods_int8_c:
772  case dods_uint8_c:
773  put_vector(val, num);
774  break;
775 
776  case dods_int16_c:
777  case dods_uint16_c:
778  case dods_int32_c:
779  case dods_uint32_c:
780  case dods_int64_c:
781  case dods_uint64_c:
782  put_vector(val, num, width);
783  break;
784 
785  case dods_enum_c:
786  if (width == 1)
787  put_vector(val, num);
788  else
789  put_vector(val, num, width);
790  break;
791 
792  case dods_float32_c:
793  put_vector_float32(val, num);
794  break;
795 
796  case dods_float64_c:
797  put_vector_float32(val, num);
798  break;
799 
800  case dods_str_c:
801  case dods_url_c:
802  throw InternalErr(__FILE__, __LINE__, "Array of String should not be passed to put_vector.");
803 
804  case dods_array_c:
805  throw InternalErr(__FILE__, __LINE__, "Array of Array not allowed.");
806 
807  case dods_opaque_c:
808  case dods_structure_c:
809  case dods_sequence_c:
810  throw InternalErr(__FILE__, __LINE__, "Array of String should not be passed to put_vector.");
811 
812  case dods_grid_c:
813  throw InternalErr(__FILE__, __LINE__, "Grid is not part of DAP4.");
814 
815  default:
816  throw InternalErr(__FILE__, __LINE__, "Unknown datatype.");
817  }
818 }
819 
820 void D4StreamMarshaller::dump(ostream &strm) const
821 {
822  strm << DapIndent::LMarg << "D4StreamMarshaller::dump - (" << (void *) this << ")" << endl;
823 }
824 
825 } // namespace libdap
826 
static void * write_thread(void *arg)
virtual void put_count(int64_t count)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
STL namespace.
virtual void dump(std::ostream &strm) const
dump the contents of this object to the specified ostream
virtual void put_vector_float32(char *val, int64_t num_elem)
Write a fixed size vector.
checksum GetCrc32() const
Definition: crc.h:110
Type
Identifies the data type.
Definition: Type.h:94
top level DAP object to house generic methods
Definition: AISConnect.cc:30
A class for software fault reporting.
Definition: InternalErr.h:64
void AddData(const uint8_t *pData, const uint32_t length)
Definition: crc.h:98
virtual void put_vector_part(char *, unsigned int, int, Type)
virtual void checksum_update(const void *data, unsigned long len)
virtual void put_vector(char *val, int64_t num_bytes)
Write a fixed size vector.
virtual void put_vector_float64(char *val, int64_t num_elem)
Write a fixed size vector of float64s.
virtual void put_checksum()
Write the checksum Write the checksum for the data sent since the last call to reset_checksum() to th...
bool is_host_big_endian()
Does this host use big-endian byte order?
Definition: util.cc:94
void Reset()
Definition: crc.h:92