
                             d Z ddlmZ ddlmZ ddlmZ ddlmZ ddlZddlm	Z	 ddl
mZ dd	l
mZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dZ G d dej0                        Zd Z G d de      Zy)z)Utilities for Pub/Sub Lite subscriptions.    )absolute_import)division)unicode_literals)futuresN)Optional)cloudpubsub)types)PubsubMessage)	lite_util)
exceptions)
gapic_util)log)queuel    c                       e Zd ZdZy)SubscribeOperationExceptionz2Error when something went wrong while subscribing.N)__name__
__module____qualname____doc__     7lib/googlecloudsdk/api_lib/pubsub/lite_subscriptions.pyr   r   &   s    :r   r   c                  R    t        j                  t        j                               S )N)credentials)r   SubscriberClientr   GetGapicCredentialsr   r   r   GetDefaultSubscriberClientr   *   s!    		%	%002
4 4r   c                   D    e Zd ZdZ	 d
dZd Zd Zd Zd Zde	e
   fd	Zy)r   z4GCloud wrapper client for a Pub/Sub Lite subscriber.Nc                 >   |xs
 t               | _        t        j                         | _        | j                  |      | _        |D ch c]  }t        j                  |       c}| _	        t        j                  |t              | _        || _        d | _        y c c}w )N)messages_outstandingbytes_outstanding)r   _clientr   Queue	_messages_SubscriptionResourceToPath_subscriptionr	   	Partition_partitionsFlowControlSettings
_MAX_INT64_flow_control_settings	_auto_ack_pull_future)selfsubscription_resource
partitionsmax_messagesauto_ackclient	partitions          r   __init__zSubscriberClient.__init__2   s     979DL[[]DN99:OPDDNOJy	2JOD"'";";)$#D DND Ps   Bc                     | j                   j                          | j                   j                  | j                  | j                  j
                  | j                  | j                        | _        | S )N)callback#per_partition_flow_control_settingsfixed_partitions)	r"   	__enter__	subscriber&   r$   putr+   r(   r-   )r.   s    r   r:   zSubscriberClient.__enter__C   s]    LL..##,0,G,G))	 / +D
 Kr   c                 8   t        j                  d       | j                  j                         s5	 | j                  j	                          | j                  j                          | j                  j                  |||       y # t        j                  $ r Y 3w xY w)N   )
timesleepr-   donecancelresultr   CancelledErrorr"   __exit__)r.   exc_type	exc_value	tracebacks       r   rE   zSubscriberClient.__exit__L   s|    JJqM!!# 	  "  " 	LL(Iy9 ## s   4B BBc                     t        j                  t        j                  |j                        t        j
                  |j                        |j                        S )N)projectlocationname)r	   SubscriptionPathr   ProjectIdToProjectNumber
projectsIdLocationToZoneOrRegionlocationsIdsubscriptionsId)r.   resources     r   r%   z,SubscriberClient._SubscriptionResourceToPathX   sF    !!2283F3FG11(2F2FG%%' 'r   c                     | j                   j                         ra| j                   j                         }|rt        dj	                  |            t        j                  d       t        j                  d      y )Nz.Subscribe operation failed with error: {error})errorzNThe streaming pull future completed unexpectedly without raising an exception.z-The subscribe stream terminated unexpectedly.)	r-   rA   	exceptionr   formatr   debugr   InternalError)r.   es     r   _RaiseIfFailedzSubscriberClient._RaiseIfFailed^   sw    



%
%
'a	
)<CC!CLN 	N	ii ( )$$
9; ;  r   returnc                     | j                          	 | j                  j                  d      }| j                  r|j	                          |S # t
        j                  $ r Y yw xY w)zPulls and optionally acks a message from the provided subscription.

    Returns:
      A PubsubMessage pulled from the subscription.
    r>   )timeoutN)r[   r$   getr,   ackr   Empty)r.   messages     r   PullzSubscriberClient.Pulli   sW     	""1"-g	n;; s   9A A"!A")N)r   r   r   r   r5   r:   rE   r%   r[   r   r
   rc   r   r   r   r   r   /   s4    < "
:'	;H]+ r   r   )r   
__future__r   r   r   
concurrentr   r?   typingr   google.cloud.pubsubliter   r	   google.pubsub_v1r
   !googlecloudsdk.command_lib.pubsubr   googlecloudsdk.corer   r   r   	six.movesr   r*   Errorr   r   objectr   r   r   r   <module>rn      s^     0 &  '    / ) * 7 * * # 
;*"2"2 ;4
Gv Gr   