
    6                         d Z ddlmZ ddlmZ ddlmZ ddlZddlmZ ddlm	Z	 ddl
mZ d	Zd
ZdZdZ G d d      Z G d de	j"                        Z G d d      Zy)zImplements logic for tracking task dependencies in task_graph_executor.

See go/parallel-processing-in-gcloud-storage for more information.
    )absolute_import)division)unicode_literalsN)List)errors)log   zTask Graph:z   - Task ID: {}
z^    - Task: {}
    - Dependency Count: {}
    - Dependent Task IDs: {}
    - Is Submitted: {}
c                       e Zd ZdZd Zd Zy)TaskWrappera  Embeds a Task instance in a dependency graph.

  Attributes:
    id (Hashable): A unique identifier for this task wrapper.
    task (googlecloudsdk.command_lib.storage.tasks.task.Task): An instance of a
      task class.
    dependency_count (int): The number of unexecuted dependencies this task has,
      i.e. this node's in-degree in a graph where an edge from A to B indicates
      that A must be executed before B.
    dependent_task_ids (Optional[Iterable[Hashable]]): The id of the tasks that
      require this task to be completed for their own completion. This value
      should be None if no tasks depend on this one.
    is_submitted (bool): True if this task has been submitted for execution.
  c                 J    || _         || _        d| _        || _        d| _        y )Nr   F)idtaskdependency_countdependent_task_idsis_submitted)selftask_idr   r   s       :lib/googlecloudsdk/command_lib/storage/tasks/task_graph.py__init__zTaskWrapper.__init__:   s(    DGDID0DD    c                    t         j                  | j                        t        j                  | j                  j
                  j                  | j                  rt        | j                        nd| j                  | j                        z   S )z3Returns a string representation of the TaskWrapper.r   )
TASK_WRAPPER_IDformatr   TASK_DETAILSr   	__class____name__r   lenr   )r   s    r   __str__zTaskWrapper.__str__A   so     	tww'II((&& ''(,-##	
	
	r   N)r   
__module____qualname____doc__r   r    r   r   r   r   *   s    r   r   c                       e Zd ZdZy)InvalidDependencyErrorzRaised on attempts to create an invalid dependency.

  Invalid dependencies are self-dependencies and those that involve nodes that
  do not exist.
  N)r   r   r    r!   r"   r   r   r$   r$   O   s    r   r$   c                   B    e Zd ZdZd Zd
dZd Zd Zd Zde	e
   fd	Zy)	TaskGrapha7  Tracks dependencies between Task instances.

  See googlecloudsdk.command_lib.storage.tasks.task.Task for the definition of
  the Task class.

  The public methods in this class are thread safe.

  Attributes:
    is_empty (threading.Event): is_empty.is_set() is True when the graph has no
      tasks in it.
  c                     t        j                         | _        | j                  j                          t        j                         | _        i | _        t        j                  |      | _        y)aS  Initializes a TaskGraph instance.

    Args:
      top_level_task_limit (int): A top-level task is a task that no other tasks
        depend on for completion (i.e. dependent_task_ids is None). Adding
        top-level tasks with TaskGraph.add will block until there are fewer than
        this number of top-level tasks in the graph.
    N)		threadingEventis_emptysetRLock_lock_task_wrappers_in_graph	Semaphore_top_level_task_semaphore)r   top_level_task_limits     r   r   zTaskGraph.__init__d   sR     OO%DMMM "DJ $&D 
 &/%8%89M%ND"r   Nc                 t   |du }|r| j                   j                          | j                  5  |j                  |j                  }nt	        |      }|| j
                  v r|j                  Nt        j                  j                  dj                  |j                  j                  |j                               nBt        j                  j                  dj                  |j                  j                               |r| j                   j                          	 ddd       yt        |||      }|xs g D ]%  }	 | j
                  |   xj                  dz  c_        ' || j
                  |j                  <   | j"                  j%                          ddd       |S # t        $ r t         w xY w# 1 sw Y   S xY w)a	  Adds a task to the graph.

    Args:
      task (googlecloudsdk.command_lib.storage.tasks.task.Task): The task to be
        added.
      dependent_task_ids (Optional[List[Hashable]]): TaskWrapper.id attributes
        for tasks already in the graph that require the task being added to
        complete before being executed. This argument should be None for
        top-level tasks, which no other tasks depend on.

    Returns:
      A TaskWrapper instance for the task passed into this function, or None if
      task.parallel_processing_key was the same as another task's
      parallel_processing_key.

    Raises:
      InvalidDependencyError if any id in dependent_task_ids is not in the
      graph, or if a the add operation would have created a self-dependency.
    NzcSkipping {} for {}. This can occur if a cp command results in multiple writes to the same resource.zoSkipping {}. This is probably because due to a bug that caused it to be submitted for execution more than once.   )r0   acquirer-   parallel_processing_keyr   r.   r   statusPrintr   r   r   releaser   r   KeyErrorr$   r*   clear)r   r   r   is_top_level_task
