
    >                        d Z ddlmZ ddlmZ ddlmZ ddlmZ ddlZddlZddlZddl	Z	ddl
Z
ddlZddlmZ ddlmZ dd	lmZ 	 ddlZd
ZdZdZdZdZdadada ej:                  dddg      Z	  e	j>                  d      Z  G d de#      Z$ G d de#      Z%d Z&d Z'ddZ(d Z)efdZ*y# e$ rZdZY dZ[kdZ[ww xY w# e!e"f$ r e	Z Y Hw xY w)z:Utility classes and methods for the parallelism framework.    )absolute_import)print_function)division)unicode_literalsN)	constants)system_util)queueTF<      )zThere were noztasks to do MultiprocessingIsAvailableResultis_availablestack_traceforkc                   @    e Zd ZdZd
dZd Zd Zd
dZd Zd Z	dd	Z
y)
AtomicDictzThread-safe (and optionally process-safe) dictionary protected by a lock.

  If a multiprocessing.Manager is supplied on init, the dictionary is
  both process and thread safe. Otherwise, it is only thread-safe.
  Nc                     |r+|j                         | _        |j                         | _        yt        j                          | _        i | _        y)zInitializes the dict.

    Args:
      manager: (multiprocessing.Manager or None) Manager instance (required for
          cross-process safety), or none if cross-process safety is not needed.
    N)Locklockdict	threading)selfmanagers     9platform/gsutil/gslib/utils/parallelism_framework_util.py__init__zAtomicDict.__init__U   s6     ,,.di,,.di.."didi    c                 d    | j                   5  | j                  |   cd d d        S # 1 sw Y   y xY wNr   r   r   keys     r   __getitem__zAtomicDict.__getitem__c   s    	YYs^ 
