
    O                         d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
mZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dl m!Z! d dl"m#Z$ d dl"mZ% d dl&m'Z( e$jR                  Z)ejT                  rd dl+m,Z, d dlm-Z- d dl.m/Z/ d dl0m1Z2  ejf                  e4      Z5e%jl                  jo                         Z8eejr                  ejt                  f   Z; G d de(jx                        Z=y)    )absolute_importN)AnyDictOptionalSequenceTupleTypeUnion)gapic_v1)AnonymousCredentials)service_account)types)
exceptions)futures)thread)ordered_sequencer)unordered_sequencer)FlowController)gapic_version)client)	pubsub_v1)_batch)OptionalRetry)pubsubc                       e Zd ZdZ	 	 d'deej                  ef   deej                  ef   de	f fdZ
e	 d(dedeej                  ef   de	dd fd       ZeZedefd	       Ze fd
       ZdededefdZdededdfdZd) fdZdej.                  j0                  ej.                  j0                  fdedededddddeeef   ddfdZd*dZd*dZd*dZd*dZd*dZd*dZ 	 d+ded d!deddfd"Z!d#e"ddfd$Z#	 d+ded%ededdfd&Z$ xZ%S ),Clienta  A publisher client for Google Cloud Pub/Sub.

    This creates an object that is capable of publishing messages.
    Generally, you can instantiate this client with no arguments, and you
    get sensible defaults.

    Args:
        batch_settings:
            The settings for batch publishing.
        publisher_options:
            The options for the publisher client. Note that enabling message ordering
            will override the publish retry timeout to be infinite.
        kwargs:
            Any additional arguments provided are sent as keyword arguments to the
            underlying
            :class:`~google.cloud.pubsub_v1.gapic.publisher_client.PublisherClient`.
            Generally you should not need to set additional keyword
            arguments. Regional endpoints can be set via ``client_options`` that
            takes a single key-value pair that defines the endpoint.

    Example:

    .. code-block:: python

        from google.cloud import pubsub_v1

        publisher_client = pubsub_v1.PublisherClient(
            # Optional
            batch_settings = pubsub_v1.types.BatchSettings(
                max_bytes=1024,  # One kilobyte
                max_latency=1,   # One second
            ),

            # Optional
            publisher_options = pubsub_v1.types.PublisherOptions(
                enable_message_ordering=False,
                flow_control=pubsub_v1.types.PublishFlowControl(
                    message_limit=2000,
                    limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK,
                ),
            ),

            # Optional
            client_options = {
                "api_endpoint": REGIONAL_ENDPOINT
            }
        )
    batch_settingspublisher_optionskwargsc                    t        |      t        j                  u st        |      dk(  sJ d       t        |      t        j                  u st        |      dk(  sJ d       t
        j                  j                  d      r1dt
        j                  j                  d      i|d<   t               |d<   t        j                  | | _	        | j                  d   | _
        t        | 0  d	i | | j                  j                  | _        t         j"                  | _        t        j                  | | _        | j$                  j)                         | _        i | _        d| _        d | _        t3        | j                  j4                        | _        y )
Nr   zBbatch_settings must be of type BatchSettings or an empty sequence.zHpublisher_options must be of type PublisherOptions or an empty sequence.PUBSUB_EMULATOR_HOSTapi_endpointclient_optionscredentialsF )typer   BatchSettingslenPublisherOptionsosenvirongetr   r   _enable_message_orderingsuper__init__
_transport_host_targetr   Batch_batch_classr   	make_lock_batch_lock_sequencers_is_stopped_commit_threadr   flow_control_flow_controller)selfr   r   r   	__class__s       :lib/third_party/google/cloud/pubsub_v1/publisher/client.pyr/   zClient.__init__n   sk     E$7$773~;NRS;S	PO	PS "#u'='==$%*	V V	V+ ::>>01

