30#include "BESInternalError.h"
33#include "DmrppRequestHandler.h"
34#include "CurlHandlePool.h"
35#include "DmrppArray.h"
36#include "DmrppNames.h"
38#include "SuperChunk.h"
40#define prolog std::string("SuperChunk::").append(__func__).append("() - ")
42#define SUPER_CHUNK_MODULE "dmrpp:3"
44using std::stringstream;
51std::mutex chunk_processing_thread_pool_mtx;
52atomic_uint chunk_processing_thread_counter(0);
53#define COMPUTE_THREADS "compute_threads"
55#define DMRPP_ENABLE_THREAD_TIMERS 0
75void process_one_chunk(shared_ptr<Chunk> chunk,
DmrppArray *array,
const vector<unsigned long long> &constrained_array_shape)
77 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
86 if (!chunk->get_uses_fill_value() && !array->is_filters_empty())
87 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->get_bytes_per_element());
89 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
90 vector<unsigned long long> chunk_source_address(array->dimensions(), 0);
92 char *dest_buf = array->get_buf();
93 if (array->var()->type() == libdap::dods_structure_c) {
94 dest_buf = array->get_structure_array_buf_ptr();
97 array->insert_chunk(0, &target_element_address, &chunk_source_address,
98 chunk, constrained_array_shape, dest_buf);
101 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"END" << endl );
123void process_one_chunk_unconstrained(shared_ptr<Chunk> chunk,
const vector<unsigned long long> &chunk_shape,
124 DmrppArray *array,
const vector<unsigned long long> &array_shape)
126 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
131 if (!chunk->get_uses_fill_value() && !array->is_filters_empty())
132 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->get_bytes_per_element());
134 array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
137 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"END" << endl );
140void process_one_chunk_unconstrained_dio(shared_ptr<Chunk> chunk,
const vector<unsigned long long> &chunk_shape,
141 DmrppArray *array,
const vector<unsigned long long> &array_shape)
143 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl );
148 array->insert_chunk_unconstrained_dio(chunk);
151 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"END" << endl );
159bool one_chunk_compute_thread(unique_ptr<one_chunk_args> args)
161#if DMRPP_ENABLE_THREAD_TIMERS
162 stringstream timer_tag;
163 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
164 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" parent_sc: " << args->parent_super_chunk_id;
165 BES_STOPWATCH_START(COMPUTE_THREADS, timer_tag.str());
168 process_one_chunk(args->chunk, args->array, args->array_shape);
177bool one_chunk_unconstrained_compute_thread(unique_ptr<one_chunk_unconstrained_args> args)
179#if DMRPP_ENABLE_THREAD_TIMERS
180 stringstream timer_tag;
181 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
182 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" parent_sc: " << args->parent_super_chunk_id ;
183 BES_STOPWATCH_START(COMPUTE_THREADS, timer_tag.str());
186 process_one_chunk_unconstrained(args->chunk, args->chunk_shape, args->array, args->array_shape);
190bool one_chunk_unconstrained_compute_thread_dio(unique_ptr<one_chunk_unconstrained_args> args)
192#if DMRPP_ENABLE_THREAD_TIMERS
193 stringstream timer_tag;
194 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
195 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" parent_sc: " << args->parent_super_chunk_id ;
196 BES_STOPWATCH_START(COMPUTE_THREADS, timer_tag.str());
199 process_one_chunk_unconstrained_dio(args->chunk, args->chunk_shape, args->array, args->array_shape);
212bool start_one_chunk_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_args> args) {
214 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
215 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads <<
" chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
216 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
217 chunk_processing_thread_counter++;
218 futures.push_back(std::async(std::launch::async, one_chunk_compute_thread, std::move(args)));
220 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Got std::future '" << futures.size() <<
221 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
236bool start_one_chunk_unconstrained_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_unconstrained_args> args) {
238 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
239 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
240 futures.push_back(std::async(std::launch::async, one_chunk_unconstrained_compute_thread, std::move(args)));
241 chunk_processing_thread_counter++;
243 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Got std::future '" << futures.size() <<
244 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
249bool start_one_chunk_unconstrained_compute_thread_dio(list<std::future<bool>> &futures, unique_ptr<one_chunk_unconstrained_args> args) {
251 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
252 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
253 futures.push_back(std::async(std::launch::async, one_chunk_unconstrained_compute_thread_dio, std::move(args)));
254 chunk_processing_thread_counter++;
256 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Got std::future '" << futures.size() <<
257 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
284void process_chunks_concurrent(
285 const string &super_chunk_id,
286 queue<shared_ptr<Chunk>> &chunks,
288 const vector<unsigned long long> &constrained_array_shape ){
291 list<future<bool>> futures;
294 bool future_finished =
true;
298 future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
302 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
304 if (!chunks.empty()){
306 bool thread_started =
true;
307 while(thread_started && !chunks.empty()) {
308 auto chunk = chunks.front();
309 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Starting thread for " << chunk->to_string() << endl);
311 auto args = unique_ptr<one_chunk_args>(
new one_chunk_args(super_chunk_id, chunk, array, constrained_array_shape));
312 thread_started = start_one_chunk_compute_thread(futures, std::move(args));
314 if (thread_started) {
316 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"STARTED thread for " << chunk->to_string() << endl);
319 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Thread not started. args deleted, Chunk remains in queue.) " <<
320 "chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
" futures.size(): " << futures.size() << endl);
329 future_finished =
false;
334 while(!futures.empty()){
335 if(futures.back().valid())
336 futures.back().get();
366void process_chunks_unconstrained_concurrent(
367 const string &super_chunk_id,
368 queue<shared_ptr<Chunk>> &chunks,
369 const vector<unsigned long long> &chunk_shape,
371 const vector<unsigned long long> &array_shape){
374 list<future<bool>> futures;
380 get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
385 if (!chunks.empty()){
387 bool thread_started =
true;
388 while(thread_started && !chunks.empty()) {
389 auto chunk = chunks.front();
391 auto args = unique_ptr<one_chunk_unconstrained_args>(
393 thread_started = start_one_chunk_unconstrained_compute_thread(futures, std::move(args));
395 if (thread_started) {
399 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
400 " chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
401 " futures.size(): " << futures.size() << endl);
414 while(!futures.empty()){
415 if(futures.back().valid())
416 futures.back().get();
425void process_chunks_unconstrained_concurrent_dio(
426 const string &super_chunk_id,
427 queue<shared_ptr<Chunk>> &chunks,
428 const vector<unsigned long long> &chunk_shape,
430 const vector<unsigned long long> &array_shape){
433 list<future<bool>> futures;
439 get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
444 if (!chunks.empty()){
446 bool thread_started =
true;
447 while(thread_started && !chunks.empty()) {
448 auto chunk = chunks.front();
450 auto args = unique_ptr<one_chunk_unconstrained_args>(
454 thread_started = start_one_chunk_unconstrained_compute_thread_dio(futures, std::move(args));
456 if (thread_started) {
460 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
461 " chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
462 " futures.size(): " << futures.size() << endl);
475 while(!futures.empty()){
476 if(futures.back().valid())
477 futures.back().get();
525 bool chunk_was_added =
false;
526 if(d_chunks.empty()){
527 d_chunks.push_back(candidate_chunk);
528 d_offset = candidate_chunk->get_offset();
529 d_size = candidate_chunk->get_size();
531 d_uses_fill_value = candidate_chunk->get_uses_fill_value();
532 if (!d_uses_fill_value)
533 d_data_url = candidate_chunk->get_data_url();
535 d_data_url =
nullptr;
536 chunk_was_added =
true;
539 else if(!candidate_chunk->get_uses_fill_value() && is_contiguous(candidate_chunk)){
540 this->d_chunks.push_back(candidate_chunk);
541 d_size += candidate_chunk->get_size();
542 chunk_was_added =
true;
544 return chunk_was_added;
548bool SuperChunk::add_chunk_non_contiguous(
const std::shared_ptr<Chunk> candidate_chunk,
unsigned long long &buffer_end_position) {
550 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"Coming to add_chunk_non_contiguous" << endl);
551 bool chunk_was_added =
false;
552 if(d_chunks.empty()){
553 d_chunks.push_back(candidate_chunk);
554 d_offset = candidate_chunk->get_offset();
555 d_size = buffer_end_position-d_offset;
557 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"add_chunk_non d_size: "<<d_size<< endl);
558 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"add_chunk_non d_offset: "<<d_offset<< endl);
561 d_uses_fill_value = candidate_chunk->get_uses_fill_value();
562 if (!d_uses_fill_value)
563 d_data_url = candidate_chunk->get_data_url();
565 d_data_url =
nullptr;
566 chunk_was_added =
true;
569 else if(!candidate_chunk->get_uses_fill_value()){
570 if (candidate_chunk->get_offset() >=d_offset) {
572 if ((candidate_chunk->get_offset() + candidate_chunk->get_size())<=buffer_end_position) {
573 this->d_chunks.push_back(candidate_chunk);
574 chunk_was_added =
true;
579 return chunk_was_added;
596bool SuperChunk::is_contiguous(
const std::shared_ptr<Chunk> candidate_chunk) {
598 bool contiguous = candidate_chunk->get_data_url()->str() == d_data_url->str();
601 contiguous = (d_offset + d_size) == candidate_chunk->get_offset();
614void SuperChunk::map_chunks_to_buffer()
616 unsigned long long bindex = 0;
617 for(
const auto &chunk : d_chunks){
618 chunk->set_read_buffer(d_read_buffer + bindex, chunk->get_size(),0,
false);
619 bindex += chunk->get_size();
620 if (bindex > d_size) {
622 msg <<
"ERROR The computed buffer index, " << bindex <<
" is larger than expected size of the SuperChunk. ";
623 msg <<
"d_size: " << d_size;
624 throw BESInternalError(msg.str(), __FILE__, __LINE__);
629void SuperChunk::map_non_contiguous_chunks_to_buffer()
631 unsigned long long bindex = 0;
633 for (
unsigned i = 0; i <d_chunks.size(); i++){
635 (d_chunks[i])->set_read_buffer(d_read_buffer + bindex, (d_chunks[i])->get_size(),0,
false);
638 if (i <(d_chunks.size()-1)) {
639 long long temp_bindex = (d_chunks[i+1])->get_offset()-d_offset;
641 throw BESInternalError(
"The non-contiguous chunk offset cannot be smaller than the SuperChunk offset. ", __FILE__, __LINE__);
642 bindex = (
unsigned long long)temp_bindex;
643 if (bindex > d_size) {
645 msg <<
"ERROR The computed buffer index, " << bindex <<
" is larger than expected size of the SuperChunk. ";
646 msg <<
"d_size: " << d_size;
647 throw BESInternalError(msg.str(), __FILE__, __LINE__);
656void SuperChunk::read_aggregate_bytes()
661 Chunk chunk(d_data_url,
"NOT_USED", d_size, d_offset);
663 chunk.set_read_buffer(d_read_buffer, d_size,0,
false);
665 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(&chunk);
667 throw BESInternalError(prolog +
"No more libcurl handles.", __FILE__, __LINE__);
679 if (d_size != chunk.get_bytes_read()) {
681 oss <<
"Wrong number of bytes read for chunk; read: " << chunk.get_bytes_read() <<
", expected: " << d_size;
682 throw BESInternalError(oss.str(), __FILE__, __LINE__);
695void SuperChunk::read_fill_value_chunk()
697 if (d_chunks.size() != 1)
698 throw BESInternalError(
"Found a SuperChunk with uses_fill_value true but more than one child chunk.", __FILE__, __LINE__);
700 d_chunks.front()->read_chunk();
711 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"SuperChunk (" << (
void **)
this <<
") has already been read! Returning." << endl);
719 if (!d_read_buffer) {
722 d_read_buffer =
new char[d_size];
729 if (non_contiguous_chunk)
730 map_non_contiguous_chunks_to_buffer();
732 map_chunks_to_buffer();
739 if (d_uses_fill_value)
740 read_fill_value_chunk();
742 read_aggregate_bytes();
749 for(
const auto& chunk : d_chunks){
750 chunk->set_is_read(
true);
751 chunk->set_bytes_read(chunk->get_size());
757void SuperChunk::retrieve_data_dio() {
762 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"SuperChunk (" << (
void **)
this <<
") has already been read! Returning." << endl);
766 if (!d_read_buffer) {
769 d_read_buffer =
new char[d_size];
775 map_chunks_to_buffer();
782 read_aggregate_bytes();
789 for(
const auto& chunk : d_chunks){
790 chunk->set_is_read(
true);
791 chunk->set_bytes_read(chunk->get_size());
801 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl);
804 vector<unsigned long long> constrained_array_shape = d_parent_array->get_shape(
true);
805 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ?
"true" :
"false") << endl);
806 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
808 if (!DmrppRequestHandler::d_use_compute_threads) {
809#if DMRPP_ENABLE_THREAD_TIMERS
810 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, prolog+
"Serial Chunk Processing. id: " + d_id);
812 for(
const auto &chunk: d_chunks){
813 process_one_chunk(chunk,d_parent_array,constrained_array_shape);
817#if DMRPP_ENABLE_THREAD_TIMERS
818 stringstream timer_tag;
819 timer_tag << prolog <<
"Concurrent Chunk Processing. id: " << d_id;
820 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, timer_tag.str());
822 queue< shared_ptr<Chunk> > chunks_to_process;
823 for(
const auto &chunk: d_chunks)
824 chunks_to_process.push(chunk);
826 process_chunks_concurrent(d_id, chunks_to_process, d_parent_array, constrained_array_shape);
828 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"END" << endl );
838 BESDEBUG(SUPER_CHUNK_MODULE, prolog <<
"BEGIN" << endl);
842 const vector<unsigned long long> array_shape = d_parent_array->get_shape(
true);
844 const vector<unsigned long long> chunk_shape = d_parent_array->get_chunk_dimension_sizes();
846 if (!DmrppRequestHandler::d_use_compute_threads) {
847#if DMRPP_ENABLE_THREAD_TIMERS
848 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, prolog +
"Serial Chunk Processing. sc_id: " + d_id);
850 for(
const auto &chunk: d_chunks){
851 process_one_chunk_unconstrained(chunk, chunk_shape, d_parent_array, array_shape);
855#if DMRPP_ENABLE_THREAD_TIMERS
856 stringstream timer_tag;
857 timer_tag << prolog <<
"Concurrent Chunk Processing. sc_id: " << d_id;
858 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, timer_tag.str());
860 queue<shared_ptr<Chunk>> chunks_to_process;
861 for (
const auto &chunk: d_chunks) {
862 chunks_to_process.push(chunk);
864 process_chunks_unconstrained_concurrent(d_id, chunks_to_process, chunk_shape, d_parent_array, array_shape);
876 msg <<
"[SuperChunk: " << (
void **)
this;
877 msg <<
" offset: " << d_offset;
878 msg <<
" size: " << d_size ;
879 msg <<
" chunk_count: " << d_chunks.size();
884 for (
auto chunk: d_chunks) {
885 msg << chunk->to_string() << endl;
900void SuperChunk::read_unconstrained_dio() {
906 const vector<unsigned long long> array_shape = d_parent_array->
get_shape(
true);
910 if(!DmrppRequestHandler::d_use_compute_threads){
911#if DMRPP_ENABLE_THREAD_TIMERS
912 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, prolog +
"Serial Chunk Processing. sc_id: " + d_id);
914 for(
const auto &chunk : d_chunks){
915 process_one_chunk_unconstrained_dio(chunk, chunk_shape, d_parent_array, array_shape);
919#if DMRPP_ENABLE_THREAD_TIMERS
920 stringstream timer_tag;
921 timer_tag << prolog <<
"Concurrent Chunk Processing. sc_id: " << d_id;
922 BES_STOPWATCH_START(SUPER_CHUNK_MODULE, timer_tag.str());
924 queue<shared_ptr<Chunk>> chunks_to_process;
925 for (
const auto &chunk : d_chunks)
926 chunks_to_process.push(chunk);
928 process_chunks_unconstrained_concurrent_dio(d_id,chunks_to_process, chunk_shape, d_parent_array, array_shape);
static void release_handle(dmrpp_easy_handle *h)
Extend libdap::Array so that a handler can read data using a DMR++ file.
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
virtual void retrieve_data()
Cause the SuperChunk and all of it's subordinate Chunks to be read.
virtual bool add_chunk(std::shared_ptr< Chunk > candidate_chunk)
Attempts to add a new Chunk to this SuperChunk.
std::string to_string(bool verbose) const
Makes a string representation of the SuperChunk.
virtual void dump(std::ostream &strm) const
Writes the to_string() output to the stream strm.
virtual void process_child_chunks()
Reads the SuperChunk, inflates/de-shuffles the subordinate chunks as required and copies the values i...
virtual void process_child_chunks_unconstrained()
Reads the SuperChunk, inflates/deshuffles the subordinate chunks as required and copies the values in...
Single argument structure for a thread that will process a single Chunk for a constrained array....
Single argument structure for a thread that will process a single Chunk for an unconstrained array....