Source code for nxt.nxt_io

"""Methods for file input and output of nxt file data.
"""
# Builtin
import json
import logging
import os
import time
import sys
# Python 2/3 compatibility
if sys.version_info[0] == 2:
    import cPickle as pickle
    # Internal
    import nxt_path
    import clean_json
    import legacy
    from constants import FILE_FORMAT, GRAPH_VERSION
    from nxt_layer import SAVE_KEY
else:
    import pickle
    # Internal
    from . import nxt_path
    from . import clean_json
    from . import legacy
    from .constants import FILE_FORMAT, GRAPH_VERSION
    from .nxt_layer import SAVE_KEY

import gc
import tempfile

nxt_folder = os.path.dirname(os.path.abspath(__file__))
BUILTIN_GRAPHS_DIR = os.path.join(nxt_folder, 'builtin')
BUILTIN_GRAPHS_ENV_VAR = 'NXT_BUILTINS'
os.environ[BUILTIN_GRAPHS_ENV_VAR] = BUILTIN_GRAPHS_DIR
logger = logging.getLogger(__name__)

plugin_expanders = []


[docs]def register_reference_path_expander(expander): """Register a method to expand reference paths. An expander is a callable \ that takes exactly 1 argument, a filepath. :param expander: A python callable that takes a single argument, and \ returns an expanded filepath. :type expander: callable """ global plugin_expanders msg = 'Registered reference path expander from ' + str(expander.__module__) logger.info(msg) plugin_expanders += [expander]
[docs]def load_file_data(filepath): """Given a file path this function determines if its a known nxt save format and attempts to open it. If the file is out of date it is passed to the legacy converter for conversion. :param filepath: string of save file filepath :type filepath: str :return: dict of file data """ real_path = nxt_path.full_file_expand(filepath) for expander in plugin_expanders: found_path = expander(filepath) if not os.path.isfile(found_path): continue real_path = found_path break _, ext = os.path.splitext(real_path) if ext not in FILE_FORMAT._ALL: raise IOError('Unknown filetype "{}"'.format(ext)) file_type = FILE_FORMAT.ASCII if file_type == FILE_FORMAT.BINARY: with open(real_path, 'rb') as file_object: file_data = pickle.load(file_object) else: with open(real_path, 'r') as file_object: file_data = json.load(file_object, object_hook=clean_json._byteify) file_version = file_data.get(SAVE_KEY.VERSION) if not file_version: file_version = GRAPH_VERSION.VERSION_STR logger.warning('Assuming file version `{}` if trying to open a ' 'file from before 1.0.0 please add ' 'the following line to to top of your save file: ' '"version": "0.45.0"'.format(GRAPH_VERSION.VERSION_STR)) if not file_version.count('.'): raise IOError('Invalid version number format! Please ensure your save ' 'file has the following line: "version": "1.x" where ' 'the x is replaced with an int.') # Strip off patch number as we don't support save converters for hot fixes major_version, minor_version = [int(n) for n in file_version.split('.')[0:2]] file_too_new = (major_version > GRAPH_VERSION.MAJOR or (major_version == GRAPH_VERSION.MAJOR and minor_version > GRAPH_VERSION.MINOR)) if file_too_new: raise IOError('You are attempting to open a file saved with a newer ' 'version of nxt ({}). ' 'Please update nxt!'.format(file_version)) if major_version == 0 or minor_version < GRAPH_VERSION.MINOR: # Calls a legacy check if the full version number doesn't match the # current version start = time.time() template = (file_data.get(SAVE_KEY.VERSION), GRAPH_VERSION.VERSION_STR, str(round(time.time() - start, 3))) file_data = legacy.FileConverter.get_converted_data(file_data) logger.info("File converted from '{}' to '{}' in {}".format(*template)) file_data[SAVE_KEY.FILEPATH] = filepath file_data[SAVE_KEY.REAL_PATH] = real_path name_from_file = os.path.splitext(os.path.basename(filepath))[0] file_data[SAVE_KEY.NAME] = file_data.get(SAVE_KEY.NAME, name_from_file) return file_data
[docs]def save_file_data(save_data, filepath=None, file_format=FILE_FORMAT.ASCII): """Saves the given file data to the given file path in the given file format. If no filepath is provided a temp file is generated. The default file format is ASCII. :param save_data: dict of save data :param filepath: string of desired output file path or None :param file_format: FILE_FORMAT constant :return: string of output file path. """ start = time.time() ext = file_format _, filepath_ext = os.path.splitext(filepath) if filepath_ext not in FILE_FORMAT._ALL: raise TypeError('Unknown file extension "{}"'.format(filepath_ext)) elif filepath_ext != ext: ext = filepath_ext filepath = filepath or tempfile.mkstemp(prefix='nxt_tmp_', suffix=ext)[1] filepath = filepath.replace(os.sep, '/') if not filepath.endswith(ext): filepath += ext if file_format == FILE_FORMAT.BINARY: gc.disable() with open(filepath, 'wb') as out_file: pickle.dump(save_data, out_file, protocol=-1) gc.enable() else: with open(filepath, 'w') as out_file: json.dump(save_data, out_file, indent=4, sort_keys=False, separators=(',', ': ')) logger.info('Successfully saved "' + filepath + '"') update_time = str(int(round((time.time() - start) * 1000))) logger.debug("Saved in: " + update_time + "ms") return filepath
def generate_temp_file(suffix='.nxt'): """Safely generates a temp file and returns a Windows safe path :param suffix: Optional suffix for the file, default is `.nxt` :return: String filepath """ _, cache_filepath = tempfile.mkstemp(suffix=suffix) return cache_filepath.replace(os.sep, '/') def generate_temp_dir(prefix='nxt_tmp_'): """Safely generates a temp dir and returns a Windows safe path :param prefix: Optional prefix to the dir name, default is `nxt_tmp_` :return: String dir path """ return tempfile.mkdtemp(prefix=prefix)