/E F(F#$ %9$:F=! "'!7!79J!K(,(>(>q(A% 	"6",,"LL#11>B  ,,668AC :> !/t/E/E/R/R S    filenamereturnc                 \    t         j                  j                  |      }||d<    | |fi |S )a  Creates an instance of this client using the provided credentials
        file.

        Args:
            filename:
                The path to the service account private key JSON file.
            batch_settings:
                The settings for batch publishing.
            kwargs:
                Additional arguments to pass to the constructor.

        Returns:
            A Publisher instance that is the constructed client.
        r$   )r   Credentialsfrom_service_account_file)clsr@   r   r   r$   s        r>   rD   z Client.from_service_account_file   s4    * &11KKHU +}>,V,,r?   c                     | j                   S )zeReturn the target (where the API is).

        Returns:
            The location of the API.
        )r2   r<   s    r>   targetzClient.target   s     ||r?   c                 R    d}t        j                  |t               t               S )a  The underlying gapic API client.

        .. versionchanged:: 2.10.0
            Instead of a GAPIC ``PublisherClient`` client instance, this property is a
            proxy object to it with the same interface.

        .. deprecated:: 2.10.0
            Use the GAPIC methods and properties on the client instance directly
            instead of through the :attr:`api` attribute.
        zThe "api" property only exists for backward compatibility, access its attributes directly thorugh the client instance (e.g. "client.foo" instead of "client.api.foo").)category)warningswarnDeprecationWarningr.   )r<   msgr=   s     r>   apiz
Client.api   s%    , 	
 	c$67wr?   topicordering_keyc                     ||f}| j                   j                  |      }|B|dk(  rt        j                  | |      }nt	        j
                  | ||      }|| j                   |<   |S )zdGet an existing sequencer or create a new one given the (topic,
        ordering_key) pair.
         )r7   r,   r   UnorderedSequencerr   OrderedSequencerr<   rP   rQ   sequencer_key	sequencers        r>   _get_or_create_sequencerzClient._get_or_create_sequencer   ss     -$$((7	r!/BB4O	->>%	 /8D]+r?   Nc                 0   | j                   5  | j                  rt        d      | j                  st	        d      ||f}| j
                  j                  |      }|t        j                  d       n|j                          ddd       y# 1 sw Y   yxY w)a*  Resume publish on an ordering key that has had unrecoverable errors.

        Args:
            topic: The topic to publish messages to.
            ordering_key: A string that identifies related messages for which
                publish order should be respected.

        Raises:
            RuntimeError:
                If called after publisher has been stopped by a `stop()` method
                call.
            ValueError:
                If the topic/ordering key combination has not been seen before
                by this client.
        z-Cannot resume publish on a stopped publisher.zICannot resume publish on a topic/ordering key if ordering is not enabled.NzCError: The topic/ordering key combination has not been seen before.)
r6   r8   RuntimeErrorr-   
ValueErrorr7   r,   _LOGGERdebugunpauserV   s        r>   resume_publishzClient.resume_publish   s      "#RSS00 & 
 #L1M((,,];I (
 !!#% s   A6BBc                 "    t        |   |i |S )z#Call the GAPIC public API directly.)r.   publish)r<   argsr   r=   s      r>   _gapic_publishzClient._gapic_publish	  s    w///r?   rS   dataretryr   timeoutztypes.OptionalTimeoutattrsz"pubsub_v1.publisher.futures.Futurec                 6    t        |t              st        d       j                  s|dk7  rt	        d      t        j
                  |      j                         D ]E  \  }}t        |t              rt        |t              r|j                  d      ||<   <t        d       t        |||      }	t        j                  j                  |	      	  j                  j                          fd}|t(        j*                  j,                  u r j.                  j0                  }|t(        j*                  j,                  u r j.                  j2                  } j4                  5   j6                  rt9        d	       j                  rn|t(        j*                  j,                  u rA j:                  }|j<                  |j>                     j@                  }|jC                  d
      }n|jC                  d
      } jE                  ||      }|j?                  ||      }|jG                  |        jI                          |cddd       S # t        j                   $ r1}
t#        j$                         }|j'                  |
       |cY d}
