42#include <libdap/D4Enum.h>
43#include <libdap/D4Attributes.h>
44#include <libdap/D4Maps.h>
45#include <libdap/D4Group.h>
46#include <libdap/Byte.h>
47#include <libdap/util.h>
49#include "BESInternalError.h"
50#include "BESInternalFatalError.h"
53#include "BESStopWatch.h"
55#include "byteswap_compat.h"
56#include "float_byteswap.h"
57#include "CurlHandlePool.h"
59#include "DmrppArray.h"
60#include "DmrppStructure.h"
61#include "DmrppRequestHandler.h"
62#include "DmrppNames.h"
67#define dmrpp_3 "dmrpp:3"
73#define prolog std::string("DmrppArray::").append(__func__).append("() - ")
79std::mutex transfer_thread_pool_mtx;
81atomic_uint transfer_thread_counter(0);
99bool get_next_future(list<std::future<bool>> &futures, atomic_uint &thread_counter,
unsigned long timeout,
string debug_prefix) {
100 bool future_finished =
false;
102 std::chrono::milliseconds timeout_ms (timeout);
105 auto futr = futures.begin();
106 auto fend = futures.end();
107 bool future_is_valid =
true;
108 while(!future_finished && future_is_valid && futr != fend){
109 future_is_valid = (*futr).valid();
117 if((*futr).wait_for(timeout_ms) != std::future_status::timeout){
119 bool success = (*futr).get();
120 future_finished =
true;
121 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"Called future::get() on a ready future."
122 <<
" success: " << (success?
"true":
"false") << endl);
125 msg << debug_prefix << prolog <<
"The std::future has failed!";
126 msg <<
" thread_counter: " << thread_counter;
127 throw BESInternalError(msg.str(), __FILE__, __LINE__);
142 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"future::wait_for() timed out. (timeout: " <<
143 timeout <<
" ms) There are currently " << futures.size() <<
" futures in process."
144 <<
" thread_counter: " << thread_counter << endl);
148 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"The future was not valid. Dumping... " << endl);
149 future_finished =
true;
153 if (futr!=fend && future_finished) {
156 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"Erased future from futures list. (Erased future was "
157 << (future_is_valid?
"":
"not ") <<
"valid at start.) There are currently " <<
158 futures.size() <<
" futures in process. thread_counter: " << thread_counter << endl);
161 done = future_finished || futures.empty();
164 return future_finished;
168 if (!args->the_one_chunk->get_rbuf()) {
169 throw BESInternalError(
"one_child_chunk_thread_new_sanity_check() - the_one_chunk->get_rbuf() is NULL!", __FILE__, __LINE__);
171 if (!args->child_chunk->get_rbuf()) {
172 throw BESInternalError(
"one_child_chunk_thread_new_sanity_check() - child_chunk->get_rbuf() is NULL!", __FILE__, __LINE__);
174 if (args->child_chunk->get_bytes_read() != args->child_chunk->get_size()) {
175 throw BESInternalError(
"one_child_chunk_thread_new_sanity_check() - child_chunk->get_bytes_read() != child_chunk->get_size()!", __FILE__, __LINE__);
189bool one_child_chunk_thread_new(
const unique_ptr<one_child_chunk_args_new> &args)
191 args->child_chunk->read_chunk();
193 one_child_chunk_thread_new_sanity_check(args.get());
206 unsigned long long offset_within_the_one_chunk = args->child_chunk->get_offset() - args->the_one_chunk->get_offset();
208 memcpy(args->the_one_chunk->get_rbuf() + offset_within_the_one_chunk, args->child_chunk->get_rbuf(),
209 args->child_chunk->get_bytes_read());
219bool one_super_chunk_transfer_thread(
const unique_ptr<one_super_chunk_args> &args)
222#if DMRPP_ENABLE_THREAD_TIMERS
223 stringstream timer_tag;
224 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
225 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" sc_id: " << args->super_chunk->id();
226 BES_STOPWATCH_START(TRANSFER_THREADS, prolog + timer_tag.str());
229 args->super_chunk->read();
238bool one_super_chunk_unconstrained_transfer_thread(
const unique_ptr<one_super_chunk_args> &args)
241#if DMRPP_ENABLE_THREAD_TIMERS
242 stringstream timer_tag;
243 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
244 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" sc_id: " << args->super_chunk->id();
245 BES_STOPWATCH_START(TRANSFER_THREADS, prolog + timer_tag.str());
248 args->super_chunk->read_unconstrained();
252bool one_super_chunk_unconstrained_transfer_thread_dio(
const unique_ptr<one_super_chunk_args> &args)
255#if DMRPP_ENABLE_THREAD_TIMERS
256 stringstream timer_tag;
257 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
258 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" sc_id: " << args->super_chunk->id();
259 BES_STOPWATCH_START(TRANSFER_THREADS, prolog + timer_tag.str());
262 args->super_chunk->read_unconstrained_dio();
267bool start_one_child_chunk_thread(list<std::future<bool>> &futures, unique_ptr<one_child_chunk_args_new> args) {
269 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
270 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
271 transfer_thread_counter++;
272 futures.push_back(std::async(std::launch::async, one_child_chunk_thread_new, std::move(args)));
277 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '" << futures.size() <<endl);
290bool start_super_chunk_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
292 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
293 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
294 transfer_thread_counter++;
295 futures.push_back(std::async(std::launch::async, one_super_chunk_transfer_thread, std::move(args)));
300 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '" << futures.size() <<endl);
313bool start_super_chunk_unconstrained_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
315 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
316 if(transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
317 transfer_thread_counter++;
318 futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_transfer_thread, std::move(args)));
323 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '" << futures.size() <<endl);
329bool start_super_chunk_unconstrained_transfer_thread_dio(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
331 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
332 if(transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
333 transfer_thread_counter++;
334 futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_transfer_thread_dio, std::move(args)));
336 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '" << futures.size() <<
337 "' from std::async, transfer_thread_counter: " << transfer_thread_counter << endl);
363void read_super_chunks_unconstrained_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks,
DmrppArray *array)
365 BES_STOPWATCH_START(MODULE, prolog +
"Timing array name: "+array->name());
372 list<future<bool>> futures;
375 bool future_finished =
true;
379 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
383 BESDEBUG(dmrpp_3, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
385 if (!super_chunks.empty()){
387 bool thread_started =
true;
388 while(thread_started && !super_chunks.empty()) {
389 auto super_chunk = super_chunks.front();
390 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << super_chunk->to_string(
false) << endl);
393 thread_started = start_super_chunk_unconstrained_transfer_thread(futures, std::move(args));
395 if (thread_started) {
397 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for " << super_chunk->to_string(
false) << endl);
400 BESDEBUG(dmrpp_3, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
401 " transfer_thread_counter: " << transfer_thread_counter <<
402 " futures.size(): " << futures.size() << endl);
411 future_finished =
false;
416 while(!futures.empty()){
417 if(futures.back().valid())
418 futures.back().get();
428void read_super_chunks_unconstrained_concurrent_dio(queue<shared_ptr<SuperChunk>> &super_chunks,
DmrppArray *array)
430 BES_STOPWATCH_START(MODULE, prolog +
"Timing array name: "+array->name());
437 list<future<bool>> futures;
440 bool future_finished =
true;
444 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
448 BESDEBUG(dmrpp_3, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
450 if (!super_chunks.empty()){
452 bool thread_started =
true;
453 while(thread_started && !super_chunks.empty()) {
454 auto super_chunk = super_chunks.front();
455 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << super_chunk->to_string(
false) << endl);
460 thread_started = start_super_chunk_unconstrained_transfer_thread_dio(futures, std::move(args));
462 if (thread_started) {
464 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for " << super_chunk->to_string(
false) << endl);
467 BESDEBUG(dmrpp_3, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
468 " transfer_thread_counter: " << transfer_thread_counter <<
469 " futures.size(): " << futures.size() << endl);
478 future_finished =
false;
483 while(!futures.empty()){
484 if(futures.back().valid())
485 futures.back().get();
514void read_super_chunks_concurrent(queue< shared_ptr<SuperChunk> > &super_chunks,
DmrppArray *array)
516 BES_STOPWATCH_START(MODULE, prolog +
"Timing array name: "+array->name());
523 list<future<bool>> futures;
526 bool future_finished =
true;
530 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
534 BESDEBUG(dmrpp_3, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
536 if (!super_chunks.empty()){
538 bool thread_started =
true;
539 while(thread_started && !super_chunks.empty()) {
540 auto super_chunk = super_chunks.front();
541 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << super_chunk->to_string(
false) << endl);
544 thread_started = start_super_chunk_transfer_thread(futures, std::move(args));
546 if (thread_started) {
548 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for " << super_chunk->to_string(
false) << endl);
551 BESDEBUG(dmrpp_3, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
552 " transfer_thread_counter: " << transfer_thread_counter <<
553 " futures.size(): " << futures.size() << endl);
562 future_finished =
false;
567 while(!futures.empty()){
568 if(futures.back().valid())
569 futures.back().get();
595static unsigned long long
596get_index(
const vector<unsigned long long> &address_in_target,
const vector<unsigned long long> &target_shape)
598 if (address_in_target.size() != target_shape.size()) {
599 throw BESInternalError(
"get_index: address_in_target != target_shape", __FILE__, __LINE__);
602 auto shape_index = target_shape.rbegin();
603 auto index = address_in_target.rbegin(), index_end = address_in_target.rend();
605 unsigned long long multiplier_var = *shape_index++;
606 unsigned long long offset = *index++;
608 while (index != index_end) {
609 if (*index >= *shape_index) {
610 throw BESInternalError(
"get_index: index >= shape_index", __FILE__, __LINE__);
613 offset += multiplier_var * *index++;
614 multiplier_var *= *shape_index++;
636static unsigned long long multiplier(
const vector<unsigned long long> &shape,
unsigned int k)
638 if (!(shape.size() > k + 1)) {
639 throw BESInternalError(
"multiplier: !(shape.size() > k + 1)", __FILE__, __LINE__);
642 vector<unsigned long long>::const_iterator i = shape.begin(), e = shape.end();
644 unsigned long long multiplier = *i++;
661 if (
this == &rhs)
return *
this;
663 dynamic_cast<Array &
>(*this) = rhs;
665 dynamic_cast<DmrppCommon &
>(*this) = rhs;
677 for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
678 if (dimension_size_ll(p,
true) != dimension_size_ll(p,
false))
return true;
692 unsigned long long asize = 1;
693 for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
694 auto dim_size = dimension_size_ll(dim, constrained);
708 auto dim = dim_begin(), edim = dim_end();
709 vector<unsigned long long> shape;
713 shape.reserve(edim - dim);
715 for (; dim != edim; dim++) {
716 shape.push_back(dimension_size_ll(dim, constrained));
727DmrppArray::dimension DmrppArray::get_dimension(
unsigned int i)
729 if (i > (dim_end() - dim_begin())) {
730 throw BESInternalError(
"get_dimension: i > (dim_end() - dim_begin())", __FILE__, __LINE__);
732 return *(dim_begin() + i);
748void DmrppArray::insert_constrained_contiguous(Dim_iter dim_iter,
unsigned long *target_index,
749 vector<unsigned long long> &subset_addr,
750 const vector<unsigned long long> &array_shape,
char *src_buf,
char *dest_buf)
752 BESDEBUG(
"dmrpp",
"DmrppArray::" << __func__ <<
"() - subsetAddress.size(): " << subset_addr.size() << endl);
754 uint64_t start = this->dimension_start_ll(dim_iter,
true);
755 uint64_t stop = this->dimension_stop_ll(dim_iter,
true);
756 uint64_t stride = this->dimension_stride_ll(dim_iter,
true);
762 if (dim_iter == dim_end() && stride == 1) {
764 subset_addr.push_back(start);
765 unsigned long long start_index = get_index(subset_addr, array_shape);
766 subset_addr.pop_back();
768 subset_addr.push_back(stop);
769 unsigned long long stop_index = get_index(subset_addr, array_shape);
770 subset_addr.pop_back();
774 for (uint64_t source_index = start_index; source_index <= stop_index; source_index++) {
775 uint64_t target_byte = *target_index * bytes_per_element;
776 uint64_t source_byte = source_index * bytes_per_element;
778 for (
unsigned long i = 0; i < bytes_per_element; i++) {
779 dest_buf[target_byte++] = src_buf[source_byte++];
786 for (uint64_t myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
789 if (dim_iter != dim_end()) {
791 subset_addr.push_back(myDimIndex);
792 insert_constrained_contiguous(dim_iter, target_index, subset_addr, array_shape, src_buf, dest_buf);
793 subset_addr.pop_back();
797 subset_addr.push_back(myDimIndex);
798 unsigned int sourceIndex = get_index(subset_addr, array_shape);
799 subset_addr.pop_back();
802 uint64_t target_byte = *target_index * bytes_per_element;
803 uint64_t source_byte = sourceIndex * bytes_per_element;
805 for (
unsigned int i = 0; i < bytes_per_element; i++) {
806 dest_buf[target_byte++] = src_buf[source_byte++];
834void DmrppArray::read_contiguous()
837 BESDEBUG(dmrpp_3, prolog <<
"NOT using direct IO " << endl);
838 BES_STOPWATCH_START(MODULE, prolog +
"Timing array name: "+name());
842 throw BESInternalError(
string(
"Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
847 unsigned long long the_one_chunk_offset = the_one_chunk->get_offset();
848 unsigned long long the_one_chunk_size = the_one_chunk->get_size();
853 if (!DmrppRequestHandler::d_use_transfer_threads || the_one_chunk_size <= DmrppRequestHandler::d_contiguous_concurrent_threshold) {
855 the_one_chunk->read_chunk();
861 the_one_chunk->set_rbuf_to_size();
864 if (the_one_chunk->get_uses_fill_value()) {
865 the_one_chunk->load_fill_values();
870 unsigned long long num_chunks = floor(the_one_chunk_size / MB);
871 if (num_chunks >= DmrppRequestHandler::d_max_transfer_threads)
872 num_chunks = DmrppRequestHandler::d_max_transfer_threads;
875 unsigned long long chunk_size = the_one_chunk_size / num_chunks;
876 std::string chunk_byteorder = the_one_chunk->get_byte_order();
880 unsigned long long chunk_remainder = the_one_chunk_size % num_chunks;
881 auto chunk_url = the_one_chunk->get_data_url();
884 queue<shared_ptr<Chunk>> chunks_to_read;
887 unsigned long long chunk_offset = the_one_chunk_offset;
888 for (
unsigned int i = 0; i < num_chunks - 1; i++) {
889 if (chunk_url ==
nullptr) {
890 BESDEBUG(dmrpp_3,
"chunk_url is null, this may be a variable that covers the fill values." <<endl);
891 chunks_to_read.push(shared_ptr<Chunk>(
new Chunk(chunk_byteorder,the_one_chunk->get_fill_value(),the_one_chunk->get_fill_value_type(), chunk_size, chunk_offset)));
894 chunks_to_read.push(shared_ptr<Chunk>(
new Chunk(chunk_url, chunk_byteorder, chunk_size, chunk_offset)));
895 chunk_offset += chunk_size;
898 if (chunk_url !=
nullptr)
899 chunks_to_read.push(shared_ptr<Chunk>(
new Chunk(chunk_url, chunk_byteorder, chunk_size + chunk_remainder, chunk_offset)));
901 chunks_to_read.push(shared_ptr<Chunk>(
new Chunk(chunk_byteorder,the_one_chunk->get_fill_value(),the_one_chunk->get_fill_value_type(), chunk_size, chunk_offset)));
904 list<future<bool>> futures;
907 bool future_finished =
true;
910 if (!futures.empty())
911 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
915 BESDEBUG(dmrpp_3, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
916 if (!chunks_to_read.empty()) {
918 bool thread_started =
true;
919 while (thread_started && !chunks_to_read.empty()) {
920 auto current_chunk = chunks_to_read.front();
921 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << current_chunk->to_string() << endl);
923 auto args = unique_ptr<one_child_chunk_args_new>(
new one_child_chunk_args_new(current_chunk, the_one_chunk));
925 thread_started = start_one_child_chunk_thread(futures, std::move(args));
927 if (thread_started) {
928 chunks_to_read.pop();
929 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for " << current_chunk->to_string() << endl);
932 BESDEBUG(dmrpp_3, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
933 " transfer_thread_counter: " << transfer_thread_counter <<
934 " futures.size(): " << futures.size() << endl);
942 future_finished =
false;
947 while (!futures.empty()) {
948 if (futures.back().valid())
949 futures.back().get();
956 BESDEBUG(dmrpp_3, prolog <<
"Before is_filter " << endl);
965 reserve_value_capacity_ll(
get_size(
false));
968 if (this->var()->type() != dods_structure_c)
969 val2buf(the_one_chunk->get_rbuf());
973 if (is_readable_struct) {
975 char *buf_value = the_one_chunk->get_rbuf();
977 unsigned long long value_size = this->length_ll() * bytes_per_element;
978 vector<char> values(buf_value,buf_value+value_size);
979 read_array_of_structure(values);
982 throw InternalErr(__FILE__, __LINE__,
"Only handle integer and float base types. Cannot handle the array of complex structure yet.");
987 if (this->var()->type() != dods_structure_c) {
989 vector<unsigned long long> array_shape =
get_shape(
false);
990 unsigned long target_index = 0;
991 vector<unsigned long long> subset;
994 reserve_value_capacity_ll(
get_size(
true));
995 char *dest_buf = get_buf();
996 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, the_one_chunk->get_rbuf(),dest_buf);
1000 if (is_readable_struct) {
1001 unsigned long long value_size =
get_size(
true)*bytes_per_element;
1002 vector<char> values;
1003 values.resize(value_size);
1004 vector<unsigned long long> array_shape =
get_shape(
false);
1005 unsigned long target_index = 0;
1006 vector<unsigned long long> subset;
1007 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, the_one_chunk->get_rbuf(),values.data());
1008 read_array_of_structure(values);
1011 throw InternalErr(__FILE__, __LINE__,
"Only handle integer and float base types. Cannot handle the array of complex structure yet.");
1017 BESDEBUG(dmrpp_3, prolog <<
" NOT using direct IO : end of this method." << endl);
1020void DmrppArray::read_one_chunk_dio() {
1022 BESDEBUG(dmrpp_3, prolog <<
"Using direct IO " << endl);
1025 throw BESInternalError(
string(
"Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
1031 the_one_chunk->read_chunk_dio();
1032 reserve_value_capacity_ll_byte(get_var_chunks_storage_size());
1033 const char *source_buffer = the_one_chunk->get_rbuf();
1034 char *target_buffer = get_buf();
1035 memcpy(target_buffer, source_buffer , the_one_chunk->get_size());
1058void DmrppArray::insert_chunk_unconstrained(shared_ptr<Chunk> chunk,
unsigned int dim,
unsigned long long array_offset,
1059 const vector<unsigned long long> &array_shape,
1060 unsigned long long chunk_offset,
const vector<unsigned long long> &chunk_shape,
1061 const vector<unsigned long long> &chunk_origin)
1066 dimension thisDim = this->get_dimension(dim);
1067 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1068 if ((
unsigned long long ) thisDim.stop < end_element) {
1069 end_element = thisDim.stop;
1072 unsigned long long chunk_end = end_element - chunk_origin[dim];
1074 unsigned int last_dim = chunk_shape.size() - 1;
1075 if (dim == last_dim) {
1077 unsigned int elem_width = bytes_per_element;
1078 array_offset += chunk_origin[dim];
1081 unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * bytes_per_element;
1082 char *source_buffer = chunk->get_rbuf();
1083 char *target_buffer =
nullptr;
1084 if (is_readable_struct)
1085 target_buffer = d_structure_array_buf.data();
1087 target_buffer = get_buf();
1088 memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
1092 unsigned long long mc = multiplier(chunk_shape, dim);
1093 unsigned long long ma = multiplier(array_shape, dim);
1096 for (
unsigned long long chunk_index = 0 ; chunk_index <= chunk_end; ++chunk_index) {
1097 unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
1098 unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
1101 insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape,
1108void DmrppArray::insert_chunk_unconstrained_dio(shared_ptr<Chunk> chunk) {
1110 const char *source_buffer = chunk->get_rbuf();
1111 char *target_buffer = get_buf();
1114 memcpy(target_buffer + chunk->get_direct_io_offset(), source_buffer,chunk->get_size());
1129void DmrppArray::read_chunks_unconstrained()
1132 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
1137 unsigned long long sc_count=0;
1139 sc_id << name() <<
"-" << sc_count++;
1140 queue<shared_ptr<SuperChunk>> super_chunks;
1141 auto current_super_chunk = std::make_shared<SuperChunk>(sc_id.str(),
this) ;
1142 super_chunks.push(current_super_chunk);
1146 bool added = current_super_chunk->add_chunk(chunk);
1148 sc_id.str(std::string());
1149 sc_id << name() <<
"-" << sc_count++;
1150 current_super_chunk = std::make_shared<SuperChunk>(sc_id.str(),
this);
1151 super_chunks.push(current_super_chunk);
1152 if (!current_super_chunk->add_chunk(chunk)) {
1154 msg << prolog <<
"Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1155 throw BESInternalError(msg.str(), __FILE__, __LINE__);
1160 reserve_value_capacity_ll(
get_size());
1161 if (is_readable_struct) {
1162 d_structure_array_buf.resize(this->length_ll()*bytes_per_element);
1167 const vector<unsigned long long> array_shape =
get_shape(
true);
1172 BESDEBUG(dmrpp_3, prolog <<
"d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ?
"true" :
"false") << endl);
1173 BESDEBUG(dmrpp_3, prolog <<
"d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1175 if (!DmrppRequestHandler::d_use_transfer_threads) {
1176#if DMRPP_ENABLE_THREAD_TIMERS
1177 BES_STOPWATCH_START(dmrpp_3, prolog +
"Serial SuperChunk Processing.");
1179 while(!super_chunks.empty()) {
1180 auto super_chunk = super_chunks.front();
1182 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(
true) << endl );
1183 super_chunk->read_unconstrained();
1187#if DMRPP_ENABLE_THREAD_TIMERS
1188 string timer_name = prolog +
"Concurrent SuperChunk Processing. d_max_transfer_threads: " + to_string(DmrppRequestHandler::d_max_transfer_threads);
1189 BES_STOPWATCH_START(dmrpp_3, timer_name);
1191 read_super_chunks_unconstrained_concurrent(super_chunks,
this);
1194 if (is_readable_struct)
1195 read_array_of_structure(d_structure_array_buf);
1201void DmrppArray::read_chunks_dio_unconstrained()
1205 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
1210 unsigned long long sc_count=0;
1212 sc_id << name() <<
"-" << sc_count++;
1213 queue<shared_ptr<SuperChunk>> super_chunks;
1214 auto current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this)) ;
1215 super_chunks.push(current_super_chunk);
1219 bool added = current_super_chunk->add_chunk(chunk);
1221 sc_id.str(std::string());
1222 sc_id << name() <<
"-" << sc_count++;
1223 current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this));
1224 super_chunks.push(current_super_chunk);
1225 if (!current_super_chunk->add_chunk(chunk)) {
1227 msg << prolog <<
"Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1228 throw BESInternalError(msg.str(), __FILE__, __LINE__);
1234 reserve_value_capacity_ll_byte(get_var_chunks_storage_size());
1237 const vector<unsigned long long> array_shape =
get_shape(
true);
1241 BESDEBUG(dmrpp_3, prolog <<
"d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ?
"true" :
"false") << endl);
1242 BESDEBUG(dmrpp_3, prolog <<
"d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1244 if (!DmrppRequestHandler::d_use_transfer_threads) {
1245#if DMRPP_ENABLE_THREAD_TIMERS
1246 BES_STOPWATCH_START(dmrpp_3, prolog +
"Serial SuperChunk Processing.");
1248 while(!super_chunks.empty()) {
1249 auto super_chunk = super_chunks.front();
1251 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(
true) << endl );
1254 super_chunk->read_unconstrained_dio();
1258#if DMRPP_ENABLE_THREAD_TIMERS
1259 string timer_name = prolog +
"Concurrent SuperChunk Processing. d_max_transfer_threads: " + to_string( DmrppRequestHandler::d_max_transfer_threads);
1260 BES_STOPWATCH_START(dmrpp_3, timer_name);
1263 read_super_chunks_unconstrained_concurrent_dio(super_chunks,
this);
1273void DmrppArray::read_linked_blocks(){
1275 unsigned int num_linked_blocks = this->get_total_linked_blocks();
1276 if (num_linked_blocks <2)
1277 throw BESInternalError(
"The number of linked blocks must be >1 to read the data.", __FILE__, __LINE__);
1279 vector<unsigned long long> accumulated_lengths;
1280 accumulated_lengths.resize(num_linked_blocks);
1281 vector<unsigned long long> individual_lengths;
1282 individual_lengths.resize(num_linked_blocks);
1289 individual_lengths[chunk->get_linked_block_index()] = chunk->get_size();
1291 accumulated_lengths[0] = 0;
1292 for (
unsigned int i = 1; i < num_linked_blocks; i++)
1293 accumulated_lengths[i] = individual_lengths[i-1] + accumulated_lengths[i-1];
1295 if (this->var()->type() == dods_structure_c) {
1299 if (!is_readable_struct)
1300 throw InternalErr(__FILE__, __LINE__,
"Only handle integer and float base types. Cannot handle the array of complex structure yet.");
1303 if (filters_str.find(
"deflate")!=string::npos)
1304 throw InternalErr(__FILE__, __LINE__,
"We don't handle compressed array of structure now.");
1306 vector<char> values;
1307 values.resize(get_var_chunks_storage_size());
1308 char *target_buffer = values.data();
1311 chunk->read_chunk();
1312 BESDEBUG(dmrpp_3, prolog <<
"linked_block_index: " << chunk->get_linked_block_index() << endl);
1313 BESDEBUG(dmrpp_3, prolog <<
"accumlated_length: " << accumulated_lengths[chunk->get_linked_block_index()] << endl);
1314 const char *source_buffer = chunk->get_rbuf();
1315 memcpy(target_buffer + accumulated_lengths[chunk->get_linked_block_index()], source_buffer,chunk->get_size());
1318 read_array_of_structure(values);
1323 reserve_value_capacity_ll_byte(get_var_chunks_storage_size());
1325 char *target_buffer = get_buf();
1328 chunk->read_chunk();
1329 BESDEBUG(dmrpp_3, prolog <<
"linked_block_index: " << chunk->get_linked_block_index() << endl);
1330 BESDEBUG(dmrpp_3, prolog <<
"accumlated_length: " << accumulated_lengths[chunk->get_linked_block_index()] << endl);
1331 const char *source_buffer = chunk->get_rbuf();
1332 memcpy(target_buffer + accumulated_lengths[chunk->get_linked_block_index()], source_buffer,chunk->get_size());
1335 if (filters_string.find(
"deflate")!=string::npos) {
1337 char *in_buf = get_buf();
1339 char **destp =
nullptr;
1340 char *dest_deflate =
nullptr;
1341 unsigned long long dest_len = get_var_chunks_storage_size();
1342 unsigned long long src_len = get_var_chunks_storage_size();
1343 dest_deflate =
new char[dest_len];
1344 destp = &dest_deflate;
1345 inflate_simple(destp, dest_len, in_buf, src_len);
1347 this->clear_local_data();
1348 reserve_value_capacity_ll_byte(this->width_ll());
1349 char *out_buf = get_buf();
1350 memcpy(out_buf,dest_deflate,this->width_ll());
1351 delete []dest_deflate;
1361 const vector<unsigned long long> array_shape =
get_shape(
true);
1366 BESDEBUG(dmrpp_3, prolog <<
"d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ?
"true" :
"false") << endl);
1367 BESDEBUG(dmrpp_3, prolog <<
"d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1369 if (!DmrppRequestHandler::d_use_transfer_threads) {
1370#if DMRPP_ENABLE_THREAD_TIMERS
1371 BES_STOPWATCH_START(dmrpp_3, prolog +
"Serial SuperChunk Processing.");
1373 while(!super_chunks.empty()) {
1374 auto super_chunk = super_chunks.front();
1376 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(
true) << endl );
1379 super_chunk->read_unconstrained_dio();
1383#if DMRPP_ENABLE_THREAD_TIMERS
1384 string timer_name = prolog +
"Concurrent SuperChunk Processing. d_max_transfer_threads: " + to_string(DmrppRequestHandler::d_max_transfer_threads);
1385 BES_STOPWATCH_START(dmrpp_3, timer_name);
1388 read_super_chunks_unconstrained_concurrent_dio(super_chunks,
this);
1395void DmrppArray::read_linked_blocks_constrained(){
1397 unsigned int num_linked_blocks = this->get_total_linked_blocks();
1398 if (num_linked_blocks <2)
1399 throw BESInternalError(
"The number of linked blocks must be >1 to read the data.", __FILE__, __LINE__);
1401 if (this->var()->type() == dods_structure_c)
1402 throw InternalErr(__FILE__, __LINE__,
"We don't handle constrained array of structure now.");
1405 vector<unsigned long long> accumulated_lengths;
1406 accumulated_lengths.resize(num_linked_blocks);
1407 vector<unsigned long long> individual_lengths;
1408 individual_lengths.resize(num_linked_blocks);
1415 individual_lengths[chunk->get_linked_block_index()] = chunk->get_size();
1417 accumulated_lengths[0] = 0;
1418 for (
unsigned int i = 1; i < num_linked_blocks; i++)
1419 accumulated_lengths[i] = individual_lengths[i-1] + accumulated_lengths[i-1];
1423 size_t array_var_type_size = bytes_per_element;
1424 reserve_value_capacity_ll_byte(array_var_type_size*
get_size(
true));
1429 vector<char> values;
1430 values.resize(get_var_chunks_storage_size());
1431 char *target_buffer = values.data();
1434 chunk->read_chunk();
1435 BESDEBUG(dmrpp_3, prolog <<
"linked_block_index: " << chunk->get_linked_block_index() << endl);
1436 BESDEBUG(dmrpp_3, prolog <<
"accumlated_length: " << accumulated_lengths[chunk->get_linked_block_index()] << endl);
1437 const char *source_buffer = chunk->get_rbuf();
1438 memcpy(target_buffer + accumulated_lengths[chunk->get_linked_block_index()], source_buffer,chunk->get_size());
1441 vector<char>uncompressed_values;
1442 bool is_compressed =
false;
1446 if (filters_string.find(
"deflate")!=string::npos) {
1448 char **destp =
nullptr;
1449 char *dest_deflate =
nullptr;
1450 unsigned long long dest_len = get_var_chunks_storage_size();
1451 unsigned long long src_len = get_var_chunks_storage_size();
1452 dest_deflate =
new char[dest_len];
1453 destp = &dest_deflate;
1454 unsigned long long deflated_length = inflate_simple(destp, dest_len, target_buffer, src_len);
1455 BESDEBUG(dmrpp_3, prolog <<
"deflated length: " << deflated_length << endl);
1456 BESDEBUG(dmrpp_3, prolog <<
"array size: " << deflated_length << endl);
1458 uncompressed_values.resize(deflated_length);
1459 memcpy(uncompressed_values.data(),dest_deflate,deflated_length);
1461 uncompressed_values.resize(this->width_ll());
1462 memcpy(uncompressed_values.data(),dest_deflate,this->width_ll());
1464 delete []dest_deflate;
1466 is_compressed =
true;
1470 vector<unsigned long long> array_shape =
get_shape(
false);
1471 unsigned long target_index = 0;
1472 vector<unsigned long long> subset;
1473 char *dest_buf = get_buf();
1475 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, uncompressed_values.data(),dest_buf);
1477 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, values.data(), dest_buf);
1483void DmrppArray::read_chunks_with_linked_blocks() {
1485 reserve_value_capacity_ll(
get_size(
false));
1487 if (chunk->get_multi_linked_blocks()) {
1488 vector<std::pair<unsigned long long, unsigned long long>> cur_chunk_lb_offset_lengths;
1489 chunk->obtain_multi_linked_offset_length(cur_chunk_lb_offset_lengths);
1490 unsigned long long cb_buffer_size =0;
1491 for (
const auto &tp:cur_chunk_lb_offset_lengths)
1492 cb_buffer_size +=tp.second;
1494 chunk->set_size(cb_buffer_size);
1496 if (chunk->get_read_buffer_is_mine())
1497 chunk->set_rbuf_to_size();
1499 throw BESInternalError(
"For multi-linked blocks, the chunk buffer ownship must be true", __FILE__, __LINE__);
1501 char *temp_cb_buffer = chunk->get_rbuf();
1503 for (
const auto &tp:cur_chunk_lb_offset_lengths) {
1506 auto cb_offset = tp.first;
1507 auto cb_length = tp.second;
1512 auto cb_data_url = chunk->get_data_url();
1513 auto cb_position_in_array = chunk->get_position_in_array();
1514 auto cb_byte_order = chunk->get_byte_order();
1517 Chunk* block_chunk =
nullptr;
1518 if (cb_data_url ==
nullptr)
1519 block_chunk =
new Chunk(cb_byte_order,cb_length,cb_offset,cb_position_in_array);
1521 block_chunk =
new Chunk(cb_data_url,cb_byte_order,cb_length,cb_offset,cb_position_in_array);
1523 block_chunk->read_chunk();
1524 const char *block_chunk_buffer = block_chunk->get_rbuf();
1525 if (block_chunk->get_bytes_read() != cb_length) {
1527 oss <<
"Wrong number of bytes read for chunk; read: " << block_chunk->get_bytes_read() <<
", expected: " << cb_length;
1528 throw BESInternalError(oss.str(), __FILE__, __LINE__);
1530 memcpy(temp_cb_buffer,block_chunk_buffer,cb_length);
1531 temp_cb_buffer +=cb_length;
1535 chunk->set_is_read(
true);
1539 chunk->read_chunk();
1542 if (chunk->get_uses_fill_value()) {
1544 throw BESInternalError(
string(
"Encounters filled linked-block chunks for variable ") + name(), __FILE__, __LINE__);
1546 else if (!is_filters_empty())
1550 if (var()->type() == libdap::dods_structure_c)
1551 throw BESInternalError(
string(
"Encounters linked-block chunk structures for variable ") + name(), __FILE__, __LINE__);
1555 vector<unsigned long long> array_shape =
get_shape(
false);
1557 vector<unsigned long long> chunk_origin = chunk->get_position_in_array();
1559 this->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0,chunk_shape,chunk_origin);
1566void DmrppArray::read_chunks_with_linked_blocks_constrained() {
1568 reserve_value_capacity_ll(
get_size(
true));
1569 char *dest_buf = this->get_buf();
1570 vector<unsigned long long> constrained_array_shape = this->
get_shape(
true);
1572 vector<unsigned long long> chunk_element_address = chunk->get_position_in_array();
1573 auto needed = find_needed_chunks(0 , &chunk_element_address, chunk);
1575 if (chunk->get_multi_linked_blocks()) {
1576 vector<std::pair<unsigned long long, unsigned long long>> cur_chunk_lb_offset_lengths;
1577 chunk->obtain_multi_linked_offset_length(cur_chunk_lb_offset_lengths);
1578 unsigned long long cb_buffer_size =0;
1579 for (
const auto &tp:cur_chunk_lb_offset_lengths)
1580 cb_buffer_size +=tp.second;
1582 chunk->set_size(cb_buffer_size);
1584 if (chunk->get_read_buffer_is_mine())
1585 chunk->set_rbuf_to_size();
1587 throw BESInternalError(
"For multi-linked blocks, the chunk buffer ownship must be true", __FILE__, __LINE__);
1589 char *temp_cb_buffer = chunk->get_rbuf();
1591 for (
const auto &tp:cur_chunk_lb_offset_lengths) {
1594 auto cb_offset = tp.first;
1595 auto cb_length = tp.second;
1600 auto cb_data_url = chunk->get_data_url();
1601 auto cb_position_in_array = chunk->get_position_in_array();
1602 auto cb_byte_order = chunk->get_byte_order();
1605 Chunk* block_chunk =
nullptr;
1606 if (cb_data_url ==
nullptr)
1607 block_chunk =
new Chunk(cb_byte_order,cb_length,cb_offset,cb_position_in_array);
1609 block_chunk =
new Chunk(cb_data_url,cb_byte_order,cb_length,cb_offset,cb_position_in_array);
1611 block_chunk->read_chunk();
1612 const char *block_chunk_buffer = block_chunk->get_rbuf();
1613 if (block_chunk->get_bytes_read() != cb_length) {
1615 oss <<
"Wrong number of bytes read for chunk; read: " << block_chunk->get_bytes_read() <<
", expected: " << cb_length;
1616 throw BESInternalError(oss.str(), __FILE__, __LINE__);
1618 memcpy(temp_cb_buffer,block_chunk_buffer,cb_length);
1619 temp_cb_buffer +=cb_length;
1623 chunk->set_is_read(
true);
1627 chunk->read_chunk();
1630 if (chunk->get_uses_fill_value()) {
1632 throw BESInternalError(
string(
"Encounters filled linked-block chunks for variable ") + name(), __FILE__, __LINE__);
1634 else if (!is_filters_empty())
1638 if (var()->type() == libdap::dods_structure_c)
1639 throw BESInternalError(
string(
"Encounters linked-block chunk structures for variable ") + name(), __FILE__, __LINE__);
1642 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
1643 vector<unsigned long long> chunk_source_address(this->dimensions(), 0);
1644 insert_chunk(0, &target_element_address, &chunk_source_address,chunk, constrained_array_shape, dest_buf);
1653unsigned long long DmrppArray::inflate_simple(
char **destp,
unsigned long long dest_len,
char *src,
unsigned long long src_len) {
1659 string msg = prolog +
"ERROR! The number of bytes to inflate is zero.";
1660 BESDEBUG(MODULE, msg << endl);
1661 throw BESInternalError(msg, __FILE__, __LINE__);
1663 if (dest_len == 0) {
1664 string msg = prolog +
"ERROR! The number of bytes to inflate into is zero.";
1665 BESDEBUG(MODULE, msg << endl);
1666 throw BESInternalError(msg, __FILE__, __LINE__);
1668 if (!destp || !*destp) {
1669 string msg = prolog +
"ERROR! The destination buffer is NULL.";
1670 BESDEBUG(MODULE, msg << endl);
1671 throw BESInternalError(msg, __FILE__, __LINE__);
1674 string msg = prolog +
"ERROR! The source buffer is NULL.";
1675 BESDEBUG(MODULE, msg << endl);
1676 throw BESInternalError(msg, __FILE__, __LINE__);
1683 memset(&z_strm, 0,
sizeof(z_strm));
1684 z_strm.next_in = (Bytef *) src;
1685 z_strm.avail_in = src_len;
1686 z_strm.next_out = (Bytef *) (*destp);
1687 z_strm.avail_out = dest_len;
1689 size_t nalloc = dest_len;
1691 char *outbuf = *destp;
1694 if (Z_OK != inflateInit(&z_strm))
1695 throw BESError(
"Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
1702 status = inflate(&z_strm, Z_SYNC_FLUSH);
1705 if (Z_STREAM_END == status)
break;
1708 if (Z_OK != status) {
1709 stringstream err_msg;
1710 err_msg <<
"Failed to inflate data chunk.";
1711 char const *err_msg_cstr = z_strm.msg;
1713 err_msg <<
" zlib message: " << err_msg_cstr;
1714 (void) inflateEnd(&z_strm);
1715 throw BESError(err_msg.str(), BES_INTERNAL_ERROR, __FILE__, __LINE__);
1720 if (0 == z_strm.avail_out) {
1723 size_t outbuf_size = nalloc;
1725 char* new_outbuf =
new char[nalloc];
1726 memcpy((
void*)new_outbuf,(
void*)outbuf,outbuf_size);
1728 outbuf = new_outbuf;
1731 z_strm.next_out = (
unsigned char*) outbuf + z_strm.total_out;
1732 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
1741 (void) inflateEnd(&z_strm);
1743 return z_strm.total_out;
1763unsigned long long DmrppArray::get_chunk_start(
const dimension &thisDim,
unsigned long long chunk_origin)
1766 unsigned long long first_element_offset = 0;
1767 if ((
unsigned long long) (thisDim.start) < chunk_origin) {
1769 if (thisDim.stride != 1) {
1771 first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
1773 if (first_element_offset != 0) {
1775 first_element_offset = thisDim.stride - first_element_offset;
1780 first_element_offset = thisDim.start - chunk_origin;
1783 return first_element_offset;
1808DmrppArray::find_needed_chunks(
unsigned int dim, vector<unsigned long long> *target_element_address, shared_ptr<Chunk> chunk)
1810 BESDEBUG(dmrpp_3, prolog <<
" BEGIN, dim: " << dim << endl);
1816 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1818 dimension thisDim = this->get_dimension(dim);
1821 if ((
unsigned long long) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) ||
1822 (
unsigned long long) thisDim.stop < chunk_origin[dim]) {
1827 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1830 if (chunk_start > chunk_shape[dim]) {
1835 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1836 if ((
unsigned long long) thisDim.stop < end_element) {
1837 end_element = thisDim.stop;
1840 unsigned long long chunk_end = end_element - chunk_origin[dim];
1842 unsigned int last_dim = chunk_shape.size() - 1;
1843 if (dim == last_dim) {
1844 BESDEBUG(dmrpp_3, prolog <<
" END, This is the last_dim. chunk: " << chunk->to_string() << endl);
1849 for (
unsigned long long chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1850 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1853 auto needed = find_needed_chunks(dim + 1, target_element_address, chunk);
1855 BESDEBUG(dmrpp_3, prolog <<
" END, Found chunk: " << needed->to_string() << endl);
1861 BESDEBUG(dmrpp_3, prolog <<
" END, dim: " << dim << endl);
1885void DmrppArray::insert_chunk(
1887 vector<unsigned long long> *target_element_address,
1888 vector<unsigned long long> *chunk_element_address,
1889 shared_ptr<Chunk> chunk,
1890 const vector<unsigned long long> &constrained_array_shape,
char *target_buffer){
1896 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1898 dimension thisDim = this->get_dimension(dim);
1901 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1904 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1905 if ((
unsigned long long) thisDim.stop < end_element) {
1906 end_element = thisDim.stop;
1909 unsigned long long chunk_end = end_element - chunk_origin[dim];
1911 unsigned int last_dim = chunk_shape.size() - 1;
1912 if (dim == last_dim) {
1913 char *source_buffer = chunk->get_rbuf();
1914 unsigned int elem_width = bytes_per_element;
1916 if (thisDim.stride == 1) {
1918 unsigned long long start_element = chunk_origin[dim] + chunk_start;
1920 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1923 (*target_element_address)[dim] = (start_element - thisDim.start);
1925 (*chunk_element_address)[dim] = chunk_start;
1928 unsigned long long target_char_start_index =
1929 get_index(*target_element_address, constrained_array_shape) * elem_width;
1930 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1932 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index,
1933 chunk_constrained_inner_dim_bytes);
1937 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1939 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1942 (*chunk_element_address)[dim] = chunk_index;
1945 unsigned long long target_char_start_index =
1946 get_index(*target_element_address, constrained_array_shape) * elem_width;
1947 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1949 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1955 for (
unsigned long long chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1956 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1957 (*chunk_element_address)[dim] = chunk_index;
1960 insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape, target_buffer);
1971void DmrppArray::read_chunks()
1974 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
1978 unsigned long long sc_count=0;
1980 sc_id << name() <<
"-" << sc_count++;
1981 queue<shared_ptr<SuperChunk>> super_chunks;
1982 auto current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this)) ;
1983 super_chunks.push(current_super_chunk);
1990 bool found_needed_chunks =
false;
1992 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
1993 auto needed = find_needed_chunks(0 , &target_element_address, chunk);
1995 found_needed_chunks =
true;
1996 bool added = current_super_chunk->add_chunk(chunk);
1998 sc_id.str(std::string());
1999 sc_id << name() <<
"-" << sc_count++;
2000 current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this));
2001 super_chunks.push(current_super_chunk);
2002 if(!current_super_chunk->add_chunk(chunk)){
2004 msg << prolog <<
"Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
2005 throw BESInternalError(msg.str(), __FILE__, __LINE__);
2010 BESDEBUG(dmrpp_3, prolog <<
"found_needed_chunks: " << (found_needed_chunks?
"true":
"false") << endl);
2011 if(!found_needed_chunks){
2012 throw BESInternalError(
"ERROR - Failed to locate any chunks that correspond to the requested data.", __FILE__, __LINE__);
2015 reserve_value_capacity_ll(
get_size(
true));
2016 if (is_readable_struct)
2017 d_structure_array_buf.resize(
get_size(
true)*bytes_per_element);
2019 BESDEBUG(dmrpp_3, prolog <<
"d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ?
"true" :
"false") << endl);
2020 BESDEBUG(dmrpp_3, prolog <<
"d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
2021 BESDEBUG(dmrpp_3, prolog <<
"d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ?
"true" :
"false") << endl);
2022 BESDEBUG(dmrpp_3, prolog <<
"d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
2023 BESDEBUG(dmrpp_3, prolog <<
"SuperChunks.size(): " << super_chunks.size() << endl);
2025 if (!DmrppRequestHandler::d_use_transfer_threads) {
2028#if DMRPP_ENABLE_THREAD_TIMERS
2029 BES_STOPWATCH_START(dmrpp_3, prolog +
"Serial SuperChunk Processing.");
2031 while (!super_chunks.empty()) {
2032 auto super_chunk = super_chunks.front();
2034 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(
true) << endl );
2035 super_chunk->read();
2039#if DMRPP_ENABLE_THREAD_TIMERS
2040 string timer_name = prolog +
"Concurrent SuperChunk Processing. d_max_transfer_threads: " + to_string(DmrppRequestHandler::d_max_transfer_threads);
2041 BES_STOPWATCH_START(dmrpp_3, timer_name);
2043 read_super_chunks_concurrent(super_chunks,
this);
2045 if (is_readable_struct)
2046 read_array_of_structure(d_structure_array_buf);
2050void DmrppArray::read_buffer_chunks()
2053 BESDEBUG(dmrpp_3, prolog <<
"coming to read_buffer_chunks() " << endl);
2055 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
2058 unsigned long long max_buffer_end_position = 0;
2063 bool first_needed_chunk =
true;
2064 unsigned long long first_needed_chunk_offset = 0;
2065 unsigned long long first_needed_chunk_size = 0;
2067 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
2068 auto needed = find_needed_chunks(0 , &target_element_address, chunk);
2070 if (first_needed_chunk ==
true) {
2071 first_needed_chunk_offset = chunk->get_offset();
2072 first_needed_chunk_size = chunk->get_size();
2073 first_needed_chunk =
false;
2075 unsigned long long temp_max_buffer_end_position= chunk->get_size() + chunk->get_offset();
2076 if(max_buffer_end_position < temp_max_buffer_end_position)
2077 max_buffer_end_position = temp_max_buffer_end_position;
2080 if (max_buffer_end_position == 0)
2081 throw BESInternalError(
"ERROR - Failed to locate any chunks that correspond to the requested data.", __FILE__, __LINE__);
2087 unsigned long long buffer_size = bytes_per_element * this->
get_size(
false);
2090 if (buffer_size < first_needed_chunk_size)
2091 buffer_size = first_needed_chunk_size;
2094 unsigned long long buffer_end_position = min((buffer_size + first_needed_chunk_offset),max_buffer_end_position);
2096 unsigned long long sc_count=0;
2099 sc_id << name() <<
"-" << sc_count;
2100 queue<shared_ptr<SuperChunk>> super_chunks;
2101 auto current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this)) ;
2104 current_super_chunk->set_non_contiguous_chunk_flag(
true);
2105 super_chunks.push(current_super_chunk);
2109 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
2110 auto needed = find_needed_chunks(0 , &target_element_address, chunk);
2112 bool added = current_super_chunk->add_chunk_non_contiguous(chunk,buffer_end_position);
2114 sc_id.str(std::string());
2116 sc_id << name() <<
"-" << sc_count;
2117 current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this));
2120 current_super_chunk->set_non_contiguous_chunk_flag(
true);
2121 super_chunks.push(current_super_chunk);
2124 if (buffer_size < chunk->
get_size())
2125 buffer_size = chunk->get_size();
2126 buffer_end_position = min((buffer_size + chunk->get_offset()),max_buffer_end_position);
2127 if(!current_super_chunk->add_chunk_non_contiguous(chunk,buffer_end_position)){
2129 msg << prolog <<
"Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
2130 throw BESInternalError(msg.str(), __FILE__, __LINE__);
2136 reserve_value_capacity_ll(
get_size(
true));
2138 while(!super_chunks.empty()) {
2139 auto super_chunk = super_chunks.front();
2141 super_chunk->read();
2148#ifdef USE_READ_SERIAL
2170void DmrppArray::insert_chunk_serial(
unsigned int dim, vector<unsigned long long> *target_element_address, vector<unsigned long long> *chunk_element_address,
2173 BESDEBUG(
"dmrpp", __func__ <<
" dim: "<< dim <<
" BEGIN "<< endl);
2179 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
2181 dimension thisDim = this->get_dimension(dim);
2184 if ((
unsigned long long) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (
unsigned long long) thisDim.stop < chunk_origin[dim]) {
2189 unsigned long long first_element_offset = get_chunk_start(dim, chunk_origin);
2192 if (first_element_offset > chunk_shape[dim]) {
2197 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
2198 if ((
unsigned long long) thisDim.stop < end_element) {
2199 end_element = thisDim.stop;
2202 unsigned long long chunk_start = first_element_offset;
2203 unsigned long long chunk_end = end_element - chunk_origin[dim];
2204 vector<unsigned long long> constrained_array_shape =
get_shape(
true);
2206 unsigned int last_dim = chunk_shape.size() - 1;
2207 if (dim == last_dim) {
2209 chunk->read_chunk();
2213 char *source_buffer = chunk->get_rbuf();
2214 char *target_buffer = get_buf();
2215 unsigned int elem_width = prototype()->width();
2217 if (thisDim.stride == 1) {
2219 unsigned long long start_element = chunk_origin[dim] + first_element_offset;
2221 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
2224 (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
2226 (*chunk_element_address)[dim] = first_element_offset;
2228 unsigned long long target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
2229 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
2231 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
2235 for (
unsigned long long chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
2237 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
2240 (*chunk_element_address)[dim] = chunk_index;
2242 unsigned long long target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
2243 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
2245 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
2251 for (
unsigned long long chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
2252 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
2253 (*chunk_element_address)[dim] = chunk_index;
2256 insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
2261void DmrppArray::read_chunks_serial()
2263 BESDEBUG(
"dmrpp", __func__ <<
" for variable '" << name() <<
"' - BEGIN" << endl);
2265 vector<Chunk> &chunk_refs = get_chunk_vec();
2266 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
2269 reserve_value_capacity_ll(
get_size(
true));
2277 for (
unsigned long long i = 0; i < chunk_refs.size(); i++) {
2278 Chunk &chunk = chunk_refs[i];
2280 vector<unsigned long long> chunk_source_address(dimensions(), 0);
2281 vector<unsigned long long> target_element_address = chunk.get_position_in_array();
2284 insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
2289 BESDEBUG(
"dmrpp",
"DmrppArray::"<< __func__ <<
"() for " << name() <<
" END"<< endl);
2294DmrppArray::set_send_p(
bool state)
2299 Array::set_send_p(state);
2309void DmrppArray::read_contiguous_string()
2311 BES_STOPWATCH_START(MODULE, prolog +
"Timing array name: "+name());
2317 the_one_chunk->read_chunk();
2326 vector < string > ss;
2327 string s(
reinterpret_cast<char *
>(the_one_chunk->get_rbuf()));
2329 set_value(ss, ss.size());
2334string DmrppArray::ingest_fixed_length_string(
const char *buf,
const unsigned long long fixed_str_len, string_pad_type pad_type)
2337 unsigned long long str_len = 0;
2342 while( str_len < fixed_str_len && buf[str_len]!=0 ){
2345 BESDEBUG(MODULE, prolog << DmrppArray::pad_type_to_str(pad_type) <<
" scheme. str_len: " << str_len << endl);
2346 value = string(buf,str_len);
2351 str_len = fixed_str_len;
2352 while( str_len>0 && (buf[str_len-1]==
' ' || buf[str_len-1]==0)){
2355 BESDEBUG(MODULE, prolog << DmrppArray::pad_type_to_str(pad_type) <<
" scheme. str_len: " << str_len << endl);
2356 value = string(buf,str_len);
2362 BESDEBUG(MODULE, prolog <<
"pad_type: NOT_SET" << endl);
2365 BESDEBUG(MODULE, prolog <<
"value: '" << value <<
"'" << endl);
2369string dims_to_string(
const vector<unsigned long long> dims){
2371 for(
auto dim: dims){
2372 ss <<
"[" << dim <<
"]";
2377std::string array_to_str(
DmrppArray a,
const string &banner) {
2379 msg << endl <<
"# - - - - - - - - - - - - - - - - - - -" << endl;
2380 msg <<
"# " << banner << endl;
2381 msg <<
"# " << a.prototype()->type_name() <<
" " << a.name();
2383 for(
auto dim=a.dim_begin(); dim < a.dim_end(); dim++){
2385 if(!dim->name.empty()){
2386 msg << dim->name <<
"=";
2388 msg << dim->size <<
"]";
2391 msg <<
"# " << endl;
2392 msg <<
"# a->get_size(true): " << a.get_size(
true) <<
" (The total number of elements in the array instance)" << endl;
2393 msg <<
"# a->width(true): " << a.width(
true) <<
" (The number of bytes needed to hold the entire array - prot.width * num_elements)" << endl;
2394 msg <<
"# a->length(true): " << a.length() <<
" (The number of elements in the vector)" << endl;
2395 msg <<
"# a->prototype()->width(true): " << a.prototype()->width() <<
" (Width of the template variable)" << endl;
2396 msg <<
"# a->dimensions(true): " << a.dimensions(
true) << endl;
2397 msg <<
"# a->chunk_dimension_sizes" << dims_to_string(a.get_chunk_dimension_sizes()) << endl;
2398 msg <<
"# a->length(): " << a.length() << endl;
2402#define HEX( x ) std::setw(2) << std::setfill('0') << std::hex << (int)( x )
2404std::string show_string_buff(
char *buff,
unsigned long long num_bytes,
unsigned long long fixed_string_len) {
2406 for (
unsigned long long i = 0; i < num_bytes; i += fixed_string_len) {
2407 char *str_ptr = buff + i;
2408 if (i) { ss <<
", "; }
2410 for (
unsigned long long j = 0; j < fixed_string_len; j++) {
2411 char this_char = *(str_ptr + j);
2412 if (j) { ss <<
", "; }
2413 if (this_char > 32 && this_char < 126) {
2416 ss <<
"0x" << std::hex << HEX(this_char) << std::dec;
2437 var_type = array.prototype()->type();
2439 auto *byte_array_proxy =
dynamic_cast<DmrppArray *
>(array.ptr_duplicate());
2440 if(!byte_array_proxy){
2441 throw BESInternalFatalError(prolog +
"Server encountered internal state ambiguity. "
2442 "Expected valid DmrppArray pointer. Exiting.",
2443 __FILE__, __LINE__);
2446 unsigned long long item_size=0;
2447 if ((var_type == dods_str_c || var_type == dods_url_c)) {
2448 if (array.is_flsa()) {
2449 BESDEBUG(MODULE, prolog <<
"Processing Fixed Length String Array data." << endl);
2450 item_size = byte_array_proxy->get_fixed_string_length();
2451 BESDEBUG(MODULE, prolog <<
"get_fixed_string_length(): " << item_size << endl);
2459 item_size = byte_array_proxy->prototype()->width();
2462 unsigned long long total_bytes = byte_array_proxy->length() * item_size;
2463 BESDEBUG(MODULE, prolog <<
"total_bytes: " << total_bytes << endl);
2465 BESDEBUG(MODULE, prolog << array_to_str(*byte_array_proxy,
"Source DmrppArray") );
2468 auto *tmp_proto =
new libdap::Byte(byte_array_proxy->prototype()->name());
2469 byte_array_proxy->set_prototype(tmp_proto);
2472 byte_array_proxy->set_bytes_per_element(byte_array_proxy->prototype()->width());
2473 tmp_proto->set_parent(byte_array_proxy);
2476 auto cdim_sizes = byte_array_proxy->get_chunk_dimension_sizes();
2477 if(!cdim_sizes.empty()) {
2478 BESDEBUG(MODULE, prolog <<
"original chunk_dimension_sizes.back(): " << dims_to_string(cdim_sizes) << endl);
2480 auto new_last_cdim_size = cdim_sizes.back() * item_size;
2481 cdim_sizes.pop_back();
2482 cdim_sizes.emplace_back(new_last_cdim_size);
2483 BESDEBUG(MODULE, prolog <<
"New chunk_dimension_sizes" << dims_to_string(cdim_sizes) << endl);
2485 byte_array_proxy->set_chunk_dimension_sizes(cdim_sizes);
2486 BESDEBUG(MODULE, prolog <<
"Updated chunk_dimension_sizes"
2487 << dims_to_string(byte_array_proxy->get_chunk_dimension_sizes()) << endl);
2491 unsigned long long chunk_index = 0;
2492 for(
const auto &chunk: byte_array_proxy->get_immutable_chunks()){
2493 auto cpia = chunk->get_position_in_array();
2494 if (!cpia.empty()) {
2495 auto new_position = cpia.back() * item_size;
2497 cpia.emplace_back(new_position);
2499 prolog <<
"Chunk[" << chunk_index <<
"] new chunk_position_in_array" << dims_to_string(cpia)
2501 chunk->set_position_in_array(cpia);
2502 BESDEBUG(MODULE, prolog <<
"Chunk[" << chunk_index <<
"] UPDATED chunk_position_in_array"
2503 << dims_to_string(chunk->get_position_in_array()) << endl);
2508 auto t_last_dim = byte_array_proxy->dim_end() - 1;
2510 BESDEBUG(MODULE, prolog <<
"Orig last_dim->size: " << t_last_dim->size << endl);
2512 t_last_dim->size = t_last_dim->size * item_size;
2513 BESDEBUG(MODULE, prolog <<
"New last_dim->size: " << t_last_dim->size << endl);
2515 t_last_dim->c_size = t_last_dim->size;
2516 BESDEBUG(MODULE, prolog <<
"New last_dim->c_size: " << t_last_dim->c_size << endl);
2518 t_last_dim->start = 0;
2519 BESDEBUG(MODULE, prolog <<
"New last_dim->start: " << t_last_dim->start << endl);
2521 t_last_dim->stop = t_last_dim->size - 1;
2522 BESDEBUG(MODULE, prolog <<
"New last_dim->stop: " << t_last_dim->stop << endl);
2524 t_last_dim->stride = 1;
2525 BESDEBUG(MODULE, prolog <<
"New last_dim->stride: " << t_last_dim->stride << endl);
2527 byte_array_proxy->set_length(total_bytes);
2528 t_last_dim = byte_array_proxy->dim_end() - 1;
2529 BESDEBUG(MODULE, prolog <<
"Updated last_dim->size: " << t_last_dim->size << endl);
2531 BESDEBUG(MODULE, prolog << array_to_str(*byte_array_proxy,
"New DmrppArray of Byte") );
2533 return byte_array_proxy;
2545 if (flsa.is_flsa()) {
2546 BESDEBUG(MODULE, prolog <<
"Ingesting Fixed Length String Array Data." << endl);
2547 auto fstr_len = flsa.get_fixed_string_length();
2548 BESDEBUG(MODULE, prolog <<
"flsa.get_fixed_string_length(): " << fstr_len << endl);
2550 auto pad_type = flsa.get_fixed_length_string_pad();
2551 BESDEBUG(MODULE, prolog <<
"flsa.get_fixed_length_string_pad_str(): " << flsa.get_fixed_length_string_pad_str() << endl);
2553 auto buff = data.get_buf();
2554 BESDEBUG(MODULE, prolog <<
"data.get_buf(): " << (
void *) buff << endl);
2555 if(buff ==
nullptr){
2556 throw BESInternalError(
"Failed to acquire byte buffer from which to read string array data.",__FILE__,__LINE__);
2558 unsigned long long num_bytes = data.length();
2559 BESDEBUG(MODULE, prolog <<
"Buffer contains: " << show_string_buff(buff, num_bytes, fstr_len) << endl);
2562 char *end = buff + num_bytes;
2563 while (begin < end) {
2564 string value = DmrppArray::ingest_fixed_length_string(begin, fstr_len, pad_type);
2565 flsa.get_str().push_back(value);
2566 BESDEBUG(MODULE, prolog <<
"Added String: '" << value <<
"'" << endl);
2588 Type var_type = this->var()->type();
2600 if (read_p())
return true;
2606 this->set_dio_flag(
false);
2610 if (this->use_direct_io_opt()) {
2612 this->set_dio_flag();
2613 auto chunks = this->get_chunks();
2616 for (
unsigned int i = 0; i<chunks.size();i++) {
2618 chunks[i]->set_direct_io_offset(chunks[i-1]->get_direct_io_offset()+chunks[i-1]->
get_size());
2619 BESDEBUG(MODULE, prolog <<
"direct_io_offset is: " << chunks[i]->get_direct_io_offset() << endl);
2623 Array::var_storage_info dmrpp_vs_info;
2627 for (
const auto &def_lev:this->get_deflate_levels())
2628 dmrpp_vs_info.deflate_levels.push_back(def_lev);
2632 dmrpp_vs_info.chunk_dims.push_back(chunk_dim);
2636 for (
const auto &chunk:im_chunks) {
2637 Array::var_chunk_info_t vci_t;
2638 vci_t.filter_mask = chunk->get_filter_mask();
2639 vci_t.chunk_direct_io_offset = chunk->get_direct_io_offset();
2640 vci_t.chunk_buffer_size = chunk->get_size();
2642 for (
const auto &chunk_coord:chunk->get_position_in_array())
2643 vci_t.chunk_coords.push_back(chunk_coord);
2644 dmrpp_vs_info.var_chunk_info.push_back(vci_t);
2646 this->set_var_storage_info(dmrpp_vs_info);
2650 if (this->get_dio_flag()) {
2651 BESDEBUG(MODULE, prolog <<
"dio is turned on" << endl);
2653 Array::var_storage_info dmrpp_vs_info = this->get_var_storage_info();
2655 auto chunks = this->get_chunks();
2658 for (
unsigned int i = 0; i<chunks.size();i++) {
2660 chunks[i]->set_direct_io_offset(chunks[i-1]->get_direct_io_offset()+chunks[i-1]->
get_size());
2661 BESDEBUG(MODULE, prolog <<
"direct_io_offset is: " << chunks[i]->get_direct_io_offset() << endl);
2667 for (
const auto &chunk:im_chunks) {
2668 Array::var_chunk_info_t vci_t;
2669 vci_t.filter_mask = chunk->get_filter_mask();
2670 vci_t.chunk_direct_io_offset = chunk->get_direct_io_offset();
2671 vci_t.chunk_buffer_size = chunk->get_size();
2673 for (
const auto &chunk_coord:chunk->get_position_in_array())
2674 vci_t.chunk_coords.push_back(chunk_coord);
2675 dmrpp_vs_info.var_chunk_info.push_back(vci_t);
2677 this->set_var_storage_info(dmrpp_vs_info);
2678 bytes_per_element = this->var()->width_ll();
2681 is_readable_struct = check_struct_handling();
2682 if (is_readable_struct) {
2683 vector<unsigned int> s_off = this->get_struct_offsets();
2685 bytes_per_element = this->var()->width_ll();
2687 bytes_per_element = s_off.back();
2690 bytes_per_element = this->var()->width_ll();
2694 DmrppArray *array_to_read =
this;
2695 if ((var_type == dods_str_c || var_type == dods_url_c)) {
2698 array_to_read = get_as_byte_array(*
this);
2703 string msg = array_to_str(*array_to_read,
"Reading Data From DmrppArray");
2704 BESDEBUG(MODULE, prolog << msg << endl);
2708 BESDEBUG(MODULE, prolog <<
"Reading data from a single contiguous chunk." << endl);
2710 if (this->get_dio_flag())
2711 array_to_read->read_one_chunk_dio();
2713 array_to_read->read_contiguous();
2716 if (get_using_linked_block()) {
2717 BESDEBUG(MODULE, prolog <<
"Reading data linked blocks" << endl);
2719 array_to_read->read_linked_blocks();
2722 array_to_read->read_linked_blocks_constrained();
2725 __FILE__, __LINE__);
2732 array_to_read->read_chunks_with_linked_blocks();
2735 array_to_read->read_chunks_with_linked_blocks_constrained();
2738 __FILE__, __LINE__);
2744 bool buffer_chunk_case = array_to_read->use_buffer_chunk();
2747 BESDEBUG(MODULE, prolog <<
"Reading data from chunks, unconstrained." << endl);
2749 if (this->get_dio_flag())
2750 array_to_read->read_chunks_dio_unconstrained();
2752 else if(buffer_chunk_case)
2753 array_to_read->read_buffer_chunks_unconstrained();
2755 array_to_read->read_chunks_unconstrained();
2757 BESDEBUG(MODULE, prolog <<
"Reading data from chunks." << endl);
2760 if (buffer_chunk_case)
2761 array_to_read->read_buffer_chunks();
2763 array_to_read->read_chunks();
2768 if ((var_type == dods_str_c || var_type == dods_url_c)) {
2769 BESDEBUG(MODULE, prolog <<
"Processing Array of Strings." << endl);
2770 if(array_to_read ==
this){
2772 "Expected byte transport array. Exiting.",
2773 __FILE__, __LINE__);
2777 ingest_flsa_data(*
this, *array_to_read);
2780 BESDEBUG(MODULE, prolog <<
"Processing Variable Length String Array data. SKIPPING..." << endl);
2782 ingest_vlsa_data(*
this, *array_to_read);
2784 throw BESInternalError(
"Arrays of variable length strings are not yet supported.",__FILE__,__LINE__);
2788 if(array_to_read && array_to_read !=
this) {
2789 delete array_to_read;
2790 array_to_read =
nullptr;
2795 if(array_to_read && array_to_read !=
this) {
2796 delete array_to_read;
2797 array_to_read =
nullptr;
2804 int64_t num = this->length_ll();
2808 case dods_uint16_c: {
2809 auto *local =
reinterpret_cast<dods_uint16*
>(this->get_buf());
2811 *local = bswap_16(*local);
2817 case dods_uint32_c: {
2818 auto *local =
reinterpret_cast<dods_uint32*
>(this->get_buf());;
2820 *local = bswap_32(*local);
2826 case dods_uint64_c: {
2827 auto *local =
reinterpret_cast<dods_uint64*
>(this->get_buf());;
2829 *local = bswap_64(*local);
2834 case dods_float32_c: {
2835 swap_float32(this->get_buf(), num);
2838 case dods_float64_c: {
2839 swap_float64(this->get_buf(), num);
2849unsigned long long DmrppArray::set_fixed_string_length(
const string &length_str)
2852 d_fixed_str_length = stoull(length_str);
2854 catch(std::invalid_argument e){
2855 stringstream err_msg;
2856 err_msg <<
"The value of the length string could not be parsed. Message: " << e.what();
2859 return d_fixed_str_length;
2863std::string pad_to_str(string_pad_type pad)
2868 pad_str =
"null_term";
2871 pad_str =
"null_pad";
2874 pad_str =
"space_pad";
2877 pad_str =
"not_set";
2880 throw BESInternalError(
"ERROR: Unrecognized HDF5 String Padding Scheme!",__FILE__,__LINE__);
2887std::string DmrppArray::pad_type_to_str(string_pad_type pad)
2889 return pad_to_str(pad);
2892string_pad_type str_to_pad_type(
const string &pad_str){
2893 string_pad_type pad_type(not_set);
2894 if(pad_str==
"null_pad"){
2895 pad_type = null_pad;
2897 else if(pad_str==
"null_term") {
2898 pad_type = null_term;
2900 else if (pad_str ==
"space_pad"){
2901 pad_type = space_pad;
2903 else if (pad_str ==
"not_set"){
2907 stringstream err_msg;
2908 err_msg <<
"The value of the pad string was not recognized. pad_str: " << pad_str;
2909 throw BESInternalError(err_msg.str(),__FILE__,__LINE__);
2915string_pad_type DmrppArray::set_fixed_length_string_pad_type(
const string &pad_str)
2917 d_fixed_length_string_pad_type = str_to_pad_type(pad_str);
2918 return d_fixed_length_string_pad_type;
2922ons::ons(
const std::string &ons_pair_str) {
2923 const string colon(
":");
2924 size_t colon_pos = ons_pair_str.find(colon);
2926 string offset_str = ons_pair_str.substr(0, colon_pos);
2927 offset = stoull(offset_str);
2929 string size_str = ons_pair_str.substr(colon_pos + 1);
2930 size = stoull(size_str);
2934void DmrppArray::set_ons_string(
const std::string &ons_str)
2936 d_vlen_ons_str = ons_str;
2939void DmrppArray::set_ons_string(
const vector<ons> &ons_pairs)
2941 stringstream ons_ss;
2943 for(
auto &ons_pair: ons_pairs){
2947 ons_ss << ons_pair.offset <<
":" << ons_pair.size;
2949 d_vlen_ons_str = ons_ss.str();
2962 const string comma(
",");
2966 while ((next = d_vlen_ons_str.find(comma, last)) != string::npos) {
2967 string ona_pair_str = d_vlen_ons_str.substr(last, next-last);
2968 ons ons_pair(ona_pair_str);
2969 ons_pairs.push_back(ons_pair);
2974 cout << d_vlen_ons_str.substr(last) << endl;
2983void flsa_xml_element(XMLWriter &xml,
DmrppArray &a){
2985 string element_name(
"dmrpp:FixedLengthStringArray");
2986 string str_len_attr_name(
"string_length");
2987 string pad_attr_name(
"pad");
2989 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar *) element_name.c_str()) < 0)
2990 throw InternalErr(__FILE__, __LINE__,
"Could not write " + element_name +
" element");
2992 stringstream strlen_str;
2993 strlen_str << a.get_fixed_string_length();
2994 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *) str_len_attr_name.c_str(),
2995 (
const xmlChar *) strlen_str.str().c_str()) < 0)
2996 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for 'string_length'");
2998 if (a.get_fixed_length_string_pad() == not_set) {
2999 throw BESInternalError(
"ERROR: Padding Scheme Has Not Been Set!", __FILE__, __LINE__);
3001 string pad_str = a.pad_type_to_str(a.get_fixed_length_string_pad());
3002 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"pad",
3003 (
const xmlChar *) pad_str.c_str()) < 0)
3004 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for 'pad'");
3006 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
3007 throw InternalErr(__FILE__, __LINE__,
"Could not end " + a.type_name() +
" element");
3016void compact_data_xml_element(XMLWriter &xml,
DmrppArray &a) {
3017 switch (a.var()->type()) {
3031 case dods_float32_c:
3032 case dods_float64_c: {
3033 uint8_t *values =
nullptr;
3035 auto size = a.buf2val(
reinterpret_cast<void **
>(&values));
3036 string encoded = base64::Base64::encode(values, size);
3049 auto sb = a.compact_str_buffer();
3051 uint8_t *values =
nullptr;
3053 auto size = a.buf2val(
reinterpret_cast<void **
>(&values));
3054 string encoded = base64::Base64::encode(values, size);
3067 throw InternalErr(__FILE__, __LINE__,
"Vector::val2buf: bad type");
3071bool obtain_compress_encode_data(
size_t num_elms,
string &encoded_str,
const Bytef*source_data,
size_t source_data_size,
string &err_msg) {
3073 if (num_elms == 1) {
3074 encoded_str = base64::Base64::encode(source_data,(
int)source_data_size);
3077 auto ssize = (uLong)source_data_size;
3078 auto csize = (uLongf)ssize*2;
3079 vector<Bytef> compressed_src;
3080 compressed_src.resize(source_data_size*2);
3082 int retval = compress(compressed_src.data(), &csize, source_data, ssize);
3084 err_msg =
"Fail to compress the data";
3087 encoded_str = base64::Base64::encode(compressed_src.data(),(
int)csize);
3094void missing_data_xml_element(
const XMLWriter &xml,
DmrppArray *da) {
3095 switch (da->var()->type()) {
3106 case dods_float32_c:
3107 case dods_float64_c: {
3108 auto source_data_src = (
const Bytef *) (da->get_buf());
3110 size_t source_data_size = da->width_ll();
3113 if (
false == obtain_compress_encode_data(da->get_size(
false),encoded_str,source_data_src,source_data_size,err_msg)) {
3114 err_msg =
"variable name: " + da->name() +
" "+err_msg;
3115 throw InternalErr(__FILE__, __LINE__, err_msg);
3123 throw InternalErr(__FILE__, __LINE__,
"Vector::val2buf: bad type");
3127void special_structure_array_data_xml_element(
const XMLWriter &xml,
DmrppArray *da) {
3129 if (da->var()->type() == dods_structure_c) {
3130 vector<char> struct_array_str_buf = da->get_structure_array_str_buffer();
3131 string final_encoded_str = base64::Base64::encode((uint8_t*)(struct_array_str_buf.data()),struct_array_str_buf.size());
3143static void print_dap4_dimension_helper(
const XMLWriter &xml,
bool constrained,
const Array::dimension &d) {
3148 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar *)
"Dim") < 0)
3149 throw InternalErr(__FILE__, __LINE__,
"Could not write Dim element");
3151 string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
3154 if (!constrained && !name.empty()) {
3155 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name",
3156 (
const xmlChar *) name.c_str()) < 0)
3157 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
3159 else if (d.use_sdim_for_slice) {
3160 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name",
3161 (
const xmlChar *) name.c_str()) < 0)
3162 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
3165 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"size",
3166 (
const xmlChar *) (constrained ? to_string(d.c_size) : to_string(
3167 d.size)).c_str()) < 0)
3168 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
3171 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
3172 throw InternalErr(__FILE__, __LINE__,
"Could not end Dim element");
3199 if (constrained && !send_p())
return;
3201 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar *) var()->type_name().c_str()) < 0)
3202 throw InternalErr(__FILE__, __LINE__,
"Could not write " + type_name() +
" element");
3204 if (!name().empty()) {
3205 BESDEBUG(MODULE, prolog <<
"variable full path: " << FQN() << endl);
3206 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name", (
const xmlChar *) name().c_str()) <
3208 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
3212 if (var()->type() == dods_enum_c) {
3213 D4Enum *e =
static_cast<D4Enum *
>(var());
3214 string path = e->enumeration()->name();
3215 if (e->enumeration()->parent()) {
3217 path =
static_cast<D4Group *
>(e->enumeration()->parent()->parent())->FQN() + path;
3219 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"enum", (
const xmlChar *) path.c_str()) < 0)
3220 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for enum");
3223 if (prototype()->is_constructor_type()) {
3224 Constructor &c =
static_cast<Constructor &
>(*prototype());
3225 for_each(c.var_begin(), c.var_end(), [&xml, constrained](BaseType *btp) { btp->print_dap4(xml, constrained); });
3229 for_each(dim_begin(), dim_end(), [&xml, constrained](
const Array::dimension &d) {
3230 print_dap4_dimension_helper(xml, constrained, d);
3233 attributes()->print_dap4(xml);
3235 for_each(maps()->map_begin(), maps()->map_end(), [&xml](D4Map *m) { m->print_dap4(xml); });
3252 compact_data_xml_element(xml, *
this);
3256 missing_data_xml_element(xml,
this);
3261 special_structure_array_data_xml_element(xml,
this);
3267 if (var()->type() == dods_str_c) {
3271 flsa_xml_element(xml, *
this);
3275 vlsa::write(xml, *
this);
3278 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
3279 throw InternalErr(__FILE__, __LINE__,
"Could not end " + type_name() +
" element");
3282void DmrppArray::dump(ostream &strm)
const
3284 strm << BESIndent::LMarg <<
"DmrppArray::" << __func__ <<
"(" << (
void *)
this <<
")" << endl;
3285 BESIndent::Indent();
3286 DmrppCommon::dump(strm);
3288 strm << BESIndent::LMarg <<
"value: " <<
"----" << endl;
3289 BESIndent::UnIndent();
3292unsigned int DmrppArray::buf2val(
void **val){
3297 if ( var()->type()==dods_str_c || var()->type()==dods_url_c ) {
3299 auto str_buf = compact_str_buffer();
3300 auto buf_size = str_buf.size();
3301 if (str_buf.empty()) {
3303 msg << prolog <<
"Logic error: called when cardinal type data buffer was empty!";
3304 throw BESInternalError(msg.str(), __FILE__, __LINE__);
3307 *val =
new char[buf_size];
3309 memcpy(*val, str_buf.data(), buf_size);
3312 return (
unsigned int)Vector::buf2val_ll(val);
3317bool DmrppArray::use_direct_io_opt() {
3319 bool ret_value =
false;
3320 bool is_integer_le_float =
false;
3322 if (DmrppRequestHandler::is_netcdf4_enhanced_response && this->is_filters_empty() ==
false) {
3323 Type t = this->var()->type();
3324 if (libdap::is_simple_type(t) && t != dods_str_c && t != dods_url_c && t!= dods_enum_c && t!=dods_opaque_c) {
3325 is_integer_le_float =
true;
3326 if(is_integer_type(t) && this->get_byte_order() ==
"BE")
3327 is_integer_le_float =
false;
3331 bool no_constraint =
false;
3334 if (is_integer_le_float) {
3335 no_constraint =
true;
3337 no_constraint =
false;
3340 bool has_deflate_filter =
false;
3343 if (no_constraint) {
3345 if (filters_string.find(
"deflate")!=string::npos)
3346 has_deflate_filter =
true;
3349 bool is_data_all_fvalues =
false;
3352 if (has_deflate_filter && this->
get_uses_fill_value() && this->get_var_chunks_storage_size() == 0)
3353 is_data_all_fvalues =
true;
3355 bool has_dio_filters =
false;
3358 if (has_deflate_filter && !is_data_all_fvalues) {
3359 if (this->get_deflate_levels().empty() ==
false)
3360 has_dio_filters =
true;
3368 if (has_dio_filters && this->get_processing_fv_chunks() ==
false) {
3371 vector <unsigned long long>dim_sizes;
3372 Dim_iter p = dim_begin();
3373 while (p != dim_end()) {
3374 dim_sizes.push_back((
unsigned long long)dimension_size_ll(p));
3378 bool chunk_less_dim =
true;
3379 if (chunk_dim_sizes.size() == dim_sizes.size()) {
3380 for (
unsigned int i = 0; i<dim_sizes.size(); i++) {
3381 if (chunk_dim_sizes[i] > dim_sizes[i]) {
3382 chunk_less_dim =
false;
3388 chunk_less_dim =
false;
3390 ret_value = chunk_less_dim;
3398void DmrppArray::read_array_of_structure(vector<char> &values) {
3400 size_t values_offset = 0;
3401 int64_t nelms = this->length_ll();
3403 BESDEBUG(dmrpp_3, prolog <<
"swap bytes " << endl);
3405 vector<unsigned int> s_offs = this->get_struct_offsets();
3407 for (int64_t element = 0; element < nelms; ++element) {
3409 auto dmrpp_s =
dynamic_cast<DmrppStructure*
>(var()->ptr_duplicate());
3411 throw InternalErr(__FILE__, __LINE__,
"Cannot obtain the structure pointer.");
3413 dmrpp_s->set_struct_offsets(s_offs);
3414 dmrpp_s->structure_read(values,values_offset, this->
twiddle_bytes());
3418 string err_msg =
"Cannot read the data of a dmrpp structure variable " + var()->name();
3419 throw InternalErr(__FILE__, __LINE__, err_msg);
3421 dmrpp_s->set_read_p(
true);
3422 set_vec_ll((uint64_t)element,dmrpp_s);
3431bool DmrppArray::check_struct_handling() {
3433 bool ret_value =
true;
3435 if (this->var()->type() == dods_structure_c) {
3437 auto array_base =
dynamic_cast<DmrppStructure*
>(this->var());
3438 Constructor::Vars_iter vi = array_base->var_begin();
3439 Constructor::Vars_iter ve = array_base->var_end();
3440 for (; vi != ve; vi++) {
3443 Type t_bt = bt->type();
3446 if (libdap::is_simple_type(t_bt) ==
false) {
3448 if (t_bt == dods_array_c) {
3450 auto t_a =
dynamic_cast<Array *
>(bt);
3451 Type t_array_var = t_a->var()->type();
3452 if (!libdap::is_simple_type(t_array_var) || t_array_var == dods_str_c || t_array_var == dods_url_c || t_array_var == dods_enum_c || t_array_var==dods_opaque_c) {
3458 else if (t_bt == dods_str_c || t_bt == dods_url_c || t_bt == dods_enum_c || t_bt == dods_opaque_c) {
3470void DmrppArray::read_buffer_chunks_unconstrained() {
3472 BESDEBUG(dmrpp_3, prolog <<
"coming to read_buffer_chunks_unconstrained() " << endl);
3475 unsigned long long buffer_size = bytes_per_element * this->
get_size(
false);
3478 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
3481 unsigned long long sc_count=0;
3484 sc_id << name() <<
"-" << sc_count;
3485 queue<shared_ptr<SuperChunk>> super_chunks;
3486 auto current_super_chunk = std::make_shared<SuperChunk>(sc_id.str(),
this) ;
3489 current_super_chunk->set_non_contiguous_chunk_flag(
true);
3490 super_chunks.push(current_super_chunk);
3495 if (buffer_size < (array_chunks[0])->
get_size())
3496 buffer_size = (array_chunks[0])->
get_size();
3498 unsigned long long max_buffer_end_position = 0;
3502 for (
const auto &chunk: array_chunks) {
3503 unsigned long long temp_max_buffer_end_position= chunk->get_size() + chunk->get_offset();
3504 if(max_buffer_end_position < temp_max_buffer_end_position)
3505 max_buffer_end_position = temp_max_buffer_end_position;
3509 unsigned long long buffer_end_position = min((buffer_size + (array_chunks[0])->get_offset()),max_buffer_end_position);
3511 BESDEBUG(dmrpp_3, prolog <<
"variable name: " << this->name() <<endl);
3512 BESDEBUG(dmrpp_3, prolog <<
"maximum buffer_end_position: " << max_buffer_end_position <<endl);
3516 bool added = current_super_chunk->add_chunk_non_contiguous(chunk,buffer_end_position);
3518 sc_id.str(std::string());
3520 sc_id << name() <<
"-" << sc_count;
3521 current_super_chunk = std::make_shared<SuperChunk>(sc_id.str(),
this);
3523 current_super_chunk->set_non_contiguous_chunk_flag(
true);
3524 super_chunks.push(current_super_chunk);
3527 if (buffer_size < chunk->
get_size())
3528 buffer_size = chunk->get_size();
3529 buffer_end_position = min((buffer_size + chunk->get_offset()),max_buffer_end_position);
3530 if (!current_super_chunk->add_chunk_non_contiguous(chunk,buffer_end_position)) {
3532 msg << prolog <<
"Failed to add Chunk to new SuperChunk for non-contiguous chunks. chunk: " << chunk->to_string();
3533 throw BESInternalError(msg.str(), __FILE__, __LINE__);
3538 reserve_value_capacity_ll(
get_size());
3540 while(!super_chunks.empty()) {
3541 auto super_chunk = super_chunks.front();
3543 super_chunk->read_unconstrained();
3549bool DmrppArray::use_buffer_chunk() {
3551 bool ret_value =
false;
3552 auto chunks = this->get_chunks();
3556 if (chunks.size() >1 && this->var()->type() !=dods_structure_c){
3557 unsigned long long first_chunk_offset = (chunks[0])->get_offset();
3558 unsigned long long first_chunk_size = (chunks[0])->
get_size();
3559 unsigned long long second_chunk_offset = (chunks[1])->get_offset();
3560 if ((first_chunk_offset + first_chunk_size) != second_chunk_offset)
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
exception thrown if internal error encountered
exception thrown if an internal error is found and is fatal to the BES
Extend libdap::Array so that a handler can read data using a DMR++ file.
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
void get_ons_objs(vector< ons > &ons_list)
void print_dap4(libdap::XMLWriter &writer, bool constrained=false) override
Shadow libdap::Array::print_dap4() - optionally prints DMR++ chunk information.
virtual unsigned long long get_size(bool constrained=false)
Return the total number of elements in this Array.
bool is_projected()
Is this Array subset?
bool read() override
Read data for the array.
static std::string d_ns_prefix
The XML namespace prefix to use.
virtual bool twiddle_bytes() const
Returns true if this object utilizes shuffle compression.
virtual bool is_multi_linked_blocks_chunk() const
Returns true if this object contains a chunk that have multiple linked blocks .
static bool d_print_chunks
if true, print_dap4() prints chunk elements
virtual bool is_compact_layout() const
Returns true if this object utilizes COMPACT layout.
virtual void load_attributes(libdap::BaseType *btp)
Load the attribute information for this variable.
virtual bool is_missing_data() const
Returns true if this object describes the missing data.
virtual bool get_chunks_loaded() const
Have the chunks been loaded?
virtual bool get_one_chunk_fill_value() const
virtual size_t get_chunks_size() const
Use this when the number of chunks is needed.
void print_chunks_element(libdap::XMLWriter &xml, const std::string &name_space="")
Print the Chunk information.
virtual const std::vector< std::shared_ptr< Chunk > > & get_immutable_chunks() const
A const reference to the vector of chunks.
virtual bool get_uses_fill_value() const
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
virtual void load_chunks(libdap::BaseType *btp)
Load chunk information for this variable.
virtual unsigned long long get_chunk_size_in_elements() const
Get the number of elements in this chunk.
virtual std::string get_filters() const
Return the names of all the filters in the order they were applied.
virtual bool get_attributes_loaded() const
Have the attributes been loaded?