
                             d dl Zd dlZd dlmZ d dlmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ  G d dee      Zy)    N)ThreadPoolExecutor)ContextManagerOptional)GoogleAPICallError)partial)wait_ignore_errors)ManagedEventLoop)StreamingPullManagerCloseCallback)AsyncSingleSubscriber)MessageCallbackc                       e Zd ZU eed<   eed<   eed<   eed<   ej                  j                  ed<   ej                  ed<   ee   ed<   ee   ed<   eed	<   d
ededefdZdefdZd ZdefdZd Zd Zd Zy)SubscriberImpl_underlying	_callback_unowned_executor_event_loop_poller_future_close_lock_failure_close_callback_closed
underlyingcallbackunowned_executorc                     || _         || _        || _        t        d      | _        t        j                         | _        d | _        d | _	        d| _
        y )NSubscriberLoopThreadF)r   r   r   r	   r   	threadingLockr   r   r   r   )selfr   r   r   s       Olib/third_party/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py__init__zSubscriberImpl.__init__3   sN     &!!1+,BC$>>+#    close_callbackc                 p    | j                   5  | j                  J || _        ddd       y# 1 sw Y   yxY w)z
        A close callback must be set exactly once by the StreamingPullFuture managing this subscriber.

        This two-phase init model is made necessary by the requirements of StreamingPullFuture.
        N)r   r   )r    r$   s     r!   add_close_callbackz!SubscriberImpl.add_close_callbackB   s3     ''///#1D  s   ,5c                     | j                   5  | j                  r
	 d d d        y d| _        d d d        | j                  d d d        y # 1 sw Y   xY w)NT)r   r   __exit__r    s    r!   closezSubscriberImpl.closeL   sF    ||   DL  	dD$'	 s   AAAerrorc                 2    || _         | j                          y N)r   r*   )r    r+   s     r!   _failzSubscriberImpl._failS   s    

r#   c                 .  K   	 	 | j                   j                          d {   }| j                  j                  | j                  |       I7 +# t
        $ r9}| j                  j                  t        | j                  |             Y d }~y d }~ww xY wwr-   )	r   readr   mapr   r   submitr   r.   )r    batches      r!   _pollerzSubscriberImpl._pollerW   s{     	B"..3355&&**4>>5A 5! 	B""))'$**a*@AA	Bs2   BA A,A 	B/BBBBc                 4   | j                   J | j                  j                          | j                  j                  | j                  j                               j                          | j                  j                  | j                               | _        | S r-   )	r   r   	__enter__r2   r   
__aenter__resultr5   r   r)   s    r!   r7   zSubscriberImpl.__enter___   sv    ##///""$ 0 0 ; ; =>EEG"..55dllnEr#   c           	         | j                   j                          	 | j                   j                          | j                  j	                  t        | j                  j                  |||                  j                          | j                  j                  |||       | j                  J | j                  | | j                         y #  Y xY wr-   )r   cancelr9   r   r2   r   r   	__aexit__r(   r   r   )r    exc_type	exc_value	tracebacks       r!   r(   zSubscriberImpl.__exit__f   s    ""$	&&( 	  **8Y	J	
 &(!!(IyA##///T4==1	s   C CN)__name__
__module____qualname__r   __annotations__r   r   r	   
concurrentfuturesFuturer   r   r   r   r   boolr"   r&   r*   r.   r5   r7   r(    r#   r!   r   r   &   s    &&))!!&&---)**m,,M) " -	2 2(- B2r#   r   )concurrent.futuresrD   r   concurrent.futures.threadr   typingr   r   google.api_core.exceptionsr   	functoolsr   6google.cloud.pubsublite.internal.wait_ignore_cancelledr   ?google.cloud.pubsublite.cloudpubsub.internal.managed_event_loopr	   Cgoogle.cloud.pubsublite.cloudpubsub.internal.streaming_pull_managerr
   r   >google.cloud.pubsublite.cloudpubsub.internal.single_subscriberr   ?google.cloud.pubsublite.cloudpubsub.subscriber_client_interfacer   r   rH   r#   r!   <module>rS      sA      8 + 9  U
M2^%9 M2r#   