Source code for stockpyl.gsm_tree

# ===============================================================================
# stockpyl - gsm_tree Module
# -------------------------------------------------------------------------------
# Author: Larry Snyder
# License: GPLv3
# ===============================================================================

"""
.. include:: ../../globals.inc

Overview 
--------

The |mod_gsm_tree| module implements Graves and Willems's (2000, 2003) dynamic programming (DP)
algorithm for multi-echelon inventory systems with tree structures. 

.. note:: |node_stage|

The primary data object is the |class_network|, which contains all of the data
for the GSM instance.

.. note:: |fosct_notation|

.. seealso::

	For an overview of multi-echelon inventory optimization in |sp|,
	see the :ref:`tutorial page for multi-echelon inventory optimization<tutorial_meio_page>`.


References
----------
S. C. Graves and S. P. Willems. Optimizing strategic safety stock placement in supply chains. 
*Manufacturing and Service Operations Management*, 2(1):68-83, 2000.

S. C. Graves and S. P. Willems. Erratum: Optimizing strategic safety stock placement in supply chains. 
*Manufacturing and Service Operations Management*, 5(2):176-177, 2003.


API Reference
-------------
"""

import networkx as nx
import copy

from stockpyl.gsm_helpers import *
from stockpyl.helpers import *
from stockpyl.supply_chain_network import SupplyChainNetwork
from stockpyl.supply_chain_node import SupplyChainNode



### OPTIMIZATION ###

