44 #include "D4StreamMarshaller.h" 45 #ifdef USE_POSIX_THREADS 46 #include "MarshallerThread.h" 49 #if USE_XDR_FOR_IEEE754_ENCODING 54 #include "DapIndent.h" 66 inline uint8_t* WriteVarint64ToArrayInline(uint64_t value, uint8_t* target) {
69 uint32_t part0 =
static_cast<uint32_t
>(value );
70 uint32_t part1 =
static_cast<uint32_t
>(value >> 28);
71 uint32_t part2 =
static_cast<uint32_t
>(value >> 56);
84 if (part0 < (1 << 14)) {
85 if (part0 < (1 << 7)) {
91 if (part0 < (1 << 21)) {
98 if (part1 < (1 << 14)) {
99 if (part1 < (1 << 7)) {
100 size = 5;
goto size5;
102 size = 6;
goto size6;
105 if (part1 < (1 << 21)) {
106 size = 7;
goto size7;
108 size = 8;
goto size8;
113 if (part2 < (1 << 7)) {
114 size = 9;
goto size9;
116 size = 10;
goto size10;
122 size10: target[9] =
static_cast<uint8_t
>((part2 >> 7) | 0x80);
123 size9 : target[8] =
static_cast<uint8_t
>((part2 ) | 0x80);
124 size8 : target[7] =
static_cast<uint8_t
>((part1 >> 21) | 0x80);
125 size7 : target[6] =
static_cast<uint8_t
>((part1 >> 14) | 0x80);
126 size6 : target[5] =
static_cast<uint8_t
>((part1 >> 7) | 0x80);
127 size5 : target[4] =
static_cast<uint8_t
>((part1 ) | 0x80);
128 size4 : target[3] =
static_cast<uint8_t
>((part0 >> 21) | 0x80);
129 size3 : target[2] =
static_cast<uint8_t
>((part0 >> 14) | 0x80);
130 size2 : target[1] =
static_cast<uint8_t
>((part0 >> 7) | 0x80);
131 size1 : target[0] =
static_cast<uint8_t
>((part0 ) | 0x80);
133 target[size-1] &= 0x7F;
134 return target + size;
138 #if USE_XDR_FOR_IEEE754_ENCODING 139 void D4StreamMarshaller::m_serialize_reals(
char *val,
unsigned int num,
int width, Type type)
141 dods_uint64 size = num * width;
143 char *buf =
new char[size];
145 xdrmem_create(&xdr, &buf[0], size, XDR_ENCODE);
147 if(!xdr_array(&xdr, &val, (
unsigned int *)&num, size, width, XDRUtils::xdr_coder(type)))
148 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 array");
150 if (xdr_getpos(&xdr) != size)
151 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 array");
157 dods_float32 *lbuf =
reinterpret_cast<dods_float32*
>(&buf[0]);
159 dods_int32 *i =
reinterpret_cast<dods_int32*
>(lbuf++);
164 dods_float64 *lbuf =
reinterpret_cast<dods_float64*
>(&buf[0]);
166 dods_int64 *i =
reinterpret_cast<dods_int64*
>(lbuf++);
171 #ifdef USE_POSIX_THREADS 172 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
174 tm->increment_child_thread_count();
175 tm->start_thread(MarshallerThread::write_thread, d_out, buf, size);
180 d_out.write(&buf[0], size);
201 D4StreamMarshaller::D4StreamMarshaller(ostream &out,
bool write_data) :
202 d_out(out), d_write_data(write_data), tm(0)
204 assert(
sizeof(std::streamsize) >=
sizeof(int64_t));
206 #if USE_XDR_FOR_IEEE754_ENCODING 210 xdrmem_create(&d_scalar_sink, d_ieee754_buf,
sizeof(dods_float64), XDR_ENCODE);
213 #ifdef USE_POSIX_THREADS 219 out.exceptions(ostream::failbit | ostream::badbit);
222 D4StreamMarshaller::~D4StreamMarshaller()
224 #if USE_XDR_FOR_IEEE754_ENCODING 225 xdr_destroy(&d_scalar_sink);
251 oss.setf(ios::hex, ios::basefield);
252 oss << setfill(
'0') << setw(8) << d_checksum.
GetCrc32();
265 Crc32::checksum chk = d_checksum.
GetCrc32();
266 #ifdef USE_POSIX_THREADS 267 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
269 d_out.write(reinterpret_cast<char*>(&chk),
sizeof(Crc32::checksum));
278 d_checksum.
AddData(reinterpret_cast<const uint8_t*>(data), len);
281 void D4StreamMarshaller::put_byte(dods_byte val)
286 DBG( std::cerr <<
"put_byte: " << val << std::endl );
287 #ifdef USE_POSIX_THREADS 289 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
291 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_byte));
295 void D4StreamMarshaller::put_int8(dods_int8 val)
300 DBG( std::cerr <<
"put_int8: " << val << std::endl );
301 #ifdef USE_POSIX_THREADS 302 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
304 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_int8));
308 void D4StreamMarshaller::put_int16(dods_int16 val)
313 #ifdef USE_POSIX_THREADS 314 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
316 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_int16));
320 void D4StreamMarshaller::put_int32(dods_int32 val)
325 #ifdef USE_POSIX_THREADS 326 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
328 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_int32));
332 void D4StreamMarshaller::put_int64(dods_int64 val)
337 #ifdef USE_POSIX_THREADS 338 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
340 d_out.write(reinterpret_cast<const char*>(&val),
sizeof(dods_int64));
344 void D4StreamMarshaller::put_float32(dods_float32 val)
346 #if !USE_XDR_FOR_IEEE754_ENCODING 347 assert(std::numeric_limits<float>::is_iec559);
352 #ifdef USE_POSIX_THREADS 353 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
355 d_out.write(reinterpret_cast<const char*>(&val),
sizeof(dods_float32));
365 if (std::numeric_limits<float>::is_iec559 ) {
366 #ifdef USE_POSIX_THREADS 367 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
369 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_float32));
372 if (!xdr_setpos(&d_scalar_sink, 0))
373 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
375 if (!xdr_float(&d_scalar_sink, &val))
376 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
378 if (xdr_getpos(&d_scalar_sink) !=
sizeof(dods_float32))
379 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
384 dods_int32 *i =
reinterpret_cast<dods_int32*
>(&d_ieee754_buf);
387 #ifdef USE_POSIX_THREADS 388 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
390 d_out.write(d_ieee754_buf,
sizeof(dods_float32));
396 void D4StreamMarshaller::put_float64(dods_float64 val)
398 #if !USE_XDR_FOR_IEEE754_ENCODING 399 assert(std::numeric_limits<double>::is_iec559);
404 #ifdef USE_POSIX_THREADS 405 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
407 d_out.write(reinterpret_cast<const char*>(&val),
sizeof(dods_float64));
413 if (std::numeric_limits<double>::is_iec559) {
414 #ifdef USE_POSIX_THREADS 415 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
417 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_float64));}
420 if (!xdr_setpos(&d_scalar_sink, 0))
421 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
423 if (!xdr_double(&d_scalar_sink, &val))
424 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
426 if (xdr_getpos(&d_scalar_sink) !=
sizeof(dods_float64))
427 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
432 dods_int64 *i =
reinterpret_cast<dods_int64*
>(&d_ieee754_buf);
436 #ifdef USE_POSIX_THREADS 437 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
439 d_out.write(d_ieee754_buf,
sizeof(dods_float64));
445 void D4StreamMarshaller::put_uint16(dods_uint16 val)
450 #ifdef USE_POSIX_THREADS 451 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
453 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_uint16));
457 void D4StreamMarshaller::put_uint32(dods_uint32 val)
462 #ifdef USE_POSIX_THREADS 463 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
465 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_uint32));
469 void D4StreamMarshaller::put_uint64(dods_uint64 val)
474 #ifdef USE_POSIX_THREADS 475 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
477 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_uint64));
491 #ifdef USE_POSIX_THREADS 492 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
494 d_out.write(reinterpret_cast<const char*>(&count),
sizeof(int64_t));
497 void D4StreamMarshaller::put_str(
const string &val)
502 int64_t len = val.length();
503 #ifdef USE_POSIX_THREADS 504 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
506 d_out.write(reinterpret_cast<const char*>(&len),
sizeof(int64_t));
507 d_out.write(val.data(), val.length());
511 void D4StreamMarshaller::put_url(
const string &val)
516 void D4StreamMarshaller::put_opaque_dap4(
const char *val, int64_t len)
524 #ifdef USE_POSIX_THREADS 525 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
527 d_out.write(reinterpret_cast<const char*>(&len),
sizeof(int64_t));
529 char *byte_buf =
new char[len];
530 memcpy(byte_buf, val, len);
532 tm->increment_child_thread_count();
535 d_out.write(reinterpret_cast<const char*>(&len),
sizeof(int64_t));
536 d_out.write(val, len);
549 assert(num_bytes >= 0);
554 #ifdef USE_POSIX_THREADS 555 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
557 char *buf =
new char[num_bytes];
558 memcpy(buf, val, num_bytes);
560 tm->increment_child_thread_count();
563 d_out.write(val, num_bytes);
571 assert(num_elem >= 0);
572 assert(elem_size > 0);
578 assert(!
"Don't call this method for bytes, use put_vector(val, bytes) instead");
583 assert(!(num_elem & 0x4000000000000000));
584 bytes = num_elem << 1;
587 assert(!(num_elem & 0x6000000000000000));
588 bytes = num_elem << 2;
591 assert(!(num_elem & 0x7000000000000000));
592 bytes = num_elem << 3;
595 bytes = num_elem * elem_size;
602 #ifdef USE_POSIX_THREADS 603 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
605 char *buf =
new char[bytes];
606 memcpy(buf, val, bytes);
608 tm->increment_child_thread_count();
611 d_out.write(val, bytes);
627 #if !USE_XDR_FOR_IEEE754_ENCODING 629 assert(std::numeric_limits<float>::is_iec559);
631 assert(num_elem >= 0);
636 assert(!(num_elem & 0xe000000000000000));
638 num_elem = num_elem << 2;
643 #ifdef USE_POSIX_THREADS 644 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
646 char *buf =
new char[num_elem];
647 memcpy(buf, val, num_elem);
649 tm->increment_child_thread_count();
652 d_out.write(val, num_elem);
658 assert(num_elem >= 0);
663 assert(!(num_elem & 0xe000000000000000));
665 int64_t bytes = num_elem << 2;
670 if (!std::numeric_limits<float>::is_iec559) {
672 m_serialize_reals(val, num_elem, 4, type);
675 #ifdef USE_POSIX_THREADS 676 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
678 char *buf =
new char[bytes];
679 memcpy(buf, val, bytes);
681 tm->increment_child_thread_count();
684 d_out.write(val, bytes);
701 #if !USE_XDR_FOR_IEEE754_ENCODING 703 assert(std::numeric_limits<double>::is_iec559);
705 assert(num_elem >= 0);
707 assert(!(num_elem & 0xf000000000000000));
709 num_elem = num_elem << 3;
714 #ifdef USE_POSIX_THREADS 715 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
717 char *buf =
new char[num_elem];
718 memcpy(buf, val, num_elem);
720 tm->increment_child_thread_count();
723 d_out.write(val, num_elem);
728 assert(num_elem >= 0);
733 assert(!(num_elem & 0xe000000000000000));
735 int64_t bytes = num_elem << 3;
740 if (!std::numeric_limits<double>::is_iec559) {
742 m_serialize_reals(val, num_elem, 8, type);
745 #ifdef USE_POSIX_THREADS 746 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
748 char *buf =
new char[bytes];
749 memcpy(buf, val, bytes);
751 tm->increment_child_thread_count();
754 d_out.write(val, bytes);
802 throw InternalErr(__FILE__, __LINE__,
"Array of String should not be passed to put_vector.");
805 throw InternalErr(__FILE__, __LINE__,
"Array of Array not allowed.");
808 case dods_structure_c:
809 case dods_sequence_c:
810 throw InternalErr(__FILE__, __LINE__,
"Array of String should not be passed to put_vector.");
813 throw InternalErr(__FILE__, __LINE__,
"Grid is not part of DAP4.");
816 throw InternalErr(__FILE__, __LINE__,
"Unknown datatype.");
822 strm << DapIndent::LMarg <<
"D4StreamMarshaller::dump - (" << (
void *)
this <<
")" << endl;
static void * write_thread(void *arg)
virtual void put_count(int64_t count)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
virtual void dump(std::ostream &strm) const
dump the contents of this object to the specified ostream
virtual void put_vector_float32(char *val, int64_t num_elem)
Write a fixed size vector.
checksum GetCrc32() const
Type
Identifies the data type.
top level DAP object to house generic methods
A class for software fault reporting.
void AddData(const uint8_t *pData, const uint32_t length)
virtual void put_vector_part(char *, unsigned int, int, Type)
virtual void checksum_update(const void *data, unsigned long len)
virtual void put_vector(char *val, int64_t num_bytes)
Write a fixed size vector.
virtual string get_checksum()
virtual void put_vector_float64(char *val, int64_t num_elem)
Write a fixed size vector of float64s.
virtual void put_checksum()
Write the checksum Write the checksum for the data sent since the last call to reset_checksum() to th...
bool is_host_big_endian()
Does this host use big-endian byte order?
virtual void reset_checksum()