s   &/c                 d    | j                   5  || j                  |<   d d d        y # 1 sw Y   y xY wr   r   )r   r    values      r   __setitem__zAtomicDict.__setitem__g   s     	diin 
s   &/c                 ~    | j                   5  | j                  j                  ||      cd d d        S # 1 sw Y   y xY wr   r   r   get)r   r    default_values      r   r'   zAtomicDict.getl   s%    	YY]]3. 
s   3<c                 `    | j                   5  | j                  |= d d d        y # 1 sw Y   y xY wr   r   r   s     r   deletezAtomicDict.deletep   s    	
))C. 
s   $-c                 z    | j                   5  | j                  j                         cd d d        S # 1 sw Y   y xY wr   )r   r   valuesr   s    r   r,   zAtomicDict.valuest   s#    	YY 
s   1:c                     | j                   5  | j                  j                  ||      |z   }|| j                  |<   |cddd       S # 1 sw Y   yxY w)a  Atomically updates the stored value associated with the given key.

    Performs the atomic equivalent of
    dict[key] = dict.get(key, default_value) + inc.

    Args:
      key: lookup key for the value of the first operand of the "+" operation.
      inc: Second operand of the "+" operation.
      default_value: Default value if there is no existing value for the key.

    Returns:
      Incremented value.
    Nr&   )r   r    incr(   vals        r   	IncrementzAtomicDict.Incrementx   s>     
IIMM#}-3cdiin 
s   0AAr   r   )__name__
__module____qualname____doc__r   r!   r$   r'   r*   r,   r1    r   r   r   r   N   s*    
/ r   r   c                   0    e Zd ZdZd ZddZd Zd Zd Zy)	ProcessAndThreadSafeInta  This class implements a process and thread-safe integer.

  It is backed either by a multiprocessing Value of type 'i' or an internal
  threading lock.  This simplifies the calling pattern for
  global variables that could be a Multiprocessing.Value or an integer.
  Without this class, callers need to write code like this:

  global variable_name
  if isinstance(variable_name, int):
    return variable_name
  else:
    return variable_name.value
  c                     || _         | j                   rt        j                  dd      | _        y t	        j
                         | _        d| _        y )Nir   )multiprocessing_is_availablemultiprocessing_contextValuer#   r   r   r   )r   r<   s     r   r   z ProcessAndThreadSafeInt.__init__   s<    (DD%((*00a8dj.."didjr   c                     | j                   r|| j                  _        y | j                  5  || _        d d d        y # 1 sw Y   y xY wr   r<   r#   r   )r   reset_values     r   ResetzProcessAndThreadSafeInt.Reset   s/    (($djj99 
 99s	   <Ac                     | j                   r | j                  xj                  dz  c_        y | j                  5  | xj                  dz  c_        d d d        y # 1 sw Y   y xY wN   r@   r-   s    r   r1   z!ProcessAndThreadSafeInt.Increment   >    ((
jj!99

a
 99   AA!c                     | j                   r | j                  xj                  dz  c_        y | j                  5  | xj                  dz  c_        d d d        y # 1 sw Y   y xY wrD   r@   r-   s    r   	Decrementz!ProcessAndThreadSafeInt.Decrement   rF   rG   c                     | j                   r| j                  j                  S | j                  5  | j                  cd d d        S # 1 sw Y   y xY wr   r@   r-   s    r   GetValuez ProcessAndThreadSafeInt.GetValue   s3    ((ZZ99zz 99s   AANr2   )	r3   r4   r5   r6   r   rB   r1   rI   rK   r7   r   r   r9   r9      s     !r   r9   c                 v   	 t        j                  |       \  }}||kD  r	 t        j                  | ||f       |S ||k  r	 t        j                  | ||f       |S |S # t         j                  t        f$ r Y yw xY w# t         j                  t        f$ r Y \w xY w# t         j                  t        f$ r |cY S w xY w)a  Sets a new soft limit for the maximum number of open files.

  The soft limit is used for this process (and its children), but the
  hard limit is set by the system and cannot be exceeded.

  We will first try to set the soft limit to the hard limit's value; if that
  fails, we will try to set the soft limit to the fallback_value iff this would
  increase the soft limit.

  Args:
    resource_name: Name of the resource to increase the soft limit for.
    fallback_value: Fallback value to be used if we couldn't set the
                    soft value to the hard value (e.g., if the hard value
                    is "unlimited").

  Returns:
    Current soft limit for the resource (after any changes we were able to
    make), or -1 if the resource doesn't exist.
  )resource	getrlimiterror
ValueError	setrlimit)resource_namefallback_value
soft_limit
hard_limits       r   _IncreaseSoftLimitForResourcerW      s    ,'11-@Z *Z(@A . (DE / ..*	%  NNJ' 
 NNJ'  s4   A A;  B A87A8;BBB87B8c                     t         j                  ryt         j                  ry	 t        dd      5 } | j	                         j                         D ]H  }d|v s|j                  d      d   j                  d      }d	|j                         v |fc cd
d
d
       S  	 d
d
d
       y# 1 sw Y   y
xY w# t        $ rU}|j                  t        j                  k(  r2t        j                  d|j                  t        |      fz         Y d
}~y d
}~wt        $ r7}t        j                  dj!                  t        |                   Y d
}~yd
}~ww xY w)a  Determines if the OS doesn't support multiprocessing.

  There are two cases we currently know about:
    - Multiple processes are not supported on Windows.
    - If an error is encountered while using multiple processes on Alpine Linux
      gsutil hangs. For this case it's possible we could do more work to find
      the root cause but after a fruitless initial attempt we decided instead
      to fall back on multi-threading w/o multiprocesing.

  Returns:
    (bool indicator if multiprocessing should be prohibited, OS name)
  )TWindows)FmacOSz/etc/os-releaserzNAME==rE   "zalpine linuxN)FUnknownzeUnable to open /etc/os-release to determine whether OS supports multiprocessing: errno=%d, message=%szYSomething went wrong while trying to determine multiprocessing capabilities.
Message: {0})r   
IS_WINDOWSIS_OSXopenread
splitlinessplitstriplowerIOErrorerrnoENOENTloggingdebugstr	Exceptionformat)flineos_nameeexcs        r   ShouldProhibitMultiprocessingrt      s      		% &&(%%'$d?JJsOA&,,S1' GMMO3W=
= 
&	% (
   
&	%	% 
 ww%,,mm EWWc!f%& '  	 MM @@F#hA ! 	sY   B0 &B$8B$	B0 B$B0 $B-)B0 -B0 0	E9A
D	D		E-EEc                    t         A| r*| j                  t               | j                  t               t        t         t              S t               \  }}|r%d|z  }| r| j                  |       t        dd      S d}d}d}	 	 t        j                  dd       t        j                         a
d
}t        rf	 t        |t        t        j                  t         j"                              }	 t        |t        t        j&                  t         j"                              }|t         j"                  k  r|d|z  z  }t)        d|z        	 |a |a|at        t         t              S #  |d	z  } xY w# t$        $ r Y w xY w# t$        $ r Y jw xY w#  t+        j,                         }d}| "| j                  |       | j                  |       Y xY w)aq  Checks if multiprocessing is available, and if so performs initialization.

  There are some environments in which there is no way to use multiprocessing
  logic that's built into Python (e.g., if /dev/shm is not available, then
  we can't create semaphores). This simply tries out a few things that will be
  needed to make sure the environment can support the pieces of the
  multiprocessing module that we need.

  See gslib.command.InitializeMultiprocessingVariables for
  an explanation of why this is necessary.

  Args:
    logger: (logging.Logger) Logger to use for debug output.

  Returns:
    (MultiprocessingIsAvailableResult) A namedtuple with the following attrs:
      - multiprocessing_is_available: True iff the multiprocessing module is
            available for use.
      - stack_trace: The stack trace generated by the call we tried that
            failed.
  N)r   r   z
Multiple processes are not supported on %s. Operations requesting
parallelism will be executed with multiple threads in a single process only.
FTz
You have requested multiple processes for an operation, but the
required functionality of Python's multiprocessing module is not available.
Operations requesting parallelism will be executed with multiple threads in a
single process only.
r;   r   zI
Please ensure that you have write access to both /dev/shm and /run/shm.
rM   a  
Your max number of open files, %s, is too low to allow safe multiprocessing.
On Linux you can fix this by adding something like "ulimit -n 10000" to your
~/.bashrc or equivalent file and opening a new terminal.

On macOS, you may also need to run a command like this once (in addition to the
above instructions), which might require a restart of your system to take
effect:
  launchctl limit maxfiles 10000

Alternatively, edit /etc/launchd.conf with something like:
  limit maxfiles 10000 10000

z)Max number of open files, %s, is too low.)$_cached_multiprocessing_is_availablerk   )_cached_multiprocessing_check_stack_tracewarn,_cached_multiprocessing_is_available_messager   rt   r=   r>   Managertop_level_manager_HAS_RESOURCE_MODULEmaxrW   rN   RLIMIT_NOFILEr   MIN_ACCEPTABLE_OPEN_FILES_LIMITAttributeErrorRLIMIT_OFILErm   	traceback
format_exc)loggershould_prohibit_multiprocessingrq   messager   r<   limits          r   $CheckMultiprocessingAvailableAndInitr      s   4 *5ll<=kk>?+9=? ? .K-L*!7$ G kk'+8<> > +!%'=##C+ 0779
 E)&&99;<)%%99;< y888   g AEIJJ 90 *F&.9+18.	)7;
= =}  g &    &&&(K#( ll;kk's`   E! F
 72E, *2E; )F
 !E))F
 ,	E85F
 7E88F
 ;	FF
 FF
 
