Source code for file_manager

"""
This module makes a batch call to QAQC methods devolped to process csv files created by HOBO sensors at
`meteorological sites`_ on the HJ Andrews experimental forest. It also preforms other file storage and management
functions. For a
specified directory, it processes all files and creates a directory of new, processed csv files.

QAQC methods are imported from :meth:`hobo_qaqc.HOBOdata.reformat_HOBO_csv`.

When module is called :meth:`FileHandling.manage` is executed.

This module is designed to minimize any read/write times by copying all files locally, preforming all processes, and
then transferring files to final directories. This is ideal with external or network drives, but if all directories are\
 local, it will create source and final directories with duplicate file names.

.. _`meteorological sites`: https://andrewsforest.oregonstate.edu/research/infrastructure/climate
"""

import hobo_qaqc, subprocess
from sys import platform
from os import listdir, makedirs, remove
from os.path import isdir, basename, isfile, abspath
from datetime import datetime
import zipfile as zp
from numpy import unique
from shutil import rmtree
import atexit

[docs]class FileHandling: """ Processes all files in assigned directory for timezone, units, and timestep sync, and converts values where necessary. Contains methods for archiving using .zip, wiping directories after processing, and adding to .//metdat directory structure. .. todo:: possible change from sys.platform to os.name to decrease package dependencies possible change from shutil.rmtrees to os.remove os.rmdir """ def __init__(self): """ .. Warning:: Executes MET_hobo//file_path.config as Python file and saves variables to class object. """ self.start_date = datetime.now().strftime('%Y%m%d_%H%M%S') # load config file as one formatted string # partial path is a weak point and assumes that pwd is ./MET_hobo/MET_hobo with open('../file_path.config') as f: lines = f.read() # compile file into pyc (essentially a local .pyc) pyc = compile(lines, '<string>', 'exec') # use exec as function for forward compatibility with 3.x (2.x exec can be a statement) # this process improves speed %12 exec(pyc) # save directory paths to class instance wdir = dir_local_processing self.wdir = wdir self.final_dir = dir_final_storage self.src_dir = dir_source_files self.map_fname2dir = map_fname2dir self.time_step = time_step self.logs = [] self.files = {'.hobo':[], '.csv':[], '.log':[], 'unk_ext':[], 'sites':[]} OS = platform if 'win' in OS: # options mov: cuts, mir: copies, TEE: prints to screen, e: includes sub-dir, XX: excludes dir self.copy = {'cmd':'robocopy', 'opt_mirror_all':'/MIR /TEE /e', 'opt_cut_files':'/mov /TEE /XX'} self.sep = '\\' data_dir = wdir + '_data/' self._mkdirs_exist_ok(data_dir) self.data_dir = data_dir processed = wdir + '_processed/' self._mkdirs_exist_ok(processed) self.proc_dir = processed # whenever, the process exits (errors/complete) write log atexit.register(self.write_log) def _mkdirs_exist_ok(self, dpath): """ Private function. Obsolete in Python >=3.2. Create directory if does not exist. :param dpath: str. Abolute path to directory. .. todo:: In update to >=3.2, mkdirs(exist_ok=True) """ makedirs(dpath) if not isdir(dpath) else False def _get_projname(self, f): """ Private function. Use map_fname2dir (from file_path.config) to key filename to project name :param f: str. Filename :return: str. Project name """ fname2dir = self.map_fname2dir name_list = [fname2dir[k] for k in fname2dir.keys() if k in f.split('_')[0]] return name_list[0] if name_list else 'UnknownProject'
[docs] def set_log_header(self): """ Create header for log file. Assigns first items to list self.logs. """ logs = ['Process CSV output from HOBOWARE for time zone, timestep, and units\n', '=====================================================================\n', 'MET_hobo module\n', 'Date: %s\n'%self.start_date] self.logs.extend(logs)
[docs] def copy_to_wdir(self): """ Copies source files to local working directory using OS specifc DOS, bash, or shell command. Results are output to log file. """ cp = self.copy # Copy files from server to local machine (or to processing folder) cmd = '%s %s %s %s'%(cp['cmd'], self.src_dir, self.data_dir, cp['opt_mirror_all']) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.logs.extend(proc.communicate())
[docs] def index_files(self): ''' Identify files in source directory. Create list of .hobo, .csv, .log files, and any other file type encountered. Identify site as any prefix to the left of "_" in filename and generate a list of unique sites. ''' fdata = self.data_dir index_files = self.files sep = self.sep data_files = listdir(fdata) sites = [] for f in data_files: # initially, file list does not contain any path information. As full file paths are appended to the list # file names have full paths and do not need additional path information if isdir(fdata + f) or isdir(f): # add files from subdirectories f_path = f if isdir(f) else fdata + f oswalk = [f_path + sep + f1 for f1 in listdir(f_path)] data_files.extend(oswalk) else: # strip file path and extension, and read first segment from file name. E.g. RS05 from path/RS05_*.zip sites.append(f.split(sep)[-1].split('_')[0].split('.')[0]) fp = abspath(fdata + f if not isfile(f) else f) if f.endswith('.csv'): index_files['.csv'].append(fp) elif f.endswith('.hobo'): index_files['.hobo'].append(fp) elif f.endswith('.log'): index_files['.log'].append(fp) else: index_files['unk_ext'].append(fp) index_files['sites'] = unique(sites) self.files = index_files
[docs] def qaqc_csv(self): """ Attempt to QAQC all csv files for timezone, timestep sync, and units. For list of .csv files generated by :meth:`index_files`, call :meth:`hobo_qaqc.HOBOdata.reformat_HOBO_csv`. :return: list. strings of filenames processed with \\\\n at end. :return: int. number of csv files :return: int. number of files processed """ # Reformat and QAQC all CSV files proc_dir = self.proc_dir fcsv = self.files['.csv'] fproc = [] for f in fcsv: q = hobo_qaqc.HOBOdata() q.reformat_HOBO_csv(f, proc_dir + basename(f), tstep=self.time_step) q = None fproc.append(f + '\n') fproc_count = fproc.__len__() fdir_count = fcsv.__len__() return fproc, fdir_count, fproc_count
[docs] def zip_hobo_files(self): """ Collect all files with .hobo extension and write to a zip file in the temp directory _processed. Naming convetion is <site>_<today's date>.zip, where site is any filename prefix to the left of "_". For list of .hobo files generated by :meth:`index_files` :return: List of strings of each filename and it's zipped filename with a \\\\n at the end :return: int. Count of hobo files :return: int. Count of zipped files """ proc_dir = self.proc_dir sep = self.sep date = self.start_date # is it faster to loop through a list of files and append to .zip every time OR # is it faster to use glob.glob('*') to generate a list of .hobo's for each site and make a zipfile once fhobo = self.files['.hobo'] zproc = [] for f in fhobo: site = f.split(sep)[-1].split('_')[0].split('.')[0] fname = basename(f) fzip = proc_dir + site + '_' + date + '.zip' with zp.ZipFile(fzip, 'a') as zhobo: zhobo.write(f, fname, compress_type=zp.ZIP_DEFLATED) zproc.append(f + '\n') zproc.append(fzip + '\n') zproc_count = zproc.__len__() fhobo_count = fhobo.__len__() return zproc, fhobo_count, zproc_count
[docs] def copy_to_final_dir(self, file_list, subdir, loc): """ Call OS specific system command to copy from temporary working directory to final storage. Selects files by site using wildcard selection. **Example:** RS12* :param file_list: List of str to select files from. `Example: ['RS12','RS04'] copies files 'RS12*' and 'RS04*'` :param subdir: str. Destination subdirectory within final storage directory. Files are moved to here. :param loc: str. Directory where files are currently located. :return: List of strings of each filename copped to the final directory """ fnc_get_prj = self._get_projname fnc_mkdirs_exists = self._mkdirs_exist_ok cp = self.copy fin_dir = self.final_dir sep = self.sep logs = [] fproc = [] for s in file_list: prj = fnc_get_prj(s) ''' .. Warning:: The following line was edited per bitbucket issue #10_ to create a simpler file storage where file movement is more manually controlled. .. _#10 : https://bitbucket.org/hjandrews/met_hobo/issues/10/task-request-simplify-file-management storage = fin_dir + sep + prj + sep + s + subdir ''' storage = fin_dir + sep + subdir fnc_mkdirs_exists(storage) # Cut files from local machine (or processing folder) to final storage (server) cmd = '%s %s %s %s %s'%(cp['cmd'], loc, storage, '"%s*"'%(basename(s)), cp['opt_cut_files']) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logs.extend(proc.communicate()) fproc.append(s + '\n') self.logs.extend(logs) return fproc
[docs] def del_temp_folders(self): ''' This is to wipe temporary processing folders in the working directory. The convention maintained by this module is that all temp folders have the "_" prefix If any files are still in _processed, and have not been copied to a final storage directory, deletion of this directory will be aborted. .. Warning:: This uses destructive methods which will erase any and all contents of the target directory and any sub- directories within. shutil.rmtree() ''' output = self.proc_dir self.logs.extend(['\n\nDeleting Temporary DIR from Working DIR\n************************************\n', '%s\n%s\n' % (self.proc_dir, self.data_dir)]) rmtree(self.data_dir) if listdir(output) == []: # All processed files output from MET_hobo.hobo_qaqc should be transferred to final storage. rmtree(output) else: self.logs.append('!!!WARNING!!! %s is not empty!\nABORT DIRECTORY CLEAN\n'%output)
[docs] def del_files_frm_srcdir(self): ''' Wipe all files from the src_dir, defined in file_path.config as dir_source_files. All files and sub- folders in this directory will be wiped. If source directory and final directory are the same, this process will abort. .. Warning:: This uses destructive methods which will erase any and all contents of the target directory and any sub- directories within. shutil.rmtree() :return: List of strings of each filename wiped from the source directory ''' sdir = self.src_dir sep = self.sep if sdir == self.final_dir: self.logs.extend('WARNING!!!: Source direcotry is same as final directory. Directory clean aborted\n') return f_wipe = [] for f in listdir(sdir): f_name = sdir + sep + f if isfile(f_name): remove(f_name) elif isdir(f_name): rmtree(f_name) else: f = None f_wipe.append(f + '\n') return f_wipe
[docs] def write_log(self): """ Write log to file. <final storage directory>//logs//hobo_qaqc_<date>.log. Log is a list of strings until this function is called. """ fin_dir = self.final_dir date = self.start_date logs = self.logs self._mkdirs_exist_ok(fin_dir + '/logs') flog = fin_dir + '/logs/hobo_qaqc_' + date + '.log' with open(flog, 'a') as f: f.writelines(logs)
[docs] def manage(self): """ Execute file managment. #. Copy files to working directory (_data). #. Create list of .csv, .hobo, and .logs files in working directory. #. Attempt to preform QAQC on all .csv files and transfer to _processed. #. Create a .zip file for all .hobo files from each site. Disabled per bitbucket `issue #10`_ . #. Copy all files with .csv, .log, and unknown extension to final storage. #. Delete temporary folders in working directory. #. Wipe original source directory. This directory contains files where QAQC was not preformed. Disabled per \ bitbucket `issue #10`_ . #. Write log file. .. _`issue #10`: https://bitbucket.org/hjandrews/met_hobo/issues/10/task-request-simplify-file-management """ def log_chg(proc, start, end, dir_frm, dir_to, tot_cnt, chg_cnt, f_list): log =[ """\n\n----------------------------\nStart {proc} - {start}\n----------------------------\ \n---------Read from {dir_frm}\ \n---------Ouput to {dir_to}\ \n--------- {tot_cnt} files total\ \n--------- {chg_cnt} files processed\ \n""".format(**locals()) ] log.extend(f_list) tail = '\n----------------------------\nEnd %s- %s\n----------------------------\n' log.append(tail%(proc, end)) return log # Copy files to a working directory and index file types and study sites self.set_log_header() self.copy_to_wdir() self.index_files() # Run QAQC and log results start = datetime.now().strftime('%H:%M:%S') c_proc, c_count, cproc_count = self.qaqc_csv() end = datetime.now().strftime('%H:%M:%S') self.logs.extend(log_chg('csv reformat', start, end, self.data_dir, self.proc_dir, c_count, cproc_count, c_proc)) ''' .. Warning:: The following feature was disabled per bitbucket issue #10_ to create a simpler file storage where file movement is more manually controlled. .. _#10 :https://bitbucket.org/hjandrews/met_hobo/issues/10/task-request-simplify-file-management # ZIP any .hobo files into an archive start = datetime.now().strftime('%H:%M:%S') h_proc, h_count, hproc_count = self.zip_hobo_files() end = datetime.now().strftime('%H:%M:%S') self.logs.extend(log_chg('archive .hobo in ZIP', start, end, self.data_dir, self.proc_dir, h_count, hproc_count, h_proc)) ''' self.files['unk_ext'].extend(self.files['.hobo']) # Copy files to final storage location self.logs.append('\n\n Start copy DATA TO FINAL STORAGE\n***********************************************\n\n') _ = self.copy_to_final_dir(self.files['sites'], '_bulk_exp_clean', self.proc_dir) # Store any files that are not recognized as data files self.logs.append('\n\n Start copy UNRECOGNIZED FILE .EXT\n***********************************************\n\n') if self.files['unk_ext'] != []: fproc = self.copy_to_final_dir(self.files['unk_ext'], 'UNK_FILE', self.data_dir) self.logs.extend(fproc) else: self.logs.append('NONE') if self.files['.log'] != []: fproc = self.copy_to_final_dir(self.files['.log'], 'logs', self.data_dir) self.logs.extend(fproc) # Clean directories self.del_temp_folders() ''' .. Warning:: The following feature was disabled per bitbucket issue #10_ to create a simpler file storage where file movement is more manually controlled. .. _#10 : https://bitbucket.org/hjandrews/met_hobo/issues/10/task-request-simplify-file-management start = datetime.now().strftime('%H:%M:%S') f_wipe = self.del_files_frm_srcdir() end = datetime.now().strftime('%H:%M:%S') warn = '!!WARNING!! WIPING ORIGINAL DATA DIRECTORY\n************************************\ \n--------- Files/Dir Wiped %s\n' % f_wipe.__len__() self.logs.extend(warn) self.logs.extend(f_wipe) '''
if __name__ == '__main__': mng = FileHandling() mng.manage()