
    5                     ^   d Z ddlmZ ddlmZ ddlmZ ddlZddlZddlZddlZddl	Z	ddl
mZ ddlZddlmZ ddlmZ dd	lmZ dZd
Z G d de      Z G d de      Z ej,                  ej.                         G d de             Z ej,                  ej.                         G d de             Z G d de      Z G d de      Z G d de      Z G d de      Z G d de      Z G d de      Z  G d d e      Z! G d! d"e      Z" G d# d$ejF                        Z$ G d% d&e      Z%d' Z&y)(a.  Parallel execution pools based on multithreading.

This module provides 2 types of pools:
- NullPool: executes work synchronously, in the current process
- ThreadPool: executes work across multiple threads

It also contains a convenience method GetPool to get the appropriate pool for
the given number of threads.

The general usage is as follows:

>>> def identity(value): return value
>>> with parallel.GetPool(num_threads) as pool:
...   future = pool.ApplyAsync(identity, (42,))
...   assert future.Get() == 42
...   assert pool.Apply(f, (42,)) == 42
...   map_future = pool.MapAsync(identity, [1, 2, 3])
...   assert map_future.Get() == [1, 2, 3]
...   assert pool.Map(identity, [1, 2, 3]) == [1, 2, 3]

Errors are raised at the time of the Get() call on the future (which is implicit
for Apply() and Map()).
    )absolute_import)division)unicode_literalsN)
exceptions)map)queue)rangeg{Gz?c                       e Zd ZdZy)UnsupportedPlatformExceptionzHException indicating that a pool was created on an unsupported platform.N)__name__
__module____qualname____doc__     (lib/googlecloudsdk/core/util/parallel.pyr   r   A   s    Pr   r   c                   "     e Zd ZdZ fdZ xZS )InvalidStateExceptionzGException to indicate that a parallel pool was put in an invalid state.c                 ,    t         t        |   |       y N)superr   __init__)selfmsg	__class__s     r   r   zInvalidStateException.__init__H   s    	
/4r   )r   r   r   r   r   __classcell__r   s   @r   r   r   E   s    O5 5r   r   c                       e Zd ZdZej
                  d        Zej
                  d        Zd Zd Z	d Z
d Zej
                  d        Zd	 Zd
 Zy)BasePoolzBase class for parallel pools.

  Provides a limited subset of the multiprocessing.Pool API.

  Can be used as a context manager:

  >>> with pool:
  ...  assert pool.Map(str, [1, 2, 3]) == ['1', '2', '3']
  c                     t         )zFInitialize non-trivial infrastructure (e.g. processes/threads/queues).NotImplementedErrorr   s    r   StartzBasePool.StartX   
     r   c                     t         )z%Clean up anything started in Start().r!   r#   s    r   JoinzBasePool.Join]   r%   r   c                 B    | j                  ||      j                         S )zFApplies func to each element in iterable and return a list of results.)MapAsyncGetr   funciterables      r   MapzBasePool.Mapb   s    ==x(,,..r   c           	      `    t        |D cg c]  }| j                  ||f       c}      S c c}w )z=Applies func to each element in iterable and return a future.)_MultiFuture
ApplyAsync)r   r,   r-   args       r   r)   zBasePool.MapAsyncf   s,    (K(3v6(KLLKs   +c                 B    | j                  ||      j                         S )a  Applies func to each element in iterable and return a generator.

    The generator yields the result immediately after the task is done. So
    result for faster task will be yielded earlier than for slower task.

    Args:
      func: a function object
      iterable: an iterable object and each element is the arguments to func

    Returns:
      A generator to produce the results.
    )r)   GetResultsEagerFetchr+   s      r   MapEagerFetchzBasePool.MapEagerFetchj   s     ==x(==??r   c                 B    | j                  ||      j                         S )z,Applies func to args and returns the result.)r1   r*   r   r,   argss      r   ApplyzBasePool.Applyy   s    ??4&**,,r   c                     t         )z'Apply func to args and return a future.r!   r7   s      r   r1   zBasePool.ApplyAsync}   r%   r   c                 &    | j                          | S r   )r$   r#   s    r   	__enter__zBasePool.__enter__   s    JJLKr   c                 $    | j                          y r   )r'   )r   exc_typeexc_valexc_tbs       r   __exit__zBasePool.__exit__   s    IIKr   N)r   r   r   r   abcabstractmethodr$   r'   r.   r)   r5   r9   r1   r<   rA   r   r   r   r   r   L   su        /M@-  r   r   c                   ^    e Zd ZdZd Zej                  d        Zej                  d        Zy)