identifiertask_wrapperr   s          r   addzTaskGraph.add~   s   ( +d2
$$,,.			%	%	111
X
	t33	3''3
**

66<f..))4+G+G7IJ
 **

HHN..))I+,
 

(
(
0
0
2) 
, !T3EFl'-2-'	'

&
&w
/
@
@A
E
@ . 7Cd""<??3
mm? 
@   	'&
&	'7 
@ s*   C,F-"F-9"F5F-F**F--F7c                    | j                   5  |j                  rg cddd       S |j                  s|gcddd       S | j                  |j                  = |j
                  K| j                  j                          | j                  s| j                  j                          g cddd       S g }|j
                  D ]:  }| j                  |   }|xj                  dz  c_        || j                  |      z  }< |cddd       S # 1 sw Y   yxY w)aI  Recursively removes a task and its parents from the graph if possible.

    Tasks can be removed only if they have been submitted for execution and have
    no dependencies. Removing a task can affect dependent tasks in one of two
    ways, if the removal left the dependent tasks with no dependencies:
     - If the dependent task has already been submitted, it can also be removed.
     - If the dependent task has not already been submitted, it can be
       submitted for execution.

    This method removes all tasks that removing task_wrapper allows, and returns
    all tasks that can be submitted after removing task_wrapper.

    Args:
      task_wrapper (TaskWrapper): The task_wrapper instance to remove.

    Returns:
      An Iterable[TaskWrapper] that yields tasks that are submittable after
      completing task_wrapper.
    Nr3   )r-   r   r   r.   r   r   r0   r8   r*   r+   complete)r   r=   submittable_tasksr   dependent_task_wrappers        r   r@   zTaskGraph.complete   s    ( 
		&	&  
 && ~ 
 
&
&|
7		(	(	0&&..0++
--


+ 
4 !44'!%!=!=g!F//14/ 	T]]+ABB 5 C 
s   DDA$D+ADD
c                 h   | j                   5  |g|j                  [|j                  O|j                  D ]@  }| j                  |   }|j                  j
                  j                  |j                         B ||j                  s| j                  |      cddd       S |g}t        |j                        D ]K  }|D cg c]  }|j                   }}g }|D ])  }	| j                  |	|      }||j                  |       + M |s| j                  |       |cddd       S c c}w # 1 sw Y   yxY w)a  Updates the graph based on the output of an executed task.

    If some googlecloudsdk.command_lib.storage.task.Task instance `a` returns
    the following iterables of tasks: [[b, c], [d, e]], we need to update the
    graph as follows to ensure they are executed appropriately.

           /-- d <-\--/- b
      a <-/         \/
          \         /\
           \-- e <-/--\- c

    After making these updates, `b` and `c` are ready for submission. If a task
    does not return any new tasks, then it will be removed from the graph,
    potentially freeing up tasks that depend on it for execution.

    See go/parallel-processing-in-gcloud-storage#heading=h.y4o7a9hcs89r for a
    more thorough description of the updates this method performs.

    Args:
      executed_task_wrapper (task_graph.TaskWrapper): Contains information about
        how a completed task fits into a dependency graph.
      task_output (Optional[task.Output]): Additional tasks and
        messages returned by the task in executed_task_wrapper.

    Returns:
      An Iterable[task_graph.TaskWrapper] containing tasks that are ready to be
      executed after performing graph updates.
    N)r   )r-   messagesr   r.   r   received_messagesextendadditional_task_iteratorsr@   reversedr   r>   append)
