
    @                         d dl Z d dlmZ d dlmZmZ d dlmZmZ d dl	m
Z
 d dlmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZmZmZmZmZmZmZ d dl m!Z!  G d deeeef         Z"y)    N)deepcopy)OptionalList)GoogleAPICallErrorFailedPrecondition)wait_ignore_errors)
ConnectionConnectionFactory)ConnectionReinitializer)FlowControlBatcher)is_reset_signal)RetryingConnection)
Subscriber)SubscribeRequestSubscribeResponseFlowControlRequestSequencedMessageInitialSubscribeRequestSeekRequestCursor)SubscriberResetHandlerc                   l   e Zd ZU eed<   eed<   eeef   ed<   e	ed<   e
ed<   ee   ed<   ded<   eej                     ed	<   eej                     ed
<   dededeeef   de	fdZd Zd Zd ZdefdZd Zd Zd Zd ZdefdZdeeef   fdZdeej>                  j@                     fdZ!de"fd Z#y!)"SubscriberImpl_base_initial_token_flush_seconds_connection_reset_handler_outstanding_flow_control_last_received_offsetz-asyncio.Queue[List[SequencedMessage.meta.pb]]_message_queue	_receiver_flusherbase_initialtoken_flush_secondsfactoryreset_handlerc                     || _         || _        t        ||       | _        || _        t               | _        d| _        d | _        t        j                         | _        d | _        d | _        y )NF)r   r   r   r   r   r   r   _reinitializingr   asyncioQueuer    r!   r"   )selfr#   r$   r%   r&   s        Hlib/third_party/google/cloud/pubsublite/internal/wire/subscriber_impl.py__init__zSubscriberImpl.__init__B   sb     *$7!-gt<+);)=&$%)"%mmo    c                 V   K   | j                   j                          d {    | S 7 wN)r   
__aenter__r+   s    r,   r1   zSubscriberImpl.__aenter__T   s(     ))+++ 	,s   )')c                     | j                   J | j                  J t        j                  | j	                               | _         t        j                  | j                               | _        y r0   )r!   r"   r)   ensure_future_receive_loop_flush_loopr2   s    r,   _start_looperszSubscriberImpl._start_loopersX   sX    ~~%%%}}$$$ ..t/A/A/CD--d.>.>.@Ar.   c                 >  K   | j                   r>| j                   j                          t        | j                          d {    d | _         | j                  r?| j                  j                          t        | j                         d {    d | _        y y 7 W7 wr0   )r!   cancelr   r"   r2   s    r,   _stop_looperszSubscriberImpl._stop_loopers^   sv     >>NN!!#$T^^444!DN==MM  "$T]]333 DM  5 4s$   ?BBA	BBBBresponsec           	      ^   d|vr%| j                   j                  t        d             y t        |j                  j                  j
                        }| j                  j                  |       |D ]  }| j                  w|j                  j                  | j                  k  rT| j                   j                  t        dj                  |j                  j                  | j                                      y |j                  j                  | _         | j                  j                  |       y )Nmessagesz@Received an invalid subsequent response on the subscribe stream.zfReceived an invalid out of order message from the server. Message is {}, previous last received is {}.)r   failr   listr=   _pbr   on_messagesr   cursoroffsetformatr    
