
    F                         d dl Z d dlmZmZ d dlZd dlmZ d dlmZ d dl	m
Z
mZ d dlmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZmZ  ej4                  e      Z ed      ZdZ G d deeeef         Z y)    N)OptionalSet)wait_ignore_errors)Assigner)RetryingConnectionConnectionFactory)FailedPreconditionGoogleAPICallError)ConnectionReinitializer)
Connection)	Partition)PartitionAssignmentRequestPartitionAssignment!InitialPartitionAssignmentRequestPartitionAssignmentAckg      LAi  c                       e Zd ZU eed<   eeef   ed<   eed<   e	e
j                     ed<   ded<   dedeeef   fd	Zd
 Zd Zd Zd Zd ZdefdZdeeef   fdZdee   fdZy)AssignerImpl_initial_connection_outstanding_assignment	_receiverzasyncio.Queue[Set[Partition]]_new_assignmentinitialfactoryc                     || _         t        ||       | _        d| _        d | _        t        j                  d      | _        y )NF   )maxsize)r   r   r   r   r   asyncioQueuer   )selfr   r   s      Flib/third_party/google/cloud/pubsublite/internal/wire/assigner_impl.py__init__zAssignerImpl.__init__=   s:    
  -gt<',$&}}Q7    c                 V   K   | j                   j                          d {    | S 7 wN)r   
__aenter__r    s    r!   r&   zAssignerImpl.__aenter__H   s(     ))+++ 	,s   )')c                 p    | j                   J t        j                  | j                               | _         y r%   )r   r   ensure_future_receive_loopr'   s    r!   _start_receiverzAssignerImpl._start_receiverL   s-    ~~%%% ..t/A/A/CDr#   c                    K   | j                   r?| j                   j                          t        | j                          d {    d | _         y y 7 wr%   )r   cancelr   r'   s    r!   _stop_receiverzAssignerImpl._stop_receiverP   s>     >>NN!!#$T^^444!DN 4s   ?AAAc                   K   	 | j                   j                          d {   }| j                  s| j                  j	                         s%| j                   j                  t        d             y d| _        t               }|j                  D ]  }|j                  t        |              t        j                  d| d       | j                  j                  |       7 w)NTzHReceived a duplicate assignment on the stream while one was outstanding.z)Received new assignment with partitions: .)r   readr   r   emptyfailr	   set
partitionsaddr   _LOGGERinfo
put_nowait)r    responser5   	partitions       r!   r*   zAssignerImpl._receive_loopV   s     !--2244H++43G3G3M3M3O  %%&b
 +/D(J%00	y34 1LLDZLPQRS  ++J7 4s   C$C"CC$c                    K   | j                          d {    | j                  j                  |||       d {    y 7 *7 wr%   )r.   r   	__aexit__)r    exc_typeexc_valexc_tbs       r!   r=   zAssignerImpl.__aexit__g   s?     !!###((7FCCC 	$Cs   AA$AAAAerrorc                    K   | j                          d {    d| _        | j                  j                         s6| j                  j	                          | j                  j                         s5y y 7 \w)NF)r.   r   r   r2   
get_nowait)r    rA   s     r!   stop_processingzAssignerImpl.stop_processingk   s]     !!###',$&&,,.  ++- &&,,. 	$s   A5A3AA51A5
connectionc                    K   |j                  t        | j                               d {    | j                          y 7 w)N)r   )writer   r   r+   )r    rE   s     r!   reinitializezAssignerImpl.reinitializeq   s8      9$--PQQQ 	Rs   )AAAreturnc                 v  K   | j                   r=	 | j                  j                  t        t	                            d {    d| _         | j                  j                  | j                  j                                d {   S 7 F# t
        $ r"}t        j                  d|        Y d }~cd }~ww xY w7 4w)N)ackFz5Assignment ack attempt failed due to stream failure: )r   r   rG   r   r   r
   r7   debugawait_unless_failedr   get)r    es     r!   get_assignmentzAssignerImpl.get_assignmentx   s     ''	&&,,.3I3KL   05, %%99$:N:N:R:R:TUUU & KA3O 
 VsL   B90B	  BB	 6B9B7B9B	 		B4B/*B9/B44B9N)__name__
__module____qualname__r   __annotations__r   r   r   boolr   r   Futurer   r"   r&   r+   r.   r*   r=   r
   rD   r   rH   r   r   rP    r#   r!   r   r   0   s     0/#$>@S$STT!!'' 54	82	8 ##=?R#RS	8E"8"D.+= .9;NNOVc)n Vr#   r   )!r   typingr   r   logging6google.cloud.pubsublite.internal.wait_ignore_cancelledr   .google.cloud.pubsublite.internal.wire.assignerr   9google.cloud.pubsublite.internal.wire.retrying_connectionr   r   google.api_core.exceptionsr	   r
   >google.cloud.pubsublite.internal.wire.connection_reinitializerr   0google.cloud.pubsublite.internal.wire.connectionr   'google.cloud.pubsublite.types.partitionr    google.cloud.pubsublite_v1.typesr   r   r   r   	getLoggerrQ   r7   int
_MAX_BYTES_MAX_MESSAGESr   rW   r#   r!   <module>rf      s        U C N H =  '

H
% "#
 TV%&@BU&UVTVr#   