r   executed_task_wrappertask_outputr   rB   parent_tasks_for_next_layertask_iteratorr=   r   r   s
             r   update_from_executed_taskz#TaskGraph.update_from_executed_task   s;   : 


!"".#66B,??G#'#?#?#H
 
 
%
%
7
7
>
>""$ @
 
	K$I$I }}23 
 &;$;! $K$I$IJ-0K
0KLOO0K 	 
 ')#!D$;MN,%'..|< " K )+,(C 
(
) 
s*   BD(D(>D#D(0)D(#D((D1c                 d   t         d| j                  j                          dt        | j                         g}| j                  rIt               }|j                  | j                  | j                  j                         t        |             n|j                  d       dj                  |      S )z1Returns a string representation of the TaskGraph.z
 - Empty: z - Task Wrappers: zNo tasks in the graph to print.
)TASK_GRAPH_HEADERr*   is_setr   r.   r+   rF   _print_task_wrapper_recursivevaluesINITIAL_INDENT_LEVELrI   join)r   outputprinted_taskss      r   r   zTaskGraph.__str__/  s     	
T]]))+,-
S!=!=>?@F
 ##emmm

,
,**113" mm5699Vr   returnc              #   8  K   |D ]  }|j                   |vs|j                  |j                          t        |       |j                  sG|j                  D cg c]  }| j                  |    }}| j                  ||dz   |      E d{     yc c}w 7 w)a  Recursively yields task wrappers and their dependencies.

    Example:
      Suppose we have task wrappers representing tasks with dependencies:

      task_wrapper1 = TaskWrapper(id='task1',
      dependent_task_ids=['task2', 'task3']),
      task_wrapper2 = TaskWrapper(id='task2', dependent_task_ids=['task4'])
      task_wrapper3 = TaskWrapper(id='task3', dependent_task_ids=[])
      task_wrapper4 = TaskWrapper(id='task4', dependent_task_ids=[])

      task_wrappers = [task_wrapper1, task_wrapper2,
                       task_wrapper3, task_wrapper4]

      Calling _print_task_wrapper_recursive(task_wrappers, 0, set())
      would produce:

      ['task1',
        '  task2',
        '    task4',
        '  task3']

      This shows the tasks and their dependencies formatted with appropriate
      indentation levels.

    Args:
      task_wrappers (list): List of task wrappers to print.
      indent_level (int): Current level of indentation for formatting.
      printed_tasks (set): Set of task IDs that have already been printed.


    Yields:
      List of formatted strings representing the task wrappers
      and their dependencies.
    r	   N)r   r>   strr   r.   rS   )r   task_wrappersindent_levelrX   r=   r   dependent_task_wrapperss          r   rS   z'TaskGraph._print_task_wrapper_recursiveC  s     N &		-,//*,** *<<%<' **73< " % 77%|a'7H H H &
%Hs(   B4BBB0BBB)N)r   r   r    r!   r   r>   r@   rN   r   r   r[   rS   r"   r   r   r&   r&   W   s7    
O48t5n>)@(1HCy1Hr   r&   )r!   
__future__r   r   r   r(   typingr   "googlecloudsdk.command_lib.storager   googlecloudsdk.corer   rU   rQ   r   r   r   Errorr$   r&   r"   r   r   <module>rd      sg   
 '  '   5 #  ! & " "JV\\ ]H ]Hr   