
    E/                         d dl mZ d dlZd dlZd dlmZmZmZ d dlZd dl	m
Z
 d dlmZ  ej                  e      Zee
j                      Z G d d      Z G d d	e      Zy)
    )OrderedDictN)DictOptionalType)types)
exceptionsc                   *    e Zd ZdZdededefdZd Zy)_QuantityReservationz2A (partial) reservation of quantifiable resources.bytes_reservedbytes_neededhas_slotc                 .    || _         || _        || _        y )Nr   r   r   )selfr   r   r   s       Clib/third_party/google/cloud/pubsub_v1/publisher/flow_controller.py__init__z_QuantityReservation.__init__"   s    ,(     c                     t        |       j                   d| j                   d| j                   d| j                   dS )Nz(bytes_reserved=z, bytes_needed=z, has_slot=))type__name__r   r   r   )r   s    r   __repr__z_QuantityReservation.__repr__'   sN    Dz""# $"112 3 --. /a)	
r   N)r   
__module____qualname____doc__intboolr   r    r   r   r
   r
      s$    <!s !# ! !

r   r
   c                       e Zd ZdZdej
                  fdZdeddfdZdeddfdZ	dd	Z
defd
ZdedefdZ	 ddee   dee   defdZy)FlowControllerzA class used to control the flow of messages passing through it.

    Args:
        settings: Desired flow control configuration.
    settingsc                     || _         d| _        d| _        t               | _        d| _        d| _        t        j                         | _	        t        j                  | j                        | _        y )Nr   )lock)	_settings_message_count_total_bytesr   _waiting_reserved_bytes_reserved_slots	threadingLock_operational_lock	Condition_has_capacity)r   r!   s     r   r   zFlowController.__init__7   sd    !  
 GRm   "+!1 '00d6L6LMr   messagereturnNc                 H   | j                   j                  t        j                  j                  k(  ry| j
                  5  | j                  |      sL| xj                  dz  c_        | xj                  |j                  j                         z  c_        	 ddd       y| j                   j                  t        j                  j                  k(  rk| j                  | j                  dz   | j                  |j                  j                         z         }dj                  |      }t        j                  |      | j                   j                  t        j                  j                   k(  sJ |j                  j                         | j                   j"                  kD  s| j                   j$                  dk  rQ| j                  d|j                  j                               }dj                  |      }t        j                  |      t'        j(                         }| j                  |      r|| j*                  vr5t-        d|j                  j                         d      }|| j*                  |<   t.        j1                  d	j                  | j                                      | j2                  j5                          t.        j1                  d
j                  | j                                      | j                  |      r| xj                  dz  c_        | xj                  |j                  j                         z  c_        | xj6                  | j*                  |   j8                  z  c_        | xj:                  dz  c_        | j*                  |= ddd       y# 1 sw Y   yxY w)a  Add a message to flow control.

        Adding a message updates the internal load statistics, and an action is
        taken if these limits are exceeded (depending on the flow control settings).

        Args:
            message:
                The message entering the flow control.

        Raises:
            :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`:
                Raised when the desired action is
                :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR` and
                the message would exceed flow control limits, or when the desired action
                is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK` and
                the message would block forever against the flow control limits.
        N   )message_counttotal_bytesz+Flow control limits would be exceeded - {}.zLTotal flow control limits too low for the message, would block forever - {}.r   Fr   z>Blocking until there is enough free capacity in the flow - {}.z7Woke up from waiting on free capacity in the flow - {}.)r$   limit_exceeded_behaviorr   LimitExceededBehaviorIGNOREr,   _would_overflowr%   r&   _pbByteSizeERROR
_load_infoformatr   FlowControlLimitErrorBLOCK
byte_limitmessage_limitr*   current_threadr'   r
   _LOGGERdebugr.   waitr(   r   r)   )r   r/   	load_info	error_msgrB   reservations         r   addzFlowController.addN   s   $ >>11U5P5P5W5WW##''0##q(#!!W[[%9%9%;;!	 $# 66..445 !OO"&"5"5"9 $ 1 1GKK4H4H4J J , 	 JPP	 !66yAA 66..4455 $$&)B)BB>>//!3 OO"#1E1E1G , 	006y0A  !66yAA&557N&&w/!6"6'(%,[[%9%9%;!&#K
 5@DMM.1 &!23
 ""'') &!23! &&w/, 1$!5!5!77  DMM.$A$P$PP   A% n-O $##s   ANI!N?BNN!c                    | j                   j                  t        j                  j                  k(  ry| j
                  5  | xj                  dz  c_        | xj                  |j                  j                         z  c_        | j                  dk  s| j                  dk  rRt        j                  dt        d       t        d| j                        | _        t        d| j                        | _        | j                          | j                         r/t         j#                  d       | j$                  j'                          ddd       y# 1 sw Y   yxY w)zRelease a mesage from flow control.

        Args:
            message:
                The message entering the flow control.
        Nr2   r   z=Releasing a message that was never added or already released.   )category
stacklevelz2Notifying threads waiting to add messages to flow.)r$   r5   r   r6   r7   r,   r%   r&   r9   r:   warningswarnRuntimeWarningmax_distribute_available_capacity_ready_to_unblockrC   rD   r.   
notify_all)r   r/   s     r   releasezFlowController.release   s     >>11U5P5P5W5WW##1$!5!5!77""Q&$*;*;a*?S+ 
 '*!T-@-@&A#$'4+<+<$=!//1 %%'RS""--/' $##s   DE

Ec                    | j                   j                  | j                  z
  | j                  z
  }| j                   j                  | j
                  z
  | j                  z
  }| j                  j                         D ]  }|dk  r|dk  r y|dkD  r-|j                  s!d|_	        | xj                  dz  c_        |dz  }|dk  rG|j                  |j                  z
  }|dk  rCdj                  |j                  |j                        }t        j                  |t               d}t!        ||      }|xj                  |z  c_        | xj                  |z  c_        ||z  } y)zDistribute available capacity among the waiting threads in FIFO order.

        The method assumes that the caller has obtained ``_operational_lock``.
        r   Tr2   z Too many bytes reserved: {} / {})rL   N)r$   rA   r%   r)   r@   r&   r(   r'   valuesr   r   r   r=   rN   rO   rP   min)r   available_slotsavailable_bytesrH   bytes_still_neededmsgcan_gives          r   rR   z-FlowController._distribute_available_capacity   sR    NN((4+>+>>AUAUU 	 NN%%(9(99D<P<PP 	  ==//1K!#1(< ";+?+?'+$$$)$1$ !#!,!9!9K<V<V!V!A%8??..0H0H cN;%&"-?H&&(2&  H, x'O5 2r   c                     | j                   rSt        t        | j                   j                                     }|j                  |j
                  k\  xr |j                  S y)zDetermine if any of the threads waiting to add a message can proceed.

        The method assumes that the caller has obtained ``_operational_lock``.
        F)r'   nextiterrW   r   r   r   )r   first_reservations     r   rS   z FlowController._ready_to_unblock   sW    
 == !%T$--*>*>*@%A B!004E4R4RR /%..
 r   c                    | j                   j                  t        j                               }|r&|j                  |j
                  k\  }|j                  }nd}d}| j                  | j                  z   |j                  j                         z   }|| j                  j                  kD  xr | }| xr3 | j                  | j                  z   dz   | j                  j                  kD  }|xs |S )zDetermine if accepting a message would exceed flow control limits.

        The method assumes that the caller has obtained ``_operational_lock``.

        Args:
            message: The message entering the flow control.
        Fr2   )r'   getr*   rB   r   r   r   r&   r(   r9   r:   r$   r@   r%   r)   rA   )r   r/   rH   enough_reservedr   bytes_takensize_overflowmsg_count_overflows           r   r8   zFlowController._would_overflow  s     mm''	(@(@(BC)88K<T<TTO"++H#OH''$*>*>>AUAUAWW#dnn&?&??WDW!)\ 
  4#7#77!;nn**+ 	
 2 22r   r3   r4   c                     || j                   }|| j                  }d| d| j                  j                   d| j                   d| d| j                  j
                   d| j                   dS )a  Return the current flow control load information.

        The caller can optionally adjust some of the values to fit its reporting
        needs.

        The method assumes that the caller has obtained ``_operational_lock``.

        Args:
            message_count:
                The value to override the current message count with.
            total_bytes:
                The value to override the current total bytes with.
        z
messages: z / z (reserved: z
), bytes: r   )r%   r&   r$   rA   r)   r@   r(   )r   r3   r4   s      r   r<   zFlowController._load_info  s        //M++K s4>>+G+G*H I../ 0!]#dnn&?&?%@ A../q2	
r   )r0   N)NN)r   r   r   r   r   PublishFlowControlr   MessageTyperI   rU   rR   r   rS   r8   r   r   strr<   r   r   r   r    r    0   s    N!9!9 N.\.; \.4 \.|0{ 0t 0>&(P4  3{ 3t 38 QU
%c]
@H
	
r   r    )collectionsr   loggingr*   typingr   r   r   rN   google.cloud.pubsub_v1r    google.cloud.pubsub_v1.publisherr   	getLoggerr   rC   PubsubMessagerj   r
   objectr    r   r   r   <module>rt      s]    $   ' '  ( 7 '

H
% 5&&'
 
"I
V I
r   