bes Updated for version 3.21.1
The Backend Server (BES) is the lower two tiers of the Hyrax data server
SuperChunk.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of the BES
4
5// Copyright (c) 2018 OPeNDAP, Inc.
6// Author: Nathan Potter<ndp@opendap.org>
7//
8// This library is free software; you can redistribute it and/or
9// modify it under the terms of the GNU Lesser General Public
10// License as published by the Free Software Foundation; either
11// version 2.1 of the License, or (at your option) any later version.
12//
13// This library is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16// Lesser General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public
19// License along with this library; if not, write to the Free Software
20// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21//
22// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23
24#include "config.h"
25
26#include <sstream>
27#include <vector>
28#include <string>
29
30#include "BESInternalError.h"
31#include "BESDebug.h"
32
33#include "DmrppRequestHandler.h"
34#include "CurlHandlePool.h"
35#include "DmrppArray.h"
36#include "DmrppNames.h"
37#include "Chunk.h"
38#include "SuperChunk.h"
39
40#define prolog std::string("SuperChunk::").append(__func__).append("() - ")
41
42#define SUPER_CHUNK_MODULE "dmrpp:3"
43
44using std::stringstream;
45using std::string;
46using std::vector;
47
48namespace dmrpp {
49
50// ThreadPool state variables.
51std::mutex chunk_processing_thread_pool_mtx; // mutex for critical section
52atomic_uint chunk_processing_thread_counter(0);
53#define COMPUTE_THREADS "compute_threads"
54
55#define DMRPP_ENABLE_THREAD_TIMERS 0
56
75void process_one_chunk(shared_ptr<Chunk> chunk, DmrppArray *array, const vector<unsigned long long> &constrained_array_shape)
76{
77 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
78
79 // TODO If this is part of a SuperChunk, hasn't the data been read by SuperChunk::Retrieve_data()?
80 // If so, calling read() here is not needed. Same question below. jhg 5/7/22
81 chunk->read_chunk();
82
83 if(array) {
84 // If this chunk used/uses hdf5 fill values, do not attempt to deflate, etc., its
85 // values since the fill value code makes the chunks 'fully formed.'' jhrg 5/16/22
86 if (!chunk->get_uses_fill_value() && !array->is_filters_empty())
87 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->get_bytes_per_element());
88
89 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
90 vector<unsigned long long> chunk_source_address(array->dimensions(), 0);
91
92 char *dest_buf = array->get_buf();
93 if (array->var()->type() == libdap::dods_structure_c) {
94 dest_buf = array->get_structure_array_buf_ptr();
95
96 }
97 array->insert_chunk(0, &target_element_address, &chunk_source_address,
98 chunk, constrained_array_shape, dest_buf);
99 }
100
101 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
102}
103
123void process_one_chunk_unconstrained(shared_ptr<Chunk> chunk, const vector<unsigned long long> &chunk_shape,
124 DmrppArray *array, const vector<unsigned long long> &array_shape)
125{
126 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
127
128 chunk->read_chunk();
129
130 if(array){
131 if (!chunk->get_uses_fill_value() && !array->is_filters_empty())
132 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->get_bytes_per_element());
133
134 array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
135 }
136
137 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
138}
139
140void process_one_chunk_unconstrained_dio(shared_ptr<Chunk> chunk, const vector<unsigned long long> &chunk_shape,
141 DmrppArray *array, const vector<unsigned long long> &array_shape)
142{
143 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
144
145 chunk->read_chunk();
146
147 if(array){
148 array->insert_chunk_unconstrained_dio(chunk);
149 }
150
151 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
152}
153
159bool one_chunk_compute_thread(unique_ptr<one_chunk_args> args)
160{
161#if DMRPP_ENABLE_THREAD_TIMERS
162 stringstream timer_tag;
163 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
164 " parent_tid: 0x" << std::hex << args->parent_thread_id << " parent_sc: " << args->parent_super_chunk_id;
165 BES_STOPWATCH_START(COMPUTE_THREADS, timer_tag.str());
166#endif
167
168 process_one_chunk(args->chunk, args->array, args->array_shape);
169 return true;
170}
171
177bool one_chunk_unconstrained_compute_thread(unique_ptr<one_chunk_unconstrained_args> args)
178{
179#if DMRPP_ENABLE_THREAD_TIMERS
180 stringstream timer_tag;
181 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
182 " parent_tid: 0x" << std::hex << args->parent_thread_id << " parent_sc: " << args->parent_super_chunk_id ;
183 BES_STOPWATCH_START(COMPUTE_THREADS, timer_tag.str());
184#endif
185
186 process_one_chunk_unconstrained(args->chunk, args->chunk_shape, args->array, args->array_shape);
187 return true;
188}
189
190bool one_chunk_unconstrained_compute_thread_dio(unique_ptr<one_chunk_unconstrained_args> args)
191{
192#if DMRPP_ENABLE_THREAD_TIMERS
193 stringstream timer_tag;
194 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
195 " parent_tid: 0x" << std::hex << args->parent_thread_id << " parent_sc: " << args->parent_super_chunk_id ;
196 BES_STOPWATCH_START(COMPUTE_THREADS, timer_tag.str());
197#endif
198
199 process_one_chunk_unconstrained_dio(args->chunk, args->chunk_shape, args->array, args->array_shape);
200 return true;
201}
212bool start_one_chunk_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_args> args) {
213 bool retval = false;
214 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
215 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << " chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
216 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
217 chunk_processing_thread_counter++;
218 futures.push_back(std::async(std::launch::async, one_chunk_compute_thread, std::move(args)));
219 retval = true;
220 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Got std::future '" << futures.size() <<
221 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
222 }
223 return retval;
224}
225
236bool start_one_chunk_unconstrained_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_unconstrained_args> args) {
237 bool retval = false;
238 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
239 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
240 futures.push_back(std::async(std::launch::async, one_chunk_unconstrained_compute_thread, std::move(args)));
241 chunk_processing_thread_counter++;
242 retval = true;
243 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Got std::future '" << futures.size() <<
244 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
245 }
246 return retval;
247}
248
249bool start_one_chunk_unconstrained_compute_thread_dio(list<std::future<bool>> &futures, unique_ptr<one_chunk_unconstrained_args> args) {
250 bool retval = false;
251 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
252 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
253 futures.push_back(std::async(std::launch::async, one_chunk_unconstrained_compute_thread_dio, std::move(args)));
254 chunk_processing_thread_counter++;
255 retval = true;
256 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Got std::future '" << futures.size() <<
257 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
258 }
259 return retval;
260}
261
262
284void process_chunks_concurrent(
285 const string &super_chunk_id,
286 queue<shared_ptr<Chunk>> &chunks,
287 DmrppArray *array,
288 const vector<unsigned long long> &constrained_array_shape ){
289
290 // We maintain a list of futures to track our parallel activities.
291 list<future<bool>> futures;
292 try {
293 bool done = false;
294 bool future_finished = true;
295 while (!done) {
296
297 if(!futures.empty())
298 future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
299
300 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
301 // because future::get() was called or a call to future::valid() returned false.
302 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
303
304 if (!chunks.empty()){
305 // Next we try to add a new Chunk compute thread if we can - there might be room.
306 bool thread_started = true;
307 while(thread_started && !chunks.empty()) {
308 auto chunk = chunks.front();
309 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Starting thread for " << chunk->to_string() << endl);
310
311 auto args = unique_ptr<one_chunk_args>(new one_chunk_args(super_chunk_id, chunk, array, constrained_array_shape));
312 thread_started = start_one_chunk_compute_thread(futures, std::move(args));
313
314 if (thread_started) {
315 chunks.pop();
316 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "STARTED thread for " << chunk->to_string() << endl);
317 } else {
318 // Thread did not start, ownership of the arguments was not passed to the thread.
319 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Thread not started. args deleted, Chunk remains in queue.) " <<
320 "chunk_processing_thread_counter: " << chunk_processing_thread_counter << " futures.size(): " << futures.size() << endl);
321 }
322 }
323 }
324 else {
325 // No more Chunks and no futures means we're done here.
326 if(futures.empty())
327 done = true;
328 }
329 future_finished = false;
330 }
331 }
332 catch (...) {
333 // Complete all of the futures, otherwise we'll have threads out there using up resources
334 while(!futures.empty()){
335 if(futures.back().valid())
336 futures.back().get();
337 futures.pop_back();
338 }
339 // re-throw the exception
340 throw;
341 }
342}
343
366void process_chunks_unconstrained_concurrent(
367 const string &super_chunk_id,
368 queue<shared_ptr<Chunk>> &chunks,
369 const vector<unsigned long long> &chunk_shape,
370 DmrppArray *array,
371 const vector<unsigned long long> &array_shape){
372
373 // We maintain a list of futures to track our parallel activities.
374 list<future<bool>> futures;
375 try {
376 bool done = false;
377 while (!done) {
378
379 if(!futures.empty())
380 get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
381
382 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
383 // because future::get() was called or a call to future::valid() returned false.
384
385 if (!chunks.empty()){
386 // Next we try to add a new Chunk compute thread if we can - there might be room.
387 bool thread_started = true;
388 while(thread_started && !chunks.empty()) {
389 auto chunk = chunks.front();
390
391 auto args = unique_ptr<one_chunk_unconstrained_args>(
392 new one_chunk_unconstrained_args(super_chunk_id, chunk, array, array_shape, chunk_shape) );
393 thread_started = start_one_chunk_unconstrained_compute_thread(futures, std::move(args));
394
395 if (thread_started) {
396 chunks.pop();
397 } else {
398 // Thread did not start, ownership of the arguments was not passed to the thread.
399 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
400 " chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
401 " futures.size(): " << futures.size() << endl);
402 }
403 }
404 }
405 else {
406 // No more Chunks and no futures means we're done here.
407 if(futures.empty())
408 done = true;
409 }
410 }
411 }
412 catch (...) {
413 // Complete all the futures, otherwise we'll have threads out there using up resources
414 while(!futures.empty()){
415 if(futures.back().valid())
416 futures.back().get();
417 futures.pop_back();
418 }
419 // re-throw the exception
420 throw;
421 }
422}
423
424//Direct IO routine for processing chunks when the variable is not constrained.
425void process_chunks_unconstrained_concurrent_dio(
426 const string &super_chunk_id,
427 queue<shared_ptr<Chunk>> &chunks,
428 const vector<unsigned long long> &chunk_shape,
429 DmrppArray *array,
430 const vector<unsigned long long> &array_shape){
431
432 // We maintain a list of futures to track our parallel activities.
433 list<future<bool>> futures;
434 try {
435 bool done = false;
436 while (!done) {
437
438 if(!futures.empty())
439 get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
440
441 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
442 // because future::get() was called or a call to future::valid() returned false.
443
444 if (!chunks.empty()){
445 // Next we try to add a new Chunk compute thread if we can - there might be room.
446 bool thread_started = true;
447 while(thread_started && !chunks.empty()) {
448 auto chunk = chunks.front();
449
450 auto args = unique_ptr<one_chunk_unconstrained_args>(
451 new one_chunk_unconstrained_args(super_chunk_id, chunk, array, array_shape, chunk_shape) );
452
453 // Call direct IO routine
454 thread_started = start_one_chunk_unconstrained_compute_thread_dio(futures, std::move(args));
455
456 if (thread_started) {
457 chunks.pop();
458 } else {
459 // Thread did not start, ownership of the arguments was not passed to the thread.
460 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
461 " chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
462 " futures.size(): " << futures.size() << endl);
463 }
464 }
465 }
466 else {
467 // No more Chunks and no futures means we're done here.
468 if(futures.empty())
469 done = true;
470 }
471 }
472 }
473 catch (...) {
474 // Complete all the futures, otherwise we'll have threads out there using up resources
475 while(!futures.empty()){
476 if(futures.back().valid())
477 futures.back().get();
478 futures.pop_back();
479 }
480 // re-throw the exception
481 throw;
482 }
483}
484
485
486//#####################################################################################################################
487//#####################################################################################################################
488//#####################################################################################################################
489//
490// SuperChunk Code Begins Here
491//
492// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
493
494// TODO There are (at least) two ways to handle 'fill value chunks.' The code can group
495// them all together as one big SuperChunk or store each FV chunk in its own SuperChunk.
496// (Of course, there are alternatives...) Using one SuperChunk is probably faster but
497// will require more work on the SuperChunk code. I think we should postpone that for now
498// to focus on getting the values correct (because that problem has yet to be solved).
499// I will add a ticket to return to this code and make that modification. jhrg 5/7/22
500//
524bool SuperChunk::add_chunk(const std::shared_ptr<Chunk> candidate_chunk) {
525 bool chunk_was_added = false;
526 if(d_chunks.empty()){
527 d_chunks.push_back(candidate_chunk);
528 d_offset = candidate_chunk->get_offset();
529 d_size = candidate_chunk->get_size();
530 // When get_uses_fill_value() is true, returns a shared_ptr<Chunk> initialized to nullptr. jhrg 5/7/22
531 d_uses_fill_value = candidate_chunk->get_uses_fill_value();
532 if (!d_uses_fill_value)
533 d_data_url = candidate_chunk->get_data_url();
534 else
535 d_data_url = nullptr;
536 chunk_was_added = true;
537 }
538 // For now, if a chunk uses fill values, it gets its own SuperChunk. jhrg 5/7/22
539 else if(!candidate_chunk->get_uses_fill_value() && is_contiguous(candidate_chunk)){
540 this->d_chunks.push_back(candidate_chunk);
541 d_size += candidate_chunk->get_size();
542 chunk_was_added = true;
543 }
544 return chunk_was_added;
545}
546
547// This method is for handling non-contiguous chunks. It follows the logic of add_chunk() but handles the non-contigous chunk case.
548bool SuperChunk::add_chunk_non_contiguous(const std::shared_ptr<Chunk> candidate_chunk, unsigned long long &buffer_end_position) {
549
550 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Coming to add_chunk_non_contiguous" << endl);
551 bool chunk_was_added = false;
552 if(d_chunks.empty()){
553 d_chunks.push_back(candidate_chunk);
554 d_offset = candidate_chunk->get_offset();
555 d_size = buffer_end_position-d_offset;
556
557 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "add_chunk_non d_size: "<<d_size<< endl);
558 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "add_chunk_non d_offset: "<<d_offset<< endl);
559
560 // When get_uses_fill_value() is true, returns a shared_ptr<Chunk> initialized to nullptr.
561 d_uses_fill_value = candidate_chunk->get_uses_fill_value();
562 if (!d_uses_fill_value)
563 d_data_url = candidate_chunk->get_data_url();
564 else
565 d_data_url = nullptr;
566 chunk_was_added = true;
567 }
568 // For now, if a chunk uses fill values, it gets its own SuperChunk.
569 else if(!candidate_chunk->get_uses_fill_value()){
570 if (candidate_chunk->get_offset() >=d_offset) {
571 // The chunk exceeds the boundary of this buffer. Create a new SuperChunk.
572 if ((candidate_chunk->get_offset() + candidate_chunk->get_size())<=buffer_end_position) {
573 this->d_chunks.push_back(candidate_chunk);
574 chunk_was_added = true;
575 }
576 }
577 }
578
579 return chunk_was_added;
580}
581
596bool SuperChunk::is_contiguous(const std::shared_ptr<Chunk> candidate_chunk) {
597 // Are the URLs the same?
598 bool contiguous = candidate_chunk->get_data_url()->str() == d_data_url->str();
599 if(contiguous){
600 // If the URLs match then see if the locations are matching
601 contiguous = (d_offset + d_size) == candidate_chunk->get_offset();
602 }
603 return contiguous;
604}
605
614void SuperChunk::map_chunks_to_buffer()
615{
616 unsigned long long bindex = 0;
617 for(const auto &chunk : d_chunks){
618 chunk->set_read_buffer(d_read_buffer + bindex, chunk->get_size(),0, false);
619 bindex += chunk->get_size();
620 if (bindex > d_size) {
621 stringstream msg;
622 msg << "ERROR The computed buffer index, " << bindex << " is larger than expected size of the SuperChunk. ";
623 msg << "d_size: " << d_size;
624 throw BESInternalError(msg.str(), __FILE__, __LINE__);
625 }
626 }
627}
628
629void SuperChunk::map_non_contiguous_chunks_to_buffer()
630{
631 unsigned long long bindex = 0;
632
633 for (unsigned i = 0; i <d_chunks.size(); i++){
634
635 (d_chunks[i])->set_read_buffer(d_read_buffer + bindex, (d_chunks[i])->get_size(),0, false);
636
637 // Need to use the next offset to figure out the starting position for this chunk buffer
638 if (i <(d_chunks.size()-1)) {
639 long long temp_bindex = (d_chunks[i+1])->get_offset()-d_offset;
640 if (temp_bindex <0)
641 throw BESInternalError("The non-contiguous chunk offset cannot be smaller than the SuperChunk offset. ", __FILE__, __LINE__);
642 bindex = (unsigned long long)temp_bindex;
643 if (bindex > d_size) {
644 stringstream msg;
645 msg << "ERROR The computed buffer index, " << bindex << " is larger than expected size of the SuperChunk. ";
646 msg << "d_size: " << d_size;
647 throw BESInternalError(msg.str(), __FILE__, __LINE__);
648 }
649 }
650 }
651}
656void SuperChunk::read_aggregate_bytes()
657{
658 // Since we already have a good infrastructure for reading Chunks, we just make a big-ol-Chunk to
659 // use for grabbing bytes. Then, once read, we'll use the child Chunks to do the dirty work of inflating
660 // and moving the results into the DmrppCommon object.
661 Chunk chunk(d_data_url, "NOT_USED", d_size, d_offset);
662
663 chunk.set_read_buffer(d_read_buffer, d_size,0,false);
664
665 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(&chunk);
666 if (!handle)
667 throw BESInternalError(prolog + "No more libcurl handles.", __FILE__, __LINE__);
668
669 try {
670 handle->read_data(); // throws if error
672 }
673 catch(...) {
675 throw;
676 }
677
678 // If the expected byte count was not read, it's an error.
679 if (d_size != chunk.get_bytes_read()) {
680 ostringstream oss;
681 oss << "Wrong number of bytes read for chunk; read: " << chunk.get_bytes_read() << ", expected: " << d_size;
682 throw BESInternalError(oss.str(), __FILE__, __LINE__);
683 }
684
685 d_is_read = true;
686}
687
695void SuperChunk::read_fill_value_chunk()
696{
697 if (d_chunks.size() != 1)
698 throw BESInternalError("Found a SuperChunk with uses_fill_value true but more than one child chunk.", __FILE__, __LINE__);
699
700 d_chunks.front()->read_chunk();
701
702 d_is_read=true;
703}
704
709 // TODO I think this code should set d_is_read. It sets it for the Chunk, which may be redundant). jhrg 5/9/22
710 if (d_is_read) {
711 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "SuperChunk (" << (void **) this << ") has already been read! Returning." << endl);
712 return;
713 }
714
715 // TODO Move this into read_aggregate_bytes(), move map_chunks_to_buffer()
716 // after read_aggregate_bytes() and modify map_chunks_to_buffer() to set
717 // the chunk size and read state so the last for loop can be removed.
718 // jhrg 5/6/22
719 if (!d_read_buffer) {
720 // Allocate memory for SuperChunk receive buffer.
721 // release memory in destructor.
722 d_read_buffer = new char[d_size];
723 }
724
725 // Massage the chunks so that their read/receive/intern data buffer
726 // points to the correct section of the d_read_buffer memory.
727 // "Slice it up!"
728 // We need to map non-contigous chunks differently.
729 if (non_contiguous_chunk)
730 map_non_contiguous_chunks_to_buffer();
731 else
732 map_chunks_to_buffer();
733
734 // Read the bytes from the target URL. (pthreads, maybe depends on size...)
735 // Use one (or possibly more) thread(s) depending on d_size
736 // and utilize our friend cURL to stuff the bytes into d_read_buffer
737 //
738 // TODO Replace or improve this way of handling fill value chunks. jhrg 5/7/22
739 if (d_uses_fill_value)
740 read_fill_value_chunk();
741 else
742 read_aggregate_bytes();
743
744 // TODO Check if Chunk::read() sets these. jhrg 5/9/22
745 // Set each Chunk's read state to true.
746 // Set each chunks byte count to the expected
747 // size for the chunk - because upstream events
748 // have assured this to be true.
749 for(const auto& chunk : d_chunks){
750 chunk->set_is_read(true);
751 chunk->set_bytes_read(chunk->get_size());
752 }
753}
754
755// Direct chunk IO routine for retrieve_data, it clones from retrieve_data(). To ensure
756// the regular operations. Still use a separate method.
757void SuperChunk::retrieve_data_dio() {
758
759 // Leave this comment copied from retrieve_data_dio().
760 // TODO I think this code should set d_is_read. It sets it for the Chunk, which may be redundant). jhrg 5/9/22
761 if (d_is_read) {
762 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "SuperChunk (" << (void **) this << ") has already been read! Returning." << endl);
763 return;
764 }
765
766 if (!d_read_buffer) {
767 // Allocate memory for SuperChunk receive buffer.
768 // release memory in destructor.
769 d_read_buffer = new char[d_size];
770 }
771
772 // Massage the chunks so that their read/receive/intern data buffer
773 // points to the correct section of the d_read_buffer memory.
774 // "Slice it up!"
775 map_chunks_to_buffer();
776
777 // Read the bytes from the target URL. (pthreads, maybe depends on size...)
778 // Use one (or possibly more) thread(s) depending on d_size
779 // and utilize our friend cURL to stuff the bytes into d_read_buffer
780 //
781 // TODO Replace or improve this way of handling fill value chunks. jhrg 5/7/22
782 read_aggregate_bytes();
783
784 // TODO Check if Chunk::read() sets these. jhrg 5/9/22
785 // Set each Chunk's read state to true.
786 // Set each chunks byte count to the expected
787 // size for the chunk - because upstream events
788 // have assured this to be true.
789 for(const auto& chunk : d_chunks){
790 chunk->set_is_read(true);
791 chunk->set_bytes_read(chunk->get_size());
792 }
793}
794
795
801 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl);
803
804 vector<unsigned long long> constrained_array_shape = d_parent_array->get_shape(true);
805 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ? "true" : "false") << endl);
806 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
807
808 if (!DmrppRequestHandler::d_use_compute_threads) {
809#if DMRPP_ENABLE_THREAD_TIMERS
810 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, prolog+"Serial Chunk Processing. id: " + d_id);
811#endif
812 for(const auto &chunk: d_chunks){
813 process_one_chunk(chunk,d_parent_array,constrained_array_shape);
814 }
815 }
816 else {
817#if DMRPP_ENABLE_THREAD_TIMERS
818 stringstream timer_tag;
819 timer_tag << prolog << "Concurrent Chunk Processing. id: " << d_id;
820 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, timer_tag.str());
821#endif
822 queue< shared_ptr<Chunk> > chunks_to_process;
823 for(const auto &chunk: d_chunks)
824 chunks_to_process.push(chunk);
825
826 process_chunks_concurrent(d_id, chunks_to_process, d_parent_array, constrained_array_shape);
827 }
828 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
829}
830
831
837
838 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl);
840
841 // The size in element of each of the array's dimensions
842 const vector<unsigned long long> array_shape = d_parent_array->get_shape(true);
843 // The size, in elements, of each of the chunk's dimensions
844 const vector<unsigned long long> chunk_shape = d_parent_array->get_chunk_dimension_sizes();
845
846 if (!DmrppRequestHandler::d_use_compute_threads) {
847#if DMRPP_ENABLE_THREAD_TIMERS
848 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, prolog + "Serial Chunk Processing. sc_id: " + d_id);
849#endif
850 for(const auto &chunk: d_chunks){
851 process_one_chunk_unconstrained(chunk, chunk_shape, d_parent_array, array_shape);
852 }
853 }
854 else {
855#if DMRPP_ENABLE_THREAD_TIMERS
856 stringstream timer_tag;
857 timer_tag << prolog << "Concurrent Chunk Processing. sc_id: " << d_id;
858 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, timer_tag.str());
859#endif
860 queue<shared_ptr<Chunk>> chunks_to_process;
861 for (const auto &chunk: d_chunks) {
862 chunks_to_process.push(chunk);
863 }
864 process_chunks_unconstrained_concurrent(d_id, chunks_to_process, chunk_shape, d_parent_array, array_shape);
865 }
866}
867
868
874string SuperChunk::to_string(bool verbose=false) const {
875 stringstream msg;
876 msg << "[SuperChunk: " << (void **)this;
877 msg << " offset: " << d_offset;
878 msg << " size: " << d_size ;
879 msg << " chunk_count: " << d_chunks.size();
880 //msg << " parent: " << d_parent->name();
881 msg << "]";
882 if (verbose) {
883 msg << endl;
884 for (auto chunk: d_chunks) {
885 msg << chunk->to_string() << endl;
886 }
887 }
888 return msg.str();
889}
890
895void SuperChunk::dump(ostream & strm) const {
896 strm << to_string(false) ;
897}
898
899// direct chunk method to read unconstrained variables.
900void SuperChunk::read_unconstrained_dio() {
901
902 //Retrieve data for the direct IO case.
903 retrieve_data_dio();
904
905 // The size in element of each of the array's dimensions
906 const vector<unsigned long long> array_shape = d_parent_array->get_shape(true);
907 // The size, in elements, of each of the chunk's dimensions
908 const vector<unsigned long long> chunk_shape = d_parent_array->get_chunk_dimension_sizes();
909
910 if(!DmrppRequestHandler::d_use_compute_threads){
911#if DMRPP_ENABLE_THREAD_TIMERS
912 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, prolog + "Serial Chunk Processing. sc_id: " + d_id);
913#endif
914 for(const auto &chunk : d_chunks){
915 process_one_chunk_unconstrained_dio(chunk, chunk_shape, d_parent_array, array_shape);
916 }
917 }
918 else {
919#if DMRPP_ENABLE_THREAD_TIMERS
920 stringstream timer_tag;
921 timer_tag << prolog << "Concurrent Chunk Processing. sc_id: " << d_id;
922 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, timer_tag.str());
923#endif
924 queue<shared_ptr<Chunk>> chunks_to_process;
925 for (const auto &chunk : d_chunks)
926 chunks_to_process.push(chunk);
927
928 process_chunks_unconstrained_concurrent_dio(d_id,chunks_to_process, chunk_shape, d_parent_array, array_shape);
929 }
930}
931
932} // namespace dmrpp
static void release_handle(dmrpp_easy_handle *h)
Extend libdap::Array so that a handler can read data using a DMR++ file.
Definition DmrppArray.h:77
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
virtual void retrieve_data()
Cause the SuperChunk and all of it's subordinate Chunks to be read.
virtual bool add_chunk(std::shared_ptr< Chunk > candidate_chunk)
Attempts to add a new Chunk to this SuperChunk.
std::string to_string(bool verbose) const
Makes a string representation of the SuperChunk.
virtual void dump(std::ostream &strm) const
Writes the to_string() output to the stream strm.
virtual void process_child_chunks()
Reads the SuperChunk, inflates/de-shuffles the subordinate chunks as required and copies the values i...
virtual void process_child_chunks_unconstrained()
Reads the SuperChunk, inflates/deshuffles the subordinate chunks as required and copies the values in...
STL class.
Single argument structure for a thread that will process a single Chunk for a constrained array....
Definition SuperChunk.h:116
Single argument structure for a thread that will process a single Chunk for an unconstrained array....
Definition SuperChunk.h:132