BaseFuturezAA future object containing a value that may not be available yet.c                 >    | j                         j                         S r   	GetResult
GetOrRaiser#   s    r   r*   zBaseFuture.Get   s    >>&&((r   c                     t         r   r!   r#   s    r   rH   zBaseFuture.GetResult       
r   c                     t         r   r!   r#   s    r   DonezBaseFuture.Done   rK   r   N)	r   r   r   r   r*   rB   rC   rH   rM   r   r   r   rE   rE      s>    I)    r   rE   c                   *    e Zd ZdZddZd Zd Zd Zy)_Resulta  Value holder for a result (a value, if successful, or an error).

  Only one of {value, error, exc_info} can be set.

  Both error and exc_info exist due to issues with pickling. exc_info is better,
  because we can re-raise it and preserve the original stacktrace, but it can't
  be pickled. error gets re-raised from GetOrRaise().

  Attributes:
    result: one-tuple of any object (optional), the result of the function. It's
      a one-tuple to distinguish a result of None from no result.
    error: Exception (optional), an exception that was thrown by the function
    exc_info: exc_info (optional) for the exception that occurred
  Nc                     t        t        t        |||g            dkD  rt        d      |s|s|st        d      || _        || _        || _        y )N   z:_Result may only have one of [value, error, exc_info] set.z6_Result must have one of [value, error, exc_info] set.)sumr   bool
ValueErrorvalueerrorexc_info)r   rU   rV   rW   s       r   r   z_Result.__init__   sW    
3teUH-./!3   UhOPPDJDJDMr   c                     | j                   r| j                   d   S | j                  r| j                  t        j                  | j                  d   | j                  d          y )Nr   rQ      )tb)rU   rV   r   reraiserW   r#   s    r   rI   z_Result.GetOrRaise   sJ    zzZZ]	JJq)dmmA.>?r   c                    | j                   rt        | j                   d         }n| }	 t        j                  |       |S # t        j                  $ r}t        |      cY d}~S d}~wt
        $ rL}t        t        j                  dj                  |t        j                  |                        cY d}~S d}~ww xY w)zReturn a pickleable version of this _Result.

    Traceback objects can't be pickled, so we just pass through the exc_value.
    Also, some values and exceptions can't be pickled.

    Returns:
      _Result: a pickleable version of this result.
    rQ   rV   Nz!Couldn't pickle result [{0}]: {1})	rW   rO   pickledumpsPicklingError	Exceptionformatsix	text_type)r   pickleable_resulterrs      r   ToPickleableResultz_Result.ToPickleableResult   s     }}!a(897ll$%    3 76//
-
4
4s!356 7 77s0   A B=A%B=%B=1AB82B=8B=c                 d    dj                  | j                  | j                  | j                        S )Nz+_Result(value={0}, error={1}, exc_info={2}))rb   rU   rV   rW   r#   s    r   __str__z_Result.__str__   s(    8??