~
S d}
~
ww xY w# 1 sw Y   yxY w)aN  Publish a single message.

        .. note::
            Messages in Pub/Sub are blobs of bytes. They are *binary* data,
            not text. You must send data as a bytestring
            (``bytes`` in Python 3; ``str`` in Python 2), and this library
            will raise an exception if you send a text string.

            The reason that this is so important (and why we do not try to
            coerce for you) is because Pub/Sub is also platform independent
            and there is no way to know how to decode messages properly on
            the other side; therefore, encoding and decoding is a required
            exercise for the developer.

        Add the given message to this object; this will cause it to be
        published once the batch either has enough messages or a sufficient
        period of time has elapsed.
        This method may block if LimitExceededBehavior.BLOCK is used in the
        flow control settings.

        Example:
            >>> from google.cloud import pubsub_v1
            >>> client = pubsub_v1.PublisherClient()
            >>> topic = client.topic_path('[PROJECT]', '[TOPIC]')
            >>> data = b'The rain in Wales falls mainly on the snails.'
            >>> response = client.publish(topic, data, username='guido')

        Args:
            topic: The topic to publish messages to.
            data: A bytestring representing the message body. This
                must be a bytestring.
            ordering_key: A string that identifies related messages for which
                publish order should be respected. Message ordering must be
                enabled for this client to use this feature.
            retry:
                Designation of what errors, if any, should be retried. If `ordering_key`
                is specified, the total retry deadline will be changed to "infinity".
                If given, it overides any retry passed into the client through
                the ``publisher_options`` argument.
            timeout:
                The timeout for the RPC request. Can be used to override any timeout
                passed in through ``publisher_options`` when instantiating the client.

            attrs: A dictionary of attributes to be
                sent as metadata. (These may be text strings or byte strings.)

        Returns:
            A :class:`~google.cloud.pubsub_v1.publisher.futures.Future`
            instance that conforms to Python Standard library's
            :class:`~concurrent.futures.Future` interface (but not an
            instance of that class).

        Raises:
            RuntimeError:
                If called after publisher has been stopped by a `stop()` method
                call.

            pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
                the ``message`` would exceed the max size limit on the backend.
        z=Data being published to Pub/Sub must be sent as a bytestring.rS   zSCannot publish a message with an ordering key when message ordering is not enabled.zutf-8zGAll attributes being published to Pub/Sub must be sent as text strings.)re   rQ   
