Source code for file_manager

"""
This module preforms QAQC methods in a batch. Methods were developed to process csv files created by HOBO sensors at
`meteorological sites`_ on the HJ Andrews experimental forest. It also preforms other file storage and management
functions. For a
specified directory, it processes all files and creates a directory of new, processed csv files.

QAQC methods are imported from :meth:`hobo_qaqc.HOBOdata.reformat_HOBO_csv`.

When module is called :meth:`FileHandling.manage` is executed.

This module is designed to minimize any read/write times by copying all files locally, preforming all processes, and
then transferring files to final directories. This is ideal with external or network drives, but if all directories are
local, it will create a final directory which duplicates file names from the source directory.

.. _`meteorological sites`: https://andrewsforest.oregonstate.edu/research/infrastructure/climate
"""

import hobo_qaqc, subprocess
from sys import platform
from os import listdir, makedirs, remove
from os.path import isdir, basename, isfile, abspath, normpath, join
from datetime import datetime
import zipfile as zp
from numpy import unique
from shutil import rmtree
import atexit

__authors__ = 'Greg Cohn'
__version__ = '1.0'

[docs]class FileHandling: """ Processes all files in assigned directory for timezone, units, and timestep sync, and converts values where necessary. Contains methods for archiving using .zip, wiping directories after processing, and adding to existing directory structure: ./<FileArchive>/<Project>/<Site>. .. Warning:: Executes ./MET_hobo/file_path.config as Python file and saves variables to class object. .. todo:: possible change from sys.platform to os.name to decrease package dependencies possible change from shutil.rmtrees to os.remove os.rmdir """ def __init__(self, config='../file_path.config'): """ """ self.start_date = datetime.now().strftime('%Y%m%d_%H%M%S') # load config file as one formatted string # partial path is a weak point and assumes that pwd is ./MET_hobo/MET_hobo with open(config) as f: lines = f.read() map_fname2dir = None # compile file into pyc (essentially a local .pyc) pyc = compile(lines, '<string>', 'exec') # use exec as function for forward compatibility with 3.x (2.x exec can be a statement) # this process improves speed %12 exec(pyc) if map_fname2dir: self.map_fname2dir = map_fname2dir # save directory paths to class instance try: # normpath converts /, \\ and \ to os specific path separators # normpath will not convert special characters \t, \r, \n, \x, etc. wdir = normpath(dir_local_processing) self.wdir = wdir self.final_dir = normpath(dir_final_storage) self.src_dir = normpath(dir_source_files) self.time_step = time_step except NameError: raise SystemExit('Ooops!!!\nYou forgot to setup ./MET_hobo/file_path.config') # Check paths for special characters like tab or newline (\t, \n) for d in [wdir, self.final_dir, self.src_dir]: if self._is_spec_char_in_path(d): raise SyntaxError('Special character used in file path\nUse / NOT \\ \n{}'.format(d)) self.logs = [] self.files = {'.hobo':[], '.csv':[], '.log':[], 'unk_ext':[], 'sites':[]} OS = platform if 'win' in OS: # options mov: cuts, mir: copies, TEE: prints to screen, e: includes sub-dir, XD *: excludes dir *, # XX: exclude (don't delete) files in dest. not in src. self.copy = {'cmd': 'robocopy', 'opt_mirror_all': '/MIR /TEE /XX /XD * ', 'opt_cut_files': '/mov /TEE /XX'} self.sep = '\\' else: raise OSError('This module does not support %s at this time\n'%OS) data_dir = join(wdir, '_data') self._mkdirs_exist_ok(data_dir) self.data_dir = data_dir processed = join(wdir, '_processed') self._mkdirs_exist_ok(processed) self.proc_dir = processed # whenever, the process exits (errors/complete) write log atexit.register(self.write_log) @staticmethod def _mkdirs_exist_ok(dpath): """ Private function. Obsolete in Python >=3.2. Create directory if does not exist. :param dpath: str. Absolute path to directory. .. todo:: In update to >=3.2, mkdirs(exist_ok=True) """ makedirs(dpath) if not isdir(dpath) else False @staticmethod def _is_spec_char_in_path(path_name): """ Search for special characters in file path. Returns True if invalid character is found. Else returns False. \ is a special escape character in DOS, UNIX, regex, Python... but it is also the path separator in windows, so this can lead to lots of confusion :param path_name: str containing path name :return: Boolean """ spec_char = ['\n', '\r', '\t', '\a', '\f', '\v', '\b', '\n', '\1', '\2', '\3', '\4', '\5', '\6', '\7', '\0'] for this_char in spec_char: if this_char in path_name: return True return False def _get_projname(self, f): """ Private function. Use map_fname2dir (from file_path.config) to key filename to project name :param f: str. Filename :return: str. Project name """ fname2dir = self.map_fname2dir name_list = [fname2dir[k] for k in fname2dir.keys() if k in f.split('_')[0].split(' ')[0]] return name_list[0] if name_list else 'UnknownProject'
[docs] def set_log_header(self): """ Create header for log file. Assigns first items to list self.logs. """ logs = ['Process CSV output from HOBOWARE for time zone, timestep, and units\n', '=====================================================================\n', 'MET_hobo module\n', 'Date: %s\n'%self.start_date] self.logs.extend(logs)
[docs] def copy_src_to_wdir(self): """ Copies source files to local working directory using OS specifc DOS, bash, or shell command. Results are output to log file. Directory paths assigned to instance from file_path.config when instance is initialized. `cp <dir_source_files> <wdir/_data>` """ cp = self.copy # Copy files from server to local machine (or to processing folder) cmd = '%s "%s" "%s" %s'%(cp['cmd'], self.src_dir, self.data_dir, cp['opt_mirror_all']) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.logs.extend(proc.communicate())
[docs] def index_files(self): ''' Identify files in source directory. Create list of .hobo, .csv, .log files, and any other file type encountered. Identify site as any prefix to the left of "_" in filename and generate a list of unique sites. ''' fdata = self.data_dir index_files = self.files sep = self.sep data_files = listdir(fdata) sites = [] for f in data_files: # initially, file list does not contain any path information. As full file paths are appended to the list # file names have full paths and do not need additional path information if isdir(fdata + f) or isdir(f): # add files from subdirectories f_path = f if isdir(f) else fdata + f oswalk = [f_path + sep + f1 for f1 in listdir(f_path)] data_files.extend(oswalk) else: # strip file path and extension, and read first segment from file name. E.g. RS05 from path/RS05_*.zip fp = abspath(f if isfile(f) else join(fdata, f)) if f.endswith('.csv'): index_files['.csv'].append(fp) sites.append(f.split(sep)[-1].split('_')[0].split(' ')[0].split('.')[0]) elif f.endswith('.hobo'): index_files['.hobo'].append(fp) elif f.endswith('.log'): index_files['.log'].append(fp) else: index_files['unk_ext'].append(fp) index_files['sites'] = unique(sites) self.files = index_files
[docs] def qaqc_csv(self, time_step=None, units='SI', tz=-8): """ Attempt to QAQC all csv files for timezone, timestep sync, and units. For list of .csv files generated by :meth:`index_files`, call :meth:`hobo_qaqc.HOBOdata.reformat_HOBO_csv`. :return: list. strings of filenames processed with \\\\n at end. :return: int. number of csv files :return: int. number of files processed """ if time_step is None: time_step = self.time_step # Reformat and QAQC all CSV files proc_dir = self.proc_dir fcsv = self.files['.csv'] fproc = [] for f in fcsv: q = hobo_qaqc.HOBOdata() q.reformat_HOBO_csv(f, join(proc_dir, basename(f)), tstep=time_step, units=units, tz=tz) q = None fproc.append(f + '\n') fproc_count = fproc.__len__() fdir_count = fcsv.__len__() return fproc, fdir_count, fproc_count
[docs] def zip_hobo_files(self): """ Collect all files with .hobo extension and write to a zip file in the temp directory _processed. Naming convetion is <site>_<today's date>.zip, where site is any filename prefix to the left of "_". For list of .hobo files generated by :meth:`index_files` :return: List of strings of each filename and it's zipped filename with a \\\\n at the end :return: int. Count of hobo files :return: int. Count of zipped files """ proc_dir = self.proc_dir sep = self.sep date = self.start_date # is it faster to loop through a list of files and append to .zip every time OR # is it faster to use glob.glob('*') to generate a list of .hobo's for each site and make a zipfile once fhobo = self.files['.hobo'] zproc = [] for f in fhobo: site = f.split(sep)[-1].split('_')[0].split('.')[0] fname = basename(f) fzip = join(proc_dir, site + '_' + date + '.zip') with zp.ZipFile(fzip, 'a') as zhobo: zhobo.write(f, fname, compress_type=zp.ZIP_DEFLATED) zproc.append(f + '\n') zproc.append(fzip + '\n') zproc_count = zproc.__len__() fhobo_count = fhobo.__len__() return zproc, fhobo_count, zproc_count
[docs] def copy_processed_to_final_dir(self): """ Copies processed (QC'd) files from local working directory to final directory using OS specifc DOS, bash, or shell command. Results are output log file. Directory paths assigned to instance from file_path.config when instance is initialized. `cp <wdir/_processed> <dir_final_storage>` """ cp = self.copy # Copy files from server to local machine (or to processing folder) cmd = '%s "%s" "%s" %s'%(cp['cmd'], self.proc_dir, self.final_dir, cp['opt_cut_files']) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.logs.extend(proc.communicate())
[docs] def copy_selected_to_site_dir(self, file_list, subdir, loc): """ Call OS specific system command to copy desired files from temporary working directory to final storage. Selects files by site using wildcard selection. Example:: `cp <wdir/_processed/site*> <dir_final_storage/proj_name/site_name/subdir>` `cp //NewServer/hoboQA/_processed/RS12* //DataServer/REFSTANDS/RS12/_bulk_export_clean` .. Warning:: This method was modified per bitbucket issue `issue #10`_ to create a simpler work flow where file movement is more manually controlled. At 6ec103b it was superseded by `copy_processed_to_final_dir`, removing it from the workflow. It remains as a legacy method still in BETA testing. :param file_list: List of str to select files from. `Example: ['RS12','RS04'] copies files 'RS12*' and 'RS04*'` :param subdir: str. Destination subdirectory within final storage directory. Files are moved to here. :param loc: str. Directory where files are currently located. :return: List of strings of each filename copied to the final directory """ fnc_get_prj = self._get_projname fnc_mkdirs_exists = self._mkdirs_exist_ok cp = self.copy fin_dir = self.final_dir sep = self.sep logs = [] fproc = [] for s in file_list: prj = fnc_get_prj(s) ''' .. Warning:: The following line was edited per bitbucket issue #10_ to create a simpler file storage where file movement is more manually controlled. .. _#10 : https://bitbucket.org/hjandrews/im_hobo/issues/10/task-request-simplify-file-management At commit 6ec103b `file_manager.FileHandling().copy_to_final_dir()` was added. This line no longer needs to be commented/substituted for the current work flow, but has not been thoroughly tested and should be considered a Beta option. storage = fin_dir + sep + subdir ''' if isfile(s): storage = fin_dir + sep + subdir else: storage = fin_dir + sep + prj + sep + s + sep + subdir fnc_mkdirs_exists(storage) # Cut files from local machine (or processing folder) to final storage (server) cmd = '%s "%s" "%s" %s %s'%(cp['cmd'], loc, storage, '%s*'%(basename(s)), cp['opt_cut_files']) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logs.extend(proc.communicate()) fproc.append(s + '\n') self.logs.extend(logs) return fproc
[docs] def del_temp_folders(self): ''' This is to wipe temporary processing folders in the working directory. The convention maintained by this module is that all temp folders have the "_" prefix If any files are still in _processed, and have not been copied to a final storage directory, deletion of this directory will be aborted. .. Warning:: This uses destructive methods which will erase any and all contents of the target directory and any sub- directories within. shutil.rmtree() ''' output = self.proc_dir self.logs.extend(['\n\nDeleting Temporary DIR from Working DIR\n************************************\n', '%s\n%s\n' % (output, self.data_dir)]) rmtree(self.data_dir) if listdir(output) == []: # All processed files output from MET_hobo.hobo_qaqc should be transferred to final storage. rmtree(output) else: self.logs.append('!!!WARNING!!! %s is not empty!\nABORT DIRECTORY CLEAN\n'%output)
[docs] def del_files_frm_srcdir(self): ''' Wipe all files from the src_dir, defined in file_path.config as dir_source_files. All files and sub- folders in this directory will be wiped. If source directory and final directory are the same, this process will abort. .. Warning:: This uses destructive methods which will erase any and all contents of the target directory and any sub- directories within. shutil.rmtree() :return: List of strings of each filename wiped from the source directory ''' sdir = self.src_dir sep = self.sep if sdir == self.final_dir: self.logs.extend('WARNING!!!: Source direcotry is same as final directory.\nABORT DIRECTORY CLEAN\n') return f_wipe = [] for f in listdir(sdir): f_name = sdir + sep + f if isfile(f_name): remove(f_name) elif isdir(f_name): rmtree(f_name) else: f = None f_wipe.append(f + '\n') return f_wipe
[docs] def write_log(self): """ Write log to file. <final storage directory>//logs//hobo_qaqc_<date>.log. Log is a list of strings until this function is called. """ fin_dir = self.final_dir date = self.start_date logs = self.logs self._mkdirs_exist_ok(join(fin_dir, 'logs')) flog = join(fin_dir, 'logs/hobo_qaqc_' + date + '.log') with open(flog, 'a') as f: f.writelines(logs)
@staticmethod def _log_chg(proc, start, end, dir_frm, dir_to, tot_cnt, chg_cnt, f_list): """ Private function. Creates a list of strings to add bunches of files to a log :param proc: str. What process was preformed :param start: start time :param end: end time :param dir_frm: directory of source files :param dir_to: directory of final files :param tot_cnt: count of all files :param chg_cnt: count of manipulated files :param f_list: list of file names or file paths :return: list of strings """ log = [ """\n\n----------------------------\nStart {proc} - {start}\n----------------------------\ \n---------Read from {dir_frm}\ \n---------Ouput to {dir_to}\ \n--------- {tot_cnt} files total\ \n--------- {chg_cnt} files processed\ \n""".format(**locals()) ] log.extend(f_list) tail = '\n----------------------------\nEnd %s- %s\n----------------------------\n' log.append(tail%(proc, end)) return log
[docs] def manage(self, time_step=None, units='SI', tz=-8, final_subdirs=False): """ Execute file managment. #. Copy files to working directory (./_data). #. Create list of .csv, .hobo, and .logs files in working directory. #. Attempt to preform QAQC on all .csv files and transfer to ./_processed. #. Create a .zip file for all .hobo files from each site. Disabled per bitbucket `issue #10`_ . #. Copy all files with .csv, .log, and unknown extension to final storage. #. Delete temporary folders in working directory. #. Wipe original source directory. This directory contains files where QAQC was not preformed. Disabled per \ bitbucket `issue #10`_ . #. Write log file. 3 keyword variables are defined to allow the user to alter :meth:`~hobo_qaqc.HOBOdata.format_QAQC_data` settings. `units`, and `tz` (time zone) are set to default values, SI units and PST (GMT-8). To change these values, :meth:`manage` must be called directly, through the terminal, or through Python. `time_step`, is defined in the :doc:`config file <file_path>`. This argument only needs to be defined here if the user wants to override the config file at the command line. .. _`issue #10`: https://bitbucket.org/hjandrews/im_hobo/issues/10/task-request-simplify-file-management """ # Copy files to a working directory and index file types and study sites self.set_log_header() self.copy_src_to_wdir() self.index_files() # Run QAQC and log results if time_step is not None: self.logs.extend(['**********************************', '\nTime step used is %s\nTime step in config file was (%s) and was overrided at \ terminal\n'% (time_step, self.time_step)]) start = datetime.now().strftime('%H:%M:%S') c_proc, c_count, cproc_count = self.qaqc_csv(time_step=time_step, units=units, tz=tz) end = datetime.now().strftime('%H:%M:%S') self.logs.extend(self._log_chg('csv reformat', start, end, self.data_dir, self.proc_dir, c_count, cproc_count, c_proc)) ''' .. Warning:: The following feature was disabled per bitbucket issue #10_ to create a simpler file storage where file movement is more manually controlled. .. _#10 :https://bitbucket.org/hjandrews/im_hobo/issues/10/task-request-simplify-file-management # ZIP any .hobo files into an archive start = datetime.now().strftime('%H:%M:%S') h_proc, h_count, hproc_count = self.zip_hobo_files() end = datetime.now().strftime('%H:%M:%S') self.logs.extend(self._log_chg('archive .hobo in ZIP', start, end, self.data_dir, self.proc_dir, h_count, hproc_count, h_proc)) ''' self.files['unk_ext'].extend(self.files['.hobo']) # Copy files to final storage location self.logs.append('\n\n Start copy DATA TO FINAL STORAGE\n***********************************************\n\n') if final_subdirs: _ = self.copy_selected_to_site_dir(self.files['sites'], '_bulk_exp_clean', self.proc_dir) # Store any files that are not recognized as data files if self.files['unk_ext'] != []: self.logs.append( '\n\n Start copy UNRECOGNIZED FILE .EXT\n***********************************************\n\n') fproc = self.copy_selected_to_site_dir(self.files['unk_ext'], 'UNK_FILE', self.data_dir) self.logs.extend(fproc) if self.files['.log'] != []: fproc = self.copy_selected_to_site_dir(['.log'], 'logs', self.data_dir) self.logs.extend(fproc) else: self.copy_processed_to_final_dir() # Clean directories self.del_temp_folders() ''' .. Warning:: The following feature was disabled per bitbucket issue #10_ to create a simpler file storage where file movement is more manually controlled. .. _#10 : https://bitbucket.org/hjandrews/im_hobo/issues/10/task-request-simplify-file-management start = datetime.now().strftime('%H:%M:%S') f_wipe = self.del_files_frm_srcdir() end = datetime.now().strftime('%H:%M:%S') warn = '!!WARNING!! WIPING ORIGINAL DATA DIRECTORY\n************************************\ \n--------- Files/Dir Wiped %s\n' % f_wipe.__len__() self.logs.extend(warn) self.logs.extend(f_wipe) '''
if __name__ == '__main__': mng = FileHandling() mng.manage()