44#include "BESInternalError.h"
50#include "BESFileLockingCache.h"
54#define LOCK "cache-lock"
55#define LOCK_STATUS "cache-lock-status"
57#define CACHE_CONTROL "cache_control"
59#define prolog std::string("BESFileLockingCache::").append(__func__).append("() - ")
64static const unsigned long long BYTES_PER_MEG = 1048576ULL;
68static const unsigned long long MAX_CACHE_SIZE_IN_MEGABYTES = (1ULL << 44);
70static inline string get_errno() {
71 const char *s_err = strerror(errno);
72 return s_err ? s_err :
"unknown error";
79static inline struct flock *advisory_lock(
int type)
81 static struct flock lock;
83 lock.l_whence = SEEK_SET;
86 lock.l_pid = getpid();
92class AdvisoryLockGuard {
96 AdvisoryLockGuard() =
delete;
97 AdvisoryLockGuard(
const AdvisoryLockGuard &) =
delete;
98 AdvisoryLockGuard &operator=(
const AdvisoryLockGuard &) =
delete;
101 AdvisoryLockGuard(
int fd,
int type) : d_fd(fd) {
102 struct flock *l = advisory_lock(type);
103 if (fcntl(d_fd, F_SETLKW, l) == -1) {
104 ERROR_LOG(
"Could not lock the advisory file lock (fcntl: " + get_errno() +
").");
108 ~AdvisoryLockGuard() {
109 struct flock *l = advisory_lock(F_UNLCK);
110 if (fcntl(d_fd, F_SETLKW, l) == -1) {
111 ERROR_LOG(
"Could not unlock the advisory file lock (fcntl: " + get_errno() +
").");
137BESFileLockingCache::BESFileLockingCache(
string cache_dir,
string prefix,
unsigned long long size) :
138 d_cache_dir(std::move(cache_dir)), d_prefix(std::move(prefix)), d_max_cache_size_in_bytes(size)
140 m_initialize_cache_info();
152static bool createLockedFile(
const string &file_name,
int &ref_fd)
154 BESDEBUG(LOCK, prolog <<
"BEGIN file: " << file_name <<endl);
157 if ((fd = open(file_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666)) < 0) {
167 struct flock *l = advisory_lock(F_WRLCK);
169 if (fcntl(fd, F_SETLKW, l) == -1) {
172 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error for '" << file_name <<
"': " << get_errno();
176 BESDEBUG(LOCK, prolog <<
"END file: " << file_name <<endl);
192bool BESFileLockingCache::m_initialize_cache_info()
194 BESDEBUG(CACHE, prolog <<
"BEGIN" << endl);
198 d_max_cache_size_in_bytes = min(d_max_cache_size_in_bytes, MAX_CACHE_SIZE_IN_MEGABYTES);
199 d_max_cache_size_in_bytes *= BYTES_PER_MEG;
200 d_target_size = d_max_cache_size_in_bytes * 0.8;
202 BESDEBUG(CACHE, prolog <<
"d_max_cache_size_in_bytes: "
203 << d_max_cache_size_in_bytes <<
" d_target_size: "<<d_target_size<< endl);
205 bool status = m_check_ctor_params();
209 BESDEBUG(CACHE, prolog <<
"d_cache_info: " << d_cache_info << endl);
213 if (createLockedFile(d_cache_info, d_cache_info_fd)) {
215 unsigned long long size = 0;
216 if (write(d_cache_info_fd, &size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
217 throw BESInternalError(prolog +
"Could not write size info to the cache info file `" + d_cache_info +
"`",
225 if (fcntl(d_cache_info_fd, F_SETLK, advisory_lock(F_UNLCK)) == -1) {
226 throw BESInternalError(prolog +
"An error occurred trying to unlock the cache-control file" + get_errno(), __FILE__,
231 if ((d_cache_info_fd = open(d_cache_info.c_str(), O_RDWR)) == -1) {
232 throw BESInternalError(prolog +
"Failed to open cache info file: " + d_cache_info +
" errno: " + get_errno(), __FILE__, __LINE__);
236 BESDEBUG(CACHE, prolog <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
239 BESDEBUG(CACHE, prolog <<
"END [" <<
"CACHE IS " << (
cache_enabled()?
"ENABLED]":
"DISABLED]") << endl);
261 d_cache_dir = cache_dir;
263 d_max_cache_size_in_bytes = size;
265 m_initialize_cache_info();
270inline void BESFileLockingCache::m_record_descriptor(
const string &file,
int fd)
272 BESDEBUG(LOCK, prolog <<
"Recording descriptor: " << file <<
", " << fd << endl);
274 d_locks.insert(std::pair<string, int>(file, fd));
277inline int BESFileLockingCache::m_remove_descriptor(
const string &file)
279 BESDEBUG(LOCK, prolog <<
"d_locks size: " << d_locks.size() << endl);
281 FilesAndLockDescriptors::iterator i = d_locks.find(file);
282 if (i == d_locks.end())
return -1;
287 BESDEBUG(LOCK, prolog <<
"Found file descriptor [" << fd <<
"] for file: " << file << endl);
292#if USE_GET_SHARED_LOCK
293inline int BESFileLockingCache::m_find_descriptor(
const string &file)
295 BESDEBUG(LOCK, prolog <<
"d_locks size: " << d_locks.size() << endl);
297 FilesAndLockDescriptors::iterator i = d_locks.find(file);
298 if (i == d_locks.end())
return -1;
300 BESDEBUG(LOCK, prolog <<
"Found file descriptor [" << i->second <<
"] for file: " << file << endl);
311static string lockStatus(
const int fd)
313 struct flock lock_query;
315 lock_query.l_type = F_WRLCK;
316 lock_query.l_start = 0;
317 lock_query.l_whence = SEEK_SET;
318 lock_query.l_len = 0;
319 lock_query.l_pid = 0;
321 int ret = fcntl(fd, F_GETLK, &lock_query);
326 ss <<
"fnctl(" << fd <<
",F_GETLK, &lock) returned: " << ret <<
" errno[" << errno <<
"]: "
327 << strerror(errno) << endl;
330 ss <<
"fnctl(" << fd <<
",F_GETLK, &lock) returned: " << ret << endl;
333 ss <<
"lock_info.l_len: " << lock_query.l_len << endl;
334 ss <<
"lock_info.l_pid: " << lock_query.l_pid << endl;
335 ss <<
"lock_info.l_start: " << lock_query.l_start << endl;
338 switch (lock_query.l_type) {
351 ss <<
"lock_info.l_type: " << type << endl;
352 ss <<
"lock_info.l_whence: " << lock_query.l_whence << endl;
362static void unlock(
int fd)
364 if (fcntl(fd, F_SETLK, advisory_lock(F_UNLCK)) == -1) {
365 throw BESInternalError(prolog +
"An error occurred trying to unlock the file: " + get_errno(), __FILE__, __LINE__);
368 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(fd) << endl);
370 if (close(fd) == -1)
throw BESInternalError(prolog +
"Could not close the (just) unlocked file.", __FILE__, __LINE__);
372 BESDEBUG(LOCK, prolog <<
"File Closed. fd: " << fd << endl);
394bool BESFileLockingCache::m_check_ctor_params()
407 BESDEBUG(CACHE, prolog <<
"BEGIN" << endl);
409 if (d_cache_dir.empty()) {
410 BESDEBUG(CACHE, prolog <<
"The cache directory was not specified. CACHE IS DISABLED." << endl);
417 int status = mkdir(d_cache_dir.c_str(), 0775);
420 if (status == -1 && errno != EEXIST) {
421 string err = prolog +
"The cache directory " + d_cache_dir +
" could not be created: " + strerror(errno);
422 throw BESError(err, BES_SYNTAX_USER_ERROR, __FILE__, __LINE__);
425 if (d_prefix.empty()) {
426 string err = prolog +
"The cache file prefix was not specified, must not be empty";
427 throw BESError(err, BES_SYNTAX_USER_ERROR, __FILE__, __LINE__);
437 if (d_max_cache_size_in_bytes < 0) {
438 string err =
"The cache size was not specified, must be greater than zero";
439 throw BESError(err, BES_SYNTAX_USER_ERROR, __FILE__, __LINE__);
444 "BESFileLockingCache::" << __func__ <<
"() -" <<
445 " d_cache_dir: " << d_cache_dir <<
446 " d_prefix: " << d_prefix <<
447 " d_max_cache_size_in_bytes: " << d_max_cache_size_in_bytes << endl);
454static const string chars_excluded_from_filenames = R
"(<>=,/()\"':? []()$)";
476 BESDEBUG(CACHE, prolog <<
"src: '" << src <<
"' mangle: "<< mangle << endl);
481 string::size_type pos = target.find_first_of(chars_excluded_from_filenames);
482 while (pos != string::npos) {
483 target.replace(pos, 1,
"#", 1);
484 pos = target.find_first_of(chars_excluded_from_filenames);
488 if (target.size() > 254) {
490 msg << prolog <<
"Cache filename is longer than 254 characters (name length: ";
491 msg << target.size() <<
", name: " << target;
497 BESDEBUG(CACHE, prolog <<
"target: '" << target <<
"'" << endl);
502#if USE_GET_SHARED_LOCK
516static bool getSharedLock(
const string &file_name,
int &ref_fd)
518 BESDEBUG(LOCK, prolog <<
"Acquiring cache read lock for " << file_name <<endl);
521 if ((fd = open(file_name.c_str(), O_RDONLY)) < 0) {
531 struct flock *l = advisory_lock(F_RDLCK);
532 if (fcntl(fd, F_SETLKW, l) == -1) {
535 oss << prolog <<
"cache process: " << l->l_pid <<
" triggered a locking error for '" << file_name <<
"': " << get_errno();
539 BESDEBUG(LOCK, prolog <<
"SUCCESS Read Lock Acquired For " << file_name <<endl);
567 AdvisoryLockGuard read_alg(d_cache_info_fd, F_RDLCK);
574#if USE_GET_SHARED_LOCK
575 status = getSharedLock(target, fd);
577 if (status) m_record_descriptor(target, fd);
579 fd = m_find_descriptor(target);
581 if ((fd == -1) && (fd = open(target.c_str(), O_RDONLY)) < 0) {
593 struct flock *l = advisory_lock(F_RDLCK);
594 if (fcntl(fd, F_SETLKW, l) == -1) {
598 m_record_descriptor(target, fd);
627 AdvisoryLockGuard write_alg(d_cache_info_fd, F_WRLCK);
632 bool status = createLockedFile(target, fd);
634 BESDEBUG(LOCK, prolog <<
"target: " << target <<
" (status: " << status <<
", fd: " << fd <<
")" << endl);
636 if (status) m_record_descriptor(target, fd);
663 lock.l_type = F_RDLCK;
664 lock.l_whence = SEEK_SET;
667 lock.l_pid = getpid();
669 if (fcntl(fd, F_SETLKW, &lock) == -1) {
673 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(fd) << endl);
685void BESFileLockingCache::lock_cache_write()
687 BESDEBUG(LOCK, prolog <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
689 if (fcntl(d_cache_info_fd, F_SETLKW, advisory_lock(F_WRLCK)) == -1) {
690 throw BESInternalError(
"An error occurred trying to lock the cache-control file" + get_errno(), __FILE__,
694 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(d_cache_info_fd) << endl);
700void BESFileLockingCache::lock_cache_read()
702 BESDEBUG(LOCK, prolog <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
704 if (fcntl(d_cache_info_fd, F_SETLKW, advisory_lock(F_RDLCK)) == -1) {
705 throw BESInternalError(prolog +
"An error occurred trying to lock the cache-control file" + get_errno(), __FILE__,
709 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(d_cache_info_fd) << endl);
717void BESFileLockingCache::unlock_cache()
719 BESDEBUG(LOCK, prolog <<
"d_cache_info_fd: " << d_cache_info_fd << endl);
721 if (fcntl(d_cache_info_fd, F_SETLK, advisory_lock(F_UNLCK)) == -1) {
722 throw BESInternalError(prolog +
"An error occurred trying to unlock the cache-control file" + get_errno(), __FILE__,
726 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(d_cache_info_fd) << endl);
747 BESDEBUG(LOCK, prolog <<
"BEGIN file: " << file_name << endl);
749 int fd = m_remove_descriptor(file_name);
752 fd = m_remove_descriptor(file_name);
755 BESDEBUG(LOCK_STATUS, prolog <<
"lock status: " << lockStatus(d_cache_info_fd) << endl);
756 BESDEBUG(LOCK, prolog <<
"END file: "<< file_name<< endl);
771 AdvisoryLockGuard write_alg(d_cache_info_fd, F_WRLCK);
772 unsigned long long current_size;
778 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
779 throw BESInternalError(prolog +
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
782 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
783 throw BESInternalError(prolog +
"Could not get read size info from the cache info file!", __FILE__, __LINE__);
786 int statret = stat(target.c_str(), &buf);
788 current_size += buf.st_size;
790 throw BESInternalError(prolog +
"Could not read the size of the new file: " + target +
" : " + get_errno(), __FILE__,
793 BESDEBUG(CACHE, prolog <<
"cache size updated to: " << current_size << endl);
795 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
796 throw BESInternalError(prolog +
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
798 if (write(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
799 throw BESInternalError(prolog +
"Could not write size info from the cache info file!", __FILE__, __LINE__);
819 return current_size > d_max_cache_size_in_bytes;
831 AdvisoryLockGuard read_alg(d_cache_info_fd, F_RDLCK);
832 unsigned long long current_size;
838 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
839 throw BESInternalError(prolog +
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
841 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
842 throw BESInternalError(prolog +
"Could not get read size info from the cache info file!", __FILE__, __LINE__);
858 return e1.time < e2.time;
862unsigned long long BESFileLockingCache::m_collect_cache_dir_info(CacheFiles &contents)
864 DIR *dip = opendir(d_cache_dir.c_str());
865 if (!dip)
throw BESInternalError(prolog +
"Unable to open cache directory " + d_cache_dir, __FILE__, __LINE__);
867 struct dirent *dit =
nullptr;
868 vector<string> files;
871 while ((dit = readdir(dip)) != NULL) {
872 string dirEntry = dit->d_name;
873 if (dirEntry.compare(0, d_prefix.size(), d_prefix) == 0 && dirEntry != d_cache_info) {
874 files.push_back(d_cache_dir +
"/" + dirEntry);
880 unsigned long long current_size = 0;
882 for (vector<string>::iterator file = files.begin(); file != files.end(); ++file) {
883 if (stat(file->c_str(), &buf) == 0) {
884 current_size += buf.st_size;
887 entry.size = buf.st_size;
888 entry.time = buf.st_atime;
892 throw BESInternalError(
"Zero-byte file found in cache. " + *file, __FILE__, __LINE__);
894 contents.push_back(entry);
899 contents.sort(entry_op);
922 BESDEBUG(LOCK, prolog <<
"BEGIN filename: " << file_name <<endl);
925 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
931 throw BESInternalError(prolog +
"errno: " + get_errno(), __FILE__, __LINE__);
935 struct flock *l = advisory_lock(F_WRLCK);
936 if (fcntl(fd, F_SETLK, l) == -1) {
940 BESDEBUG(LOCK,prolog <<
"exit (false): " << file_name <<
" by: " << l->l_pid << endl);
947 oss << prolog <<
"cache process: " << l->l_pid <<
" triggered a locking error for '" << file_name <<
"': " << get_errno();
955 m_record_descriptor(file_name,fd);
957 BESDEBUG(LOCK, prolog <<
"END filename: " << file_name <<endl);
979 BESDEBUG(CACHE, prolog <<
"Starting the purge" << endl);
982 BESDEBUG(CACHE, prolog <<
"Size is set to unlimited, so no need to purge." << endl);
986 AdvisoryLockGuard write_alg(d_cache_info_fd, F_WRLCK);
993 unsigned long long computed_size = m_collect_cache_dir_info(contents);
995 if (BESISDEBUG(
"cache_contents" )) {
996 BESDEBUG(CACHE,
"BEFORE Purge " << computed_size/BYTES_PER_MEG << endl );
997 CacheFiles::iterator ti = contents.begin();
998 CacheFiles::iterator te = contents.end();
999 for (; ti != te; ti++) {
1000 BESDEBUG(CACHE, (*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
1004 BESDEBUG(CACHE, prolog <<
"Current and target size (in MB) "
1005 << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl);
1012 CacheFiles::iterator i = contents.begin();
1013 while (i != contents.end() && computed_size > d_target_size) {
1019 BESDEBUG(CACHE, prolog <<
"purge: " << i->name <<
" removed." << endl);
1021 if (unlink(i->name.c_str()) != 0)
1023 prolog +
"Unable to purge the file " + i->name +
" from the cache: " + get_errno(), __FILE__,
1027 computed_size -= i->size;
1031 BESDEBUG(CACHE,prolog <<
"Current and target size (in MB) "
1032 << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl);
1036 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
1037 throw BESInternalError(prolog +
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
1039 if (write(d_cache_info_fd, &computed_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
1040 throw BESInternalError(prolog +
"Could not write size info to the cache info file!", __FILE__, __LINE__);
1042 if (BESISDEBUG(
"cache_contents" )) {
1044 computed_size = m_collect_cache_dir_info(contents);
1045 BESDEBUG(CACHE,
"AFTER Purge " << computed_size/BYTES_PER_MEG << endl );
1046 CacheFiles::iterator ti = contents.begin();
1047 CacheFiles::iterator te = contents.end();
1048 for (; ti != te; ti++) {
1049 BESDEBUG(CACHE, (*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
1081 BESDEBUG(LOCK, prolog <<
"BEGIN filename: " << file_name <<endl);
1084 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
1092 msg << prolog <<
"FAILED to open file: " << file_name <<
" errno: " << get_errno();
1093 BESDEBUG(LOCK, msg.str() << endl);
1099 struct flock *l = advisory_lock(F_WRLCK);
1100 if (fcntl(fd, F_SETLKW, l) == -1) {
1103 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error for '" << file_name <<
"': " << get_errno();
1108 m_record_descriptor(file_name,fd);
1111 BESDEBUG(LOCK, prolog <<
"END filename: " << file_name <<endl);
1127 AdvisoryLockGuard write_alg(d_cache_info_fd, F_WRLCK);
1128 BESDEBUG(CACHE, prolog <<
"Starting the purge" << endl);
1140 unsigned long long size = 0;
1142 if (stat(file.c_str(), &buf) == 0) {
1146 BESDEBUG(CACHE, prolog <<
"file: " << file <<
" removed." << endl);
1148 if (unlink(file.c_str()) != 0)
1150 prolog +
"Unable to purge the file " + file +
" from the cache: " + get_errno(), __FILE__,
1158 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
1159 throw BESInternalError(prolog +
"Could not rewind to front of cache info file.", __FILE__,
1162 if (write(d_cache_info_fd, &cache_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
1163 throw BESInternalError(prolog +
"Could not write size info to the cache info file!", __FILE__,
1194 return (stat(dir.c_str(), &buf) == 0) && (buf.st_mode & S_IFDIR);
1207 strm << BESIndent::LMarg << prolog <<
"(" << (
void *)
this <<
")" << endl;
1208 BESIndent::Indent();
1209 strm << BESIndent::LMarg <<
"cache dir: " << d_cache_dir << endl;
1210 strm << BESIndent::LMarg <<
"prefix: " << d_prefix << endl;
1211 strm << BESIndent::LMarg <<
"size (bytes): " << d_max_cache_size_in_bytes << endl;
1212 BESIndent::UnIndent();
virtual unsigned long long get_cache_size()
Get the cache size.
bool cache_enabled() const
void initialize(const std::string &cache_dir, const std::string &prefix, unsigned long long size)
Initialize an instance of FileLockingCache.
virtual void unlock_and_close(const std::string &target)
std::string get_cache_directory() const
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
void enable()
Enable the cache.
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
static bool dir_exists(const std::string &dir)
virtual bool get_exclusive_lock(const std::string &target, int &fd)
void disable()
Disable the cache.
virtual bool get_exclusive_lock_nb(const std::string &target, int &fd)
std::string get_cache_file_prefix() const
void dump(std::ostream &strm) const override
dumps information about this object
virtual void purge_file(const std::string &file)
Purge a single file from the cache.
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
exception thrown if internal error encountered
static std::string assemblePath(const std::string &firstPart, const std::string &secondPart, bool leadingSlash=false, bool trailingSlash=false)
Assemble path fragments making sure that they are separated by a single '/' character.