DJJ/ /r   )NNN)r   r   r   r   r   rI   rg   ri   r   r   r   rO   rO      s    @0/r   rO   c                        e Zd Z fdZ xZS )
MultiErrorc           	      x    || _         d }t        t        |   ddj	                  t        ||            z          y )Nc                 r    dj                  t        |       j                  t        j                  |             S )Nz{}: {})rb   typer   rc   rd   )es    r   <lambda>z%MultiError.__init__.<locals>.<lambda>   s"    8??47#3#3S]]15EFr   zOne or more errors occurred:
z

)errorsr   rk   r   joinr   )r   rq   fnr   s      r   r   zMultiError.__init__   s8    DK	FB	*d$(CFO$	%&r   )r   r   r   r   r   r   s   @r   rk   rk      s    & &r   rk   c                   (    e Zd ZdZd Zd Zd Zd Zy)r0   zFuture object that combines other Future objects.

  Returns the results of each future when they are all ready.

  Attributes:
    futures: list of BaseFuture.
  c                     || _         y r   )futures)r   rv   s     r   r   z_MultiFuture.__init__   s	    DLr   c                    g }g }| j                   D ]"  }	 |j                  |j                                $ |rt	        t        |            S t	        |f      S # t        $ r}|j                  |       Y d }~hd }~ww xY w)Nr]   )rU   )rv   appendr*   ra   rO   rk   )r   resultsrq   futurerf   s        r   rH   z_MultiFuture.GetResult   st    GF,,vzz|$ 
 :f-..'$$	  cs   A	A>#A99A>c                 n    t        | j                  D cg c]  }|j                          c}      S c c}w r   )allrv   rM   )r   rz   s     r   rM   z_MultiFuture.Done   s)    T\\:\6\:;;:s   2c              #     K   | j                   }|r]g }|D ]7  }|j                         r	 |j                          '|j	                  |       9 |}t        j                  t               |r\yy# t        $ r}| Y d}~ld}~ww xY ww)zCollect the results of futures.

    Results are yielded immediately after the task is done. So
    result for faster task will be yielded earlier than for slower task.

    Yields:
      result which is done.
    N)rv   rM   r*   ra   rx   timesleep_POLL_INTERVAL)r   uncollected_futurenext_uncollected_futurerz   rf   s        r   r4   z!_MultiFuture.GetResultsEagerFetch   s~      
 "&&;;=**, "
(
(
0 ' 3
jj    Is3   &B	A/1B	-B	/	B8B<B	BB	N)r   r   r   r   r   rH   rM   r4   r   r   r   r0   r0      s    
%<!r   r0   c                   (    e Zd ZdZd Zd Zd Zd Zy)_TaskzAn individual work unit to be performed in parallel.

  Attributes:
    func: callable, a function to be called with the given arguments. Must be
      serializable.
    args: tuple, the arguments to pass to func. Must be serializable.
  c                      || _         || _        y r   )r,   r8   r7   s      r   r   z_Task.__init__  s    DIDIr   c                 X    t        | j                  j                  | j                  f      S r   )hashr,   r   r8   r#   s    r   __hash__z_Task.__hash__#  s     ##TYY/00r   c                     | j                   j                  |j                   j                  k(  xr | j                  |j                  k(  S r   )r,   r   r8   r   others     r   __eq__z_Task.__eq__&  s3    99!4!44Pejj9PPr   c                 &    | j                  |       S r   )r   r   s     r   __ne__z_Task.__ne__)  s    {{5!!!r   N)r   r   r   r   r   r   r   r   r   r   r   r   r     s    1Q"r   r   c                       e Zd Zd Zd Zd Zy)_NullFuturec                     || _         y r   result)r   r   s     r   r   z_NullFuture.__init__4  s	    DKr   c                     | j                   S r   r   r#   s    r   rH   z_NullFuture.GetResult7  s    ;;r   c                      y)NTr   r#   s    r   rM   z_NullFuture.Done:  s    r   N)r   r   r   r   rH   rM   r   r   r   r   r   2  s    r   r   c                   (    e Zd ZdZd Zd Zd Zd Zy)NullPoolz)Serial analog of parallel execution Pool.c                     d| _         y )NF)_startedr#   s    r   r   zNullPool.__init__A  s	    DMr   c                     | j                   st        d      	 t         || f      }t        |      S #  t        t        j                               }Y t        |      S xY w)N&NullPool must be Start()ed before use.rW   )r   r   rO   sysrW   r   )r   r,   r8   r   s       r   r1   zNullPool.ApplyAsyncD  s[    == ""JKK0d~&f v0/fvs	   3  Ac                 @    | j                   rt        d      d| _         y )NzCan only start NullPool once.Tr   r   r#   s    r   r$   zNullPool.StartP  s    }}!"ABBDMr   c                 2    | j                   st        d      y )Nr   r   r#   s    r   r'   zNullPool.JoinU  s    == ""JKK	 r   N)r   r   r   r   r   r1   r$   r'   r   r   r   r   r   >  s    1

