56#include "BESInternalError.h"
57#include "BESSyntaxUserError.h"
61#include "TheBESKeys.h"
69static const unsigned long long BYTES_PER_MEG = 1048576ULL;
73static const unsigned long long MAX_CACHE_SIZE_IN_MEGABYTES = (1ULL << 44);
90DAPCache3::DAPCache3(
const string &cache_dir,
const string &prefix,
unsigned long long size)
91 : d_cache_dir(cache_dir), d_prefix(prefix), d_max_cache_size_in_bytes(size) {
92 m_initialize_cache_info();
95void DAPCache3::delete_instance() {
96 DBG(cerr <<
"DAPCache3::delete_instance() - Deleting singleton DAPCache3 instance." << endl);
116BESCache3::get_instance(BESKeys *keys,
const string &cache_dir_key,
const string &prefix_key,
const string &size_key)
119 d_instance =
new BESCache3(keys, cache_dir_key, prefix_key, size_key);
136 if (d_instance == 0) {
137 d_instance =
new DAPCache3(cache_dir, prefix, size);
139 atexit(delete_instance);
150 throw InternalErr(__FILE__, __LINE__,
"Tried to get the DAPCache3 instance, but it hasn't been created yet");
155static inline string get_errno() {
156 char *s_err = strerror(errno);
160 return "Unknown error.";
164static inline struct flock *lock(
int type) {
165 static struct flock lock;
167 lock.l_whence = SEEK_SET;
170 lock.l_pid = getpid();
175inline void DAPCache3::m_record_descriptor(
const string &file,
int fd) {
176 DBG(cerr <<
"DAP Cache: recording descriptor: " << file <<
", " << fd << endl);
177 d_locks.insert(std::pair<string, int>(file, fd));
180inline int DAPCache3::m_get_descriptor(
const string &file) {
181 FilesAndLockDescriptors::iterator i = d_locks.find(file);
183 DBG(cerr <<
"DAP Cache: getting descriptor: " << file <<
", " << fd << endl);
193static void unlock(
int fd) {
194 if (fcntl(fd, F_SETLK, lock(F_UNLCK)) == -1) {
195 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to unlock the file" + get_errno());
199 throw InternalErr(__FILE__, __LINE__,
"Could not close the (just) unlocked file.");
214static bool getSharedLock(
const string &file_name,
int &ref_fd) {
215 DBG(cerr <<
"getSharedLock: " << file_name << endl);
218 if ((fd = open(file_name.c_str(), O_RDONLY)) < 0) {
224 throw InternalErr(__FILE__, __LINE__, get_errno());
228 struct flock *l = lock(F_RDLCK);
229 if (fcntl(fd, F_SETLKW, l) == -1) {
232 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
236 DBG(cerr <<
"getSharedLock exit: " << file_name << endl);
255static bool getExclusiveLock(
string file_name,
int &ref_fd) {
256 DBG(cerr <<
"getExclusiveLock: " << file_name << endl);
259 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
265 throw InternalErr(__FILE__, __LINE__, get_errno());
269 struct flock *l = lock(F_WRLCK);
270 if (fcntl(fd, F_SETLKW, l) == -1) {
273 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
277 DBG(cerr <<
"getExclusiveLock exit: " << file_name << endl);
295static bool getExclusiveLockNB(
string file_name,
int &ref_fd) {
296 DBG(cerr <<
"getExclusiveLock_nonblocking: " << file_name << endl);
299 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
305 throw InternalErr(__FILE__, __LINE__, get_errno());
309 struct flock *l = lock(F_WRLCK);
310 if (fcntl(fd, F_SETLK, l) == -1) {
313 DBG(cerr <<
"getExclusiveLock_nonblocking exit (false): " << file_name <<
" by: " << l->l_pid << endl);
320 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
326 DBG(cerr <<
"getExclusiveLock_nonblocking exit (true): " << file_name << endl);
346static bool createLockedFile(
string file_name,
int &ref_fd) {
347 DBG(cerr <<
"createLockedFile: " << file_name << endl);
350 if ((fd = open(file_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666)) < 0) {
356 throw InternalErr(__FILE__, __LINE__, get_errno());
360 struct flock *l = lock(F_WRLCK);
361 if (fcntl(fd, F_SETLKW, l) == -1) {
364 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
368 DBG(cerr <<
"createLockedFile exit: " << file_name << endl);
376void DAPCache3::m_check_ctor_params() {
377 if (d_cache_dir.empty()) {
378 string err =
"The cache directory was not specified, must be non-empty";
379 throw InternalErr(__FILE__, __LINE__, err);
384 int statret = stat(d_cache_dir.c_str(), &buf);
385 if (statret != 0 || !S_ISDIR(buf.st_mode)) {
387 int status = mkdir(d_cache_dir.c_str(), 0775);
389 string err =
"The cache directory " + d_cache_dir +
" does not exist or could not be created.";
390 throw InternalErr(__FILE__, __LINE__, err);
394 if (d_prefix.empty()) {
395 string err =
"The cache file prefix was not specified, must not be empty";
396 throw InternalErr(__FILE__, __LINE__, err);
399 if (d_max_cache_size_in_bytes <= 0) {
400 string err =
"The cache size was not specified, must be greater than zero";
401 throw InternalErr(__FILE__, __LINE__, err);
408 if (d_max_cache_size_in_bytes > MAX_CACHE_SIZE_IN_MEGABYTES) {
409 std::ostringstream msg;
410 msg <<
"The specified cache size was larger than the max cache size of: " << MAX_CACHE_SIZE_IN_MEGABYTES
411 <<
" (was " << d_max_cache_size_in_bytes <<
").";
412 throw InternalErr(__FILE__, __LINE__, msg.str());
415 DBG(cerr <<
"DAP Cache: directory " << d_cache_dir <<
", prefix " << d_prefix <<
", max size "
416 << d_max_cache_size_in_bytes << endl);
420void DAPCache3::m_initialize_cache_info() {
423 d_max_cache_size_in_bytes = min(d_max_cache_size_in_bytes, MAX_CACHE_SIZE_IN_MEGABYTES);
424 d_max_cache_size_in_bytes *= BYTES_PER_MEG;
425 d_target_size = d_max_cache_size_in_bytes * 0.8;
427 m_check_ctor_params();
429 d_cache_info = d_cache_dir +
"/dap.cache.info";
433 if (createLockedFile(d_cache_info, d_cache_info_fd)) {
435 unsigned long long size = 0;
436 if (write(d_cache_info_fd, &size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
437 throw InternalErr(__FILE__, __LINE__,
"Could not write size info to the cache info file in startup!");
442 if ((d_cache_info_fd = open(d_cache_info.c_str(), O_RDWR)) == -1) {
443 throw InternalErr(__FILE__, __LINE__, get_errno());
447 DBG(cerr <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
465BESCache3::BESCache3(BESKeys *keys,
const string &cache_dir_key,
const string &prefix_key,
const string &size_key) :
466 d_max_cache_size_in_bytes(0)
469 keys->get_value(cache_dir_key, d_cache_dir, found);
471 throw BESSyntaxUserError(
"The cache directory key " + cache_dir_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
474 keys->get_value(prefix_key, d_prefix, found);
476 throw BESSyntaxUserError(
"The prefix key " + prefix_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
479 string cache_size_str;
480 keys->get_value(size_key, cache_size_str, found);
482 throw BESSyntaxUserError(
"The size key " + size_key +
" was not found in the BES configuration file", __FILE__, __LINE__);
484 std::istringstream is(cache_size_str);
485 is >> d_max_cache_size_in_bytes;
487 m_initialize_cache_info();
510 if (target.at(0) ==
'/') {
511 target = src.substr(1, target.length() - 1);
513 string::size_type slash = 0;
514 while ((slash = target.find(
'/')) != string::npos) {
515 target.replace(slash, 1, 1, DAPCache3::DAP_CACHE_CHAR);
517 string::size_type last_dot = target.rfind(
'.');
518 if (last_dot != string::npos) {
519 target = target.substr(0, last_dot);
522 DBG(cerr <<
" d_cache_dir: '" << d_cache_dir <<
"'" << endl);
523 DBG(cerr <<
" d_prefix: '" << d_prefix <<
"'" << endl);
524 DBG(cerr <<
" target: '" << target <<
"'" << endl);
526 return d_cache_dir +
"/" + d_prefix + DAPCache3::DAP_CACHE_CHAR + target;
549 bool status = getSharedLock(target, fd);
551 DBG(cerr <<
"DAP Cache: read_lock: " << target <<
"(" << status <<
")" << endl);
554 m_record_descriptor(target, fd);
576 bool status = createLockedFile(target, fd);
578 DBG(cerr <<
"DAP Cache: create_and_lock: " << target <<
"(" << status <<
")" << endl);
581 m_record_descriptor(target, fd);
603 lock.l_type = F_RDLCK;
604 lock.l_whence = SEEK_SET;
607 lock.l_pid = getpid();
609 if (fcntl(fd, F_SETLKW, &lock) == -1) {
610 throw InternalErr(__FILE__, __LINE__, get_errno());
623 DBG(cerr <<
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
625 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_WRLCK)) == -1) {
626 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to lock the cache-control file" + get_errno());
634 DBG(cerr <<
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
636 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_RDLCK)) == -1) {
637 throw InternalErr(__FILE__, __LINE__,
"An error occurred trying to lock the cache-control file" + get_errno());
647 DBG(cerr <<
"DAP Cache: unlock: cache_info (fd: " << d_cache_info_fd <<
")" << endl);
649 if (fcntl(d_cache_info_fd, F_SETLK, lock(F_UNLCK)) == -1) {
651 "An error occurred trying to unlock the cache-control file" + get_errno());
667 DBG(cerr <<
"DAP Cache: unlock file: " << file_name << endl);
669 unlock(m_get_descriptor(file_name));
678 DBG(cerr <<
"DAP Cache: unlock fd: " << fd << endl);
682 DBG(cerr <<
"DAP Cache: unlock " << fd <<
" Success" << endl);
699 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
700 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
703 unsigned long long current_size;
704 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
705 throw InternalErr(__FILE__, __LINE__,
"Could not get read size info from the cache info file!");
708 int statret = stat(target.c_str(), &buf);
710 current_size += buf.st_size;
713 "Could not read the size of the new file: " + target +
" : " + get_errno());
715 DBG(cerr <<
"DAP Cache: cache size updated to: " << current_size << endl);
717 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
718 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
720 if (write(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
721 throw InternalErr(__FILE__, __LINE__,
"Could not write size info from the cache info file!");
736 return current_size > d_max_cache_size_in_bytes;
750 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
751 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
753 unsigned long long current_size;
754 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
755 throw InternalErr(__FILE__, __LINE__,
"Could not get read size info from the cache info file!");
768unsigned long long DAPCache3::m_collect_cache_dir_info(
CacheFiles &contents) {
769 DIR *dip = opendir(d_cache_dir.c_str());
771 throw InternalErr(__FILE__, __LINE__,
"Unable to open cache directory " + d_cache_dir);
774 vector<string> files;
777 while ((dit = readdir(dip)) != NULL) {
778 string dirEntry = dit->d_name;
779 if (dirEntry.compare(0, d_prefix.length(), d_prefix) == 0) {
780 files.push_back(d_cache_dir +
"/" + dirEntry);
786 unsigned long long current_size = 0;
788 for (vector<string>::iterator file = files.begin(); file != files.end(); ++file) {
789 if (stat(file->c_str(), &buf) == 0) {
790 current_size += buf.st_size;
793 entry.
size = buf.st_size;
794 entry.
time = buf.st_atime;
798 throw InternalErr(__FILE__, __LINE__,
"Zero-byte file found in cache. " + *file);
800 contents.push_back(entry);
805 contents.sort(entry_op);
822 DBG(cerr <<
"purge - starting the purge" << endl);
828 unsigned long long computed_size = m_collect_cache_dir_info(contents);
830 if (BESISDEBUG(
"cache_contents" )) {
831 DBG(endl <<
"BEFORE Purge " << computed_size/BYTES_PER_MEG << endl );
832 CacheFiles::iterator ti = contents.begin();
833 CacheFiles::iterator te = contents.end();
834 for (; ti != te; ti++) {
835 DBG((*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
839 DBG(cerr <<
"purge - current and target size (in MB) " << computed_size / BYTES_PER_MEG <<
", "
840 << d_target_size / BYTES_PER_MEG << endl);
847 CacheFiles::iterator i = contents.begin();
848 while (i != contents.end() && computed_size > d_target_size) {
853 if (i->name != new_file && getExclusiveLockNB(i->name, cfile_fd)) {
854 DBG(cerr <<
"purge: " << i->name <<
" removed." << endl);
856 if (unlink(i->name.c_str()) != 0)
858 "Unable to purge the file " + i->name +
" from the cache: " + get_errno());
861 computed_size -= i->size;
866 DBG(cerr <<
"purge: " << i->name <<
" is in use." << endl );
871 DBG(cerr <<
"purge - current and target size (in MB) " << computed_size / BYTES_PER_MEG <<
", "
872 << d_target_size / BYTES_PER_MEG << endl);
876 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
877 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
879 if (write(d_cache_info_fd, &computed_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
880 throw InternalErr(__FILE__, __LINE__,
"Could not write size info to the cache info file!");
882 if (BESISDEBUG(
"cache_contents" )) {
884 computed_size = m_collect_cache_dir_info(contents);
885 DBG(endl <<
"AFTER Purge " << computed_size/BYTES_PER_MEG << endl );
886 CacheFiles::iterator ti = contents.begin();
887 CacheFiles::iterator te = contents.end();
888 for (; ti != te; ti++) {
889 DBG((*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
912 DBG(cerr <<
"purge_file - starting the purge" << endl);
919 if (getExclusiveLock(file, cfile_fd)) {
921 unsigned long long size = 0;
923 if (stat(file.c_str(), &buf) == 0) {
927 DBG(cerr <<
"purge_file: " << file <<
" removed." << endl);
929 if (unlink(file.c_str()) != 0)
931 "Unable to purge the file " + file +
" from the cache: " + get_errno());
937 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
938 throw InternalErr(__FILE__, __LINE__,
"Could not rewind to front of cache info file.");
940 if (write(d_cache_info_fd, &cache_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
941 throw InternalErr(__FILE__, __LINE__,
"Could not write size info to the cache info file!");
959 strm <<
DapIndent::LMarg <<
"DAPCache3::dump - (" << (
void *)
this <<
")" << endl;
963 strm <<
DapIndent::LMarg <<
"size (bytes): " << d_max_cache_size_in_bytes << endl;
Implementation of a caching mechanism for compressed data. This cache uses simple advisory locking fo...
virtual unsigned long long get_cache_size()
Get the cache size. Read the size information from the cache info file and return it....
string get_cache_file_name(const string &src, bool mangle=true)
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void purge_file(const string &file)
Purge a single file from the cache.
virtual bool get_read_lock(const string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual void unlock_cache()
virtual void dump(ostream &strm) const
dumps information about this object
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock. If the file has an exclusive write lock on it,...
virtual void lock_cache_read()
virtual void unlock_and_close(const string &target)
virtual unsigned long long update_cache_info(const string &target)
Update the cache info file to include 'target'.
static DAPCache3 * get_instance()
virtual void update_and_purge(const string &new_file)
Purge files from the cache.
virtual void lock_cache_write()
virtual bool create_and_lock(const string &target, int &fd)
Create a file in the cache and lock it for write access. If the file does not exist,...
static ostream & LMarg(ostream &strm)
A class for software fault reporting.
top level DAP object to house generic methods
std::list< cache_entry > CacheFiles