[docs]def optimize_committed_service_times(tree): """Optimize committed service times using the dynamic programming (DP) algorithm of Graves and Willems (2000, 2003). ``tree`` is the |class_network| containing the instance. The tree need not already have been pre-processed using :func:`preprocess_tree` or :func:`relabel_nodes`; this function will do so. Output parameters are expressed using the original labeling of tree, even if the nodes are relabeled internally. Demands are assumed to be normally distributed. Parameters ---------- tree : |class_network| The multi-echelon tree network. Current node labels are ignored and may be anything. Returns ------- opt_cst : dict Dict of optimal CSTs, with node indices as keys and CSTs as values. opt_cost : float Optimal expected cost of system. Raises ------ ValueError If any sink node (node with no successors) has no demand mean or standard devation provided. **Example** (Example 6.5): .. testsetup:: * from stockpyl.gsm_tree import * .. doctest:: >>> from stockpyl.instances import load_instance >>> tree = load_instance("example_6_5") >>> opt_cst, opt_cost = optimize_committed_service_times(tree) >>> opt_cst {1: 0, 3: 0, 2: 0, 4: 1} >>> opt_cost 8.277916867529369 References ---------- S. C. Graves and S. P. Willems. Optimizing strategic safety stock placement in supply chains. *Manufacturing and Service Operations Management*, 2(1):68-83, 2000. S. C. Graves and S. P. Willems. Erratum: Optimizing strategic safety stock placement in supply chains. *Manufacturing and Service Operations Management*, 5(2):176-177, 2003. """ # Validate parameters. for n in tree.sink_nodes: if n.demand_source.mean is None: raise ValueError(f'All sink nodes must have demand_source.mean (node {n.index} does not).') if n.demand_source.standard_deviation is None: raise ValueError(f'All sink nodes must have demand_source.standard_deviation (node {n.index} does not).') # Preprocess tree. tree = preprocess_tree(tree) # Relabel nodes. tree = relabel_nodes(tree) # Solve. opt_cst_relabeled, opt_cost = _cst_dp_tree(tree) # Prepare optimal solution in terms of original labels. opt_cst = {k.original_label: opt_cst_relabeled[k.index] for k in tree.nodes} return opt_cst, opt_cost
def _cst_dp_tree(tree): """Optimize committed service times on pre-processed tree. Optimization is performed using the dynamic programming (DP) algorithm of Graves and Willems (2000). ``tree`` is the |class_network| containing the instance. It must be pre-processed before calling this function. Assumes demand bound over tau periods is of the form :math:`z_\\alpha\\sigma\\sqrt{\\tau}`. Parameters ---------- tree : |class_network| The multi-echelon tree network. Current node labels are ignored and may be anything. Returns ------- opt_cst : dict Dict of optimal CSTs, with node indices as keys and CSTs as values. opt_cost : float Optimal expected cost of system. """ # Initialize dicts to store values of theta_in(.) and theta_out(.) functions # (called f(.) and g(.) in Graves and Willems). theta_in = {k_index: {} for k_index in tree.node_indices} theta_out = {k_index: {} for k_index in tree.node_indices} # Get min and max node indices (for convenience). min_k_index = int(np.min(tree.node_indices)) max_k_index = int(np.max(tree.node_indices)) # Initialize best_cst_adjacent. # best_cst_adjacent[k_index][S][i] = CST chosen for stage i when calculating # theta_out(S) or theta_in(SI) for stage k. best_cst_adjacent = {k.index: {S: {} for S in range(k.max_replenishment_time+1)} for k in tree.nodes} # Loop through stages. for k_index in range(min_k_index, max_k_index + 1): # Get shortcuts to some parameters (for convenience). k = tree.get_node_from_index(k_index) max_replen_time = k.max_replenishment_time proc_time = k.processing_time # Evaluate theta_out(k_index, S) if p(k_index) is downstream from k_index and # and k_index < final k_index, evaluate theta_in(k_index, SI) otherwise. if k_index < max_k_index and k.larger_adjacent_node_is_downstream: # p(k_index) is downstream from k_index -- evaluate theta_out(k_index, S). for S in range(max_replen_time+1): # Calculate theta_out. theta_out[k_index][S], temp_best_cst_adjacent = \ _calculate_theta_out(tree, k_index, S, theta_in, theta_out) # Copy temp_best_cst_adjacent to best_cst_adjacent. best_cst_adjacent[k_index][S] = {i: temp_best_cst_adjacent[i] for i in temp_best_cst_adjacent} # Set values of theta_out and best_cst_adjacent for # max_replenishment_time+1 to max_max_replenishment_time to # theta_out(max_replenishment_time). # Needed so that stages with larger max_replenishment_time don't # encounter undefined values of theta_out. for S in range(max_replen_time+1, tree.max_max_replenishment_time + 1): theta_out[k_index][S] = theta_out[k_index][max_replen_time] best_cst_adjacent[k_index][S] = best_cst_adjacent[k_index][max_replen_time] else: # p(k_index) is upstream from k_index -- evaluate theta_in(k_index, SI). for SI in range(max_replen_time - proc_time + 1): # Calculate theta_in. theta_in[k_index][SI], temp_best_cst_adjacent = \ _calculate_theta_in(tree, k_index, SI, theta_in, theta_out) # Copy temp_best_cst_adjacent to best_cst_adjacent. best_cst_adjacent[k_index][SI] = {i: temp_best_cst_adjacent[i] for i in temp_best_cst_adjacent} # Set values of theta_in and best_cst_adjacent for # max_replenishment_time+1 to max_max_replenishment_time to # theta_in(max_replenishment_time - processing_time). # Needed so that stages with larger max_replenishment_time don't # encounter undefined values of theta_in. for SI in range(max_replen_time - proc_time + 1, tree.max_max_replenishment_time + 1): theta_in[k_index][SI] = theta_in[k_index][max_replen_time - proc_time] best_cst_adjacent[k_index][SI] = \ best_cst_adjacent[k_index][max_replen_time - proc_time] # Determine best value of SI for final stage. max_k_node = tree.get_node_from_index(max_k_index) SI_dict = {SI: theta_in[max_k_index][SI] for SI in range(max_k_node.max_replenishment_time - max_k_node.processing_time + 1)} # smaller range of SI best_theta_in, best_SI = min_of_dict(SI_dict) # Initialize dict of optimal CSTs and optimal inbound CSTs. opt_cst = {} opt_in_cst = {} # Backtrack to find optimal CSTs: Loop backwards through stages k_index; # if p(k_index) is downstream from k_index, then set k_index's outbound CST to p(k_index)'s # optimal inbound CST (which we get from best_cst_adjacent[p(k_index)][CST(p(k_index))]); # if p(k_index) is upstream from k_index, then set k_index's inbound CST to p(k_index)'s optimal # outbound CST and set k_index's outbound CST to the optimal for that inbound # CST (from best_cst_adjacent[p(k_index)][CST(p(k_index))]). # For each stage, remember optimal outbound _and_ inbound CSTs. for k_index in range(max_k_index, min_k_index-1, -1): # Get node k. k = tree.get_node_from_index(k_index) # Get p(k_index), and determine whether p(k_index) and p(p(k_index)) are upstream or # downstream (for convenience). if k_index < max_k_index: pk = k.larger_adjacent_node pk_is_downstream = k.larger_adjacent_node_is_downstream if pk < max_k_index: ppk_is_downstream = tree.get_node_from_index(pk).larger_adjacent_node_is_downstream # Where is p(k_index)? if k_index == max_k_index: # This is final stage. opt_cst[k_index] = best_cst_adjacent[k_index][best_SI][k_index] opt_in_cst[k_index] = best_SI elif pk_is_downstream: # p(k_index) is downstream from k. Is p(p(k_index)) upstream or downstream from p(k_index)? if pk != max_k_index and ppk_is_downstream: # p(p(k_index)) is downstream from p(k_index) -- that means that optimal # CST values are stored in best_cst_adjacent[pk][opt_cst[pk]][.]. opt_cst[k_index] = best_cst_adjacent[pk][opt_cst[pk]][k_index] else: # p(p(k_index)) is upstream from p(k_index) (or it's the final node) -- # that means that optimal CST values are stored in # best_cst_adjacent[pk][opt_in_cst[ppk]][.]. opt_cst[k_index] = best_cst_adjacent[pk][opt_in_cst[pk]][k_index] opt_in_cst[k_index] = best_cst_adjacent[k_index][opt_cst[k_index]][k_index] else: # p(k_index) is upstream from k. Is p(p(k_index)) upstream or downstream from p(k_index)? if pk != max_k_index and ppk_is_downstream: # p(p(k_index)) is downstream from p(k_index) -- that means that optimal # inbound CST values are stored in # best_cst_adjacent[pk][opt_cst[pk]][.]. opt_in_cst[k_index] = best_cst_adjacent[pk][opt_cst[pk]][k_index] else: # p(p(k_index)) is upstream from p(k_index) (or it's the final node) -- # that means that optimal inbound CST values are stored in # best_cst_adjacent[pk][opt_in_cst[pk]][.]. opt_in_cst[k_index] = best_cst_adjacent[pk][opt_in_cst[pk]][k_index] opt_cst[k_index] = best_cst_adjacent[k_index][opt_in_cst[k_index]][k_index] # If outbound CST for k_index is greater than k_index's external outbound CST, # reset it. opt_cst[k_index] = min(opt_cst[k_index], k.external_outbound_cst) # Get optimal cost. opt_cost = best_theta_in return opt_cst, opt_cost def _calculate_theta_out(tree, k_index, S, theta_in_partial, theta_out_partial): """Calculate the function :math:`\\theta^o_k(S)` as described in Section 6.3.6.2 of |fosct| [function :math:`f_i(S)` in Section 5 of Graves and Willems (2003)]. Original function is modified in the following ways: 1. If :math:`S` is greater than the external outbound CST for stage :math:`k`, :math:`\\theta^o_k(S)` is calculated as though :math:`S` = external outbound CST. (If :math:`k` is a sink stage, :math:`\\theta^o_k(\\cdot)` will never be calculated [:math:`\\theta^i_k(\\cdot)` will be], but :math:`k` might have non-zero external outbound CST even if it is not a sink stage.) 2. The range of values of :math:`SI` for which :math:`c_k(S,SI)` is evaluated begins at :math:`\\max\\{\\text{external_inbound_cst}, S - T_k\\}`, not :math:`\\max\\{0, S - T_k\\}`. 3. The demand bound demand bound over :math:`\\tau` periods is assumed to be of the form :math:`z_\\alpha\\sigma\\sqrt{\\tau}`. 4. When calculating :math:`c_k(S, SI)`, upstream nodes are allowed to use outbound CSTs greater than :math:`SI`, and downstream nodes are allowed to use inbound CSTs greater than :math:`S`. In effect, this allows multiple inbound/outbound CSTs for a single node. Parameters ---------- tree : |class_network| The multi-echelon tree network. Tree must be pre-processed already. k_index : int Index of node. S : int Outbound committed service time. theta_in_partial : dict Dict of values of theta_in function that have been calculated so far (i.e., for ``i`` < ``k_index``). theta_out_partial : dict Dict of values of theta_out function that have been calculated so far (i.e., for ``i`` < ``k_index``). Returns ------- theta_out_k_S : float The value of :math:`\\theta^o_k(S)`. best_cst_adjacent : dict Dict indicating, for each adjacent stage :math:`i` with :math:`i \\le k`, the CST value that minimized :math:`\\theta^o_i(\\cdot)` for the optimal value of :math:`SI`. * If :math:`i` serves :math:`k`, then ``best_CST_adjacent[i]`` = the value of :math:`S_i` that minimizes :math:`\\theta^o_i(S_i)`. * If :math:`i` is served by :math:`k`, then ``best_CST_adjacent[i]`` = the value of :math:`SI_i` that minimizes :math:`\\theta^i_i(SI_i)`. * If :math:`i = k`, then ``best_CST_adjacent[i]`` = the best value of :math:`SI` chosen in minimization of :math:`\\theta^o_k(\\cdot)`. """ # Get node k_index, for convenience. k = tree.get_node_from_index(k_index) # Initialize min_c. min_c = float('inf') # Initialize output dict. best_cst_adjacent = {} # Initialize dict of c_k(S, SI) values (keys = SI values). c_SI = {} # Set local_S: If S > external_outbound_cst[k_index], must pretend # S = external_outbound_cst[k_index], otherwise stage thinks it can promise a # longer outbound CST than it really can. # Note: external outbound CST defaults to BIG_INT if not provided. local_S = min(S, k.external_outbound_cst) # Check whether S <= external outbound CST (otherwise this S is infeasible). # Note that external outbound CST defaults to BIG_INT if not provided. if S <= k.external_outbound_cst: # Loop through SI values between max(external inbound CST[k_index], # S - T_k) and M_k - T_k. # Note that external inbound CST defaults to 0 if not provided. lo_SI = max(k.external_inbound_cst, local_S - k.processing_time) hi_SI = k.max_replenishment_time - k.processing_time for SI in range(lo_SI, hi_SI+1): # Calculate c_k(S, SI). c_SI[SI], stage_cost, best_upstream_S, best_downstream_SI = \ _calculate_c(tree, k_index, local_S, SI, theta_in_partial, theta_out_partial) # Compare to min. if c_SI[SI] < min_c: # Remember min cost and value of SI that attained it. min_c = c_SI[SI] best_cst_adjacent[k_index] = SI # Remember values of other CSTs that attained min cost. for i in range(np.min(tree.node_indices), k_index): if i in k.predecessor_indices(): best_cst_adjacent[i] = best_upstream_S[i] elif i in k.successor_indices(): best_cst_adjacent[i] = best_downstream_SI[i] # Capture theta_out_k_S. theta_out_k_S = min_c return theta_out_k_S, best_cst_adjacent def _calculate_theta_in(tree, k_index, SI, theta_in_partial, theta_out_partial): """Calculate the function :math:`\\theta^i_k(SI)` as described in Section 6.3.6.2 of |fosct| [function :math:`g_i(SI)` in Section 5 of Graves and Willems (2003)]. Original function is modified in the following ways: 1. If :math:`SI` is less than the external inbound CST for stage :math:`k`, :math:`\\theta^i_k(SI)` is calculated as though :math:`SI` = external inbound CST. (If :math:`k` is a source stage, :math:`\\theta^i_k(\\cdot)` will never be calculated [:math:`\\theta^o_k(\\cdot)` will be], but :math:`k` might have non-zero external inbound CST even if it is not a source stage.) 2. The demand bound demand bound over :math:`\\tau` periods is assumed to be of the form :math:`z_\\alpha\\sigma\\sqrt{\\tau}`. 3. When calculating :math:`c_k(S, SI)`, upstream nodes are allowed to use outbound CSTs greater than :math:`SI`, and downstream nodes are allowed to use inbound CSTs greater than :math:`S`. In effect, this allows multiple inbound/outbound CSTs for a single node. Parameters ---------- tree : |class_network| The multi-echelon tree network. Tree must be pre-processed already. k_index : int Index of node. SI : int Inbound committed service time. theta_in_partial : dict Dict of values of theta_in function that have been calculated so far (i.e., for :math:`i < k`). theta_out_partial : dict Dict of values of theta_out function that have been calculated so far (i.e., for :math:`i < k`). Returns ------- theta_in_k_SI : float The value of :math:`\\theta^i_k(SI)`. best_cst_adjacent : dict Dict indicating, for each adjacent stage :math:`i` with :math:`i \\le k`, the CST value that minimized :math:`\\theta^i_i(\\cdot)` for the optimal value of S. * If :math:`i` serves :math:`k`, then ``best_CST_adjacent[i]`` = the value of :math:`S_i` that minimizes :math:`\\theta^o_i(S_i)`. * If :math:`i` is served by :math:`k`, then ``best_CST_adjacent[i]`` = the value of :math:`SI_i` that minimizes :math:`\\theta^i_i(SI_i)`. * If :math:`i = k`, then ``best_CST_adjacent[i]`` = the best value of :math:`S` chosen in minimization of :math:`theta^i(\\cdot)`. """ # Get node k_index, for convenience. k = tree.get_node_from_index(k_index) # Initialize min_c. min_c = float('inf') # Initialize output dict. best_cst_adjacent = {} # Initialize dict of c_k(S, SI) values (keys = S values). c_S = {} # Set local_SI: If SI < external_inbound_cst[k_index], must pretend # SI = external_inbound_cst[k_index], otherwise stage thinks it can get a # shorter inbound CST than it really can. # Note: external inbound CST defaults to 0 if not provided. local_SI = max(SI, k.external_inbound_cst) # Loop through S values between 0 and min(SI + T_k, external outbound CST[k_index]). # Note that external outbound CST defaults to BIG_INT if not provided. lo_S = 0 hi_S = min(local_SI + k.processing_time, k.external_outbound_cst) for S in range(lo_S, hi_S+1): # Calculate c_k(S, SI). c_S[S], stage_cost, best_upstream_S, best_downstream_SI = \ _calculate_c(tree, k_index, S, local_SI, theta_in_partial, theta_out_partial) # Compare to min. if c_S[S] < min_c: # Remember min cost and value of S that attained it. min_c = c_S[S] best_cst_adjacent[k_index] = S # Remember values of other CSTs that attained min cost. for i in range(np.min(tree.node_indices), k_index): if i in k.predecessor_indices(): best_cst_adjacent[i] = best_upstream_S[i] elif i in k.successor_indices(): best_cst_adjacent[i] = best_downstream_SI[i] # Capture theta_in_k_SI. theta_in_k_SI = min_c return theta_in_k_SI, best_cst_adjacent def _calculate_c(tree, k_index, S, SI, theta_in_partial, theta_out_partial): """Calculate :math:`c_k(S,SI)`, the expected holding cost for :math:`N_k` as function of inbound and outbound CSTs at node :math:`k`. Assumes demand bound over :math:`\\tau` periods is of the form :math:`z_\\alpha\\sigma\\sqrt{\\tau}`. Upstream nodes are allowed to use outbound CSTs greater than :math:`SI` and downstream nodes are allowed to use inbound CSTs greater than :math:`S`. In effect, this allows multiple inbound/outbound CSTs for a single :math:`k`. Parameters ---------- tree : |class_network| The multi-echelon tree network. Tree must be pre-processed already. k_index : int Index of node. S : int Outbound committed service time. SI : int Inbound committed service time. theta_in_partial : dict Dict of values of :math:`\\theta^i` function that have been calculated so far (i.e., for :math:`i < k`). theta_out_partial : dict Dict of values of :math:`\\theta^o` function that have been calculated so far (i.e., for :math:`i < k`). Returns ------- cost : float Value of :math:`c_k(S,SI)`. stage_cost : float Cost to hold inventory at stage :math:`k` (only) given CSTs of :math:`SI` and :math:`S`. best_upstream_S : dict Dict indicating, for each :math:`i` that is immediately upstream from :math:`k`, the best outbound CST for :math:`i` given :math:`k`'s CSTs of :math:`SI` and :math:`S`. best_downstream_SI : dict Dict indicating, for each :math:`i` that is immediately downstream from :math:`k`, the best inbound CST for :math:`i` given :math:`k`'s CSTs of :math:`SI` and :math:`S`. """ # Get node k, for convenience. k = tree.get_node_from_index(k_index) # Initialize output dicts. best_upstream_S = {} best_downstream_SI = {} # Calculate safety stock. safety_stock = k.demand_bound_constant * \ k.net_demand_standard_deviation * \ math.sqrt(SI + k.processing_time - S) # Set stage_cost equal to holding cost at node k. stage_cost = k.holding_cost * safety_stock # Initialize cost to holding cost at node k. cost = stage_cost # Add theta_out_partial(i) for nodes i that are immediately upstream from k_index # and have smaller index. (At this point, theta_out_partial(i) has already been # calculated for i < k_index.) for i in k.predecessor_indices(): if i < k_index: # Build dict of theta_out_partial(i, S2) values for S2 <= SI. theta_out_values = {S2: theta_out_partial[i][S2] for S2 in range(SI + 1)} # Find min value and argmin of theta_out_partial(i, S2) where S2 <= SI. min_theta_out, best_upstream_S[i] = min_of_dict(theta_out_values) # Add min value of theta_out_partial to cost. cost += min_theta_out # Add theta_in_partial(j) for nodes j that are immediately downstream from k_index # and have smaller index. (At this point, theta_in_partial(j) has already been # calculated for j < k_index.) for j in k.successor_indices(): if j < k_index: # Build dict of theta_in_partial(j, SI2) values for SI2 >= S. theta_in_values = {SI2: theta_in_partial[j][SI2] for SI2 in range(S, tree.max_max_replenishment_time + 1)} # Find min value and argmin of theta_in_partial(i, SI2) for SI2 >= S. min_theta_in, best_downstream_SI[j] = min_of_dict(theta_in_values) # Add min value of theta_in_partial to cost. cost += min_theta_in return cost, stage_cost, best_upstream_S, best_downstream_SI ### GRAPH MANIPULATION ###
[docs]def preprocess_tree(tree): """Preprocess the GSM tree. Returns an independent copy. If tree is already correctly labeled, does not relabel it. Fill node-level attributes: ``net_demand_mean``, ``net_demand_standard_deviation``, ``larger_adjacent_node``, ``max_replenishment_time``. Fill missing data for ``demand_bound_constant``, ``external_inbound_cst``, and ``external_outbound_cst attributes``. Fill ``max_max_replenishment_time`` network-level attribute. Parameters ---------- tree : |class_network| The multi-echelon tree network. Current node labels are ignored and may be anything. start_index : int, optional Integer to use as starting (smallest) node label. Returns ------- new_tree : |class_network| Pre-processed multi-echelon tree network. """ new_tree = copy.deepcopy(tree) # Fill external inbound and outbound CST parameters, if not provided. # Default value of external outbound CST = BIG_INT. # Default value of external inbound CST = 0. (Not strictly necessary, # but cleaner.) for k in new_tree.nodes: if k.external_inbound_cst is None: k.external_inbound_cst = 0 if k.external_outbound_cst is None: k.external_outbound_cst = BIG_INT # Fill demand bound constant parameters, if not provided. # Set equal to demand bound constant of sink node. If more than one sink node, # one is chosen arbitrarily. If no sink nodes have demand bound constant, # constant is set to 1. sinks_with_dbc = [k for k in new_tree.sink_nodes if k.demand_bound_constant is not None] for k in new_tree.nodes: if k.demand_bound_constant is None: if sinks_with_dbc == []: k.demand_bound_constant = 1 else: k.demand_bound_constant = \ sinks_with_dbc[0].demand_bound_constant # Calculate net demand parameters. net_demand_means, net_demand_standard_deviations = _net_demand(new_tree) for k in new_tree.nodes: k.net_demand_mean = net_demand_means[k.index] k.net_demand_standard_deviation = net_demand_standard_deviations[k.index] # Calculate max replenishment times. max_replenishment_times = _longest_paths(new_tree) for k in new_tree.nodes: k.max_replenishment_time = max_replenishment_times[k.index] # Calculate maximum value of max_replenishment_time. new_tree.max_max_replenishment_time = \ np.max([k.max_replenishment_time for k in new_tree.nodes]) return new_tree
[docs]def relabel_nodes(tree, start_index=0, force_relabel=False): """Perform the node-labeling algorithm described in Section 5 of Graves and Willems (2000). If tree is already correctly labeled, returns the original tree, unless ``force_relabel`` is ``True``, in which case performs the relabeling. Does not modify the input tree. Fills ``original_label``, ``larger_adjacent_node``, and ``larger_adjacent_node_is_downstream`` attributes of nodes in new tree, whether or not original tree was already correctly labeled. Parameters ---------- tree : |class_network| The multi-echelon tree network. Current node labels are ignored (unless the tree is already correctly labeled) and may be anything. start_index : int, optional Integer to use as starting (smallest) node label. force_relabel : bool, optional If ``True``, function will relabel nodes even if original tree is correctly labeled. Returns ------- relabeled_tree : |class_network| The relabeled tree network. """ # Check whether tree is already correctly labeled. is_correct = is_correctly_labeled(tree) # Do relabel? if is_correct and not force_relabel: relabeled_tree = copy.deepcopy(tree) new_labels = {k.index: k.index for k in tree.nodes} else: # Initialize all nodes to "unlabeled", and initialize list of new labels. labeled = {i.index: False for i in tree.nodes} new_labels = {} # Find nodes that are adjacent to at most 1 unlabeled node and label them. for k_index in range(start_index, start_index+len(tree.nodes)): # Find a node for labeling. for i in tree.nodes: # Make sure i is unlabeled. if not labeled[i.index]: # Count unlabeled nodes that are adjacent to node i. num_adj = len([j for j in i.neighbor_indices if not labeled[j]]) # If i is adjacent to at most 1 unlabeled node, label it. if num_adj <= 1: # Change i's label to k_index. new_labels[i.index] = k_index # Mark i as labeled. labeled[i.index] = True # Break out of 'for i' loop break # Relabel the nodes relabeled_tree = copy.deepcopy(tree) relabeled_tree.reindex_nodes(new_labels) # Fill attributes of relabeled tree. larger_adjacent, downstream = _find_larger_adjacent_nodes(relabeled_tree) for k in tree.nodes: relabeled_node = relabeled_tree.get_node_from_index(new_labels[k.index]) relabeled_node.original_label = k.index if new_labels[k.index] < np.max(list(new_labels.values())): relabeled_node.larger_adjacent_node = larger_adjacent[relabeled_node.index] relabeled_node.larger_adjacent_node_is_downstream = downstream[relabeled_node.index] else: # Largest-indexed node has no larger adjacent node. relabeled_node.larger_adjacent_node = None relabeled_node.larger_adjacent_node_is_downstream = None return relabeled_tree
[docs]def is_correctly_labeled(tree): """Determine whether tree is already correctly labeled. Tree is correctly labeled if all labels are consecutive integers and every stage (other than the highest-indexed one) has exactly one adjacent stage with a greater index. Parameters ---------- tree : |class_network| The multi-echelon tree network. Returns ------- ``True`` if tree is already correctly labeled. """ # Get indices. ind = tree.node_indices # Check whether labels are consecutive integers starting at min_index. min_index = np.min(ind) try: if set(ind) != set(range(min_index, min_index + len(ind))): is_correct = False else: # Check whether every node has exactly one adjacent node with # greater index. is_correct = True for k in tree.nodes: if k.index < np.max(ind): greater_indexed_neighbors = \ {i_ind for i_ind in k.predecessor_indices() if i_ind > k.index}.union( {i_ind for i_ind in k.successor_indices() if i_ind > k.index} ) if len(greater_indexed_neighbors) != 1: is_correct = False except: is_correct = False return is_correct
def _find_larger_adjacent_nodes(tree): """Find larger-indexed adjacent node, for each node in tree. After the nodes are relabeled by :func:`relabel_nodes`, each node (except the node with the largest index) is adjacent to exactly one node with a larger index. Node :math:`k`'s neighbor with larger index is denoted :math:`p(k)` in Graves and Willems (2000). This function finds :math:`p(k)` for all :math:`k` and also indicates whether :math:`p(k)` is upstream or downstream from :math:`k`. Parameters ---------- tree : |class_network| The multi-echelon tree network. Nodes are assumed to have been relabeled using :func:`relabel_nodes`. Returns ------- larger_adjacent: dict Dict containing index of each node's larger-indexed adjacent node, for all nodes except the largest-indexed node. Keys are node indices. downstream: dict Dict containing, for each node, ``True`` if the larger-indexed adjacent node is downstream from the node, ``False`` if it is upstream, for all nodes except the largest-indexed node. Keys are node indices. """ # Initialize dicts. larger_adjacent = {} downstream = {} # Loop through nodes. for k in tree.nodes: if k.index < np.max(tree.node_indices): # Get list of nodes that are adjacent to k and have a larger index, # but the list will only contain a single item; set larger_adjacent[k_index] to it. larger_adjacent_list = [i.index for i in k.neighbors if i.index > k.index] larger_adjacent[k.index] = larger_adjacent_list[0] # Set downstream flag. if larger_adjacent[k.index] in k.successor_indices(): downstream[k.index] = True else: downstream[k.index] = False return larger_adjacent, downstream def _longest_paths(tree): """Determine the length of the longest path from any source node to to each node :math:`k`. Arc lengths are determined by processing times. External inbound CSTs are considered source nodes, so having an external inbound CST at node :math:`i` of 5 is like having another node that serves node :math:`i` with a processing time of 5. Parameters ---------- tree : |class_network| The multi-echelon tree network. Nodes are assumed to have been relabeled using :func:`relabel_nodes`. Returns ------- longest_lengths : dict Dict of longest paths to each node, with keys equal to node indices. [:math:`M_k`]. """ # Build the tree as networkx DiGraph. Set the weight of each edge into a given node k to the processing # time of node k. If node k is a source node and/or it has an external inbound CST, # add a dummy node before node k and set weight of edge from dummy node to node k # equal to processing time of node k + external inbound CST for node k. temp_tree = tree.networkx_digraph() for k in tree.nodes: for p in k.predecessors(): temp_tree[p.index][k.index]['weight'] = k.processing_time if temp_tree.in_degree(k.index) == 0 or (k.external_inbound_cst or 0) > 0: temp_tree.add_edge('dummy_' + str(k.index), k.index, weight=k.processing_time + (k.external_inbound_cst or 0)) # Determine shortest path between every pair of nodes. # (Really there's only one path, but shortest path is the # most straightforward algorithm to use here.) path_lengths = dict(nx.shortest_path_length(temp_tree, weight='weight')) # Determine longest shortest path to each node k, among all # source nodes that are ancestors to k. longest_lengths = {} for k in tree.nodes: longest_lengths[k.index] = max([path_lengths[i][k.index] for i in nx.ancestors(temp_tree, k.index)], default=0) return longest_lengths def _net_demand(tree): """Calculate net demand mean and standard deviation for all nodes in tree. Net demand is the demand stream consisting of the external demand for the node plus all downstream demand. Parameters ---------- tree : |class_network| The multi-echelon tree network. Returns ------- net_means : dict Dict of net mean for each node. net_standard_deviations : dict Dict of net standard deviation for each node. """ # Initialize net_mean and net_variances using each node's external demand. net_means = {k.index: k.demand_source.mean or 0 for k in tree.nodes} net_variances = {k.index: (k.demand_source.standard_deviation or 0)**2 for k in tree.nodes} # Make temp copy of tree. temp_tree = copy.deepcopy(tree) # Loop through temp_tree. At each iteration, handle leaf nodes (nodes with # no _successors), adding their net_means and net_variances to those of their # _predecessors. Then remove the leaf nodes and iterate. while len(temp_tree.nodes) > 0: leaf_nodes = temp_tree.sink_nodes for k in leaf_nodes: for i in k.predecessor_indices(): net_means[i] += net_means[k.index] net_variances[i] += net_variances[k.index] temp_tree.remove_node(k) net_standard_deviations = {k.index: math.sqrt(net_variances[k.index]) for k in tree.nodes} return net_means, net_standard_deviations def _connected_subgraph_nodes(tree): """Determine nodes connected to :math:`k` in subgraph on :math:`\\{\\min_k,\\ldots,k\\}`, for each :math:`k`, where :math:`\\min_k` is smallest index in graph. [:math:`N_k`] "Connected" does not necessarily mean "adjacent." Parameters ---------- tree : |class_network| The multi-echelon tree network. Returns ------- connected_nodes : dict Dict of set of connected subgraph nodes for each node. """ # Intiailize output dict. connected_nodes = {} # Convert to networkx DiGraph so we can use subgraph(). networkx_tree = tree.networkx_digraph() # Loop through nodes. for k in networkx_tree: # Build subgraph on {min_k,...,k}. subgraph = networkx_tree.to_undirected().subgraph(range(np.min(networkx_tree.nodes), k+1)) # Build set of connected nodes. connected_nodes[k] = set(i for i in subgraph.nodes if nx.has_path(subgraph, i, k)) return connected_nodes
[docs]def gsm_to_ssm(tree, p=None): """Convert GSM tree to SSM tree: - Convert local to echelon holding costs. - Convert processing times to lead times. - Include stockout cost at demand nodes (if provided). Tree must be pre-processed before calling. Parameters ---------- tree : |class_network| The multi-echelon tree network. p : float or dict, optional Stockout cost to use at demand nodes, or dict indicating stockout cost for each demand node. If ``None``, copies ``stockout_cost`` field from tree for nodes that have it, and does not fill ``stockout_cost`` for nodes that do not. Returns ------- SSM_tree : |class_network| SSM representation of tree. """ # Build new graph. SSM_tree = SupplyChainNetwork() # Add nodes. for n in tree.nodes: upstream_h = np.sum([k.local_holding_cost for k in n.predecessors()]) SSM_tree.add_node(SupplyChainNode(n.index, name=n.name, network=SSM_tree, shipment_lead_time=n.processing_time+n.external_inbound_cst, echelon_holding_cost=n.local_holding_cost-upstream_h)) SSM_node = SSM_tree.get_node_from_index(n.index) SSM_node.demand_source = copy.deepcopy(n.demand_source) if p is not None: if n.demand_source is not None: if isinstance(p, dict): SSM_node.stockout_cost = p[SSM_node.index] else: SSM_node.stockout_cost = p else: if n.stockout_cost is not None: SSM_node.stockout_cost = n.stockout_cost # Add edges. edge_list = tree.edges SSM_tree.add_edges_from_list(edge_list) return SSM_tree