
                             d dl Z d dlmZmZ d dlZd dlmZ d dlmZ d dl	m
Z
mZ d dlmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZ  ej2                  e      Z G d deeeef         Zy)    N)OptionalList)wait_ignore_errors)	Committer)RetryingConnectionConnectionFactory)FailedPreconditionGoogleAPICallError)ConnectionReinitializer)
Connection)Cursor)StreamingCommitCursorRequestStreamingCommitCursorResponseInitialCommitCursorRequestc                   H   e Zd ZU eed<   eed<   eeef   ed<   e	e
   ed<   ee
   ed<   e	ej                     ed<   e	ej                     ed<   ej                  ed<   d	ed
edeeef   fdZd Zd Zd ZdefdZd Zd Zd Zd Zd Zde
ddfdZdefdZdeeef   fdZy)CommitterImpl_initial_flush_seconds_connection_next_to_commit_outstanding_commits	_receiver_flusher_emptyinitialflush_secondsfactoryc                     || _         || _        t        ||       | _        d | _        g | _        d | _        d | _        t        j                         | _
        | j                  j                          y N)r   r   r   r   r   r   r   r   asyncioEventr   set)selfr   r   r   s       Glib/third_party/google/cloud/pubsublite/internal/wire/committer_impl.py__init__zCommitterImpl.__init__=   s[      +-gt<#$&!mmo    c                 V   K   | j                   j                          d {    | S 7 wr   )r   
__aenter__r#   s    r$   r(   zCommitterImpl.__aenter__O   s(     ))+++ 	,s   )')c                     | j                   J | j                  J t        j                  | j	                               | _         t        j                  | j                               | _        y r   )r   r   r    ensure_future_receive_loop_flush_loopr)   s    r$   _start_looperszCommitterImpl._start_loopersS   sX    ~~%%%}}$$$ ..t/A/A/CD--d.>.>.@Ar&   c                 >  K   | j                   r>| j                   j                          t        | j                          d {    d | _         | j                  r?| j                  j                          t        | j                         d {    d | _        y y 7 W7 wr   )r   cancelr   r   r)   s    r$   _stop_looperszCommitterImpl._stop_loopersY   sv     >>NN!!#$T^^444!DN==MM  "$T]]333 DM  5 4s$   ?BBA	BBBBresponsec                    d|vr$| j                   j                  t        d             |j                  j                  t        | j                        kD  r$| j                   j                  t        d             t        |j                  j                        D ]  }| j                  j                  d        t        | j                        dk(  r| j                  j                          y y )Ncommitz=Received an invalid subsequent response on the commit stream.zEReceived a commit response on the stream with no outstanding commits.r   )r   failr	   r4   acknowledged_commitslenr   rangepopr   r"   )r#   r2   _s      r$   _handle_responsezCommitterImpl._handle_responsec   s    8#!!"S
 ??//#d6O6O2PP!!"[
 x;;<A%%))!, =t(()Q.KKOO /r&   c                 x   K   	 | j                   j                          d {   }| j                  |       47 wr   )r   readr;   )r#   r2   s     r$   r,   zCommitterImpl._receive_loopu   s6     !--2244H!!(+ 4s   :8:c                    K   	 t        j                  | j                         d {    | j                          d {    @7 7 wr   )r    sleepr   _flushr)   s    r$   r-   zCommitterImpl._flush_loopz   s<     -- 3 3444++- 4s   $AAAAAAc                    K   | j                          d {    | j                  j                         s| j                          d {    | j                  j	                  |||       d {    y 7 \7 ,7 	wr   )r1   r   errorr@   	__aexit__)r#   exc_typeexc_valexc_tbs       r$   rC   zCommitterImpl.__aexit__   sd       """%%'++-((7FCCC 	#Cs3   A9A31A9A5	$A9-A7.A95A97A9c                   K   | j                   y t               }| j                   |j                  _        | j                  j                  | j                          d | _         | j                  j                          	 | j                  j                  |       d {    y 7 # t        $ r"}t        j                  d|        Y d }~y d }~ww xY ww)NzFailed commit on stream: )r   r   r4   cursorr   appendr   clearr   writer
   _LOGGERdebug)r#   reqes      r$   r@   zCommitterImpl._flush   s     '*, 00

!!(()=)=>#	;""((---! 	;MM5aS9::	;sB   A9C<B" B B" C B" "	C+CCCCc                    K   | j                          d {    | j                  j                  | j                  j	                                d {    y 7 @7 wr   )r@   r   await_unless_failedr   waitr)   s    r$   wait_until_emptyzCommitterImpl.wait_until_empty   sE     kkm224;;3C3C3EFFF 	Fs!   AA:AAAArH   returnNc                 z    | j                   j                         r| j                   j                         || _        y r   )r   rB   r   )r#   rH   s     r$   r4   zCommitterImpl.commit   s1    !!#""((**%r&   rB   c                 @   K   | j                          d {    y 7 wr   )r1   )r#   rB   s     r$   stop_processingzCommitterImpl.stop_processing   s       """s   
connectionc                 t  K   |j                  t        | j                               d {    |j                          d {   }d|vr$| j                  j                  t        d             | j                   | j                  r| j                  d   | _        g | _        | j                          y 7 7 rw)N)r   r   z;Received an invalid initial response on the publish stream.)
rK   r   r   r=   r   r5   r	   r   r   r.   )r#   rX   r2   s      r$   reinitializezCommitterImpl.reinitialize   s      ;DMMRSSS#**H$!!"Q
 '(('+'@'@'D$$&! 	T*s"   )B8B4B8B6A1B86B8)__name__
__module____qualname__r   __annotations__floatr   r   r   r   r   r   r    Futurer!   r   r%   r(   r.   r1   r;   r,   r-   rC   r@   rS   r4   r
   rW   r   r[    r&   r$   r   r   *   s    )(#$&CC  f%%v,&''w~~&&MM+  #(*GG
	$B!)F $,
 
D;G&V & &
#+= #(*GG
r&   r   )r    typingr   r   logging6google.cloud.pubsublite.internal.wait_ignore_cancelledr   /google.cloud.pubsublite.internal.wire.committerr   9google.cloud.pubsublite.internal.wire.retrying_connectionr   r   google.api_core.exceptionsr	   r
   >google.cloud.pubsublite.internal.wire.connection_reinitializerr   0google.cloud.pubsublite.internal.wire.connectionr   google.cloud.pubsublite_v1r    google.cloud.pubsublite_v1.typesr   r   r   	getLoggerr\   rL   r   rb   r&   r$   <module>rn      sm     !  U E N H -  '

H
%F$&CCFr&   