"""Ways to operate on and ask questions of nxt layers.
"""
# Built-in
import os
import json
import copy
import logging
import time
import sys
from collections import OrderedDict
# Internal
if sys.version_info[0] == 2:
import nxt_io
import nxt_path
import nxt_node
from constants import GRAPH_VERSION
from runtime import Console
else:
from . import nxt_io
from . import nxt_path
from . import nxt_node
from .constants import GRAPH_VERSION
from .runtime import Console
from . import UNTITLED
logger = logging.getLogger(__name__)
class SAVE_KEY(object):
VERSION = 'version'
ALIAS = 'alias'
MUTE = 'mute'
SOLO = 'solo'
REFERENCES = 'references'
COMP_ORVERRIDES = 'comp_overrides'
COLOR = 'color'
META_DATA = 'meta_data'
NODES = 'nodes'
CWD = 'cwd'
FILEPATH = 'filepath'
REAL_PATH = 'real_path'
ATTRS = 'attrs'
NAME = 'name'
class META_DATA_KEY(object):
POSITIONS = 'positions'
COLLAPSE = 'collapse'
COLORS = 'colors'
ALIASES = 'aliases'
class LAYERS(object):
COMP = '<display>'
TARGET = '<target>'
TOP = '<top>'
class AUTHORING:
CREATE = 'create'
REFERENCE = 'reference'
ABOVE = 0
BELOW = 1
class LayerReturnTypes(object):
Node = 'Node' # Return a node or list of nodes
Path = 'Path' # Return a node path or list of node paths
NodeTable = 'NodeTable' # Return a list of lists [[NodePath, Node]]
NameDict = 'NameDict' # Return a dict where k is node NAME and v is node
PathDict = 'PathDict' # Return a dict where k is node PATH and v is node
Boolean = 'Boolean' # Return if there is at least one child
[docs]class SpecLayer(object):
"""Layer object representing layer data from disk.
"""
RETURNS = LayerReturnTypes
[docs] @classmethod
def load_from_filepath(cls, filepath):
"""Given a filepath, return a SpecLayer loaded from that path.
:param filepath: path to file to load
:type filepath: str
:return: Loaded SpecLayer
:rtype: SpecLayer
"""
layer_data = nxt_io.load_file_data(filepath)
return cls.load_from_layer_data(layer_data)
@classmethod
def load_from_layer_data(cls, layer_data):
return cls(layer_data)
def __init__(self, layer_data=None):
"""Create a SpecLayer, optionally from layer data.
:param layer_data: layer data dictionary, defaults to None
:type layer_data: dict, optional
"""
if layer_data is None:
layer_data = {}
self._cached_children = {}
self._cached_implied_children = {}
self._name = UNTITLED
self._layer_idx = 0
self.mute = False
self.solo = False
self.filepath = layer_data.get(SAVE_KEY.FILEPATH)
self.real_path = layer_data.get(SAVE_KEY.REAL_PATH)
self.cwd = layer_data.get(SAVE_KEY.CWD)
if self.filepath:
file_name = os.path.basename(self.filepath)
else:
file_name = UNTITLED
self.alias = layer_data.get(SAVE_KEY.ALIAS, file_name)
self.comp_overrides = layer_data.get(SAVE_KEY.COMP_ORVERRIDES, {})
self.sub_layer_paths = []
self.sub_layers = []
for layer_path in layer_data.get(SAVE_KEY.REFERENCES, []):
if not layer_path:
continue
self.sub_layer_paths += [layer_path]
self.sub_layers += [{SAVE_KEY.FILEPATH: layer_path}]
self.parent_layer = layer_data.get('parent_layer', None)
meta_data = layer_data.get(SAVE_KEY.META_DATA, {})
self.positions = meta_data.get(META_DATA_KEY.POSITIONS, {})
self.collapse = meta_data.get(META_DATA_KEY.COLLAPSE, {})
self.aliases = meta_data.get(META_DATA_KEY.ALIASES, {})
self.colors = meta_data.get(META_DATA_KEY.COLORS, {})
self.color = layer_data.get(SAVE_KEY.COLOR, None)
self.spec_list = []
self._nodes_path_as_key = {}
self._nodes_node_as_key = {}
self._construct_node_specs(layer_data)
self.refresh()
def get_cwd(self):
path = getattr(self, SAVE_KEY.CWD)
if path:
return path
path = self.real_path
if path:
return os.path.dirname(path)
return os.getcwd()
def _construct_node_specs(self, layer_data):
self.spec_list = []
self._nodes_path_as_key = {}
self._nodes_node_as_key = {}
nodes = order_nodes_dict(layer_data.get(SAVE_KEY.NODES, {}))
for node_path, node_data in nodes.items():
parent_path = nxt_path.get_parent_path(node_path)
root_name = nxt_path.node_name_from_node_path(node_path)
node_data['name'] = root_name
node_data[nxt_node.INTERNAL_ATTRS.SOURCE_LAYER] = self.real_path
root_spec_node = nxt_node.create_spec_node(node_data, self,
parent_path=parent_path)
root_parent_path = getattr(root_spec_node,
nxt_node.INTERNAL_ATTRS.PARENT_PATH)
root_node_path = nxt_path.join_node_paths(root_parent_path,
root_name)
self.spec_list += [root_spec_node]
self._nodes_path_as_key[root_node_path] = root_spec_node
self._nodes_node_as_key[root_spec_node] = root_node_path
self.clear_node_child_cache(root_node_path)
def refresh(self):
"""Re-populates the node table, and sets nodes' source layer.
"""
self._node_table = []
for path, node in self._nodes_path_as_key.items():
for attr in get_node_local_attr_names(path, [self]):
source_attr = attr + nxt_node.META_ATTRS.SOURCE
setattr(node, source_attr, (self.real_path, path))
node_ns = nxt_path.str_path_to_node_namespace(path)
self._node_table += [[node_ns, node]]
self.clear_node_child_cache(path)
self.sort_node_table()
def sort_node_table(self):
self._node_table = sort_multidimensional_list(self._node_table,
sort_by_idx=0)
def layer_idx(self):
return self._layer_idx
def lookup(self, node_path):
return self._nodes_path_as_key.get(node_path)
def node_exists(self, node_path):
return bool(self.lookup(node_path))
def get_exec_in(self, node_path):
node = self.lookup(node_path)
parent_path = getattr(node, nxt_node.INTERNAL_ATTRS.PARENT_PATH)
if node and parent_path == nxt_path.WORLD:
try:
return getattr(node, nxt_node.INTERNAL_ATTRS.EXECUTE_IN)
except AttributeError:
pass
return None
def get_node_path(self, node):
return self._nodes_node_as_key.get(node)
def ancestors(self, node_path, return_type=LayerReturnTypes.Node,
include_implied=False):
if include_implied and return_type != LayerReturnTypes.Path:
raise TypeError('When including implied nodes, {} is an '
'unsupported return type'.format(return_type))
if include_implied:
return nxt_path.all_ancestor_paths(node_path)
ancestor_nodes = []
ancestor_paths = []
node_table = []
name_dict = {}
if sys.version_info[0] == 2:
node_path_is_str = isinstance(node_path, basestring)
else:
node_path_is_str = isinstance(node_path, str)
if not node_path_is_str:
node_path = self.get_node_path(node_path)
node = self.lookup(node_path)
parent_path = getattr(node, nxt_node.INTERNAL_ATTRS.PARENT_PATH)
if node:
while parent_path != nxt_path.WORLD:
_node = self.lookup(parent_path)
if _node is None:
parent_path = nxt_path.get_parent_path(parent_path)
continue
if return_type == LayerReturnTypes.Boolean:
return True
elif return_type == LayerReturnTypes.Node:
ancestor_nodes += [_node]
elif return_type == LayerReturnTypes.Path:
ancestor_paths += [parent_path]
elif return_type == LayerReturnTypes.NodeTable:
node_table += [[parent_path, _node]]
elif return_type == LayerReturnTypes.NameDict:
key = getattr(node, nxt_node.INTERNAL_ATTRS.NAME)
name_dict[key] = _node
node = _node
ppath_attr = nxt_node.INTERNAL_ATTRS.PARENT_PATH
parent_path = getattr(node, ppath_attr)
if return_type == LayerReturnTypes.Node:
return ancestor_nodes
elif return_type == LayerReturnTypes.Path:
return ancestor_paths
elif return_type == LayerReturnTypes.NodeTable:
return node_table
elif return_type == LayerReturnTypes.NameDict:
return name_dict
elif return_type == LayerReturnTypes.Boolean:
return False
def children(self, node_path=nxt_path.WORLD,
return_type=LayerReturnTypes.Node, ordered=False,
include_implied=False):
if include_implied and return_type != LayerReturnTypes.Path:
raise ValueError('When including implied, can only return path. '
'Nothing else exists for implicit nodes.')
children_nodes = []
children_paths = []
node_table = []
name_dict = {}
if not node_path:
if return_type in (LayerReturnTypes.Node, LayerReturnTypes.Path,
LayerReturnTypes.NodeTable):
return []
elif return_type == LayerReturnTypes.NameDict:
return {}
elif return_type == LayerReturnTypes.Boolean:
return False
else:
logger.error('Invalid return type provided')
return None
child_order = []
implied_children = []
# Look up real children cache
children_cache = self._cached_children.get(node_path)
if node_path == nxt_path.WORLD:
children_cache = None
if children_cache is not None:
children_nodes = children_cache[LayerReturnTypes.Node][:]
children_paths = children_cache[LayerReturnTypes.Path][:]
node_table = children_cache[LayerReturnTypes.NodeTable][:]
name_dict = copy.copy(children_cache[LayerReturnTypes.NameDict])
cache_real = False
else:
self._cached_children[node_path] = {LayerReturnTypes.Node:
children_nodes,
LayerReturnTypes.Path:
children_paths,
LayerReturnTypes.NodeTable:
node_table,
LayerReturnTypes.NameDict:
name_dict}
cache_real = True
# Lookup implied cache
cache_implied = False
if include_implied:
implied_c_cache = self._cached_implied_children.get(node_path)
if node_path == nxt_path.WORLD:
implied_c_cache = None
if implied_c_cache is not None:
implied_children = implied_c_cache[LayerReturnTypes.Path][:]
if return_type == LayerReturnTypes.Boolean and not cache_real:
return bool(implied_children + children_paths)
else:
path_implied = {LayerReturnTypes.Path: implied_children}
self._cached_implied_children[node_path] = path_implied
cache_implied = True
re_cache = cache_implied or cache_real
if re_cache:
for path, node in self._nodes_path_as_key.items():
if cache_real:
parent_path = getattr(node,
nxt_node.INTERNAL_ATTRS.PARENT_PATH)
if parent_path == node_path:
children_nodes += [node]
children_paths += [path]
node_table += [[path, node]]
key = getattr(node, nxt_node.INTERNAL_ATTRS.NAME)
name_dict[key] = node
if not include_implied or not cache_implied:
continue
if nxt_path.is_ancestor(path, node_path):
trim_depth = nxt_path.get_path_depth(node_path) + 1
trimmed = nxt_path.trim_to_depth(path, trim_depth)
if trimmed not in implied_children:
implied_children += [trimmed]
if cache_real:
k = LayerReturnTypes.Path
self._cached_children[node_path][k] = children_paths[:]
if include_implied:
for imp in implied_children:
if imp not in children_paths:
children_paths += [imp]
if return_type == LayerReturnTypes.Boolean:
return bool(children_paths)
if ordered:
node = self.lookup(node_path)
if include_implied and not node:
child_order = []
else:
co_attr = nxt_node.INTERNAL_ATTRS.CHILD_ORDER
child_order = getattr(node, co_attr)
if child_order:
ordered_child_nodes = []
ordered_child_paths = []
ordered_node_table = []
for child_name in child_order:
# return type NODE
for n in children_nodes:
c_name = getattr(n, nxt_node.INTERNAL_ATTRS.NAME)
if c_name == child_name:
ordered_child_nodes += [n]
# return type PATH
for p in children_paths:
if nxt_path.node_name_from_node_path(p) == child_name:
ordered_child_paths += [p]
# return type TABLE
for item in node_table:
p, n = item
if nxt_path.node_name_from_node_path(p) == child_name:
ordered_node_table += [item]
# return type NODE
for n in children_nodes:
if n not in ordered_child_nodes:
ordered_child_nodes += [n]
children_nodes = ordered_child_nodes
# return type PATH
for p in children_paths:
if p not in ordered_child_paths:
ordered_child_paths += [p]
children_paths = ordered_child_paths
# return type TABLE
for item in node_table:
p, n = item
if p not in ordered_node_table:
ordered_node_table += [item]
node_table = ordered_node_table
if return_type == LayerReturnTypes.Node:
return children_nodes
elif return_type == LayerReturnTypes.Path:
return children_paths
elif return_type == LayerReturnTypes.NodeTable:
return node_table
elif return_type == LayerReturnTypes.NameDict:
return name_dict
elif return_type == LayerReturnTypes.Boolean:
return False
else:
logger.error('Invalid return type provided')
return None
def descendants(self, node_path=nxt_path.WORLD,
return_type=LayerReturnTypes.Path, ordered=False,
include_implied=False):
if return_type not in (LayerReturnTypes.Node, LayerReturnTypes.Path,
LayerReturnTypes.NodeTable,):
raise TypeError('Unsupported return type {}'.format(return_type))
if node_path == nxt_path.WORLD:
if ordered:
raise ValueError('Cannot get ordered descendants of the world')
return self._world_descendants(return_type,
include_implied=include_implied)
if ordered:
if return_type is not LayerReturnTypes.Path:
raise ValueError('Ordered descedants can only return paths.')
return self._ordered_descendants(node_path,
include_implied=include_implied)
descendants = self.children(node_path, return_type,
include_implied=include_implied)
more_descendants = descendants
while more_descendants:
_temp = []
if return_type == LayerReturnTypes.Path:
for d_pth in more_descendants:
_temp += self.children(d_pth, return_type,
include_implied=include_implied)
elif return_type == LayerReturnTypes.NodeTable:
for d_pth, d in more_descendants:
_temp += self.children(d_pth, return_type,
include_implied=include_implied)
descendants += _temp
more_descendants = _temp
return descendants
def _ordered_descendants(self, node_path, include_implied=False):
desc = []
for child_path in self.children(node_path, ordered=True,
return_type=LayerReturnTypes.Path,
include_implied=include_implied):
desc += [child_path]
desc += self._ordered_descendants(child_path,
include_implied=include_implied)
return desc
def _world_descendants(self, return_type, include_implied=False):
if include_implied and return_type != LayerReturnTypes.Path:
raise ValueError('When including implied, can only return path. '
'Nothing else exists for implicit nodes.')
if return_type == LayerReturnTypes.Path:
paths = list(self._nodes_path_as_key.keys())
if nxt_path.WORLD in paths:
paths.remove(nxt_path.WORLD)
if not include_implied:
return paths
implied_paths = set()
for real_path in paths:
ancest_paths = nxt_path.all_ancestor_paths(real_path)
implied_paths = implied_paths.union(ancest_paths)
if nxt_path.WORLD in implied_paths:
implied_paths.remove(nxt_path.WORLD)
return implied_paths.union(paths)
if return_type == LayerReturnTypes.Node:
nodes = list(self._nodes_node_as_key.keys())
world_node = self._nodes_path_as_key.get(nxt_path.WORLD)
if world_node:
nodes.remove(world_node)
return nodes
if return_type == LayerReturnTypes.NodeTable:
node_table = []
for path, node in self._nodes_path_as_key.items():
if path is nxt_path.WORLD:
continue
node_table += [[path, node]]
return node_table
raise TypeError('Unsupported return type {}'.format(return_type))
def clear_node_child_cache(self, node_path):
try:
self._cached_children.pop(node_path)
except KeyError:
pass
try:
self._cached_implied_children.pop(node_path)
except KeyError:
pass
def get_exec_order(self, start_path):
if start_path == nxt_path.WORLD:
return []
start_root_path = nxt_path.get_root_path(start_path)
exec_order = []
start_found = False
root_exec_order = self.get_root_exec_order(start_root_path)
for root_path in root_exec_order:
if not start_found and root_path == start_path:
start_found = True
node = self.lookup(root_path)
enabled = nxt_node.get_node_enabled(node)
if enabled is None:
enabled = True
if not enabled:
continue
if start_found:
exec_order += [root_path]
path_rt_type = LayerReturnTypes.Path
disabled_desc = []
for desc_path in self.descendants(node_path=root_path,
ordered=True,
return_type=path_rt_type):
if not start_found and desc_path == start_path:
start_found = True
desc_node = self.lookup(desc_path)
enabled = nxt_node.get_node_enabled(desc_node)
if enabled is None:
enabled = True
if enabled:
for potential_ancestor in disabled_desc:
if nxt_path.is_ancestor(desc_path, potential_ancestor):
enabled = False
break
if not enabled:
disabled_desc += [desc_path]
continue
if start_found:
exec_order += [desc_path]
return exec_order
def get_root_exec_order(self, start_root_path):
exec_order = [start_root_path]
prev_root_path = start_root_path
while True:
for root in self.children():
try:
exec_in = getattr(root, nxt_node.INTERNAL_ATTRS.EXECUTE_IN)
match = exec_in == prev_root_path
except AttributeError:
continue
if not match:
continue
prev_root_path = self.get_node_path(root)
exec_order += [prev_root_path]
break # for loop
else:
break # while loop
return exec_order
def get_muted(self, local=False):
muted = getattr(self, SAVE_KEY.MUTE, False)
if not local:
ref_data = get_comped_layer_overs(self, SAVE_KEY.COMP_ORVERRIDES)
r_dat = ref_data.get(self.filepath, {})
muted = r_dat.get(SAVE_KEY.MUTE, muted)
return muted
def set_muted(self, state):
self.mute = state
def set_mute_over(self, layer_path, state):
ref_data = get_comped_layer_overs(self, SAVE_KEY.COMP_ORVERRIDES)
r_dat = ref_data.get(layer_path, {})
r_dat[SAVE_KEY.MUTE] = state
ref_data[layer_path] = r_dat
self.comp_overrides = ref_data
def get_soloed(self, local=False):
soloed = getattr(self, SAVE_KEY.SOLO, False)
if not local:
ref_data = get_comped_layer_overs(self, SAVE_KEY.COMP_ORVERRIDES)
r_dat = ref_data.get(self.filepath, {})
soloed = r_dat.get(SAVE_KEY.SOLO, soloed)
return soloed
def set_soloed(self, state):
self.solo = state
def set_solo_over(self, layer_path, state):
ref_data = get_comped_layer_overs(self, SAVE_KEY.COMP_ORVERRIDES)
r_dat = ref_data.get(layer_path, {})
r_dat[SAVE_KEY.SOLO] = state
ref_data[layer_path] = r_dat
self.comp_overrides = ref_data
[docs] def add_reference(self, layer_path=None, layer=None, insert_idx=None):
"""Given a layer path, a layer object, or both, add a reference to
specified layer on this layer, optionally at a specified insert index.
:param layer_path: Path to use as reference path, defaults to None, \
if None is given, defaults to given `layer.real_path`
:type layer_path: str, optional
:param layer: layer to add as reference layer in this layer
:type layer: SpecLayer
:param insert_idx: index to insert given layer at, defaults to None, \
if None is given, defaults to lowest reference layer.
:type insert_idx: int, optional
:return: index given layer was inserted at.
:rtype: int
:raises ValueError: If not provided with a layer path or layer.
"""
if not (layer_path or layer):
raise ValueError("Must specify layer path or layer to reference.")
if insert_idx is None:
insert_idx = len(self.sub_layer_paths)
if layer_path is None:
layer_path = layer.real_path
self.sub_layer_paths.insert(insert_idx, layer_path)
sub_layer_data = {SAVE_KEY.FILEPATH: layer_path}
if layer is not None:
sub_layer_data['layer'] = layer
self.sub_layers.insert(insert_idx, sub_layer_data)
def get_references(self):
refs = []
for ref in self.sub_layers:
refs += [ref[SAVE_KEY.FILEPATH]]
return refs
def get_alias(self, local=False, fallback_to_local=True):
"""Get the layer's alias (nice name). By default the local alias is
retuned, if local is False the parent layer's opinion of the alias is
returned.
:param local: If True the local opinion is returned, if False the
strongest override opinion is returned.
:type local: bool
:param fallback_to_local: if True and local is False and there is no
override opinion the local alias is returned. If false and no
override opinion is found None is returned.
:return: string of layer alias or None
"""
alias = getattr(self, SAVE_KEY.ALIAS, None)
if not local:
aliases = get_comped_layer_overs(self, META_DATA_KEY.ALIASES)
if fallback_to_local:
fallback = alias
else:
fallback = None
alias = aliases.get(self.filepath, fallback)
return alias
def set_alias(self, alias):
self.alias = alias
def set_alias_over(self, alias):
"""Set a alias override on the root parent of a layer.
:param alias: new alias string
"""
layer_path = self.filepath
root = get_root_parent(self)
if not root:
logger.error('Can not set alias override on top layer!')
if alias is None:
if layer_path in list(root.aliases.keys()):
root.aliases.pop(layer_path)
else:
root.aliases[layer_path] = alias
def get_color(self, local=True, fallback_to_local=True):
"""Get the layer's color. By default the local color is
retuned, if local is False the parent layer's opinion of the alias is
returned.
:param local: bool
:param fallback_to_local: if True and local is False and there is no
override opinion the local color is returned. If false and no
override opinion is found None is returned.
:return: string of layer alias
"""
color = getattr(self, SAVE_KEY.COLOR, None)
if not local:
colors = get_comped_layer_overs(self, META_DATA_KEY.COLORS)
if fallback_to_local:
fallback = color
else:
fallback = None
color = colors.get(self.filepath, fallback)
return color
def set_color_over(self, color):
"""Set a color override on the root parent of a layer.
:param color: hex color
"""
layer_path = self.filepath
root = get_root_parent(self)
if not root:
logger.error('Can not set colors override on top layer!')
if color is None:
if layer_path in list(root.colors.keys()):
root.colors.pop(layer_path)
else:
root.colors[layer_path] = color
[docs] def save(self, filepath=None):
"""Save this layer, optionally to new, given filepath.
:param filepath: file path to save to, defaults to None
:type filepath: str, optional
"""
filepath = filepath or self.real_path
# Update graph name
graph_name = os.path.splitext(os.path.basename(filepath))[0]
if self.get_alias(local=True) == UNTITLED:
self.set_alias(graph_name)
save_data = self.get_save_data()
try:
json.dumps(save_data, indent=4, sort_keys=False)
except TypeError:
logger.error("Failed to save file!")
# TODO: Should raise here, but its out of scope for what I'm
# working on right now.
return
logger.info("Save Data Generated")
nxt_io.save_file_data(save_data=save_data, filepath=filepath)
if filepath != self.real_path:
self.real_path = filepath
self.propegate_real_path()
return save_data
def get_meta_data(self):
positions = OrderedDict(sorted(self.positions.items(),
key=lambda x: x[0]))
collapsed = OrderedDict(sorted(self.collapse.items(),
key=lambda x: x[0]))
aliases = OrderedDict(sorted(self.aliases.items(), key=lambda x: x[0]))
colors = OrderedDict(sorted(self.colors.items(), key=lambda x: x[0]))
meta_data = OrderedDict()
if aliases:
meta_data[META_DATA_KEY.ALIASES] = aliases
if colors:
meta_data[META_DATA_KEY.COLORS] = colors
if positions:
meta_data[META_DATA_KEY.POSITIONS] = positions
if collapsed:
meta_data[META_DATA_KEY.COLLAPSE] = collapsed
return meta_data
def get_comp_overrides(self):
return OrderedDict(sorted(self.comp_overrides.items(),
key=lambda x: x[0]))
def get_save_data(self):
"""Returns save data for this layer.
"""
save_dict = {
SAVE_KEY.VERSION: GRAPH_VERSION.VERSION_STR,
SAVE_KEY.ALIAS: self.get_alias(local=True),
SAVE_KEY.MUTE: self.get_muted(local=True),
SAVE_KEY.SOLO: self.get_soloed(local=True),
SAVE_KEY.REFERENCES: self.get_references(),
SAVE_KEY.COMP_ORVERRIDES: self.get_comp_overrides(),
SAVE_KEY.COLOR: self.color,
SAVE_KEY.META_DATA: self.get_meta_data(),
SAVE_KEY.NODES: self.get_nodes_save_data()}
rm = []
for k, v in save_dict.items():
if v in ((), [], {}):
rm += [k]
for k in rm:
save_dict.pop(k)
return self.order_save_dict(save_dict)
def order_save_dict(self, save_dict):
"""Given a save dictionary, returns an ordered dictionary where the
save keys are ordered correctly. Leftover keys are put at the end.
:param data: dictionary to order
:type data: dict
:return: ordered version of given dictionary
:rtype: OrderedDcit
"""
# order keys
keys_order = (SAVE_KEY.VERSION, SAVE_KEY.ALIAS, SAVE_KEY.COLOR,
SAVE_KEY.MUTE, SAVE_KEY.SOLO, SAVE_KEY.REFERENCES,
SAVE_KEY.COMP_ORVERRIDES, SAVE_KEY.META_DATA,
SAVE_KEY.NODES, SAVE_KEY.REAL_PATH)
result = OrderedDict()
data_keys = list(save_dict.keys())
for key in keys_order:
if key in data_keys:
result[key] = save_dict[key]
data_keys.remove(key)
# leftovers
for key in data_keys:
result[key] = save_dict[key]
return result
def get_nodes_save_data(self):
"""Gets an ordered dict of all of the spec nodes
:return: OrderedDict
"""
nodes_dict = OrderedDict()
for spec_node in self.spec_list:
node_data = nxt_node.get_node_as_dict(spec_node)
node_path = self.get_node_path(spec_node)
nodes_dict[node_path] = node_data
return order_nodes_dict(nodes_dict)
def propegate_real_path(self):
"""Update nodes source layer to current real path
"""
for node in self.descendants(return_type=LayerReturnTypes.Node):
setattr(node, nxt_node.INTERNAL_ATTRS.SOURCE_LAYER, self.real_path)
class CompLayer(SpecLayer):
@classmethod
def load_from_layer_data(cls, layer_data):
raise NotImplementedError
def __init__(self):
super(CompLayer, self).__init__({})
self._name = 'CompositeLayer'
self._layer_range = (0, 0)
self._dirty_map = {}
self._sublayer_node_tables = []
self.runtime = False
self.running = False
self._console = Console(_globals={}, _locals={}, node_path='STAGE')
self.cache_layer = CacheLayer()
def layer_idx(self):
return self._layer_range[0]
def get_node_dirties(self, node_path):
"""Get all nodes that depend on the given node path. The given node
path will be the 0th item in the return list.
:param node_path: String of node path
:return: List of string node paths
"""
concerns = self._dirty_map.get(node_path, [])[:]
new_concerns = [node_path] + concerns
while new_concerns:
_tmp = []
for concern in new_concerns:
found = self._dirty_map.get(concern, [])
for item in found:
if item not in concerns + _tmp:
_tmp += [item]
new_concerns = _tmp
concerns += _tmp
return concerns
class CacheLayer(SpecLayer):
def __init__(self):
super(CacheLayer, self).__init__({})
self.nodes = {}
self.node_time_map = {}
self.node_times = []
def was_during_node_exec(self, time_float):
"""Checks if given time float is within any executed node's exec
timeframe.
:param time_float: float of time
:return: bool
"""
for enter_time, exit_time in reversed(self.node_times):
if time_float >= enter_time and exit_time is None:
return True
if enter_time <= time_float <= exit_time:
return True
return False
def set_node_enter_time(self, node_path, t=0.):
if not t:
t = time.time()
node_runs = self.node_time_map.get(node_path, [])
time_entry = [[t, None]]
node_runs += time_entry
self.node_times += time_entry
self.node_time_map[node_path] = node_runs
def set_node_exit_time(self, node_path, t=0.):
if not t:
t = time.time()
try:
node_runs = self.node_time_map[node_path]
except IndexError:
raise IndexError('No start time set for {}'.format(node_path))
for run_time in node_runs:
if run_time[1] is None:
run_time.pop(1)
run_time += [t]
return
raise ValueError('No exec tuples for {} have a missing end '
'time!'.format(node_path))
def get_node_run_time(self, node_path, idx=-1):
try:
enter_time, exit_time = self.node_time_map[node_path][idx]
except (KeyError, IndexError):
raise ValueError('No time for {} at index {}'.format(node_path,
idx))
return exit_time - enter_time
def lookup(self, node_path):
return self.nodes.get(node_path)
def add_node(self, node_path, node):
cached_code = getattr(node, nxt_node.INTERNAL_ATTRS.CACHED_CODE, '\n')
spl_code = cached_code.split('\n')
setattr(node, nxt_node.INTERNAL_ATTRS.COMPUTE, spl_code)
self.nodes[node_path] = node
def save(self, filepath):
save_data = self.get_save_data()
nxt_io.save_file_data(save_data=save_data, filepath=filepath)
return save_data
def get_save_data(self):
save_data = {
'version': GRAPH_VERSION.VERSION_STR,
'nodes': {}
}
for path, node in self.nodes.items():
save_data['nodes'][path] = nxt_node.get_node_as_dict(node)
return save_data
@classmethod
def load_from_layer_data(cls, layer_data):
new_layer = cls()
result_nodes = {}
name_attr = nxt_node.INTERNAL_ATTRS.NAME
name_key = nxt_node.INTERNAL_ATTRS.as_save_key(name_attr)
for node_path, node_data in layer_data.get('nodes', {}).items():
parent_path = nxt_path.get_parent_path(node_path)
node_name = nxt_path.node_name_from_node_path(node_path)
node_data[name_key] = node_name
spec_node = nxt_node.create_spec_node(node_data, new_layer,
parent_path)
code_lines = getattr(spec_node, nxt_node.INTERNAL_ATTRS.COMPUTE)
code = '\n'.join(code_lines)
setattr(spec_node, nxt_node.INTERNAL_ATTRS.CACHED_CODE, code)
result_nodes[node_path] = spec_node
new_layer.nodes = result_nodes
return new_layer
[docs]def get_active_layers(layers):
"""Given a list of layers, return those that should contribute to comp.
If there are soloed layers, only the soloed layers that are not muted
will be returned. If there are not soloed layers, only the non-muted
layers will be returned.
:param layers: list of layers to filter
:type layers: list
:return: list of layers that should contribute to a comp.
:rtype: list
"""
soloed_layers = get_soloed_layers(layers)
potential_layers = soloed_layers or layers
result_layers = []
for layer in potential_layers:
if layer.get_muted():
continue
result_layers += [layer]
return result_layers
[docs]def get_soloed_layers(layers):
"""Given a list of layers, return only those that are soloed
:param layers: list of layers to filter
:type layers: list
:return: list of layers that are soloed.
:rtype: list
"""
return [layer for layer in layers if layer.get_soloed()]
[docs]def get_muted_layers(layers):
"""Given a list of layers, return only those that are muted
:param layers: list of layers to filter
:type layers: list
:return: list of layers that are muted.
:rtype: list
"""
return [layer for layer in layers if layer.get_muted()]
[docs]def get_node_local_attr_names(node_path, layers):
"""Get all attribute names that are local to the given node path within
the given layers.
NOTE that layers are filtered to active layers.
:param node_path: node path to find local attributes of
:type node_path: str
:param layers: list of layers to look for attributes of given node within
:type layers: list
:return: list of local attributes found in given layers
:rtype: list
"""
local_attrs = []
# TODO should we filter for active layers or expect callers to?
active_layers = get_active_layers(layers)
for layer in active_layers:
node = layer.lookup(node_path)
if not node:
continue
for attr in node.__dict__.keys():
if attr in local_attrs:
continue
if attr in nxt_node.INTERNAL_ATTRS.PROTECTED:
continue
if attr.endswith(nxt_node.META_ATTRS._suffix):
continue
local_attrs += [attr]
return local_attrs
def get_comped_layer_overs(layer, over_dict_attr):
over_dict = {}
over_dict.update(getattr(layer, over_dict_attr, {}))
parent = layer.parent_layer
while parent:
over_dict.update(getattr(parent, over_dict_attr, {}))
parent = parent.parent_layer
return over_dict
def get_root_parent(layer):
parent = layer.parent_layer
root = None
while parent:
root = parent
parent = parent.parent_layer
return root
[docs]def order_nodes_dict(node_dict):
"""Given a dictionary mapping node paths to node data, return an ordered
dict where the keys are sorted.
:param node_dict: dictionary mapping node paths to node data
:type node_dict: dict
:return: ordered dctionary with keys sosrted
:rtype: OrderedDict
"""
unsorted_paths = list(node_dict.keys())
sorted_paths = sorted(unsorted_paths)
ordered_nodes = OrderedDict()
for path in sorted_paths:
ordered_nodes[path] = node_dict[path]
return ordered_nodes
def sort_multidimensional_list(multi_list, sort_by_idx):
"""Takes a multi-dimensional list and sorts it by the length of a sub-list
item. The item who's length we sort by is
determined by the idx parameter.
If your input is:
```
sort_multidimensional_list(
[
[[1,2,3], ['a', 'b']],
[[4,5,6], ['a']]
], idx = 1)
```
Your output will be:
```
[
[[1,2,3], ['a']],
[[4,5,6], ['a', 'b']]
]
```
:param multi_list: multi-dimensional list
:param sort_by_idx: Index of sub list item to sort by
:return: list
"""
list_len = len(multi_list)
i = 0
while i <= list_len:
ii = 0
while ii < (list_len - i - 1):
item_len = len(multi_list[ii][sort_by_idx])
next_item_len = len(multi_list[ii + 1][sort_by_idx])
if item_len > next_item_len:
_temp = multi_list[ii]
multi_list[ii] = multi_list[ii + 1]
multi_list[ii + 1] = _temp
ii += 1
i += 1
return multi_list