attributesNc                 <    j                   j                         y N)r;   release)futuremessager<   s    r>   on_publish_donez'Client.publish.<locals>.on_publish_done|  s    !!))'2r?   z&Cannot publish on a stopped publisher.g      A)rf   rg   )%
isinstancebytes	TypeErrorr-   r\   copyitemsstrdecode_raw_proto_pubbsub_messagegapic_typesPubsubMessagewrapr;   addr   FlowControlLimitErrorr   Futureset_exceptionr   methodDEFAULTr   rf   rg   r6   r8   r[   r0   _wrapped_methodsrb   _retrywith_deadlinerY   add_done_callback!_ensure_commit_timer_runs_no_lock)r<   rP   re   rQ   rf   rg   rh   kv
vanilla_pbexcrn   rp   	transport
base_retryrX   ro   s   `               @r>   rb   zClient.publish  sG   N $&O  ,,1C+  IIe$**,DAq!S!!U#88G,a+  - 0LU

 ++00<	!!%%g.	3 HOO+++**00Ehoo---,,44G"#KLL
 ,,HOO333 $I!*!;!;I<M<M!N!U!UJ&44W=E!//8E 55e\JI&&weW&MF$$_5 2243  // 	^^%F  %M	 s+   I $CJJ&JJJJc                 f    | j                   5  | j                          ddd       y# 1 sw Y   yxY w)zEnsure a cleanup/commit timer thread is running.

        If a cleanup/commit timer thread is already running, this does nothing.
        N)r6   r   rG   s    r>   $ensure_cleanup_and_commit_timer_runsz+Client.ensure_cleanup_and_commit_timer_runs  s%    
 224 s   '0c                     | j                   s4| j                  j                  t        d      k  r| j	                          yyy)zEnsure a commit timer thread is running, without taking
        _batch_lock.

        _batch_lock must be held before calling this method.
        infN)r9   r   max_latencyfloat_start_commit_threadrG   s    r>   r   z(Client._ensure_commit_timer_runs_no_lock  s9     ""t':':'F'Fu'U%%' (V"r?   c                     t        j                  d| j                  d      | _        | j                  j	                          y)z>Start a new thread to actually wait and commit the sequencers.zThread-PubSubBatchCommitterT)namerH   daemonN)	threadingThread_wait_and_commit_sequencersr9   startrG   s    r>   r   zClient._start_commit_thread  s:    
 (...33

 	!!#r?   c                    t        j                  | j                  j                         t        j                  d       | j                  5  | j                  r
	 ddd       y| j                          d| _	        ddd       y# 1 sw Y   yxY w)z;Wait up to the batching timeout, and commit all sequencers.zCommit thread is waking upN)
timesleepr   r   r]   r^   r6   r8   _commit_sequencersr9   rG   s    r>   r   z"Client._wait_and_commit_sequencers  sh     	

4&&22323  ##%"&D	 s   B"BBc                    | j                   j                         D cg c]  \  }}|j                         r| }}}|D ]  }| j                   |=  | j                   j                         D ]  }|j	                           yc c}}w )z1Clean up finished sequencers and commit the rest.N)r7   ru   is_finishedvaluescommit)r<   keyrX   finished_sequencer_keysrW   s        r>   r   zClient._commit_sequencers  s     #'"2"2"8"8":#
":Y$$& ": 	  #

 5M  / 5 ))002I 3#
s   Bc                     | j                   5  | j                  rt        d      d| _        | j                  j	                         D ]  }|j                           	 ddd       y# 1 sw Y   yxY w)a  Immediately publish all outstanding messages.

        Asynchronously sends all outstanding messages and
        prevents future calls to `publish()`. Method should
        be invoked prior to deleting this `Client()` object
        in order to ensure that no pending messages are lost.

        .. note::

            This method is non-blocking. Use `Future()` objects
            returned by `publish()` to make sure all publish
            requests completed, either in success or error.

        Raises:
            RuntimeError:
                If called after publisher has been stopped by a `stop()` method
                call.
        z(Cannot stop a publisher already stopped.TN)r6   r8   r[   r7   r   stop)r<   rX   s     r>   r   zClient.stop  sY    & "#MNN#D!--446	  7 s   AA%%A.batchz_batch.thread.Batchc                 J    | j                  ||      }|j                  |       y rl   )rY   
_set_batch)r<   rP   r   rQ   rX   s        r>   r   zClient._set_batch  s$     11%F	U#r?   batch_classc                     || _         y rl   )r4   )r<   r   s     r>   _set_batch_classzClient._set_batch_class  s
    'r?   rX   c                 *    ||f}|| j                   |<   y rl   )r7   )r<   rP   rX   rQ   rW   s        r>   _set_sequencerzClient._set_sequencer  s     -*3'r?   )r%   r%   )r%   )rA   zpubsub_types.PublishResponse)rA   N)rS   )&__name__
__module____qualname____doc__r
   r   r'   r   r)   r   r/   classmethodrv   rD   from_service_account_jsonpropertyrH   rO   SequencerTyperY   r`   rd   r   r   r   rr   rb   r   r   r   r   r   r   r   r	   r   r   __classcell__)r=   s   @r>   r   r   <   s   /f @BEG,Te118;<,T !!7!7!AB,T 	,T\  @B-- e118;<- 	-
 
- -0 !:    &c   ""$C "$s "$t "$H0 !)!8!8+3??+B+BQQ Q 	Q
 Q )Q ucz"Q 
.Qf5(
$
'!< MO$$!6$FI$	$(D (T (
 IK44%24BE4	4r?   r   )>
__future__r   rt   loggingr*   r   r   typingr   r   r   r   r   r	   r
   rK   google.api_corer   google.auth.credentialsr   google.oauth2r   google.cloud.pubsub_v1r    google.cloud.pubsub_v1.publisherr   r   'google.cloud.pubsub_v1.publisher._batchr   +google.cloud.pubsub_v1.publisher._sequencerr   r   0google.cloud.pubsub_v1.publisher.flow_controllerr   google.pubsub_v1r   package_versionry   #google.pubsub_v1.services.publisherr   publisher_client__version__TYPE_CHECKINGgoogle.cloudr   r   *google.pubsub_v1.services.publisher.clientr   google.pubsub_v1.typesr   pubsub_types	getLoggerr   r]   rz   pbrx   rU   rT   r   PublisherClientr   r%   r?   r>   <module>r      s    '   	    D D D  $ 8 ) ( 7 4 : I K K = 1 J))	&7H= '

H
% )6699; &&(;(N(NN
F4-- F4r?   