bes Updated for version 3.21.1
The Backend Server (BES) is the lower two tiers of the Hyrax data server
DmrppArray.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of the BES
4
5// Copyright (c) 2016 OPeNDAP, Inc.
6// Author: James Gallagher <jgallagher@opendap.org>
7//
8// This library is free software; you can redistribute it and/or
9// modify it under the terms of the GNU Lesser General Public
10// License as published by the Free Software Foundation; either
11// version 2.1 of the License, or (at your option) any later version.
12//
13// This library is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16// Lesser General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public
19// License along with this library; if not, write to the Free Software
20// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21//
22// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23
24#include "config.h"
25
26#include <string>
27#include <sstream>
28#include <vector>
29#include <memory>
30#include <queue>
31#include <iterator>
32#include <thread>
33#include <future> // std::async, std::future
34#include <chrono> // std::chrono::milliseconds
35
36#include <cstring>
37#include <cassert>
38#include <iomanip>
39#include <cmath>
40#include <zlib.h>
41
42#include <libdap/D4Enum.h>
43#include <libdap/D4Attributes.h>
44#include <libdap/D4Maps.h>
45#include <libdap/D4Group.h>
46#include <libdap/Byte.h>
47#include <libdap/util.h>
48
49#include "BESInternalError.h"
50#include "BESInternalFatalError.h"
51#include "BESDebug.h"
52#include "BESLog.h"
53#include "BESStopWatch.h"
54
55#include "byteswap_compat.h"
56#include "float_byteswap.h"
57#include "CurlHandlePool.h"
58#include "Chunk.h"
59#include "DmrppArray.h"
60#include "DmrppStructure.h"
61#include "DmrppRequestHandler.h"
62#include "DmrppNames.h"
63#include "Base64.h"
64#include "vlsa_util.h"
65
66// Used with BESDEBUG
67#define dmrpp_3 "dmrpp:3"
68
69using namespace libdap;
70using namespace std;
71
72#define MB (1024*1024)
73#define prolog std::string("DmrppArray::").append(__func__).append("() - ")
74
75namespace dmrpp {
76
77
78// Transfer Thread Pool state variables.
79std::mutex transfer_thread_pool_mtx; // mutex for critical section
80//atomic_ullong transfer_thread_counter(0);
81atomic_uint transfer_thread_counter(0);
82
83
84
99bool get_next_future(list<std::future<bool>> &futures, atomic_uint &thread_counter, unsigned long timeout, string debug_prefix) {
100 bool future_finished = false;
101 bool done = false;
102 std::chrono::milliseconds timeout_ms (timeout);
103
104 while(!done){
105 auto futr = futures.begin();
106 auto fend = futures.end();
107 bool future_is_valid = true;
108 while(!future_finished && future_is_valid && futr != fend){
109 future_is_valid = (*futr).valid();
110 if(future_is_valid){
111 // What happens if wait_for() always returns future_status::timeout for a stuck thread?
112 // If that were to happen, the loop would run forever. However, we assume that these
113 // threads are never 'stuck.' We assume that their computations always complete, either
114 // with success or failure. For the transfer threads, timeouts will stop them if nothing
115 // else does and for the decompression threads, the worst case is a segmentation fault.
116 // jhrg 2/5/21
117 if((*futr).wait_for(timeout_ms) != std::future_status::timeout){
118 try {
119 bool success = (*futr).get();
120 future_finished = true;
121 BESDEBUG(dmrpp_3, debug_prefix << prolog << "Called future::get() on a ready future."
122 << " success: " << (success?"true":"false") << endl);
123 if(!success){
124 stringstream msg;
125 msg << debug_prefix << prolog << "The std::future has failed!";
126 msg << " thread_counter: " << thread_counter;
127 throw BESInternalError(msg.str(), __FILE__, __LINE__);
128 }
129 }
130 catch(...){
131 // TODO I had to add this to make the thread counting work when there's errors
132 // But I think it's primitive because it trashes everything - there's
133 // surely a way to handle the situation on a per thread basis and maybe even
134 // retry?
135 futures.clear();
136 thread_counter=0;
137 throw;
138 }
139 }
140 else {
141 futr++;
142 BESDEBUG(dmrpp_3, debug_prefix << prolog << "future::wait_for() timed out. (timeout: " <<
143 timeout << " ms) There are currently " << futures.size() << " futures in process."
144 << " thread_counter: " << thread_counter << endl);
145 }
146 }
147 else {
148 BESDEBUG(dmrpp_3, debug_prefix << prolog << "The future was not valid. Dumping... " << endl);
149 future_finished = true;
150 }
151 }
152
153 if (futr!=fend && future_finished) {
154 futures.erase(futr);
155 thread_counter--;
156 BESDEBUG(dmrpp_3, debug_prefix << prolog << "Erased future from futures list. (Erased future was "
157 << (future_is_valid?"":"not ") << "valid at start.) There are currently " <<
158 futures.size() << " futures in process. thread_counter: " << thread_counter << endl);
159 }
160
161 done = future_finished || futures.empty();
162 }
163
164 return future_finished;
165}
166
167static void one_child_chunk_thread_new_sanity_check(const one_child_chunk_args_new *args) {
168 if (!args->the_one_chunk->get_rbuf()) {
169 throw BESInternalError("one_child_chunk_thread_new_sanity_check() - the_one_chunk->get_rbuf() is NULL!", __FILE__, __LINE__);
170 }
171 if (!args->child_chunk->get_rbuf()) {
172 throw BESInternalError("one_child_chunk_thread_new_sanity_check() - child_chunk->get_rbuf() is NULL!", __FILE__, __LINE__);
173 }
174 if (args->child_chunk->get_bytes_read() != args->child_chunk->get_size()) {
175 throw BESInternalError("one_child_chunk_thread_new_sanity_check() - child_chunk->get_bytes_read() != child_chunk->get_size()!", __FILE__, __LINE__);
176 }
177}
178
189bool one_child_chunk_thread_new(const unique_ptr<one_child_chunk_args_new> &args)
190{
191 args->child_chunk->read_chunk();
192
193 one_child_chunk_thread_new_sanity_check(args.get());
194
195 // the_one_chunk offset \/
196 // the_one_chunk: mmmmmmmmmmmmmmmm
197 // child chunks: 1111222233334444 (there are four child chunks)
198 // child offsets: ^ ^ ^ ^
199 // For this example, child_1_offset - the_one_chunk_offset == 0 (that's always true)
200 // child_2_offset - the_one_chunk_offset == 4; child_2_offset - the_one_chunk_offset == 8
201 // and child_3_offset - the_one_chunk_offset == 12.
202 // Those are the starting locations with in the data buffer of the the_one_chunk
203 // where that child chunk should be written.
204 // Note: all the offset values start at the beginning of the file.
205
206 unsigned long long offset_within_the_one_chunk = args->child_chunk->get_offset() - args->the_one_chunk->get_offset();
207
208 memcpy(args->the_one_chunk->get_rbuf() + offset_within_the_one_chunk, args->child_chunk->get_rbuf(),
209 args->child_chunk->get_bytes_read());
210
211 return true;
212}
213
219bool one_super_chunk_transfer_thread(const unique_ptr<one_super_chunk_args> &args)
220{
221
222#if DMRPP_ENABLE_THREAD_TIMERS
223 stringstream timer_tag;
224 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
225 " parent_tid: 0x" << std::hex << args->parent_thread_id << " sc_id: " << args->super_chunk->id();
226 BES_STOPWATCH_START(TRANSFER_THREADS, prolog + timer_tag.str());
227#endif
228
229 args->super_chunk->read();
230 return true;
231}
232
238bool one_super_chunk_unconstrained_transfer_thread(const unique_ptr<one_super_chunk_args> &args)
239{
240
241#if DMRPP_ENABLE_THREAD_TIMERS
242 stringstream timer_tag;
243 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
244 " parent_tid: 0x" << std::hex << args->parent_thread_id << " sc_id: " << args->super_chunk->id();
245 BES_STOPWATCH_START(TRANSFER_THREADS, prolog + timer_tag.str());
246#endif
247
248 args->super_chunk->read_unconstrained();
249 return true;
250}
251
252bool one_super_chunk_unconstrained_transfer_thread_dio(const unique_ptr<one_super_chunk_args> &args)
253{
254
255#if DMRPP_ENABLE_THREAD_TIMERS
256 stringstream timer_tag;
257 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
258 " parent_tid: 0x" << std::hex << args->parent_thread_id << " sc_id: " << args->super_chunk->id();
259 BES_STOPWATCH_START(TRANSFER_THREADS, prolog + timer_tag.str());
260#endif
261
262 args->super_chunk->read_unconstrained_dio();
263 return true;
264}
265
266
267bool start_one_child_chunk_thread(list<std::future<bool>> &futures, unique_ptr<one_child_chunk_args_new> args) {
268 bool retval = false;
269 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
270 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
271 transfer_thread_counter++;
272 futures.push_back(std::async(std::launch::async, one_child_chunk_thread_new, std::move(args)));
273 retval = true;
274
275 // The args may be null after move(args) is called and causes the segmentation fault in the following BESDEBUG.
276 // So remove that part but leave the futures.size() for bookkeeping.
277 BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<endl);
278 }
279 return retval;
280}
281
282
290bool start_super_chunk_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
291 bool retval = false;
292 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
293 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
294 transfer_thread_counter++;
295 futures.push_back(std::async(std::launch::async, one_super_chunk_transfer_thread, std::move(args)));
296 retval = true;
297
298 // The args may be null after move(args) is called and causes the segmentation fault in the following BESDEBUG.
299 // So remove that part but leave the futures.size() for bookkeeping.
300 BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<endl);
301
302 }
303 return retval;
304}
305
313bool start_super_chunk_unconstrained_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
314 bool retval = false;
315 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
316 if(transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
317 transfer_thread_counter++;
318 futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_transfer_thread, std::move(args)));
319 retval = true;
320
321 // The args may be null after move(args) is called and causes the segmentation fault in the following BESDEBUG.
322 // So remove that part but leave the futures.size() for bookkeeping.
323 BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<endl);
324
325 }
326 return retval;
327}
328
329bool start_super_chunk_unconstrained_transfer_thread_dio(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
330 bool retval = false;
331 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
332 if(transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
333 transfer_thread_counter++;
334 futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_transfer_thread_dio, std::move(args)));
335 retval = true;
336 BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<
337 "' from std::async, transfer_thread_counter: " << transfer_thread_counter << endl);
338 }
339 return retval;
340}
341
342
363void read_super_chunks_unconstrained_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
364{
365 BES_STOPWATCH_START(MODULE, prolog + "Timing array name: "+array->name());
366
367 // Parallel version based on read_chunks_unconstrained(). There is
368 // substantial duplication of the code in read_chunks_unconstrained(), but
369 // wait to remove that when we move to C++11 which has threads integrated.
370
371 // We maintain a list of futures to track our parallel activities.
372 list<future<bool>> futures;
373 try {
374 bool done = false;
375 bool future_finished = true;
376 while (!done) {
377
378 if(!futures.empty())
379 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
380
381 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
382 // because future::get() was called or a call to future::valid() returned false.
383 BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
384
385 if (!super_chunks.empty()){
386 // Next we try to add a new Chunk compute thread if we can - there might be room.
387 bool thread_started = true;
388 while(thread_started && !super_chunks.empty()) {
389 auto super_chunk = super_chunks.front();
390 BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
391
392 auto args = unique_ptr<one_super_chunk_args>(new one_super_chunk_args(super_chunk, array));
393 thread_started = start_super_chunk_unconstrained_transfer_thread(futures, std::move(args));
394
395 if (thread_started) {
396 super_chunks.pop();
397 BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << super_chunk->to_string(false) << endl);
398 } else {
399 // Thread did not start, ownership of the arguments was not passed to the thread.
400 BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
401 " transfer_thread_counter: " << transfer_thread_counter <<
402 " futures.size(): " << futures.size() << endl);
403 }
404 }
405 }
406 else {
407 // No more Chunks and no futures means we're done here.
408 if(futures.empty())
409 done = true;
410 }
411 future_finished = false;
412 }
413 }
414 catch (...) {
415 // Complete all the futures, otherwise we'll have threads out there using up resources
416 while(!futures.empty()){
417 if(futures.back().valid())
418 futures.back().get();
419 futures.pop_back();
420 }
421 // re-throw the exception
422 throw;
423 }
424}
425
426// Clone of read_super_chunks_unconstrained_concurrent for direct IO.
427// Doing this to ensure direct IO won't affect the regular operations.
428void read_super_chunks_unconstrained_concurrent_dio(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
429{
430 BES_STOPWATCH_START(MODULE, prolog + "Timing array name: "+array->name());
431
432 // Parallel version based on read_chunks_unconstrained(). There is
433 // substantial duplication of the code in read_chunks_unconstrained(), but
434 // wait to remove that when we move to C++11 which has threads integrated.
435
436 // We maintain a list of futures to track our parallel activities.
437 list<future<bool>> futures;
438 try {
439 bool done = false;
440 bool future_finished = true;
441 while (!done) {
442
443 if(!futures.empty())
444 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
445
446 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
447 // because future::get() was called or a call to future::valid() returned false.
448 BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
449
450 if (!super_chunks.empty()){
451 // Next we try to add a new Chunk compute thread if we can - there might be room.
452 bool thread_started = true;
453 while(thread_started && !super_chunks.empty()) {
454 auto super_chunk = super_chunks.front();
455 BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
456
457 auto args = unique_ptr<one_super_chunk_args>(new one_super_chunk_args(super_chunk, array));
458
459 // direct IO calling
460 thread_started = start_super_chunk_unconstrained_transfer_thread_dio(futures, std::move(args));
461
462 if (thread_started) {
463 super_chunks.pop();
464 BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << super_chunk->to_string(false) << endl);
465 } else {
466 // Thread did not start, ownership of the arguments was not passed to the thread.
467 BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
468 " transfer_thread_counter: " << transfer_thread_counter <<
469 " futures.size(): " << futures.size() << endl);
470 }
471 }
472 }
473 else {
474 // No more Chunks and no futures means we're done here.
475 if(futures.empty())
476 done = true;
477 }
478 future_finished = false;
479 }
480 }
481 catch (...) {
482 // Complete all the futures, otherwise we'll have threads out there using up resources
483 while(!futures.empty()){
484 if(futures.back().valid())
485 futures.back().get();
486 futures.pop_back();
487 }
488 // re-throw the exception
489 throw;
490 }
491}
492
493
514void read_super_chunks_concurrent(queue< shared_ptr<SuperChunk> > &super_chunks, DmrppArray *array)
515{
516 BES_STOPWATCH_START(MODULE, prolog + "Timing array name: "+array->name());
517
518 // Parallel version based on read_chunks_unconstrained(). There is
519 // substantial duplication of the code in read_chunks_unconstrained(), but
520 // wait to remove that when we move to C++11 which has threads integrated.
521
522 // We maintain a list of futures to track our parallel activities.
523 list<future<bool>> futures;
524 try {
525 bool done = false;
526 bool future_finished = true;
527 while (!done) {
528
529 if(!futures.empty())
530 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
531
532 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
533 // because future::get() was called or a call to future::valid() returned false.
534 BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
535
536 if (!super_chunks.empty()){
537 // Next we try to add a new Chunk compute thread if we can - there might be room.
538 bool thread_started = true;
539 while(thread_started && !super_chunks.empty()) {
540 auto super_chunk = super_chunks.front();
541 BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
542
543 auto args = unique_ptr<one_super_chunk_args>(new one_super_chunk_args(super_chunk, array));
544 thread_started = start_super_chunk_transfer_thread(futures, std::move(args));
545
546 if (thread_started) {
547 super_chunks.pop();
548 BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << super_chunk->to_string(false) << endl);
549 } else {
550 // Thread did not start, ownership of the arguments was not passed to the thread.
551 BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
552 " transfer_thread_counter: " << transfer_thread_counter <<
553 " futures.size(): " << futures.size() << endl);
554 }
555 }
556 }
557 else {
558 // No more Chunks and no futures means we're done here.
559 if(futures.empty())
560 done = true;
561 }
562 future_finished = false;
563 }
564 }
565 catch (...) {
566 // Complete all the futures, otherwise we'll have threads out there using up resources
567 while(!futures.empty()){
568 if(futures.back().valid())
569 futures.back().get();
570 futures.pop_back();
571 }
572 // re-throw the exception
573 throw;
574 }
575}
576
595static unsigned long long
596get_index(const vector<unsigned long long> &address_in_target, const vector<unsigned long long> &target_shape)
597{
598 if (address_in_target.size() != target_shape.size()) { // ranks must be equal
599 throw BESInternalError("get_index: address_in_target != target_shape", __FILE__, __LINE__);
600 }
601
602 auto shape_index = target_shape.rbegin();
603 auto index = address_in_target.rbegin(), index_end = address_in_target.rend();
604
605 unsigned long long multiplier_var = *shape_index++;
606 unsigned long long offset = *index++;
607
608 while (index != index_end) {
609 if (*index >= *shape_index) {
610 throw BESInternalError("get_index: index >= shape_index", __FILE__, __LINE__);
611 }
612
613 offset += multiplier_var * *index++;
614 multiplier_var *= *shape_index++;
615 }
616
617 return offset;
618}
619
622
636static unsigned long long multiplier(const vector<unsigned long long> &shape, unsigned int k)
637{
638 if (!(shape.size() > k + 1)) {
639 throw BESInternalError("multiplier: !(shape.size() > k + 1)", __FILE__, __LINE__);
640 }
641
642 vector<unsigned long long>::const_iterator i = shape.begin(), e = shape.end();
643 advance(i, k + 1);
644 unsigned long long multiplier = *i++;
645 while (i != e) {
646 multiplier *= *i++;
647 }
648
649 return multiplier;
650}
651
652//#####################################################################################################################
653//
654// DmrppArray code begins here.
655//
656// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
657
659DmrppArray::operator=(const DmrppArray &rhs)
660{
661 if (this == &rhs) return *this;
662
663 dynamic_cast<Array &>(*this) = rhs; // run Constructor=
664
665 dynamic_cast<DmrppCommon &>(*this) = rhs;
666 // Removed DmrppCommon::m_duplicate_common(rhs); jhrg 11/12/21
667
668 return *this;
669}
670
676{
677 for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
678 if (dimension_size_ll(p, true) != dimension_size_ll(p, false)) return true;
679
680 return false;
681}
682
689unsigned long long DmrppArray::get_size(bool constrained)
690{
691 // number of array elements in the constrained array
692 unsigned long long asize = 1;
693 for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
694 auto dim_size = dimension_size_ll(dim, constrained);
695 asize *= dim_size;
696 }
697 return asize;
698}
699
706vector<unsigned long long> DmrppArray::get_shape(bool constrained)
707{
708 auto dim = dim_begin(), edim = dim_end();
709 vector<unsigned long long> shape;
710
711 // For a 3d array, this method took 14ms without reserve(), 5ms with
712 // (when called many times).
713 shape.reserve(edim - dim);
714
715 for (; dim != edim; dim++) {
716 shape.push_back(dimension_size_ll(dim, constrained));
717 }
718
719 return shape;
720}
721
727DmrppArray::dimension DmrppArray::get_dimension(unsigned int i)
728{
729 if (i > (dim_end() - dim_begin())) {
730 throw BESInternalError("get_dimension: i > (dim_end() - dim_begin())", __FILE__, __LINE__);
731 }
732 return *(dim_begin() + i);
733}
734
737
748void DmrppArray::insert_constrained_contiguous(Dim_iter dim_iter, unsigned long *target_index,
749 vector<unsigned long long> &subset_addr,
750 const vector<unsigned long long> &array_shape, char /*Chunk*/*src_buf, char *dest_buf)
751{
752 BESDEBUG("dmrpp", "DmrppArray::" << __func__ << "() - subsetAddress.size(): " << subset_addr.size() << endl);
753
754 uint64_t start = this->dimension_start_ll(dim_iter, true);
755 uint64_t stop = this->dimension_stop_ll(dim_iter, true);
756 uint64_t stride = this->dimension_stride_ll(dim_iter, true);
757
758 dim_iter++;
759
760 // The end case for the recursion is dimIter == dim_end(); stride == 1 is an optimization
761 // See the else clause for the general case.
762 if (dim_iter == dim_end() && stride == 1) {
763 // For the start and stop indexes of the subset, get the matching indexes in the whole array.
764 subset_addr.push_back(start);
765 unsigned long long start_index = get_index(subset_addr, array_shape);
766 subset_addr.pop_back();
767
768 subset_addr.push_back(stop);
769 unsigned long long stop_index = get_index(subset_addr, array_shape);
770 subset_addr.pop_back();
771
772 // Copy data block from start_index to stop_index
773 // TODO Replace this loop with a call to std::memcpy()
774 for (uint64_t source_index = start_index; source_index <= stop_index; source_index++) {
775 uint64_t target_byte = *target_index * bytes_per_element;
776 uint64_t source_byte = source_index * bytes_per_element;
777 // Copy a single value.
778 for (unsigned long i = 0; i < bytes_per_element; i++) {
779 dest_buf[target_byte++] = src_buf[source_byte++];
780 }
781 (*target_index)++;
782 }
783
784 }
785 else {
786 for (uint64_t myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
787
788 // Is it the last dimension?
789 if (dim_iter != dim_end()) {
790 // Nope! Then we recurse to the last dimension to read stuff
791 subset_addr.push_back(myDimIndex);
792 insert_constrained_contiguous(dim_iter, target_index, subset_addr, array_shape, src_buf, dest_buf);
793 subset_addr.pop_back();
794 }
795 else {
796 // We are at the last (innermost) dimension, so it's time to copy values.
797 subset_addr.push_back(myDimIndex);
798 unsigned int sourceIndex = get_index(subset_addr, array_shape);
799 subset_addr.pop_back();
800
801 // Copy a single value.
802 uint64_t target_byte = *target_index * bytes_per_element;
803 uint64_t source_byte = sourceIndex * bytes_per_element;
804
805 for (unsigned int i = 0; i < bytes_per_element; i++) {
806 dest_buf[target_byte++] = src_buf[source_byte++];
807 }
808 (*target_index)++;
809 }
810 }
811 }
812}
813
814
834void DmrppArray::read_contiguous()
835{
836
837 BESDEBUG(dmrpp_3, prolog << "NOT using direct IO " << endl);
838 BES_STOPWATCH_START(MODULE, prolog + "Timing array name: "+name());
839
840 // Get the single chunk that makes up this CONTIGUOUS variable.
841 if (get_chunks_size() != 1)
842 throw BESInternalError(string("Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
843
844 // This is the original chunk for this 'contiguous' variable.
845 auto the_one_chunk = get_immutable_chunks()[0];
846
847 unsigned long long the_one_chunk_offset = the_one_chunk->get_offset();
848 unsigned long long the_one_chunk_size = the_one_chunk->get_size();
849
850 // We only want to read in the Chunk concurrently if:
851 // - Concurrent transfers are enabled (DmrppRequestHandler::d_use_transfer_threads)
852 // - The variable's size is above the threshold value held in DmrppRequestHandler::d_contiguous_concurrent_threshold
853 if (!DmrppRequestHandler::d_use_transfer_threads || the_one_chunk_size <= DmrppRequestHandler::d_contiguous_concurrent_threshold) {
854 // Read the the_one_chunk as is. This is the non-parallel I/O case
855 the_one_chunk->read_chunk();
856
857 }
858 else {
859 // Allocate memory for the 'the_one_chunk' so the transfer threads can transfer data
860 // from the child chunks to it.
861 the_one_chunk->set_rbuf_to_size();
862
863 // We need to load fill values if using fill values. KY 2023-02-17
864 if (the_one_chunk->get_uses_fill_value()) {
865 the_one_chunk->load_fill_values();
866 }
867 // The number of child chunks are determined based on the size of the data.
868 // If the size of the the_one_chunk is 3 MB then 3 chunks will be made. We will round down
869 // when necessary and handle the remainder later on (3.3MB = 3 chunks, 4.2MB = 4 chunks, etc.)
870 unsigned long long num_chunks = floor(the_one_chunk_size / MB);
871 if (num_chunks >= DmrppRequestHandler::d_max_transfer_threads)
872 num_chunks = DmrppRequestHandler::d_max_transfer_threads;
873
874 // Use the original chunk's size and offset to evenly split it into smaller chunks
875 unsigned long long chunk_size = the_one_chunk_size / num_chunks;
876 std::string chunk_byteorder = the_one_chunk->get_byte_order();
877
878 // If the size of the the_one_chunk is not evenly divisible by num_chunks, capture
879 // the remainder here and increase the size of the last chunk by this number of bytes.
880 unsigned long long chunk_remainder = the_one_chunk_size % num_chunks;
881 auto chunk_url = the_one_chunk->get_data_url();
882
883 // Set up a queue to break up the original the_one_chunk and keep track of the pieces
884 queue<shared_ptr<Chunk>> chunks_to_read;
885
886 // Make the Chunk objects
887 unsigned long long chunk_offset = the_one_chunk_offset;
888 for (unsigned int i = 0; i < num_chunks - 1; i++) {
889 if (chunk_url == nullptr) {
890 BESDEBUG(dmrpp_3, "chunk_url is null, this may be a variable that covers the fill values." <<endl);
891 chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_byteorder,the_one_chunk->get_fill_value(),the_one_chunk->get_fill_value_type(), chunk_size, chunk_offset)));
892 }
893 else
894 chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_url, chunk_byteorder, chunk_size, chunk_offset)));
895 chunk_offset += chunk_size;
896 }
897 // Make the remainder Chunk, see above for details.
898 if (chunk_url != nullptr)
899 chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_url, chunk_byteorder, chunk_size + chunk_remainder, chunk_offset)));
900 else
901 chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_byteorder,the_one_chunk->get_fill_value(),the_one_chunk->get_fill_value_type(), chunk_size, chunk_offset)));
902
903 // We maintain a list of futures to track our parallel activities.
904 list<future<bool>> futures;
905 try {
906 bool done = false;
907 bool future_finished = true;
908 while (!done) {
909
910 if (!futures.empty())
911 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
912
913 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
914 // because future::get() was called or a call to future::valid() returned false.
915 BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
916 if (!chunks_to_read.empty()) {
917 // Next we try to add a new Chunk compute thread if we can - there might be room.
918 bool thread_started = true;
919 while (thread_started && !chunks_to_read.empty()) {
920 auto current_chunk = chunks_to_read.front();
921 BESDEBUG(dmrpp_3, prolog << "Starting thread for " << current_chunk->to_string() << endl);
922
923 auto args = unique_ptr<one_child_chunk_args_new>(new one_child_chunk_args_new(current_chunk, the_one_chunk));
924
925 thread_started = start_one_child_chunk_thread(futures, std::move(args));
926
927 if (thread_started) {
928 chunks_to_read.pop();
929 BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << current_chunk->to_string() << endl);
930 } else {
931 // Thread did not start, ownership of the arguments was not passed to the thread.
932 BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
933 " transfer_thread_counter: " << transfer_thread_counter <<
934 " futures.size(): " << futures.size() << endl);
935 }
936 }
937 } else {
938 // No more Chunks and no futures means we're done here.
939 if (futures.empty())
940 done = true;
941 }
942 future_finished = false;
943 }
944 }
945 catch (...) {
946 // Complete all the futures, otherwise we'll have threads out there using up resources
947 while (!futures.empty()) {
948 if (futures.back().valid())
949 futures.back().get();
950 futures.pop_back();
951 }
952 // re-throw the exception
953 throw;
954 }
955 }
956 BESDEBUG(dmrpp_3, prolog << "Before is_filter " << endl);
957
958 // Check if this is a DAP structure we can handle
959 // Now that the_one_chunk has been read, we do what is necessary...
960 if (!is_filters_empty() && !get_one_chunk_fill_value()) {
961 the_one_chunk->filter_chunk(get_filters(), get_chunk_size_in_elements(), bytes_per_element);
962 }
963 // The 'the_one_chunk' now holds the data values. Transfer it to the Array.
964 if (!is_projected()) { // if there is no projection constraint
965 reserve_value_capacity_ll(get_size(false));
966
967 // We need to handle the structure data differently.
968 if (this->var()->type() != dods_structure_c)
969 val2buf(the_one_chunk->get_rbuf()); // yes, it's not type-safe
970 else { // Structure
971 // Check if we can handle this case.
972 // Currently we only handle one-layer simple int/float types.
973 if (is_readable_struct) {
974 // Only "one chunk", we can simply obtain the buf_value.
975 char *buf_value = the_one_chunk->get_rbuf();
976
977 unsigned long long value_size = this->length_ll() * bytes_per_element;
978 vector<char> values(buf_value,buf_value+value_size);
979 read_array_of_structure(values);
980 }
981 else
982 throw InternalErr(__FILE__, __LINE__, "Only handle integer and float base types. Cannot handle the array of complex structure yet.");
983 }
984 }
985 else { // apply the constraint
986
987 if (this->var()->type() != dods_structure_c) {
988
989 vector<unsigned long long> array_shape = get_shape(false);
990 unsigned long target_index = 0;
991 vector<unsigned long long> subset;
992
993 // Reserve space in this array for the constrained size of the data request
994 reserve_value_capacity_ll(get_size(true));
995 char *dest_buf = get_buf();
996 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, the_one_chunk->get_rbuf(),dest_buf);
997 }
998 else {
999 // Currently we only handle one-layer simple int/float types.
1000 if (is_readable_struct) {
1001 unsigned long long value_size = get_size(true)*bytes_per_element;
1002 vector<char> values;
1003 values.resize(value_size);
1004 vector<unsigned long long> array_shape = get_shape(false);
1005 unsigned long target_index = 0;
1006 vector<unsigned long long> subset;
1007 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, the_one_chunk->get_rbuf(),values.data());
1008 read_array_of_structure(values);
1009 }
1010 else
1011 throw InternalErr(__FILE__, __LINE__, "Only handle integer and float base types. Cannot handle the array of complex structure yet.");
1012 }
1013 }
1014
1015 set_read_p(true);
1016
1017 BESDEBUG(dmrpp_3, prolog << " NOT using direct IO : end of this method." << endl);
1018}
1019
1020void DmrppArray::read_one_chunk_dio() {
1021
1022 BESDEBUG(dmrpp_3, prolog << "Using direct IO " << endl);
1023 // Get the single chunk that makes up this one-chunk compressed variable.
1024 if (get_chunks_size() != 1)
1025 throw BESInternalError(string("Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
1026
1027 // This is the chunk for this variable.
1028 auto the_one_chunk = get_immutable_chunks()[0];
1029
1030 // For this version, we just read the whole chunk all at once.
1031 the_one_chunk->read_chunk_dio();
1032 reserve_value_capacity_ll_byte(get_var_chunks_storage_size());
1033 const char *source_buffer = the_one_chunk->get_rbuf();
1034 char *target_buffer = get_buf();
1035 memcpy(target_buffer, source_buffer , the_one_chunk->get_size());
1036
1037}
1038
1058void DmrppArray::insert_chunk_unconstrained(shared_ptr<Chunk> chunk, unsigned int dim, unsigned long long array_offset,
1059 const vector<unsigned long long> &array_shape,
1060 unsigned long long chunk_offset, const vector<unsigned long long> &chunk_shape,
1061 const vector<unsigned long long> &chunk_origin)
1062{
1063 // Now we figure out the correct last element. It's possible that a
1064 // chunk 'extends beyond' the Array bounds. Here 'end_element' is the
1065 // last element of the destination array
1066 dimension thisDim = this->get_dimension(dim);
1067 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1068 if ((unsigned long long ) thisDim.stop < end_element) {
1069 end_element = thisDim.stop;
1070 }
1071
1072 unsigned long long chunk_end = end_element - chunk_origin[dim];
1073
1074 unsigned int last_dim = chunk_shape.size() - 1;
1075 if (dim == last_dim) {
1076
1077 unsigned int elem_width = bytes_per_element;
1078 array_offset += chunk_origin[dim];
1079
1080 // Compute how much we are going to copy
1081 unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * bytes_per_element;
1082 char *source_buffer = chunk->get_rbuf();
1083 char *target_buffer = nullptr;
1084 if (is_readable_struct)
1085 target_buffer = d_structure_array_buf.data();
1086 else
1087 target_buffer = get_buf();
1088 memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
1089
1090 }
1091 else {
1092 unsigned long long mc = multiplier(chunk_shape, dim);
1093 unsigned long long ma = multiplier(array_shape, dim);
1094
1095 // Not the last dimension, so we continue to proceed down the Recursion Branch.
1096 for (unsigned long long chunk_index = 0 /*chunk_start*/; chunk_index <= chunk_end; ++chunk_index) {
1097 unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
1098 unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
1099
1100 // Re-entry here:
1101 insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape,
1102 chunk_origin);
1103 }
1104 }
1105}
1106
1107// The direct IO routine to insert the unconstrained chunks.
1108void DmrppArray::insert_chunk_unconstrained_dio(shared_ptr<Chunk> chunk) {
1109
1110 const char *source_buffer = chunk->get_rbuf();
1111 char *target_buffer = get_buf();
1112
1113 // copy the chunk buffer to the variable buffer at the right location.
1114 memcpy(target_buffer + chunk->get_direct_io_offset(), source_buffer,chunk->get_size());
1115
1116}
1117
1129void DmrppArray::read_chunks_unconstrained()
1130{
1131 if (get_chunks_size() < 2)
1132 throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
1133
1134 // Find all the required chunks to read. I used a queue to preserve the chunk order, which
1135 // made using a debugger easier. However, order does not matter, AFAIK.
1136
1137 unsigned long long sc_count=0;
1138 stringstream sc_id;
1139 sc_id << name() << "-" << sc_count++;
1140 queue<shared_ptr<SuperChunk>> super_chunks;
1141 auto current_super_chunk = std::make_shared<SuperChunk>(sc_id.str(), this) ;
1142 super_chunks.push(current_super_chunk);
1143
1144 // Make the SuperChunks using all the chunks.
1145 for(const auto& chunk: get_immutable_chunks()) {
1146 bool added = current_super_chunk->add_chunk(chunk);
1147 if (!added) {
1148 sc_id.str(std::string());
1149 sc_id << name() << "-" << sc_count++;
1150 current_super_chunk = std::make_shared<SuperChunk>(sc_id.str(), this);
1151 super_chunks.push(current_super_chunk);
1152 if (!current_super_chunk->add_chunk(chunk)) {
1153 stringstream msg ;
1154 msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1155 throw BESInternalError(msg.str(), __FILE__, __LINE__);
1156 }
1157 }
1158 }
1159
1160 reserve_value_capacity_ll(get_size());
1161 if (is_readable_struct) {
1162 d_structure_array_buf.resize(this->length_ll()*bytes_per_element);
1163 }
1164
1165
1166 // The size in element of each of the array's dimensions
1167 const vector<unsigned long long> array_shape = get_shape(true);
1168 // The size, in elements, of each of the chunk's dimensions
1169 const vector<unsigned long long> chunk_shape = get_chunk_dimension_sizes();
1170
1171
1172 BESDEBUG(dmrpp_3, prolog << "d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ? "true" : "false") << endl);
1173 BESDEBUG(dmrpp_3, prolog << "d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1174
1175 if (!DmrppRequestHandler::d_use_transfer_threads) { // Serial transfers
1176#if DMRPP_ENABLE_THREAD_TIMERS
1177 BES_STOPWATCH_START(dmrpp_3, prolog + "Serial SuperChunk Processing.");
1178#endif
1179 while(!super_chunks.empty()) {
1180 auto super_chunk = super_chunks.front();
1181 super_chunks.pop();
1182 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
1183 super_chunk->read_unconstrained();
1184 }
1185 }
1186 else { // Parallel transfers
1187#if DMRPP_ENABLE_THREAD_TIMERS
1188 string timer_name = prolog + "Concurrent SuperChunk Processing. d_max_transfer_threads: " + to_string(DmrppRequestHandler::d_max_transfer_threads);
1189 BES_STOPWATCH_START(dmrpp_3, timer_name);
1190#endif
1191 read_super_chunks_unconstrained_concurrent(super_chunks, this);
1192 }
1193
1194 if (is_readable_struct)
1195 read_array_of_structure(d_structure_array_buf);
1196 set_read_p(true);
1197
1198}
1199
1200//The direct chunk IO routine of read chunks., mostly copy from the general IO handling routines.
1201void DmrppArray::read_chunks_dio_unconstrained()
1202{
1203
1204 if (get_chunks_size() < 2)
1205 throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
1206
1207 // Find all the required chunks to read. I used a queue to preserve the chunk order, which
1208 // made using a debugger easier. However, order does not matter, AFAIK.
1209
1210 unsigned long long sc_count=0;
1211 stringstream sc_id;
1212 sc_id << name() << "-" << sc_count++;
1213 queue<shared_ptr<SuperChunk>> super_chunks;
1214 auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this)) ;
1215 super_chunks.push(current_super_chunk);
1216
1217 // Make the SuperChunks using all the chunks.
1218 for(const auto& chunk: get_immutable_chunks()) {
1219 bool added = current_super_chunk->add_chunk(chunk);
1220 if (!added) {
1221 sc_id.str(std::string());
1222 sc_id << name() << "-" << sc_count++;
1223 current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this));
1224 super_chunks.push(current_super_chunk);
1225 if (!current_super_chunk->add_chunk(chunk)) {
1226 stringstream msg ;
1227 msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1228 throw BESInternalError(msg.str(), __FILE__, __LINE__);
1229 }
1230 }
1231 }
1232
1233 //Change to the total storage buffer size to just the compressed buffer size.
1234 reserve_value_capacity_ll_byte(get_var_chunks_storage_size());
1235
1236 // The size in element of each of the array's dimensions
1237 const vector<unsigned long long> array_shape = get_shape(true);
1238 // The size, in elements, of each of the chunk's dimensions
1239 const vector<unsigned long long> chunk_shape = get_chunk_dimension_sizes();
1240
1241 BESDEBUG(dmrpp_3, prolog << "d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ? "true" : "false") << endl);
1242 BESDEBUG(dmrpp_3, prolog << "d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1243
1244 if (!DmrppRequestHandler::d_use_transfer_threads) { // Serial transfers
1245#if DMRPP_ENABLE_THREAD_TIMERS
1246 BES_STOPWATCH_START(dmrpp_3, prolog + "Serial SuperChunk Processing.");
1247#endif
1248 while(!super_chunks.empty()) {
1249 auto super_chunk = super_chunks.front();
1250 super_chunks.pop();
1251 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
1252
1253 // Call direct IO routine
1254 super_chunk->read_unconstrained_dio();
1255 }
1256 }
1257 else { // Parallel transfers
1258#if DMRPP_ENABLE_THREAD_TIMERS
1259 string timer_name = prolog + "Concurrent SuperChunk Processing. d_max_transfer_threads: " + to_string( DmrppRequestHandler::d_max_transfer_threads);
1260 BES_STOPWATCH_START(dmrpp_3, timer_name);
1261#endif
1262 // Call direct IO routine for parallel transfers
1263 read_super_chunks_unconstrained_concurrent_dio(super_chunks, this);
1264 }
1265 set_read_p(true);
1266}
1267
1268
1269// Retrieve data from the linked blocks. We don't need to use the super chunk technique
1270// since the adjacent blocks are already combined. We just need to read the data
1271// from each chunk, combine them and decompress the buffer if necessary.
1272// Note: HDF4 vdata doesn't have the alignment issue.
1273void DmrppArray::read_linked_blocks(){
1274
1275 unsigned int num_linked_blocks = this->get_total_linked_blocks();
1276 if (num_linked_blocks <2)
1277 throw BESInternalError("The number of linked blocks must be >1 to read the data.", __FILE__, __LINE__);
1278
1279 vector<unsigned long long> accumulated_lengths;
1280 accumulated_lengths.resize(num_linked_blocks);
1281 vector<unsigned long long> individual_lengths;
1282 individual_lengths.resize(num_linked_blocks);
1283
1284 // Here we cannot assume that the index of the linked block always increases
1285 // in the loop of chunks so we use the linked block index.
1286 // For the HDF4 case, we observe the index of the linked block is always consistent with the
1287 // chunk it loops, though.
1288 for(const auto& chunk: get_immutable_chunks()) {
1289 individual_lengths[chunk->get_linked_block_index()] = chunk->get_size();
1290 }
1291 accumulated_lengths[0] = 0;
1292 for (unsigned int i = 1; i < num_linked_blocks; i++)
1293 accumulated_lengths[i] = individual_lengths[i-1] + accumulated_lengths[i-1];
1294
1295 if (this->var()->type() == dods_structure_c) {
1296
1297 // Check if we can handle this case.
1298 // Currently we only handle one-layer simple int/float types, and the data is not compressed.
1299 if (!is_readable_struct)
1300 throw InternalErr(__FILE__, __LINE__, "Only handle integer and float base types. Cannot handle the array of complex structure yet.");
1301
1302 string filters_str = this->get_filters();
1303 if (filters_str.find("deflate")!=string::npos)
1304 throw InternalErr(__FILE__, __LINE__, "We don't handle compressed array of structure now.");
1305
1306 vector<char> values;
1307 values.resize(get_var_chunks_storage_size());
1308 char *target_buffer = values.data();
1309
1310 for(const auto& chunk: get_immutable_chunks()) {
1311 chunk->read_chunk();
1312 BESDEBUG(dmrpp_3, prolog << "linked_block_index: " << chunk->get_linked_block_index() << endl);
1313 BESDEBUG(dmrpp_3, prolog << "accumlated_length: " << accumulated_lengths[chunk->get_linked_block_index()] << endl);
1314 const char *source_buffer = chunk->get_rbuf();
1315 memcpy(target_buffer + accumulated_lengths[chunk->get_linked_block_index()], source_buffer,chunk->get_size());
1316 }
1317
1318 read_array_of_structure(values);
1319 }
1320 else {
1321
1322 //Change to the total storage buffer size to just the compressed buffer size.
1323 reserve_value_capacity_ll_byte(get_var_chunks_storage_size());
1324
1325 char *target_buffer = get_buf();
1326
1327 for(const auto& chunk: get_immutable_chunks()) {
1328 chunk->read_chunk();
1329 BESDEBUG(dmrpp_3, prolog << "linked_block_index: " << chunk->get_linked_block_index() << endl);
1330 BESDEBUG(dmrpp_3, prolog << "accumlated_length: " << accumulated_lengths[chunk->get_linked_block_index()] << endl);
1331 const char *source_buffer = chunk->get_rbuf();
1332 memcpy(target_buffer + accumulated_lengths[chunk->get_linked_block_index()], source_buffer,chunk->get_size());
1333 }
1334 string filters_string = this->get_filters();
1335 if (filters_string.find("deflate")!=string::npos) {
1336
1337 char *in_buf = get_buf();
1338
1339 char **destp = nullptr;
1340 char *dest_deflate = nullptr;
1341 unsigned long long dest_len = get_var_chunks_storage_size();
1342 unsigned long long src_len = get_var_chunks_storage_size();
1343 dest_deflate = new char[dest_len];
1344 destp = &dest_deflate;
1345 inflate_simple(destp, dest_len, in_buf, src_len);
1346
1347 this->clear_local_data();
1348 reserve_value_capacity_ll_byte(this->width_ll());
1349 char *out_buf = get_buf();
1350 memcpy(out_buf,dest_deflate,this->width_ll());
1351 delete []dest_deflate;
1352 }
1353 }
1354
1355
1356 set_read_p(true);
1357
1358 // Leave the following commented code for the time being since we may add this feature in the future. KY 2024-03-20
1359#if 0
1360 // The size in element of each of the array's dimensions
1361 const vector<unsigned long long> array_shape = get_shape(true);
1362
1363 // The size, in elements, of each of the chunk's dimensions
1364 const vector<unsigned long long> chunk_shape = get_chunk_dimension_sizes();
1365
1366 BESDEBUG(dmrpp_3, prolog << "d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ? "true" : "false") << endl);
1367 BESDEBUG(dmrpp_3, prolog << "d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1368
1369 if (!DmrppRequestHandler::d_use_transfer_threads) { // Serial transfers
1370#if DMRPP_ENABLE_THREAD_TIMERS
1371 BES_STOPWATCH_START(dmrpp_3, prolog + "Serial SuperChunk Processing.");
1372#endif
1373 while(!super_chunks.empty()) {
1374 auto super_chunk = super_chunks.front();
1375 super_chunks.pop();
1376 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
1377
1378 // Call direct IO routine
1379 super_chunk->read_unconstrained_dio();
1380 }
1381 }
1382 else { // Parallel transfers
1383#if DMRPP_ENABLE_THREAD_TIMERS
1384 string timer_name = prolog + "Concurrent SuperChunk Processing. d_max_transfer_threads: " + to_string(DmrppRequestHandler::d_max_transfer_threads);
1385 BES_STOPWATCH_START(dmrpp_3, timer_name);
1386#endif
1387 // Call direct IO routine for parallel transfers
1388 read_super_chunks_unconstrained_concurrent_dio(super_chunks, this);
1389 }
1390 set_read_p(true);
1391#endif
1392
1393}
1394
1395void DmrppArray::read_linked_blocks_constrained(){
1396
1397 unsigned int num_linked_blocks = this->get_total_linked_blocks();
1398 if (num_linked_blocks <2)
1399 throw BESInternalError("The number of linked blocks must be >1 to read the data.", __FILE__, __LINE__);
1400
1401 if (this->var()->type() == dods_structure_c)
1402 throw InternalErr(__FILE__, __LINE__, "We don't handle constrained array of structure now.");
1403
1404 // Gather information of linked blocks
1405 vector<unsigned long long> accumulated_lengths;
1406 accumulated_lengths.resize(num_linked_blocks);
1407 vector<unsigned long long> individual_lengths;
1408 individual_lengths.resize(num_linked_blocks);
1409
1410 // Here we cannot assume that the index of the linked block always increases
1411 // in the loop of chunks so we use the linked block index.
1412 // For the HDF4 case, we observe the index of the linked block is always consistent with the
1413 // chunk it loops, though.
1414 for(const auto& chunk: get_immutable_chunks()) {
1415 individual_lengths[chunk->get_linked_block_index()] = chunk->get_size();
1416 }
1417 accumulated_lengths[0] = 0;
1418 for (unsigned int i = 1; i < num_linked_blocks; i++)
1419 accumulated_lengths[i] = individual_lengths[i-1] + accumulated_lengths[i-1];
1420
1421 // Allocate the final constrained buffer
1422 //size_t array_var_type_size = prototype()->width();
1423 size_t array_var_type_size = bytes_per_element;
1424 reserve_value_capacity_ll_byte(array_var_type_size*get_size(true));
1425
1426 // For the linked block compressed case, we need to obtain the whole buffer to do subsetting.
1427 // Since this is the major use case, we will not do any optimiziaton for the uncompressed case.
1428
1429 vector<char> values;
1430 values.resize(get_var_chunks_storage_size());
1431 char *target_buffer = values.data();
1432
1433 for(const auto& chunk: get_immutable_chunks()) {
1434 chunk->read_chunk();
1435 BESDEBUG(dmrpp_3, prolog << "linked_block_index: " << chunk->get_linked_block_index() << endl);
1436 BESDEBUG(dmrpp_3, prolog << "accumlated_length: " << accumulated_lengths[chunk->get_linked_block_index()] << endl);
1437 const char *source_buffer = chunk->get_rbuf();
1438 memcpy(target_buffer + accumulated_lengths[chunk->get_linked_block_index()], source_buffer,chunk->get_size());
1439 }
1440
1441 vector<char>uncompressed_values;
1442 bool is_compressed = false;
1443 string filters_string = this->get_filters();
1444
1445 // The linked blocks are compressed.
1446 if (filters_string.find("deflate")!=string::npos) {
1447
1448 char **destp = nullptr;
1449 char *dest_deflate = nullptr;
1450 unsigned long long dest_len = get_var_chunks_storage_size();
1451 unsigned long long src_len = get_var_chunks_storage_size();
1452 dest_deflate = new char[dest_len];
1453 destp = &dest_deflate;
1454 unsigned long long deflated_length = inflate_simple(destp, dest_len, target_buffer, src_len);
1455 BESDEBUG(dmrpp_3, prolog << "deflated length: " << deflated_length << endl);
1456 BESDEBUG(dmrpp_3, prolog << "array size: " << deflated_length << endl);
1457
1458 uncompressed_values.resize(deflated_length);
1459 memcpy(uncompressed_values.data(),dest_deflate,deflated_length);
1460#if 0
1461 uncompressed_values.resize(this->width_ll());
1462 memcpy(uncompressed_values.data(),dest_deflate,this->width_ll());
1463#endif
1464 delete []dest_deflate;
1465
1466 is_compressed = true;
1467 }
1468
1469 // Now this falls to the contiguous constrained case. We just call the existing method.
1470 vector<unsigned long long> array_shape = get_shape(false);
1471 unsigned long target_index = 0;
1472 vector<unsigned long long> subset;
1473 char *dest_buf = get_buf();
1474 if (is_compressed)
1475 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, uncompressed_values.data(),dest_buf);
1476 else
1477 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, values.data(), dest_buf);
1478
1479
1480 set_read_p(true);
1481}
1482
1483void DmrppArray::read_chunks_with_linked_blocks() {
1484
1485 reserve_value_capacity_ll(get_size(false));
1486 for(const auto& chunk: get_immutable_chunks()) {
1487 if (chunk->get_multi_linked_blocks()) {
1488 vector<std::pair<unsigned long long, unsigned long long>> cur_chunk_lb_offset_lengths;
1489 chunk->obtain_multi_linked_offset_length(cur_chunk_lb_offset_lengths);
1490 unsigned long long cb_buffer_size =0;
1491 for (const auto &tp:cur_chunk_lb_offset_lengths)
1492 cb_buffer_size +=tp.second;
1493
1494 chunk->set_size(cb_buffer_size);
1495 // Now we get the chunk buffer size, set it.
1496 if (chunk->get_read_buffer_is_mine())
1497 chunk->set_rbuf_to_size();
1498 else
1499 throw BESInternalError("For multi-linked blocks, the chunk buffer ownship must be true", __FILE__, __LINE__);
1500
1501 char *temp_cb_buffer = chunk->get_rbuf();
1502
1503 for (const auto &tp:cur_chunk_lb_offset_lengths) {
1504
1505 // Obtain this chunk block's offset and length.
1506 auto cb_offset = tp.first;
1507 auto cb_length = tp.second;
1508
1509 // Obtain this chunk's other information:byteOrder,URL,chunk position.
1510 // Create a block chunk for each block in order to obtain the data.
1511
1512 auto cb_data_url = chunk->get_data_url();
1513 auto cb_position_in_array = chunk->get_position_in_array();
1514 auto cb_byte_order = chunk->get_byte_order();
1515
1516 // Cannot use the shared pointer here somehow.
1517 Chunk* block_chunk = nullptr;
1518 if (cb_data_url == nullptr)
1519 block_chunk = new Chunk(cb_byte_order,cb_length,cb_offset,cb_position_in_array);
1520 else
1521 block_chunk = new Chunk(cb_data_url,cb_byte_order,cb_length,cb_offset,cb_position_in_array);
1522
1523 block_chunk->read_chunk();
1524 const char *block_chunk_buffer = block_chunk->get_rbuf();
1525 if (block_chunk->get_bytes_read() != cb_length) {
1526 ostringstream oss;
1527 oss << "Wrong number of bytes read for chunk; read: " << block_chunk->get_bytes_read() << ", expected: " << cb_length;
1528 throw BESInternalError(oss.str(), __FILE__, __LINE__);
1529 }
1530 memcpy(temp_cb_buffer,block_chunk_buffer,cb_length);
1531 temp_cb_buffer +=cb_length;
1532 delete block_chunk;
1533
1534 }
1535 chunk->set_is_read(true);
1536
1537 }
1538 else { // General Chunk
1539 chunk->read_chunk();
1540 }
1541 // Now we need to handle the filters.
1542 if (chunk->get_uses_fill_value()) {
1543 //No, we won't handle the filled chunks case since HDF4 doesn't have this.
1544 throw BESInternalError(string("Encounters filled linked-block chunks for variable ") + name(), __FILE__, __LINE__);
1545 }
1546 else if (!is_filters_empty())
1547 chunk->filter_chunk(get_filters(), get_chunk_size_in_elements(), get_bytes_per_element());
1548
1549 // No, HDF4 doesn't have linked-block chunk structure AFAIK
1550 if (var()->type() == libdap::dods_structure_c)
1551 throw BESInternalError(string("Encounters linked-block chunk structures for variable ") + name(), __FILE__, __LINE__);
1552
1553 // Now we go to the insert_chunk routine
1554
1555 vector<unsigned long long> array_shape = get_shape(false);
1556 vector<unsigned long long> chunk_shape = get_chunk_dimension_sizes();
1557 vector<unsigned long long> chunk_origin = chunk->get_position_in_array();
1558
1559 this->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0,chunk_shape,chunk_origin);
1560
1561 }
1562 set_read_p(true);
1563
1564}
1565
1566void DmrppArray::read_chunks_with_linked_blocks_constrained() {
1567
1568 reserve_value_capacity_ll(get_size(true));
1569 char *dest_buf = this->get_buf();
1570 vector<unsigned long long> constrained_array_shape = this->get_shape(true);
1571 for(const auto& chunk: get_immutable_chunks()){
1572 vector<unsigned long long> chunk_element_address = chunk->get_position_in_array();
1573 auto needed = find_needed_chunks(0 /* dimension */, &chunk_element_address, chunk);
1574 if (needed){
1575 if (chunk->get_multi_linked_blocks()) {
1576 vector<std::pair<unsigned long long, unsigned long long>> cur_chunk_lb_offset_lengths;
1577 chunk->obtain_multi_linked_offset_length(cur_chunk_lb_offset_lengths);
1578 unsigned long long cb_buffer_size =0;
1579 for (const auto &tp:cur_chunk_lb_offset_lengths)
1580 cb_buffer_size +=tp.second;
1581
1582 chunk->set_size(cb_buffer_size);
1583 // Now we get the chunk buffer size, set it.
1584 if (chunk->get_read_buffer_is_mine())
1585 chunk->set_rbuf_to_size();
1586 else
1587 throw BESInternalError("For multi-linked blocks, the chunk buffer ownship must be true", __FILE__, __LINE__);
1588
1589 char *temp_cb_buffer = chunk->get_rbuf();
1590
1591 for (const auto &tp:cur_chunk_lb_offset_lengths) {
1592
1593 // Obtain this chunk block's offset and length.
1594 auto cb_offset = tp.first;
1595 auto cb_length = tp.second;
1596
1597 // Obtain this chunk's other information:byteOrder,URL,chunk position.
1598 // Create a block chunk for each block in order to obtain the data.
1599
1600 auto cb_data_url = chunk->get_data_url();
1601 auto cb_position_in_array = chunk->get_position_in_array();
1602 auto cb_byte_order = chunk->get_byte_order();
1603
1604 // Cannot use the shared pointer here somehow.
1605 Chunk* block_chunk = nullptr;
1606 if (cb_data_url == nullptr)
1607 block_chunk = new Chunk(cb_byte_order,cb_length,cb_offset,cb_position_in_array);
1608 else
1609 block_chunk = new Chunk(cb_data_url,cb_byte_order,cb_length,cb_offset,cb_position_in_array);
1610
1611 block_chunk->read_chunk();
1612 const char *block_chunk_buffer = block_chunk->get_rbuf();
1613 if (block_chunk->get_bytes_read() != cb_length) {
1614 ostringstream oss;
1615 oss << "Wrong number of bytes read for chunk; read: " << block_chunk->get_bytes_read() << ", expected: " << cb_length;
1616 throw BESInternalError(oss.str(), __FILE__, __LINE__);
1617 }
1618 memcpy(temp_cb_buffer,block_chunk_buffer,cb_length);
1619 temp_cb_buffer +=cb_length;
1620 delete block_chunk;
1621
1622 }
1623 chunk->set_is_read(true);
1624
1625 }
1626 else { // General Chunk
1627 chunk->read_chunk();
1628 }
1629 // Now we need to handle the filters.
1630 if (chunk->get_uses_fill_value()) {
1631 //No, we won't handle the filled chunks case since HDF4 doesn't have this.
1632 throw BESInternalError(string("Encounters filled linked-block chunks for variable ") + name(), __FILE__, __LINE__);
1633 }
1634 else if (!is_filters_empty())
1635 chunk->filter_chunk(get_filters(), get_chunk_size_in_elements(), get_bytes_per_element());
1636
1637 // No, HDF4 doesn't have linked-block chunk structure AFAIK
1638 if (var()->type() == libdap::dods_structure_c)
1639 throw BESInternalError(string("Encounters linked-block chunk structures for variable ") + name(), __FILE__, __LINE__);
1640
1641 // Now we go to the insert_chunk routine
1642 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
1643 vector<unsigned long long> chunk_source_address(this->dimensions(), 0);
1644 insert_chunk(0, &target_element_address, &chunk_source_address,chunk, constrained_array_shape, dest_buf);
1645
1646 }
1647 }
1648
1649 set_read_p(true);
1650
1651}
1652
1653unsigned long long DmrppArray::inflate_simple(char **destp, unsigned long long dest_len, char *src, unsigned long long src_len) {
1654
1655
1656 /* Sanity check */
1657
1658 if (src_len == 0) {
1659 string msg = prolog + "ERROR! The number of bytes to inflate is zero.";
1660 BESDEBUG(MODULE, msg << endl);
1661 throw BESInternalError(msg, __FILE__, __LINE__);
1662 }
1663 if (dest_len == 0) {
1664 string msg = prolog + "ERROR! The number of bytes to inflate into is zero.";
1665 BESDEBUG(MODULE, msg << endl);
1666 throw BESInternalError(msg, __FILE__, __LINE__);
1667 }
1668 if (!destp || !*destp) {
1669 string msg = prolog + "ERROR! The destination buffer is NULL.";
1670 BESDEBUG(MODULE, msg << endl);
1671 throw BESInternalError(msg, __FILE__, __LINE__);
1672 }
1673 if (!src) {
1674 string msg = prolog + "ERROR! The source buffer is NULL.";
1675 BESDEBUG(MODULE, msg << endl);
1676 throw BESInternalError(msg, __FILE__, __LINE__);
1677 }
1678
1679 /* Input; uncompress */
1680 z_stream z_strm; /* zlib parameters */
1681
1682 /* Set the decompression parameters */
1683 memset(&z_strm, 0, sizeof(z_strm));
1684 z_strm.next_in = (Bytef *) src;
1685 z_strm.avail_in = src_len;
1686 z_strm.next_out = (Bytef *) (*destp);
1687 z_strm.avail_out = dest_len;
1688
1689 size_t nalloc = dest_len;
1690
1691 char *outbuf = *destp;
1692
1693 /* Initialize the decompression routines */
1694 if (Z_OK != inflateInit(&z_strm))
1695 throw BESError("Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
1696
1697
1698 /* Loop to uncompress the buffer */
1699 int status = Z_OK;
1700 do {
1701 /* Uncompress some data */
1702 status = inflate(&z_strm, Z_SYNC_FLUSH);
1703
1704 /* Check if we are done decompressing data */
1705 if (Z_STREAM_END == status) break; /*done*/
1706
1707 /* Check for error */
1708 if (Z_OK != status) {
1709 stringstream err_msg;
1710 err_msg << "Failed to inflate data chunk.";
1711 char const *err_msg_cstr = z_strm.msg;
1712 if(err_msg_cstr)
1713 err_msg << " zlib message: " << err_msg_cstr;
1714 (void) inflateEnd(&z_strm);
1715 throw BESError(err_msg.str(), BES_INTERNAL_ERROR, __FILE__, __LINE__);
1716 }
1717 else {
1718 // If we're not done and just ran out of buffer space, we need to extend the buffer.
1719 // We may encounter this case when the deflate filter is used twice. KY 2022-08-03
1720 if (0 == z_strm.avail_out) {
1721
1722 /* Allocate a buffer twice as big */
1723 size_t outbuf_size = nalloc;
1724 nalloc *= 2;
1725 char* new_outbuf = new char[nalloc];
1726 memcpy((void*)new_outbuf,(void*)outbuf,outbuf_size);
1727 delete[] outbuf;
1728 outbuf = new_outbuf;
1729
1730 /* Update pointers to buffer for next set of uncompressed data */
1731 z_strm.next_out = (unsigned char*) outbuf + z_strm.total_out;
1732 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
1733
1734 } /* end if */
1735 } /* end else */
1736 } while (true /* status == Z_OK */); // Exit via the break statement after the call to inflate(). jhrg 11/8/21
1737
1738 *destp = outbuf;
1739 outbuf = nullptr;
1740 /* Finish decompressing the stream */
1741 (void) inflateEnd(&z_strm);
1742
1743 return z_strm.total_out;
1744}
1745
1746
1747
1750
1763unsigned long long DmrppArray::get_chunk_start(const dimension &thisDim, unsigned long long chunk_origin)
1764{
1765 // What's the first element that we are going to access for this dimension of the chunk?
1766 unsigned long long first_element_offset = 0; // start with 0
1767 if ((unsigned long long) (thisDim.start) < chunk_origin) {
1768 // If the start is behind this chunk, then it's special.
1769 if (thisDim.stride != 1) {
1770 // And if the stride isn't 1, we have to figure our where to begin in this chunk.
1771 first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
1772 // If it's zero great!
1773 if (first_element_offset != 0) {
1774 // otherwise, adjust to get correct first element.
1775 first_element_offset = thisDim.stride - first_element_offset;
1776 }
1777 }
1778 }
1779 else {
1780 first_element_offset = thisDim.start - chunk_origin;
1781 }
1782
1783 return first_element_offset;
1784}
1785
1807shared_ptr<Chunk>
1808DmrppArray::find_needed_chunks(unsigned int dim, vector<unsigned long long> *target_element_address, shared_ptr<Chunk> chunk)
1809{
1810 BESDEBUG(dmrpp_3, prolog << " BEGIN, dim: " << dim << endl);
1811
1812 // The size, in elements, of each of the chunk's dimensions.
1813 const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
1814
1815 // The chunk's origin point a.k.a. its "position in array".
1816 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1817
1818 dimension thisDim = this->get_dimension(dim);
1819
1820 // Do we even want this chunk?
1821 if ((unsigned long long) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) ||
1822 (unsigned long long) thisDim.stop < chunk_origin[dim]) {
1823 return nullptr; // No. No, we do not. Skip this chunk.
1824 }
1825
1826 // What's the first element that we are going to access for this dimension of the chunk?
1827 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1828
1829 // Is the next point to be sent in this chunk at all? If no, return.
1830 if (chunk_start > chunk_shape[dim]) {
1831 return nullptr;
1832 }
1833
1834 // Now we figure out the correct last element, based on the subset expression
1835 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1836 if ((unsigned long long) thisDim.stop < end_element) {
1837 end_element = thisDim.stop;
1838 }
1839
1840 unsigned long long chunk_end = end_element - chunk_origin[dim];
1841
1842 unsigned int last_dim = chunk_shape.size() - 1;
1843 if (dim == last_dim) {
1844 BESDEBUG(dmrpp_3, prolog << " END, This is the last_dim. chunk: " << chunk->to_string() << endl);
1845 return chunk;
1846 }
1847 else {
1848 // Not the last dimension, so we continue to proceed down the Recursion Branch.
1849 for (unsigned long long chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1850 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1851
1852 // Re-entry here:
1853 auto needed = find_needed_chunks(dim + 1, target_element_address, chunk);
1854 if (needed){
1855 BESDEBUG(dmrpp_3, prolog << " END, Found chunk: " << needed->to_string() << endl);
1856 return needed;
1857 }
1858
1859 }
1860 }
1861 BESDEBUG(dmrpp_3, prolog << " END, dim: " << dim << endl);
1862
1863 return nullptr;
1864}
1865
1885void DmrppArray::insert_chunk(
1886 unsigned int dim,
1887 vector<unsigned long long> *target_element_address,
1888 vector<unsigned long long> *chunk_element_address,
1889 shared_ptr<Chunk> chunk,
1890 const vector<unsigned long long> &constrained_array_shape,char *target_buffer){
1891
1892 // The size, in elements, of each of the chunk's dimensions.
1893 const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
1894
1895 // The chunk's origin point a.k.a. its "position in array".
1896 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1897
1898 dimension thisDim = this->get_dimension(dim);
1899
1900 // What's the first element that we are going to access for this dimension of the chunk?
1901 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1902
1903 // Now we figure out the correct last element, based on the subset expression
1904 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1905 if ((unsigned long long) thisDim.stop < end_element) {
1906 end_element = thisDim.stop;
1907 }
1908
1909 unsigned long long chunk_end = end_element - chunk_origin[dim];
1910
1911 unsigned int last_dim = chunk_shape.size() - 1;
1912 if (dim == last_dim) {
1913 char *source_buffer = chunk->get_rbuf();
1914 unsigned int elem_width = bytes_per_element;
1915
1916 if (thisDim.stride == 1) {
1917 // The start element in this array
1918 unsigned long long start_element = chunk_origin[dim] + chunk_start;
1919 // Compute how much we are going to copy
1920 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1921
1922 // Compute where we need to put it.
1923 (*target_element_address)[dim] = (start_element - thisDim.start); // / thisDim.stride;
1924 // Compute where we are going to read it from
1925 (*chunk_element_address)[dim] = chunk_start;
1926
1927 // See below re get_index()
1928 unsigned long long target_char_start_index =
1929 get_index(*target_element_address, constrained_array_shape) * elem_width;
1930 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1931
1932 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index,
1933 chunk_constrained_inner_dim_bytes);
1934 }
1935 else {
1936 // Stride != 1
1937 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1938 // Compute where we need to put it.
1939 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1940
1941 // Compute where we are going to read it from
1942 (*chunk_element_address)[dim] = chunk_index;
1943
1944 // These calls to get_index() can be removed as with the insert...unconstrained() code.
1945 unsigned long long target_char_start_index =
1946 get_index(*target_element_address, constrained_array_shape) * elem_width;
1947 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1948
1949 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1950 }
1951 }
1952 }
1953 else {
1954 // Not the last dimension, so we continue to proceed down the Recursion Branch.
1955 for (unsigned long long chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1956 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1957 (*chunk_element_address)[dim] = chunk_index;
1958
1959 // Re-entry here:
1960 insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape, target_buffer);
1961 }
1962 }
1963}
1964
1971void DmrppArray::read_chunks()
1972{
1973 if (get_chunks_size() < 2)
1974 throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
1975
1976 // Find all the required chunks to read. I used a queue to preserve the chunk order, which
1977 // made using a debugger easier. However, order does not matter, AFAIK.
1978 unsigned long long sc_count=0;
1979 stringstream sc_id;
1980 sc_id << name() << "-" << sc_count++;
1981 queue<shared_ptr<SuperChunk>> super_chunks;
1982 auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(), this)) ;
1983 super_chunks.push(current_super_chunk);
1984
1985 // TODO We know that non-contiguous chunks may be forward or backward in the file from
1986 // the current offset. When an add_chunk() call fails, prior to making a new SuperChunk
1987 // we might want try adding the rejected Chunk to the other existing SuperChunks to see
1988 // if it's contiguous there.
1989 // Find the required Chunks and put them into SuperChunks.
1990 bool found_needed_chunks = false;
1991 for(const auto& chunk: get_immutable_chunks()){
1992 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
1993 auto needed = find_needed_chunks(0 /* dimension */, &target_element_address, chunk);
1994 if (needed){
1995 found_needed_chunks = true;
1996 bool added = current_super_chunk->add_chunk(chunk);
1997 if(!added){
1998 sc_id.str(std::string()); // Clears stringstream.
1999 sc_id << name() << "-" << sc_count++;
2000 current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this));
2001 super_chunks.push(current_super_chunk);
2002 if(!current_super_chunk->add_chunk(chunk)){
2003 stringstream msg ;
2004 msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
2005 throw BESInternalError(msg.str(), __FILE__, __LINE__);
2006 }
2007 }
2008 }
2009 }
2010 BESDEBUG(dmrpp_3, prolog << "found_needed_chunks: " << (found_needed_chunks?"true":"false") << endl);
2011 if(!found_needed_chunks){ // Ouch! Something went horribly wrong...
2012 throw BESInternalError("ERROR - Failed to locate any chunks that correspond to the requested data.", __FILE__, __LINE__);
2013 }
2014
2015 reserve_value_capacity_ll(get_size(true));
2016 if (is_readable_struct)
2017 d_structure_array_buf.resize(get_size(true)*bytes_per_element);
2018
2019 BESDEBUG(dmrpp_3, prolog << "d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ? "true" : "false") << endl);
2020 BESDEBUG(dmrpp_3, prolog << "d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
2021 BESDEBUG(dmrpp_3, prolog << "d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ? "true" : "false") << endl);
2022 BESDEBUG(dmrpp_3, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
2023 BESDEBUG(dmrpp_3, prolog << "SuperChunks.size(): " << super_chunks.size() << endl);
2024
2025 if (!DmrppRequestHandler::d_use_transfer_threads) {
2026 // This version is the 'serial' version of the code. It reads a chunk, inserts it,
2027 // reads the next one, and so on.
2028#if DMRPP_ENABLE_THREAD_TIMERS
2029 BES_STOPWATCH_START(dmrpp_3, prolog + "Serial SuperChunk Processing.");
2030#endif
2031 while (!super_chunks.empty()) {
2032 auto super_chunk = super_chunks.front();
2033 super_chunks.pop();
2034 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
2035 super_chunk->read();
2036 }
2037 }
2038 else {
2039#if DMRPP_ENABLE_THREAD_TIMERS
2040 string timer_name = prolog + "Concurrent SuperChunk Processing. d_max_transfer_threads: " + to_string(DmrppRequestHandler::d_max_transfer_threads);
2041 BES_STOPWATCH_START(dmrpp_3, timer_name);
2042#endif
2043 read_super_chunks_concurrent(super_chunks, this);
2044 }
2045 if (is_readable_struct)
2046 read_array_of_structure(d_structure_array_buf);
2047 set_read_p(true);
2048}
2049
2050void DmrppArray::read_buffer_chunks()
2051{
2052
2053 BESDEBUG(dmrpp_3, prolog << "coming to read_buffer_chunks() " << endl);
2054 if (get_chunks_size() < 2)
2055 throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
2056
2057 // Prepare buffer size.
2058 unsigned long long max_buffer_end_position = 0;
2059
2060 // For highly compressed chunks, we need to make sure the buffer_size is not too big to exceed the file size.
2061 // For this variable we also need to find the maximum value of the end position of all the chunks.
2062 // Here we try to loop through all the needed chunks for the constraint case.
2063 bool first_needed_chunk = true;
2064 unsigned long long first_needed_chunk_offset = 0;
2065 unsigned long long first_needed_chunk_size = 0;
2066 for (const auto &chunk: get_immutable_chunks()) {
2067 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
2068 auto needed = find_needed_chunks(0 /* dimension */, &target_element_address, chunk);
2069 if (needed){
2070 if (first_needed_chunk == true) {
2071 first_needed_chunk_offset = chunk->get_offset();
2072 first_needed_chunk_size = chunk->get_size();
2073 first_needed_chunk = false;
2074 }
2075 unsigned long long temp_max_buffer_end_position= chunk->get_size() + chunk->get_offset();
2076 if(max_buffer_end_position < temp_max_buffer_end_position)
2077 max_buffer_end_position = temp_max_buffer_end_position;
2078 }
2079 }
2080 if (max_buffer_end_position == 0)
2081 throw BESInternalError("ERROR - Failed to locate any chunks that correspond to the requested data.", __FILE__, __LINE__);
2082
2083 // Here we can adjust the buffer size as needed, for now we just use the whole array size as the starting point.
2084 // Note: we can further optimize the buffer_size for the constraint case as needed.
2085 // However, since the buffer_size will be bounded by the offset and length of chunks, it may not be an issue.
2086 // So just choose the the whole array size first.
2087 unsigned long long buffer_size = bytes_per_element * this->get_size(false);
2088
2089 // Make sure buffer_size at least can hold one chunk.
2090 if (buffer_size < first_needed_chunk_size)
2091 buffer_size = first_needed_chunk_size;
2092
2093 // The end position of the buffer should not exceed the max_buffer_end_position.
2094 unsigned long long buffer_end_position = min((buffer_size + first_needed_chunk_offset),max_buffer_end_position);
2095
2096 unsigned long long sc_count=0;
2097 stringstream sc_id;
2098 sc_count++;
2099 sc_id << name() << "-" << sc_count;
2100 queue<shared_ptr<SuperChunk>> super_chunks;
2101 auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(), this)) ;
2102
2103 // Set the non-contiguous chunk flag
2104 current_super_chunk->set_non_contiguous_chunk_flag(true);
2105 super_chunks.push(current_super_chunk);
2106
2107 // Loop through all the needed chunks and put them to the super chunk.
2108 for(const auto& chunk: get_immutable_chunks()){
2109 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
2110 auto needed = find_needed_chunks(0 /* dimension */, &target_element_address, chunk);
2111 if (needed){
2112 bool added = current_super_chunk->add_chunk_non_contiguous(chunk,buffer_end_position);
2113 if(!added){
2114 sc_id.str(std::string()); // Clears stringstream.
2115 sc_count++;
2116 sc_id << name() << "-" << sc_count;
2117 current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this));
2118
2119 // We need to mark that this superchunk includes non-contiguous chunks.
2120 current_super_chunk->set_non_contiguous_chunk_flag(true);
2121 super_chunks.push(current_super_chunk);
2122
2123 // Here we need to make sure buffer_size is not too small although this rarely happens.
2124 if (buffer_size < chunk->get_size())
2125 buffer_size = chunk->get_size();
2126 buffer_end_position = min((buffer_size + chunk->get_offset()),max_buffer_end_position);
2127 if(!current_super_chunk->add_chunk_non_contiguous(chunk,buffer_end_position)){
2128 stringstream msg ;
2129 msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
2130 throw BESInternalError(msg.str(), __FILE__, __LINE__);
2131 }
2132 }
2133 }
2134 }
2135
2136 reserve_value_capacity_ll(get_size(true));
2137
2138 while(!super_chunks.empty()) {
2139 auto super_chunk = super_chunks.front();
2140 super_chunks.pop();
2141 super_chunk->read();
2142 }
2143
2144 set_read_p(true);
2145
2146}
2147
2148#ifdef USE_READ_SERIAL
2170void DmrppArray::insert_chunk_serial(unsigned int dim, vector<unsigned long long> *target_element_address, vector<unsigned long long> *chunk_element_address,
2171 Chunk *chunk)
2172{
2173 BESDEBUG("dmrpp", __func__ << " dim: "<< dim << " BEGIN "<< endl);
2174
2175 // The size, in elements, of each of the chunk's dimensions.
2176 const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
2177
2178 // The chunk's origin point a.k.a. its "position in array".
2179 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
2180
2181 dimension thisDim = this->get_dimension(dim);
2182
2183 // Do we even want this chunk?
2184 if ((unsigned long long) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (unsigned long long) thisDim.stop < chunk_origin[dim]) {
2185 return; // No. No, we do not. Skip this.
2186 }
2187
2188 // What's the first element that we are going to access for this dimension of the chunk?
2189 unsigned long long first_element_offset = get_chunk_start(dim, chunk_origin);
2190
2191 // Is the next point to be sent in this chunk at all? If no, return.
2192 if (first_element_offset > chunk_shape[dim]) {
2193 return;
2194 }
2195
2196 // Now we figure out the correct last element, based on the subset expression
2197 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
2198 if ((unsigned long long) thisDim.stop < end_element) {
2199 end_element = thisDim.stop;
2200 }
2201
2202 unsigned long long chunk_start = first_element_offset; //start_element - chunk_origin[dim];
2203 unsigned long long chunk_end = end_element - chunk_origin[dim];
2204 vector<unsigned long long> constrained_array_shape = get_shape(true);
2205
2206 unsigned int last_dim = chunk_shape.size() - 1;
2207 if (dim == last_dim) {
2208 // Read and Process chunk
2209 chunk->read_chunk();
2210
2211 chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
2212
2213 char *source_buffer = chunk->get_rbuf();
2214 char *target_buffer = get_buf();
2215 unsigned int elem_width = prototype()->width();
2216
2217 if (thisDim.stride == 1) {
2218 // The start element in this array
2219 unsigned long long start_element = chunk_origin[dim] + first_element_offset;
2220 // Compute how much we are going to copy
2221 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
2222
2223 // Compute where we need to put it.
2224 (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
2225 // Compute where we are going to read it from
2226 (*chunk_element_address)[dim] = first_element_offset;
2227
2228 unsigned long long target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
2229 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
2230
2231 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
2232 }
2233 else {
2234 // Stride != 1
2235 for (unsigned long long chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
2236 // Compute where we need to put it.
2237 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
2238
2239 // Compute where we are going to read it from
2240 (*chunk_element_address)[dim] = chunk_index;
2241
2242 unsigned long long target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
2243 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
2244
2245 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
2246 }
2247 }
2248 }
2249 else {
2250 // Not the last dimension, so we continue to proceed down the Recursion Branch.
2251 for (unsigned long long chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
2252 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
2253 (*chunk_element_address)[dim] = chunk_index;
2254
2255 // Re-entry here:
2256 insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
2257 }
2258 }
2259}
2260
2261void DmrppArray::read_chunks_serial()
2262{
2263 BESDEBUG("dmrpp", __func__ << " for variable '" << name() << "' - BEGIN" << endl);
2264
2265 vector<Chunk> &chunk_refs = get_chunk_vec();
2266 if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
2267
2268 // Allocate target memory.
2269 reserve_value_capacity_ll(get_size(true));
2270
2271 /*
2272 * Find the chunks to be read, make curl_easy handles for them, and
2273 * stuff them into our curl_multi handle. This is a recursive activity
2274 * which utilizes the same code that copies the data from the chunk to
2275 * the variables.
2276 */
2277 for (unsigned long long i = 0; i < chunk_refs.size(); i++) {
2278 Chunk &chunk = chunk_refs[i];
2279
2280 vector<unsigned long long> chunk_source_address(dimensions(), 0);
2281 vector<unsigned long long> target_element_address = chunk.get_position_in_array();
2282
2283 // Recursive insertion operation.
2284 insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
2285 }
2286
2287 set_read_p(true);
2288
2289 BESDEBUG("dmrpp", "DmrppArray::"<< __func__ << "() for " << name() << " END"<< endl);
2290}
2291#endif
2292
2293void
2294DmrppArray::set_send_p(bool state)
2295{
2296 if (!get_attributes_loaded())
2297 load_attributes(this);
2298
2299 Array::set_send_p(state);
2300}
2301
2309void DmrppArray::read_contiguous_string()
2310{
2311 BES_STOPWATCH_START(MODULE, prolog + "Timing array name: "+name());
2312
2313 // This is the original chunk for this 'contiguous' variable.
2314 auto the_one_chunk = get_immutable_chunks()[0];
2315
2316 // Read the the_one_chunk as is. This is the non-parallel I/O case
2317 the_one_chunk->read_chunk();
2318
2319 // Now that the_one_chunk has been read, we do what is necessary...
2320 if (!is_filters_empty() && !get_one_chunk_fill_value()){
2321 the_one_chunk->filter_chunk(get_filters(), get_chunk_size_in_elements(), var()->width_ll());
2322 }
2323
2324 // FIXME This part will only work if the array contains a single element. See below.
2325 // jhrg 3/3/22
2326 vector < string > ss; // Prepare for the general case
2327 string s(reinterpret_cast<char *>(the_one_chunk->get_rbuf()));
2328 ss.push_back(s);
2329 set_value(ss, ss.size());
2330
2331 set_read_p(true);
2332}
2333
2334string DmrppArray::ingest_fixed_length_string(const char *buf, const unsigned long long fixed_str_len, string_pad_type pad_type)
2335{
2336 string value;
2337 unsigned long long str_len = 0;
2338 switch(pad_type){
2339 case null_pad:
2340 case null_term:
2341 {
2342 while( str_len < fixed_str_len && buf[str_len]!=0 ){
2343 str_len++;
2344 }
2345 BESDEBUG(MODULE, prolog << DmrppArray::pad_type_to_str(pad_type) << " scheme. str_len: " << str_len << endl);
2346 value = string(buf,str_len);
2347 break;
2348 }
2349 case space_pad:
2350 {
2351 str_len = fixed_str_len;
2352 while( str_len>0 && (buf[str_len-1]==' ' || buf[str_len-1]==0)){
2353 str_len--;
2354 }
2355 BESDEBUG(MODULE, prolog << DmrppArray::pad_type_to_str(pad_type) << " scheme. str_len: " << str_len << endl);
2356 value = string(buf,str_len);
2357 break;
2358 }
2359 case not_set:
2360 default:
2361 // Do nothing.
2362 BESDEBUG(MODULE, prolog << "pad_type: NOT_SET" << endl);
2363 break;
2364 }
2365 BESDEBUG(MODULE, prolog << "value: '" << value << "'" << endl);
2366 return value;
2367}
2368
2369string dims_to_string(const vector<unsigned long long> dims){
2370 stringstream ss;
2371 for(auto dim: dims){
2372 ss << "[" << dim << "]";
2373 }
2374 return ss.str();
2375}
2376
2377std::string array_to_str(DmrppArray a, const string &banner) {
2378 stringstream msg;
2379 msg << endl << "# - - - - - - - - - - - - - - - - - - -" << endl;
2380 msg << "# " << banner << endl;
2381 msg << "# " << a.prototype()->type_name() << " " << a.name();
2382
2383 for(auto dim=a.dim_begin(); dim < a.dim_end(); dim++){
2384 msg << "[";
2385 if(!dim->name.empty()){
2386 msg << dim->name << "=";
2387 }
2388 msg << dim->size << "]";
2389 }
2390 msg << endl;
2391 msg << "# " << endl;
2392 msg << "# a->get_size(true): " << a.get_size(true) << " (The total number of elements in the array instance)" << endl;
2393 msg << "# a->width(true): " << a.width(true) << " (The number of bytes needed to hold the entire array - prot.width * num_elements)" << endl;
2394 msg << "# a->length(true): " << a.length() << " (The number of elements in the vector)" << endl;
2395 msg << "# a->prototype()->width(true): " << a.prototype()->width() << " (Width of the template variable)" << endl;
2396 msg << "# a->dimensions(true): " << a.dimensions(true) << endl;
2397 msg << "# a->chunk_dimension_sizes" << dims_to_string(a.get_chunk_dimension_sizes()) << endl;
2398 msg << "# a->length(): " << a.length() << endl;
2399 return msg.str();
2400}
2401
2402#define HEX( x ) std::setw(2) << std::setfill('0') << std::hex << (int)( x )
2403
2404std::string show_string_buff(char *buff, unsigned long long num_bytes, unsigned long long fixed_string_len) {
2405 stringstream ss;
2406 for (unsigned long long i = 0; i < num_bytes; i += fixed_string_len) {
2407 char *str_ptr = buff + i;
2408 if (i) { ss << ", "; }
2409 ss << "{";
2410 for (unsigned long long j = 0; j < fixed_string_len; j++) {
2411 char this_char = *(str_ptr + j);
2412 if (j) { ss << ", "; }
2413 if (this_char > 32 && this_char < 126) {
2414 ss << this_char;
2415 } else {
2416 ss << "0x" << std::hex << HEX(this_char) << std::dec;
2417 }
2418 }
2419 ss << "}";
2420 }
2421 return ss.str();
2422}
2423
2434DmrppArray *get_as_byte_array(DmrppArray &array){
2435
2436 Type var_type;
2437 var_type = array.prototype()->type();
2438
2439 auto *byte_array_proxy = dynamic_cast<DmrppArray *>(array.ptr_duplicate());
2440 if(!byte_array_proxy){
2441 throw BESInternalFatalError(prolog + "Server encountered internal state ambiguity. "
2442 "Expected valid DmrppArray pointer. Exiting.",
2443 __FILE__, __LINE__);
2444 }
2445
2446 unsigned long long item_size=0;
2447 if ((var_type == dods_str_c || var_type == dods_url_c)) {
2448 if (array.is_flsa()) {
2449 BESDEBUG(MODULE, prolog << "Processing Fixed Length String Array data." << endl);
2450 item_size = byte_array_proxy->get_fixed_string_length();
2451 BESDEBUG(MODULE, prolog << "get_fixed_string_length(): " << item_size << endl);
2452 }
2453 else {
2454 // VLSA would have a size of one byte??
2455 item_size = 1;
2456 }
2457 }
2458 else {
2459 item_size = byte_array_proxy->prototype()->width();
2460 }
2461
2462 unsigned long long total_bytes = byte_array_proxy->length() * item_size;
2463 BESDEBUG(MODULE, prolog << "total_bytes: " << total_bytes << endl);
2464
2465 BESDEBUG(MODULE, prolog << array_to_str(*byte_array_proxy,"Source DmrppArray") );
2466
2467 // Replace prototype
2468 auto *tmp_proto = new libdap::Byte(byte_array_proxy->prototype()->name());
2469 byte_array_proxy->set_prototype(tmp_proto);
2470
2471 // bytes_per_element should be updated since the proto is updated.
2472 byte_array_proxy->set_bytes_per_element(byte_array_proxy->prototype()->width());
2473 tmp_proto->set_parent(byte_array_proxy);
2474
2475 // Fiddle Chunk dimension sizes
2476 auto cdim_sizes = byte_array_proxy->get_chunk_dimension_sizes();
2477 if(!cdim_sizes.empty()) {
2478 BESDEBUG(MODULE, prolog << "original chunk_dimension_sizes.back(): " << dims_to_string(cdim_sizes) << endl);
2479
2480 auto new_last_cdim_size = cdim_sizes.back() * item_size;
2481 cdim_sizes.pop_back();
2482 cdim_sizes.emplace_back(new_last_cdim_size);
2483 BESDEBUG(MODULE, prolog << "New chunk_dimension_sizes" << dims_to_string(cdim_sizes) << endl);
2484
2485 byte_array_proxy->set_chunk_dimension_sizes(cdim_sizes);
2486 BESDEBUG(MODULE, prolog << "Updated chunk_dimension_sizes"
2487 << dims_to_string(byte_array_proxy->get_chunk_dimension_sizes()) << endl);
2488 }
2489
2490 // Fiddle Each chunk's chunk_position_in_array to reflect the change in array element count
2491 unsigned long long chunk_index = 0;
2492 for(const auto &chunk: byte_array_proxy->get_immutable_chunks()){
2493 auto cpia = chunk->get_position_in_array();
2494 if (!cpia.empty()) {
2495 auto new_position = cpia.back() * item_size;
2496 cpia.pop_back();
2497 cpia.emplace_back(new_position);
2498 BESDEBUG(MODULE,
2499 prolog << "Chunk[" << chunk_index << "] new chunk_position_in_array" << dims_to_string(cpia)
2500 << endl);
2501 chunk->set_position_in_array(cpia);
2502 BESDEBUG(MODULE, prolog << "Chunk[" << chunk_index << "] UPDATED chunk_position_in_array"
2503 << dims_to_string(chunk->get_position_in_array()) << endl);
2504 }
2505 chunk_index++;
2506 }
2507
2508 auto t_last_dim = byte_array_proxy->dim_end() - 1;
2509
2510 BESDEBUG(MODULE, prolog << "Orig last_dim->size: " << t_last_dim->size << endl);
2511
2512 t_last_dim->size = t_last_dim->size * item_size;
2513 BESDEBUG(MODULE, prolog << "New last_dim->size: " << t_last_dim->size << endl);
2514
2515 t_last_dim->c_size = t_last_dim->size;
2516 BESDEBUG(MODULE, prolog << "New last_dim->c_size: " << t_last_dim->c_size << endl);
2517
2518 t_last_dim->start = 0;
2519 BESDEBUG(MODULE, prolog << "New last_dim->start: " << t_last_dim->start << endl);
2520
2521 t_last_dim->stop = t_last_dim->size - 1;
2522 BESDEBUG(MODULE, prolog << "New last_dim->stop: " << t_last_dim->stop << endl);
2523
2524 t_last_dim->stride = 1;
2525 BESDEBUG(MODULE, prolog << "New last_dim->stride: " << t_last_dim->stride << endl);
2526
2527 byte_array_proxy->set_length(total_bytes);
2528 t_last_dim = byte_array_proxy->dim_end() - 1;
2529 BESDEBUG(MODULE, prolog << "Updated last_dim->size: " << t_last_dim->size << endl);
2530
2531 BESDEBUG(MODULE, prolog << array_to_str(*byte_array_proxy,"New DmrppArray of Byte") );
2532
2533 return byte_array_proxy;
2534
2535}
2536
2543void ingest_flsa_data(DmrppArray &flsa, DmrppArray &data)
2544{
2545 if (flsa.is_flsa()) {
2546 BESDEBUG(MODULE, prolog << "Ingesting Fixed Length String Array Data." << endl);
2547 auto fstr_len = flsa.get_fixed_string_length();
2548 BESDEBUG(MODULE, prolog << "flsa.get_fixed_string_length(): " << fstr_len << endl);
2549
2550 auto pad_type = flsa.get_fixed_length_string_pad();
2551 BESDEBUG(MODULE, prolog << "flsa.get_fixed_length_string_pad_str(): " << flsa.get_fixed_length_string_pad_str() << endl);
2552
2553 auto buff = data.get_buf();
2554 BESDEBUG(MODULE, prolog << "data.get_buf(): " << (void *) buff << endl);
2555 if(buff == nullptr){
2556 throw BESInternalError("Failed to acquire byte buffer from which to read string array data.",__FILE__,__LINE__);
2557 }
2558 unsigned long long num_bytes = data.length();
2559 BESDEBUG(MODULE, prolog << "Buffer contains: " << show_string_buff(buff, num_bytes, fstr_len) << endl);
2560
2561 auto begin = buff;
2562 char *end = buff + num_bytes;
2563 while (begin < end) {
2564 string value = DmrppArray::ingest_fixed_length_string(begin, fstr_len, pad_type);
2565 flsa.get_str().push_back(value);
2566 BESDEBUG(MODULE, prolog << "Added String: '" << value << "'" << endl);
2567 begin += fstr_len;
2568 }
2569 }
2570
2571}
2572
2573
2585
2587{
2588 Type var_type = this->var()->type();
2589 // If the chunks are not loaded, load them now. NB: load_chunks()
2590 // reads data for HDF5 COMPACT storage, so read_p() will be true
2591 // (but it does not read any other data). Thus, call load_chunks()
2592 // before testing read_p() to cover that case. jhrg 11/15/21
2593 // String Arrays that use COMPACT storage appear to work. jhrg 3/3/22
2594 if (!get_chunks_loaded())
2595 load_chunks(this);
2596
2597 // It's important to note that w.r.t. the compact data layout the DMZ parser reads the values into the
2598 // DmrppArray at the time it is parsed and the read flag is then set. Thus, the compact layout solution
2599 // does not explicitly appear in this method as it is handled by the parser.
2600 if (read_p()) return true;
2601
2602#if 0
2603 // Here we need to reset the dio_flag to false for the time being before calling the method use_direct_io_opt()
2604 // since the dio_flag may be set to true for reducing the memory usage with a temporary solution.
2605 // TODO: we need to reset the direct io flag to false and change back in the future. KY 2023-11-29
2606 this->set_dio_flag(false);
2607
2608 // Add direct_io offset for each chunk. This will be used to retrieve individal buffer at fileout netCDF.
2609 // Direct io offset is only necessary when the direct IO operation is possible.
2610 if (this->use_direct_io_opt()) {
2611
2612 this->set_dio_flag();
2613 auto chunks = this->get_chunks();
2614
2615 // Need to provide the offset of a chunk in the final data buffer.
2616 for (unsigned int i = 0; i<chunks.size();i++) {
2617 if (i > 0)
2618 chunks[i]->set_direct_io_offset(chunks[i-1]->get_direct_io_offset()+chunks[i-1]->get_size());
2619 BESDEBUG(MODULE, prolog << "direct_io_offset is: " << chunks[i]->get_direct_io_offset() << endl);
2620 }
2621
2622 // Fill in the chunk information so that the fileout netcdf can retrieve.
2623 Array::var_storage_info dmrpp_vs_info;
2624 dmrpp_vs_info.filter = this->get_filters();
2625
2626 // Provide the deflate compression levels.
2627 for (const auto &def_lev:this->get_deflate_levels())
2628 dmrpp_vs_info.deflate_levels.push_back(def_lev);
2629
2630 // Chunk dimension sizes.
2631 for (const auto &chunk_dim:this->get_chunk_dimension_sizes())
2632 dmrpp_vs_info.chunk_dims.push_back(chunk_dim);
2633
2634 // Provide chunk offset/length etc.
2635 auto im_chunks = this->get_immutable_chunks();
2636 for (const auto &chunk:im_chunks) {
2637 Array::var_chunk_info_t vci_t;
2638 vci_t.filter_mask = chunk->get_filter_mask();
2639 vci_t.chunk_direct_io_offset = chunk->get_direct_io_offset();
2640 vci_t.chunk_buffer_size = chunk->get_size();
2641
2642 for (const auto &chunk_coord:chunk->get_position_in_array())
2643 vci_t.chunk_coords.push_back(chunk_coord);
2644 dmrpp_vs_info.var_chunk_info.push_back(vci_t);
2645 }
2646 this->set_var_storage_info(dmrpp_vs_info);
2647 }
2648#endif
2649
2650 if (this->get_dio_flag()) {
2651 BESDEBUG(MODULE, prolog << "dio is turned on" << endl);
2652
2653 Array::var_storage_info dmrpp_vs_info = this->get_var_storage_info();
2654
2655 auto chunks = this->get_chunks();
2656
2657 // Need to provide the offset of a chunk in the final data buffer.
2658 for (unsigned int i = 0; i<chunks.size();i++) {
2659 if (i > 0)
2660 chunks[i]->set_direct_io_offset(chunks[i-1]->get_direct_io_offset()+chunks[i-1]->get_size());
2661 BESDEBUG(MODULE, prolog << "direct_io_offset is: " << chunks[i]->get_direct_io_offset() << endl);
2662 }
2663
2664 // Fill in the chunk information so that the fileout netcdf can retrieve.
2665 // Provide chunk offset/length etc.
2666 auto im_chunks = this->get_immutable_chunks();
2667 for (const auto &chunk:im_chunks) {
2668 Array::var_chunk_info_t vci_t;
2669 vci_t.filter_mask = chunk->get_filter_mask();
2670 vci_t.chunk_direct_io_offset = chunk->get_direct_io_offset();
2671 vci_t.chunk_buffer_size = chunk->get_size();
2672
2673 for (const auto &chunk_coord:chunk->get_position_in_array())
2674 vci_t.chunk_coords.push_back(chunk_coord);
2675 dmrpp_vs_info.var_chunk_info.push_back(vci_t);
2676 }
2677 this->set_var_storage_info(dmrpp_vs_info);
2678 bytes_per_element = this->var()->width_ll();
2679 }
2680 else {
2681 is_readable_struct = check_struct_handling();
2682 if (is_readable_struct) {
2683 vector<unsigned int> s_off = this->get_struct_offsets();
2684 if (s_off.empty())
2685 bytes_per_element = this->var()->width_ll();
2686 else
2687 bytes_per_element = s_off.back();
2688 }
2689 else
2690 bytes_per_element = this->var()->width_ll();
2691 }
2692
2693
2694 DmrppArray *array_to_read = this;
2695 if ((var_type == dods_str_c || var_type == dods_url_c)) {
2696 if (is_flsa()) {
2697 // For fixed length string we use a proxy array of Byte to retrieve the data.
2698 array_to_read = get_as_byte_array(*this);
2699 }
2700 }
2701 try {
2702 if(BESDebug::IsSet(MODULE)) {
2703 string msg = array_to_str(*array_to_read, "Reading Data From DmrppArray");
2704 BESDEBUG(MODULE, prolog << msg << endl);
2705 }
2706 // Single chunk and 'contiguous' are the same for this code.
2707 if (array_to_read->get_chunks_size() == 1) {
2708 BESDEBUG(MODULE, prolog << "Reading data from a single contiguous chunk." << endl);
2709 // KENT: here we need to add the handling of direct chunk IO for one chunk.
2710 if (this->get_dio_flag())
2711 array_to_read->read_one_chunk_dio();
2712 else
2713 array_to_read->read_contiguous(); // Throws on various errors
2714 }
2715 else { // Handle the more complex case where the data is chunked.
2716 if (get_using_linked_block()) {
2717 BESDEBUG(MODULE, prolog << "Reading data linked blocks" << endl);
2718 if (!array_to_read->is_projected()) {
2719 array_to_read->read_linked_blocks();
2720 }
2721 else {
2722 array_to_read->read_linked_blocks_constrained();
2723#if 0
2724 throw BESInternalFatalError(prolog + "Not support data subset when linked blocks are used. ",
2725 __FILE__, __LINE__);
2726#endif
2727
2728 }
2729 }
2730 else if (is_multi_linked_blocks_chunk()) {
2731 if (!array_to_read->is_projected()) {
2732 array_to_read->read_chunks_with_linked_blocks();
2733 }
2734 else {
2735 array_to_read->read_chunks_with_linked_blocks_constrained();
2736#if 0
2737 throw BESInternalFatalError(prolog + "Not support data subset when linked blocks are used. ",
2738 __FILE__, __LINE__);
2739#endif
2740 }
2741 }
2742 else {
2743
2744 bool buffer_chunk_case = array_to_read->use_buffer_chunk();
2745
2746 if (!array_to_read->is_projected()) {
2747 BESDEBUG(MODULE, prolog << "Reading data from chunks, unconstrained." << endl);
2748 // KENT: Only here we need to consider the direct buffer IO.
2749 if (this->get_dio_flag())
2750 array_to_read->read_chunks_dio_unconstrained();
2751 // Also buffer chunks for the non-contiguous chunk case.
2752 else if(buffer_chunk_case)
2753 array_to_read->read_buffer_chunks_unconstrained();
2754 else
2755 array_to_read->read_chunks_unconstrained();
2756 } else {
2757 BESDEBUG(MODULE, prolog << "Reading data from chunks." << endl);
2758
2759 // Also buffer chunks for the non-contiguous chunk case.
2760 if (buffer_chunk_case)
2761 array_to_read->read_buffer_chunks();
2762 else
2763 array_to_read->read_chunks();
2764 }
2765 }
2766 }
2767
2768 if ((var_type == dods_str_c || var_type == dods_url_c)) {
2769 BESDEBUG(MODULE, prolog << "Processing Array of Strings." << endl);
2770 if(array_to_read == this){
2771 throw BESInternalFatalError(prolog + "Server encountered internal state conflict. "
2772 "Expected byte transport array. Exiting.",
2773 __FILE__, __LINE__);
2774 }
2775
2776 if (is_flsa()) {
2777 ingest_flsa_data(*this, *array_to_read);
2778 }
2779 else {
2780 BESDEBUG(MODULE, prolog << "Processing Variable Length String Array data. SKIPPING..." << endl);
2781#if 0 // @TODO Turn this on...
2782 ingest_vlsa_data(*this, *array_to_read);
2783#else
2784 throw BESInternalError("Arrays of variable length strings are not yet supported.",__FILE__,__LINE__);
2785#endif
2786 }
2787 }
2788 if(array_to_read && array_to_read != this) {
2789 delete array_to_read;
2790 array_to_read = nullptr;
2791 }
2792
2793 }
2794 catch(...){
2795 if(array_to_read && array_to_read != this) {
2796 delete array_to_read;
2797 array_to_read = nullptr;
2798 }
2799 throw;
2800 }
2801
2802 if (this->twiddle_bytes()) {
2803
2804 int64_t num = this->length_ll();
2805
2806 switch (var_type) {
2807 case dods_int16_c:
2808 case dods_uint16_c: {
2809 auto *local = reinterpret_cast<dods_uint16*>(this->get_buf());
2810 while (num--) {
2811 *local = bswap_16(*local);
2812 local++;
2813 }
2814 break;
2815 }
2816 case dods_int32_c:
2817 case dods_uint32_c: {
2818 auto *local = reinterpret_cast<dods_uint32*>(this->get_buf());;
2819 while (num--) {
2820 *local = bswap_32(*local);
2821 local++;
2822 }
2823 break;
2824 }
2825 case dods_int64_c:
2826 case dods_uint64_c: {
2827 auto *local = reinterpret_cast<dods_uint64*>(this->get_buf());;
2828 while (num--) {
2829 *local = bswap_64(*local);
2830 local++;
2831 }
2832 break;
2833 }
2834 case dods_float32_c: {
2835 swap_float32(this->get_buf(), num);
2836 break;
2837 }
2838 case dods_float64_c: {
2839 swap_float64(this->get_buf(), num);
2840 break;
2841 }
2842 default: break; // Do nothing for all other types.
2843 }
2844 }
2845
2846 return true;
2847}
2848
2849unsigned long long DmrppArray::set_fixed_string_length(const string &length_str)
2850{
2851 try {
2852 d_fixed_str_length = stoull(length_str);
2853 }
2854 catch(std::invalid_argument e){
2855 stringstream err_msg;
2856 err_msg << "The value of the length string could not be parsed. Message: " << e.what();
2857 throw BESInternalError(err_msg.str(),__FILE__,__LINE__);
2858 }
2859 return d_fixed_str_length;
2860}
2861
2862
2863std::string pad_to_str(string_pad_type pad)
2864{
2865 string pad_str;
2866 switch(pad){
2867 case null_term:
2868 pad_str = "null_term";
2869 break;
2870 case null_pad:
2871 pad_str = "null_pad";
2872 break;
2873 case space_pad:
2874 pad_str = "space_pad";
2875 break;
2876 case not_set:
2877 pad_str = "not_set";
2878 break;
2879 default:
2880 throw BESInternalError("ERROR: Unrecognized HDF5 String Padding Scheme!",__FILE__,__LINE__);
2881 break;
2882 }
2883 return pad_str;
2884}
2885
2886
2887std::string DmrppArray::pad_type_to_str(string_pad_type pad)
2888{
2889 return pad_to_str(pad);
2890}
2891
2892string_pad_type str_to_pad_type(const string &pad_str){
2893 string_pad_type pad_type(not_set);
2894 if(pad_str=="null_pad"){
2895 pad_type = null_pad;
2896 }
2897 else if(pad_str=="null_term") {
2898 pad_type = null_term;
2899 }
2900 else if (pad_str == "space_pad"){
2901 pad_type = space_pad;
2902 }
2903 else if (pad_str == "not_set"){
2904 pad_type = not_set;
2905 }
2906 else {
2907 stringstream err_msg;
2908 err_msg << "The value of the pad string was not recognized. pad_str: " << pad_str;
2909 throw BESInternalError(err_msg.str(),__FILE__,__LINE__);
2910 }
2911 return pad_type;
2912}
2913
2914
2915string_pad_type DmrppArray::set_fixed_length_string_pad_type(const string &pad_str)
2916{
2917 d_fixed_length_string_pad_type = str_to_pad_type(pad_str);
2918 return d_fixed_length_string_pad_type;
2919}
2920
2921
2922ons::ons(const std::string &ons_pair_str) {
2923 const string colon(":");
2924 size_t colon_pos = ons_pair_str.find(colon);
2925
2926 string offset_str = ons_pair_str.substr(0, colon_pos);
2927 offset = stoull(offset_str);
2928
2929 string size_str = ons_pair_str.substr(colon_pos + 1);
2930 size = stoull(size_str);
2931}
2932
2933
2934void DmrppArray::set_ons_string(const std::string &ons_str)
2935{
2936 d_vlen_ons_str = ons_str;
2937}
2938
2939void DmrppArray::set_ons_string(const vector<ons> &ons_pairs)
2940{
2941 stringstream ons_ss;
2942 bool first = true;
2943 for(auto &ons_pair: ons_pairs){
2944 if(!first){
2945 ons_ss << ",";
2946 }
2947 ons_ss << ons_pair.offset << ":" << ons_pair.size;
2948 }
2949 d_vlen_ons_str = ons_ss.str();
2950}
2951
2952
2960void DmrppArray::get_ons_objs(vector<ons> &ons_pairs)
2961{
2962 const string comma(",");
2963 size_t last = 0;
2964 size_t next = 0;
2965
2966 while ((next = d_vlen_ons_str.find(comma, last)) != string::npos) {
2967 string ona_pair_str = d_vlen_ons_str.substr(last, next-last);
2968 ons ons_pair(ona_pair_str);
2969 ons_pairs.push_back(ons_pair);
2970 last = next + 1;
2971 }
2972 // @TODO - Inspect this once we are doing the real implementation
2973 // and make sure the "tail" is handled correctly.
2974 cout << d_vlen_ons_str.substr(last) << endl;
2975}
2976
2983void flsa_xml_element(XMLWriter &xml, DmrppArray &a){
2984
2985 string element_name("dmrpp:FixedLengthStringArray");
2986 string str_len_attr_name("string_length");
2987 string pad_attr_name("pad");
2988
2989 if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) element_name.c_str()) < 0)
2990 throw InternalErr(__FILE__, __LINE__, "Could not write " + element_name + " element");
2991
2992 stringstream strlen_str;
2993 strlen_str << a.get_fixed_string_length();
2994 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) str_len_attr_name.c_str(),
2995 (const xmlChar *) strlen_str.str().c_str()) < 0)
2996 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for 'string_length'");
2997
2998 if (a.get_fixed_length_string_pad() == not_set) {
2999 throw BESInternalError("ERROR: Padding Scheme Has Not Been Set!", __FILE__, __LINE__);
3000 }
3001 string pad_str = a.pad_type_to_str(a.get_fixed_length_string_pad());
3002 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "pad",
3003 (const xmlChar *) pad_str.c_str()) < 0)
3004 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for 'pad'");
3005
3006 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
3007 throw InternalErr(__FILE__, __LINE__, "Could not end " + a.type_name() + " element");
3008}
3009
3010
3016void compact_data_xml_element(XMLWriter &xml, DmrppArray &a) {
3017 switch (a.var()->type()) {
3018 case dods_byte_c:
3019 case dods_char_c:
3020 case dods_int8_c:
3021 case dods_uint8_c:
3022 case dods_int16_c:
3023 case dods_uint16_c:
3024 case dods_int32_c:
3025 case dods_uint32_c:
3026 case dods_int64_c:
3027 case dods_uint64_c:
3028
3029 case dods_enum_c:
3030
3031 case dods_float32_c:
3032 case dods_float64_c: {
3033 uint8_t *values = nullptr;
3034 try {
3035 auto size = a.buf2val(reinterpret_cast<void **>(&values));
3036 string encoded = base64::Base64::encode(values, size);
3037 a.print_compact_element(xml, DmrppCommon::d_ns_prefix, encoded);
3038 delete[] values;
3039 }
3040 catch (...) {
3041 delete[] values;
3042 throw;
3043 }
3044 break;
3045 }
3046
3047 case dods_str_c:
3048 case dods_url_c: {
3049 auto sb = a.compact_str_buffer();
3050 if(!sb.empty()) {
3051 uint8_t *values = nullptr;
3052 try {
3053 auto size = a.buf2val(reinterpret_cast<void **>(&values));
3054 string encoded = base64::Base64::encode(values, size);
3055 a.print_compact_element(xml, DmrppCommon::d_ns_prefix, encoded);
3056 delete[] values;
3057 }
3058 catch (...) {
3059 delete[] values;
3060 throw;
3061 }
3062 }
3063 break;
3064 }
3065
3066 default:
3067 throw InternalErr(__FILE__, __LINE__, "Vector::val2buf: bad type");
3068 }
3069}
3070
3071bool obtain_compress_encode_data(size_t num_elms, string &encoded_str, const Bytef*source_data,size_t source_data_size, string &err_msg) {
3072
3073 if (num_elms == 1) {
3074 encoded_str = base64::Base64::encode(source_data,(int)source_data_size);
3075 }
3076 else {
3077 auto ssize = (uLong)source_data_size;
3078 auto csize = (uLongf)ssize*2;
3079 vector<Bytef> compressed_src;
3080 compressed_src.resize(source_data_size*2);
3081
3082 int retval = compress(compressed_src.data(), &csize, source_data, ssize);
3083 if (retval != 0) {
3084 err_msg = "Fail to compress the data";
3085 return false;
3086 }
3087 encoded_str = base64::Base64::encode(compressed_src.data(),(int)csize);
3088 }
3089
3090 return true;
3091
3092}
3093
3094void missing_data_xml_element(const XMLWriter &xml, DmrppArray *da) {
3095 switch (da->var()->type()) {
3096 case dods_byte_c:
3097 case dods_char_c:
3098 case dods_int8_c:
3099 case dods_uint8_c:
3100 case dods_int16_c:
3101 case dods_uint16_c:
3102 case dods_int32_c:
3103 case dods_uint32_c:
3104 case dods_int64_c:
3105 case dods_uint64_c:
3106 case dods_float32_c:
3107 case dods_float64_c: {
3108 auto source_data_src = (const Bytef *) (da->get_buf());
3109
3110 size_t source_data_size = da->width_ll();
3111 string encoded_str;
3112 string err_msg;
3113 if (false == obtain_compress_encode_data(da->get_size(false),encoded_str,source_data_src,source_data_size,err_msg)) {
3114 err_msg = "variable name: " + da->name() + " "+err_msg;
3115 throw InternalErr(__FILE__, __LINE__, err_msg);
3116 }
3117
3118 da->print_missing_data_element(xml, DmrppCommon::d_ns_prefix, encoded_str);
3119 break;
3120 }
3121
3122 default:
3123 throw InternalErr(__FILE__, __LINE__, "Vector::val2buf: bad type");
3124 }
3125}
3126
3127void special_structure_array_data_xml_element(const XMLWriter &xml, DmrppArray *da) {
3128
3129 if (da->var()->type() == dods_structure_c) {
3130 vector<char> struct_array_str_buf = da->get_structure_array_str_buffer();
3131 string final_encoded_str = base64::Base64::encode((uint8_t*)(struct_array_str_buf.data()),struct_array_str_buf.size());
3132 da->print_special_structure_element(xml, DmrppCommon::d_ns_prefix, final_encoded_str);
3133 }
3134
3135}
3136
3143static void print_dap4_dimension_helper(const XMLWriter &xml, bool constrained, const Array::dimension &d) {
3144 // This duplicates code in D4Dimensions (where D4Dimension::print_dap4() is defined
3145 // because of the need to print the constrained size of a dimension). I think that
3146 // the constraint information has to be kept here and not in the dimension (since they
3147 // are shared dims). Could hack print_dap4() to take the constrained size, however.
3148 if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) "Dim") < 0)
3149 throw InternalErr(__FILE__, __LINE__, "Could not write Dim element");
3150
3151 string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
3152 // If there is a name, there must be a Dimension (named dimension) in scope
3153 // so write its name but not its size.
3154 if (!constrained && !name.empty()) {
3155 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name",
3156 (const xmlChar *) name.c_str()) < 0)
3157 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
3158 }
3159 else if (d.use_sdim_for_slice) {
3160 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name",
3161 (const xmlChar *) name.c_str()) < 0)
3162 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
3163 }
3164 else {
3165 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "size",
3166 (const xmlChar *) (constrained ? to_string(d.c_size) : to_string(
3167 d.size)).c_str()) < 0)
3168 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
3169 }
3170
3171 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
3172 throw InternalErr(__FILE__, __LINE__, "Could not end Dim element");
3173}
3174
3198void DmrppArray::print_dap4(XMLWriter &xml, bool constrained /*false*/) {
3199 if (constrained && !send_p()) return;
3200
3201 if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) var()->type_name().c_str()) < 0)
3202 throw InternalErr(__FILE__, __LINE__, "Could not write " + type_name() + " element");
3203
3204 if (!name().empty()) {
3205 BESDEBUG(MODULE, prolog << "variable full path: " << FQN() << endl);
3206 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name", (const xmlChar *) name().c_str()) <
3207 0)
3208 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
3209 }
3210
3211 // Hack job... Copied from D4Enum::print_xml_writer. jhrg 11/12/13
3212 if (var()->type() == dods_enum_c) {
3213 D4Enum *e = static_cast<D4Enum *>(var());
3214 string path = e->enumeration()->name();
3215 if (e->enumeration()->parent()) {
3216 // print the FQN for the enum def; D4Group::FQN() includes the trailing '/'
3217 path = static_cast<D4Group *>(e->enumeration()->parent()->parent())->FQN() + path;
3218 }
3219 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "enum", (const xmlChar *) path.c_str()) < 0)
3220 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for enum");
3221 }
3222
3223 if (prototype()->is_constructor_type()) {
3224 Constructor &c = static_cast<Constructor &>(*prototype());
3225 for_each(c.var_begin(), c.var_end(), [&xml, constrained](BaseType *btp) { btp->print_dap4(xml, constrained); });
3226 }
3227
3228 // Drop the local_constraint which is per-array and use a per-dimension on instead
3229 for_each(dim_begin(), dim_end(), [&xml, constrained](const Array::dimension &d) {
3230 print_dap4_dimension_helper(xml, constrained, d);
3231 });
3232
3233 attributes()->print_dap4(xml);
3234
3235 for_each(maps()->map_begin(), maps()->map_end(), [&xml](D4Map *m) { m->print_dap4(xml); });
3236
3237 // Only print the chunks' info if there. This is the code added to libdap::Array::print_dap4().
3238 // jhrg 5/10/18
3239 // Update: print the <chunks> element even if the chinks_size value is zero since this
3240 // might be a variable with all fill values. jhrg 4/24/22
3243
3244 // If this variable uses the COMPACT layout, encode the values for
3245 // the array using base64. Note that strings are a special case; each
3246 // element of the array is a string and is encoded in its own base64
3247 // xml element. So, while an array of 10 int32 will be encoded in a
3248 // single base64 element, an array of 10 strings will use 10 base64
3249 // elements. This is because the size of each string's value is different.
3250 // Not so for an int32.
3251 if (DmrppCommon::d_print_chunks && is_compact_layout() && read_p()) {
3252 compact_data_xml_element(xml, *this);
3253 }
3254
3255 if (DmrppCommon::d_print_chunks && is_missing_data() && read_p()) {
3256 missing_data_xml_element(xml, this);
3257 }
3258
3259 // Special structure string array.
3260 if (DmrppCommon::d_print_chunks && get_special_structure_flag() && read_p()) {
3261 special_structure_array_data_xml_element(xml, this);
3262 }
3263
3264
3265
3266 // Is it an array of strings? Those have issues so we treat them special.
3267 if (var()->type() == dods_str_c) {
3268 if (is_flsa() && DmrppCommon::d_print_chunks) {
3269
3270 // Write the dmr++ for Fix Length String Array
3271 flsa_xml_element(xml, *this);
3272 }
3273 else if (is_vlsa() && DmrppCommon::d_print_chunks) {
3274 // Write the dmr++ for Variable Length String Array
3275 vlsa::write(xml, *this);
3276 }
3277 }
3278 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
3279 throw InternalErr(__FILE__, __LINE__, "Could not end " + type_name() + " element");
3280}
3281
3282void DmrppArray::dump(ostream &strm) const
3283{
3284 strm << BESIndent::LMarg << "DmrppArray::" << __func__ << "(" << (void *) this << ")" << endl;
3285 BESIndent::Indent();
3286 DmrppCommon::dump(strm);
3287 Array::dump(strm);
3288 strm << BESIndent::LMarg << "value: " << "----" << /*d_buf <<*/endl;
3289 BESIndent::UnIndent();
3290}
3291
3292unsigned int DmrppArray::buf2val(void **val){
3293
3294 if (!val) {
3295 throw BESInternalError("NULL pointer encountered.", __FILE__, __LINE__);
3296 }
3297 if ( var()->type()==dods_str_c || var()->type()==dods_url_c ) {
3298
3299 auto str_buf = compact_str_buffer();
3300 auto buf_size = str_buf.size();
3301 if (str_buf.empty()) {
3302 stringstream msg;
3303 msg << prolog << "Logic error: called when cardinal type data buffer was empty!";
3304 throw BESInternalError(msg.str(), __FILE__, __LINE__);
3305 }
3306 if (!*val) {
3307 *val = new char[buf_size];
3308 }
3309 memcpy(*val, str_buf.data(), buf_size);
3310 return buf_size;
3311 } else {
3312 return (unsigned int)Vector::buf2val_ll(val);
3313 }
3314}
3315
3316// Check if direct chunk IO can be used.
3317bool DmrppArray::use_direct_io_opt() {
3318
3319 bool ret_value = false;
3320 bool is_integer_le_float = false;
3321
3322 if (DmrppRequestHandler::is_netcdf4_enhanced_response && this->is_filters_empty() == false) {
3323 Type t = this->var()->type();
3324 if (libdap::is_simple_type(t) && t != dods_str_c && t != dods_url_c && t!= dods_enum_c && t!=dods_opaque_c) {
3325 is_integer_le_float = true;
3326 if(is_integer_type(t) && this->get_byte_order() =="BE")
3327 is_integer_le_float = false;
3328 }
3329 }
3330
3331 bool no_constraint = false;
3332
3333 // Check if it requires a subset of this variable.
3334 if (is_integer_le_float) {
3335 no_constraint = true;
3336 if (this->is_projected())
3337 no_constraint = false;
3338 }
3339
3340 bool has_deflate_filter = false;
3341
3342 // Check if having the deflate filters.
3343 if (no_constraint) {
3344 string filters_string = this->get_filters();
3345 if (filters_string.find("deflate")!=string::npos)
3346 has_deflate_filter = true;
3347 }
3348
3349 bool is_data_all_fvalues = false;
3350 // This is the check for a rare case: the variable data just contains the filled values.
3351 // If this var's storage size is 0. Then it should be filled with the filled values.
3352 if (has_deflate_filter && this->get_uses_fill_value() && this->get_var_chunks_storage_size() == 0)
3353 is_data_all_fvalues = true;
3354
3355 bool has_dio_filters = false;
3356
3357 // If the deflate level is not provided, we cannot do the direct IO.
3358 if (has_deflate_filter && !is_data_all_fvalues) {
3359 if (this->get_deflate_levels().empty() == false)
3360 has_dio_filters = true;
3361 }
3362
3363 // Check if the chunk size is greater than the dimension size for any dimension.
3364 // If this is the case, we will not use the direct chunk IO since netCDF-4 doesn't allow this.
3365 // TODO later, if the dimension is unlimited, this restriction can be lifted. Current dmrpp doesn't store the
3366 // unlimited dimension information.
3367
3368 if (has_dio_filters && this->get_processing_fv_chunks() == false) {
3369
3370 vector <unsigned long long>chunk_dim_sizes = this->get_chunk_dimension_sizes();
3371 vector <unsigned long long>dim_sizes;
3372 Dim_iter p = dim_begin();
3373 while (p != dim_end()) {
3374 dim_sizes.push_back((unsigned long long)dimension_size_ll(p));
3375 p++;
3376 }
3377
3378 bool chunk_less_dim = true;
3379 if (chunk_dim_sizes.size() == dim_sizes.size()) {
3380 for (unsigned int i = 0; i<dim_sizes.size(); i++) {
3381 if (chunk_dim_sizes[i] > dim_sizes[i]) {
3382 chunk_less_dim = false;
3383 break;
3384 }
3385 }
3386 }
3387 else
3388 chunk_less_dim = false;
3389
3390 ret_value = chunk_less_dim;
3391 }
3392
3393 return ret_value;
3394
3395}
3396
3397// Read the data from the supported array of structure
3398void DmrppArray::read_array_of_structure(vector<char> &values) {
3399
3400 size_t values_offset = 0;
3401 int64_t nelms = this->length_ll();
3402 if (this->twiddle_bytes())
3403 BESDEBUG(dmrpp_3, prolog << "swap bytes " << endl);
3404
3405 vector<unsigned int> s_offs = this->get_struct_offsets();
3406
3407 for (int64_t element = 0; element < nelms; ++element) {
3408
3409 auto dmrpp_s = dynamic_cast<DmrppStructure*>(var()->ptr_duplicate());
3410 if(!dmrpp_s)
3411 throw InternalErr(__FILE__, __LINE__, "Cannot obtain the structure pointer.");
3412 try {
3413 dmrpp_s->set_struct_offsets(s_offs);
3414 dmrpp_s->structure_read(values,values_offset, this->twiddle_bytes());
3415 }
3416 catch(...) {
3417 delete dmrpp_s;
3418 string err_msg = "Cannot read the data of a dmrpp structure variable " + var()->name();
3419 throw InternalErr(__FILE__, __LINE__, err_msg);
3420 }
3421 dmrpp_s->set_read_p(true);
3422 set_vec_ll((uint64_t)element,dmrpp_s);
3423 delete dmrpp_s;
3424 }
3425
3426 set_read_p(true);
3427
3428}
3429
3430// Check if this DAP4 structure is what we can support.
3431bool DmrppArray::check_struct_handling() {
3432
3433 bool ret_value = true;
3434
3435 if (this->var()->type() == dods_structure_c) {
3436
3437 auto array_base = dynamic_cast<DmrppStructure*>(this->var());
3438 Constructor::Vars_iter vi = array_base->var_begin();
3439 Constructor::Vars_iter ve = array_base->var_end();
3440 for (; vi != ve; vi++) {
3441
3442 BaseType *bt = *vi;
3443 Type t_bt = bt->type();
3444
3445 // Only support array or scalar of float/int.
3446 if (libdap::is_simple_type(t_bt) == false) {
3447
3448 if (t_bt == dods_array_c) {
3449
3450 auto t_a = dynamic_cast<Array *>(bt);
3451 Type t_array_var = t_a->var()->type();
3452 if (!libdap::is_simple_type(t_array_var) || t_array_var == dods_str_c || t_array_var == dods_url_c || t_array_var == dods_enum_c || t_array_var==dods_opaque_c) {
3453 ret_value = false;
3454 break;
3455 }
3456 }
3457 }
3458 else if (t_bt == dods_str_c || t_bt == dods_url_c || t_bt == dods_enum_c || t_bt == dods_opaque_c) {
3459 ret_value = false;
3460 break;
3461 }
3462 }
3463 }
3464 else
3465 ret_value = false;
3466
3467 return ret_value;
3468}
3469
3470void DmrppArray::read_buffer_chunks_unconstrained() {
3471
3472 BESDEBUG(dmrpp_3, prolog << "coming to read_buffer_chunks_unconstrained() " << endl);
3473
3474 // Here we can adjust the buffer size as needed, for now we just use the whole array size as the starting point.
3475 unsigned long long buffer_size = bytes_per_element * this->get_size(false);
3476
3477 if (get_chunks_size() < 2)
3478 throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
3479
3480 // Follow the general superchunk way.
3481 unsigned long long sc_count=0;
3482 stringstream sc_id;
3483 sc_count++;
3484 sc_id << name() << "-" << sc_count;
3485 queue<shared_ptr<SuperChunk>> super_chunks;
3486 auto current_super_chunk = std::make_shared<SuperChunk>(sc_id.str(), this) ;
3487
3488 // Set the non-contiguous chunk flag
3489 current_super_chunk->set_non_contiguous_chunk_flag(true);
3490 super_chunks.push(current_super_chunk);
3491
3492 auto array_chunks = get_immutable_chunks();
3493
3494 // Make sure buffer_size at least can hold one chunk.
3495 if (buffer_size < (array_chunks[0])->get_size())
3496 buffer_size = (array_chunks[0])->get_size();
3497
3498 unsigned long long max_buffer_end_position = 0;
3499
3500 // For highly compressed chunks, we need to make sure the buffer_size is not too big to exceed the file size.
3501 // For this variable we also need to find the maximum value of the end position of all the chunks.
3502 for (const auto &chunk: array_chunks) {
3503 unsigned long long temp_max_buffer_end_position= chunk->get_size() + chunk->get_offset();
3504 if(max_buffer_end_position < temp_max_buffer_end_position)
3505 max_buffer_end_position = temp_max_buffer_end_position;
3506 }
3507
3508 // The end position of the buffer should not exceed the max_buffer_end_position.
3509 unsigned long long buffer_end_position = min((buffer_size + (array_chunks[0])->get_offset()),max_buffer_end_position);
3510
3511 BESDEBUG(dmrpp_3, prolog << "variable name: " << this->name() <<endl);
3512 BESDEBUG(dmrpp_3, prolog << "maximum buffer_end_position: " << max_buffer_end_position <<endl);
3513
3514 // Make the SuperChunks using all the chunks.
3515 for(const auto& chunk: get_immutable_chunks()) {
3516 bool added = current_super_chunk->add_chunk_non_contiguous(chunk,buffer_end_position);
3517 if (!added) {
3518 sc_id.str(std::string());
3519 sc_count++;
3520 sc_id << name() << "-" << sc_count;
3521 current_super_chunk = std::make_shared<SuperChunk>(sc_id.str(), this);
3522 // We need to mark this superchunk includes non-contiguous chunks.
3523 current_super_chunk->set_non_contiguous_chunk_flag(true);
3524 super_chunks.push(current_super_chunk);
3525
3526 // Here we need to make sure buffer_size is not too small although this rarely happens.
3527 if (buffer_size < chunk->get_size())
3528 buffer_size = chunk->get_size();
3529 buffer_end_position = min((buffer_size + chunk->get_offset()),max_buffer_end_position);
3530 if (!current_super_chunk->add_chunk_non_contiguous(chunk,buffer_end_position)) {
3531 stringstream msg ;
3532 msg << prolog << "Failed to add Chunk to new SuperChunk for non-contiguous chunks. chunk: " << chunk->to_string();
3533 throw BESInternalError(msg.str(), __FILE__, __LINE__);
3534 }
3535 }
3536 }
3537
3538 reserve_value_capacity_ll(get_size());
3539
3540 while(!super_chunks.empty()) {
3541 auto super_chunk = super_chunks.front();
3542 super_chunks.pop();
3543 super_chunk->read_unconstrained();
3544 }
3545
3546 set_read_p(true);
3547}
3548
3549bool DmrppArray::use_buffer_chunk() {
3550
3551 bool ret_value = false;
3552 auto chunks = this->get_chunks();
3553
3554 // For our use case, we only need to check if the first chunk and the second chunk are adjacent.
3555 // To make the process clear and simple, we don't handle structure data.
3556 if (chunks.size() >1 && this->var()->type() !=dods_structure_c){
3557 unsigned long long first_chunk_offset = (chunks[0])->get_offset();
3558 unsigned long long first_chunk_size = (chunks[0])->get_size();
3559 unsigned long long second_chunk_offset = (chunks[1])->get_offset();
3560 if ((first_chunk_offset + first_chunk_size) != second_chunk_offset)
3561 ret_value = true;
3562 }
3563
3564 return ret_value;
3565}
3566} // namespace dmrpp
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition BESDebug.h:145
exception thrown if internal error encountered
exception thrown if an internal error is found and is fatal to the BES
Extend libdap::Array so that a handler can read data using a DMR++ file.
Definition DmrppArray.h:77
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
void get_ons_objs(vector< ons > &ons_list)
void print_dap4(libdap::XMLWriter &writer, bool constrained=false) override
Shadow libdap::Array::print_dap4() - optionally prints DMR++ chunk information.
virtual unsigned long long get_size(bool constrained=false)
Return the total number of elements in this Array.
bool is_projected()
Is this Array subset?
bool read() override
Read data for the array.
static std::string d_ns_prefix
The XML namespace prefix to use.
virtual bool twiddle_bytes() const
Returns true if this object utilizes shuffle compression.
virtual bool is_multi_linked_blocks_chunk() const
Returns true if this object contains a chunk that have multiple linked blocks .
static bool d_print_chunks
if true, print_dap4() prints chunk elements
virtual bool is_compact_layout() const
Returns true if this object utilizes COMPACT layout.
virtual void load_attributes(libdap::BaseType *btp)
Load the attribute information for this variable.
virtual bool is_missing_data() const
Returns true if this object describes the missing data.
virtual bool get_chunks_loaded() const
Have the chunks been loaded?
virtual bool get_one_chunk_fill_value() const
virtual size_t get_chunks_size() const
Use this when the number of chunks is needed.
void print_chunks_element(libdap::XMLWriter &xml, const std::string &name_space="")
Print the Chunk information.
virtual const std::vector< std::shared_ptr< Chunk > > & get_immutable_chunks() const
A const reference to the vector of chunks.
virtual bool get_uses_fill_value() const
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
virtual void load_chunks(libdap::BaseType *btp)
Load chunk information for this variable.
virtual unsigned long long get_chunk_size_in_elements() const
Get the number of elements in this chunk.
virtual std::string get_filters() const
Return the names of all the filters in the order they were applied.
virtual bool get_attributes_loaded() const
Have the attributes been loaded?
Type
Type of JSON value.
Definition rapidjson.h:664