<Gc                  z    t               j                  rt        j                         S t	        j                         S )a  Returns either a multiprocessing lock or a threading lock.

  Use Multiprocessing lock iff we have access to the parts of the
  multiprocessing module that are necessary to enable parallelism in operations.

  Returns:
    Multiprocessing or threading lock.
  )r   r   r{   r   r   r7   r   r   
CreateLockr     s,     *+88!!##>>r   c                 r    d}|s	 | j                  ||       d}|syy# t        j                  $ r Y w xY w)a7  Puts an item to the status queue.

  If the queue is full, this function will timeout periodically and repeat
  until success. This avoids deadlock during shutdown by never making a fully
  blocking call to the queue, since Python signal handlers cannot execute
  in between instructions of the Python interpreter (see
  https://docs.python.org/2/library/signal.html for details).

  Args:
    queue: Queue class (typically the global status queue)
    msg: message to post to the queue.
    timeout: (optional) amount of time to wait before repeating put request.
  F)timeoutTN)putQueueFull)r	   msgr   put_successs       r   PutToQueueWithTimeoutr     sC     +iiWi%k  :: 
s     66r   )+r6   
__future__r   r   r   r   collectionsrh   rj   multiprocessingr   r   gslib.utilsr   r   	six.movesr	   r   rN   r|   ImportErrorrr   SEEK_AHEAD_JOIN_TIMEOUTSTATUS_QUEUE_OP_TIMEOUTUI_THREAD_JOIN_TIMEOUTZERO_TASKS_TO_DO_ARGUMENTrv   rw   ry   
namedtupler   get_contextr=   r   rQ   objectr   r9   rW   rt   r   r   r   r7   r   r   <module>r      s   A & %  '       ! # $      <  (, $,0 )/3 , $:;#9#9&(G$I  
,7O77?
; ;|2f 2j/d*Zz=z$ /F S  H 	
# ,+,s*   B+ 0B? +B<0B77B<?	C
C