Lr   r   c                   $    e Zd Zd Zd Zd Zd Zy)_ThreadFuturec                      || _         || _        y r   _task_results_map)r   taskresults_maps      r   r   z_ThreadFuture.__init__e  s    DJ#Dr   c                 >    | j                         j                         S )z6Return the value of the future, or raise an exception.rG   r#   s    r   r*   z_ThreadFuture.Geti  s    >>&&((r   c                     	 | j                   | j                  v r| j                  | j                      S t        j                  t               K)zGet the _Result of the future.)r   r   r~   r   r   r#   s    r   rH   z_ThreadFuture.GetResultm  s=    
	t((	(  ,,
jj  r   c                 2    | j                   | j                  v S )z8Return True if the task finished with or without errors.r   r#   s    r   rM   z_ThreadFuture.Donet  s    ::****r   N)r   r   r   r   r*   rH   rM   r   r   r   r   r   c  s    $)!+r   r   c                       e Zd Zd Zy)_ThreadTaskc                     || _         y r   )r   )r   r   s     r   r   z_ThreadTask.__init__{  s	    DIr   N)r   r   r   r   r   r   r   r   r   y  s    r   r   c                   $     e Zd Z fdZd Z xZS )_WorkerThreadc                 F    t         t        |           || _        || _        y r   )r   r   r   
work_queuer   )r   r   r   r   s      r   r   z_WorkerThread.__init__  s    	-') DO"Dr   c                 (   	 | j                   j                         }|t        u ry |j                  }	 t	         |j
                  |j                   f      }|| j                  |j                  <   m#  t	        t        j                               }Y <xY w)Nr   )
r   get_STOP_WORKINGr   rO   r,   r8   r   rW   r   )r   thread_taskr   r   s       r   runz_WorkerThread.run  s    
OO'')k		%d2)$))TYY/12 ,2d{''( 2#,,.1s   #A/ / B)r   r   r   r   r   r   r   s   @r   r   r     s    #

2r   r   c                   (    e Zd ZdZd Zd Zd Zd Zy)
ThreadPoolz%Thread-based parallel execution Pool.c                 `    || _         t        j                         | _        g | _        i | _        y r   )num_threadsr   Queue_task_queueworker_threadsr   )r   r   s     r   r   zThreadPool.__init__  s(    "D{{}DDDr   c                     | j                   rt        d      t        | j                        D ]M  }t	        | j
                  | j                        }| j                   j                  |       |j                          O y )Nz(ThreadPool must be started at most once.)	r   r   r	   r   r   r   r   rx   startr   _threads      r   r$   zThreadPool.Start  sa    !"LMM4##$T--t/@/@Af
  (lln %r   c                     | j                   st        d      t        ||      }t        || j                        }| j
                  j                  t        |             |S Nz(ThreadPool must be Start()ed before use.)r   r   r   r   r   r   putr   )r   r,   r8   r   r   s        r   r1   zThreadPool.ApplyAsync  sS    !"LMMtD4!2!23FT*+Mr   c                     | j                   st        d      | j                   D ]!  }| j                  j                  t               # | j                   D ]  }|j                           y r   )r   r   r   r   r   rr   r   s      r   r'   zThreadPool.Join  sV    !"LMM  
=) ! %%kkm &r   N)r   r   r   r   r   r$   r1   r'   r   r   r   r   r     s    -r   r   c                 6    | dk(  r
t               S t        |       S )a=  Returns a parallel execution pool for the given number of threads.

  Can return either:
  - NullPool: if num_threads is 1.
  - ThreadPool: if num_threads is greater than 1

  Args:
    num_threads: int, the number of threads to use.

  Returns:
    BasePool instance appropriate for the given type of parallelism.
  rQ   )r   r   )r   s    r   GetPoolr     s     A:k""r   )'r   
__future__r   r   r   rB   r^   r   	threadingr~   googlecloudsdk.corer   rc   	six.movesr   r   r	   r   r   ra   r   r   add_metaclassABCMetaobjectr   rE   rO   rk   r0   r   r   r   r   r   Threadr   r   r   r   r   r   <module>r      s;  0 '  ' 
  
   * 
    Q9 Q5I 5 3;;:v :  :z 3;;   </f </~& &0!: 0!f"F "8	* 	Lx LJ+J +,& 2I$$ 2(   P#r   