put_nowait)r+   r;   r=   messages       r,   _handle_responsezSubscriberImpl._handle_responseh   s   X%!!"V
 &&**
 	&&228<G**6NN))T-G-GG  %%& A  H  H#NN1143M3M )0)>)>D&   	&&x0r.   c                 x   K   	 | j                   j                          d {   }| j                  |       47 wr0   )r   readrG   )r+   r;   s     r,   r5   zSubscriberImpl._receive_loop   s6     !--2244H!!(+ 4s   :8:c                    K   | j                   j                         }|y 	 | j                  j                  t	        |             d {    y 7 # t
        $ r Y y w xY ww)Nflow_control)r   release_pending_requestr   writer   r   )r+   reqs     r,   _try_send_tokenszSubscriberImpl._try_send_tokens   sX     ,,DDF;	""(()9s)KLLL! 		s:   A (A 	A
A A A 	AA AA c                    K   	 t        j                  | j                         d {    | j                          d {    @7 7 wr0   )r)   sleepr   rP   r2   s    r,   r6   zSubscriberImpl._flush_loop   s?     -- 9 9:::''))) :)s   $AAAAAAc                    K   | j                          d {    | j                  j                  |||       d {    y 7 *7 wr0   )r:   r   	__aexit__)r+   exc_typeexc_valexc_tbs       r,   rT   zSubscriberImpl.__aexit__   s?       """((7FCCC 	#Cs   AA$AAAAerrorc                   K   | j                          d {    t        |      r| j                  j                         sv| j                  j	                         }t        d |D              }| j                  j                  t        t        |      |             | j                  j                         sv| j                  j                          d {    d | _        y y 7 7 w)Nc              3   4   K   | ]  }|j                     y wr0   )
size_bytes).0rF   s     r,   	<genexpr>z1SubscriberImpl.stop_processing.<locals>.<genexpr>   s     #Le7G$6$6es   )allowed_messagesallowed_bytes)r:   r   r    empty
get_nowaitsumr   addr   lenr   handle_resetr   )r+   rX   batchr_   s       r,   stop_processingzSubscriberImpl.stop_processing   s       """5!))//18<8K8K8V8V8X ##Le#L L..22&),U&3 ))//1 %%22444)-D& " 	# 5s(   C%C!BC%6C%C#C%#C%
connectionc                 d  K   t        | j                        }| j                  )t        t	        | j                  dz               |_        n)t        t        j                  j                        |_        |j                  t        |             d {    |j                          d {   }d|vr%| j                  j                  t        d             y | j                  j                         }|#|j                  t        |             d {    | j!                          y 7 7 7 w)	N   )rC   )rB   )named_target)initialrl   z=Received an invalid initial response on the subscribe stream.rK   )r   r   r   r   r   initial_locationNamedTargetCOMMITTED_CURSORrN   r   rI   r   r>   r   r   request_for_restartr7   )r+   rh   rl   r;   tokenss        r,   reinitializezSubscriberImpl.reinitialize   s
     4--.%%1'2T%?%?!%CD(G$ (3(44EE(G$ /@AAA#**H$!!"S
 //CCE""#3#HIII 	B* Js7   BD0D*D0,D,-A'D0D.D0,D0.D0returnc                    K   | j                   j                  | j                  j                                d {   S 7 wr0   )r   await_unless_failedr    getr2   s    r,   rI   zSubscriberImpl.read   s2     %%99$:M:M:Q:Q:STTTTs   7A >A requestc                 :    | j                   j                  |       y r0   )r   rc   )r+   rw   s     r,   
allow_flowzSubscriberImpl.allow_flow   s    &&**73r.   N)$__name__
__module____qualname__r   __annotations__floatr   r   r   r   r   r   intr)   Futurer
   r-   r1   r7   r:   rG   r5   rP   r6   rT   r   rg   r	   rr   r   r   metapbrI   r   ry    r.   r,   r   r   1   s    +*#$46G$GHH**11#C=(CC''w~~&&- # ##35F#FG	
 .$B!1): 1<,
*
D.+= ."$%57H%HI4UD!1!6!6!9!9: U4"4 4r.   r   )#r)   copyr   typingr   r   google.api_core.exceptionsr   r   6google.cloud.pubsublite.internal.wait_ignore_cancelledr   0google.cloud.pubsublite.internal.wire.connectionr	   r
   >google.cloud.pubsublite.internal.wire.connection_reinitializerr   :google.cloud.pubsublite.internal.wire.flow_control_batcherr   2google.cloud.pubsublite.internal.wire.reset_signalr   9google.cloud.pubsublite.internal.wire.retrying_connectionr   0google.cloud.pubsublite.internal.wire.subscriberr   google.cloud.pubsublite_v1r   r   r   r   r   r   r   >google.cloud.pubsublite.internal.wire.subscriber_reset_handlerr   r   r   r.   r,   <module>r      sc      ! M U O X G  
\4'(8:K(KL\4r.   