
                             d dl Z d dl mZ d dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZmZ d dlmZ d dlmZmZmZmZ d d	lmZ d d
lmZ dZdZ G d deeef   e      Zy)    N)Future)	Cancelled)adapt_error)is_retryable)wait_ignore_errorswait_ignore_cancelled)ConnectionReinitializer)
ConnectionRequestResponseConnectionFactory)WorkItem)PermanentFailableg{Gz?
   c                   ,    e Zd ZU dZeeef   ed<   eeef   ed<   e	j                  ed<   e	j                  ed<   ded<   ded	<   d
eeef   deeef   f fdZd Zd ZdeddfdZdefdZd Zdeeef   fdZedeeef   deeef   fd       Z xZS )RetryingConnectionz_A connection which performs retries on an underlying stream when experiencing retryable errors._connection_factory_reinitializer_initialized_once
_loop_taskz&asyncio.Queue[WorkItem[Request, None]]_write_queuezasyncio.Queue[Response]_read_queueconnection_factoryreinitializerc                     t         |           || _        || _        t	        j
                         | _        t	        j                  d      | _        t	        j                  d      | _	        y )N   maxsize)
super__init__r   r   asyncioEventr   Queuer   r   )selfr   r   	__class__s      Llib/third_party/google/cloud/pubsublite/internal/wire/retrying_connection.pyr    zRetryingConnection.__init__7   sP    
 	#5 +!(#MM!4"==3    c                    K   t        j                  | j                               | _        | j	                  | j
                  j                                d {    | S 7 wN)r!   ensure_future	_run_loopr   await_unless_failedr   waitr$   s    r&   
__aenter__zRetryingConnection.__aenter__C   sM     !//0@A&&t'='='B'B'DEEE 	Fs   AA AA c                    K   | j                  t        d             | j                  j                          t	        | j                         d {    y 7 w)NzConnection shutting down.)failr   r   cancelr   )r$   exc_typeexc_valexc_tbs       r&   	__aexit__zRetryingConnection.__aexit__H   s:     		)789  111s   AAAArequestreturnNc                    K   t        |      }| j                  | j                  j                  |             d {    | j                  |j                         d {   S 7 '7 wr)   )r   r,   r   putresponse_future)r$   r7   items      r&   writezRetryingConnection.writeM   sY      &&t'8'8'<'<T'BCCC--d.B.BCCC 	DCs!   9A'A#"A'A%A'%A'c                 p   K   | j                  | j                  j                                d {   S 7 wr)   )r,   r   getr.   s    r&   readzRetryingConnection.readR   s,     --d.>.>.B.B.DEEEEs   -646c                   K   	 d}| j                         s	 | j                  j                         }| d{   4 d{   }| j                  j	                  |       d{    | j
                  j                          d}| j                  |       d{    ddd      d{    | j                         syy7 7 7 `7 -7 # 1 d{  7  sw Y   /xY w# t        $ r}| j                         rY d}~yt        |      }t        j                  dt        j                                t        |      s| j                  |       Y d}~y	 | j                  j!                  |       d{  7   n4# t        $ r(}| j                  t        |             Y d}~Y d}~yd}~ww xY w| j"                  j%                         s`| j"                  j'                         j(                  }|j+                         s|j-                  |       | j"                  j%                         s`t/        j0                  d      | _        t/        j0                  d      | _        t5        t/        j6                  t9        t:        t<        d|z  z                     d{  7   |dz  }Y d}~d}~ww xY w# t        $ rL}t        j                   dt        j                                | j                  t        |             Y d}~yd}~ww xY ww)z[
        Processes actions on this connection and handles retries until cancelled.
        r   Nz Saw a stream failure. Cause: 
%sr   r      z4Saw a stream failure which was unhandled. Cause: 
%s)errorr   newr   reinitializer   set_loop_connection	Exceptionr   loggingdebug	traceback
format_excr   r1   stop_processingr   empty
get_nowaitr;   	cancelledset_exceptionr!   r#   r   r   sleepmin_MAX_BACKOFF_SECS_MIN_BACKOFF_SECS)r$   bad_retriesconn_fut
connectione
stop_errorr;   s          r&   r+   zRetryingConnection._run_loopU   sc    1	&Kjjl(%#77;;=H&.:"11>>&   ..224&'"33&   !/ jjl !/
 !/ ! %zz|#AAMM;Y=Q=Q=S (?		!"11AA!DDD$ 		+j"9: #//557*.*;*;*F*F*H*X*X.88:+99!< #//557 (/}}Q'?D$(/a(@D%/ 1 1Q^ D    1$K;%<  	&MMG$$& IIk!n%%	&sH  LJ- C B?C C C C	"C#4C	CC	C 'C(C ,J- =L?C C C	C	C 	CCCC 
J*(J%8J- <L=AJ%J- LE70E31E76J%7	F( F#J%J- "L#F((A<J%%A/J%J
J%J- %J**J- -	L6AK=8L=LLrX   c                   K   t        j                  |j                               }t        j                  | j                  j	                               }	 	 t        j
                  ||gt         j                         d {   \  }}||v rO| j                  || d {          d {    t        j                  | j                  j	                               }||v rN| j                  j                  | d {          d {    t        j                  |j                               }7 7 7 7 :7 0# |j                          |j                          t        |       d {  7   t        |       d {  7   w xY ww)N)return_when)r!   r*   r@   r   r?   r-   FIRST_COMPLETED_handle_writer   r:   r2   r   )r$   rX   	read_task
write_taskdone_s         r&   rG   z#RetryingConnection._loop_connection   sA    (/(=(=joo>O(P	8?8M8M!!#9

	1 '+9P9P! a %,,Zz9IJJJ!(!6!6t7H7H7L7L7N!OJ$**..Y??? ' 5 5joo6G HI  :JJ 0?? $Y///$Z000s   AF+D7 ?D- D7 D/
D7 (D1)AD7 8D39D7 D5)D7 /D7 1D7 3D7 5D7 7/F&E)'F:E=;FFto_writec                    K   	 | j                  |j                         d {    |j                  j                  d        y 7  # t        $ r-}t        |      }|j                  j                  |       |d }~ww xY wwr)   )r=   r7   r;   
set_resultrH   r   rQ   )rX   rc   rY   s      r&   r^   z RetryingConnection._handle_write   so     	""8#3#3444$$//5 5 	AA$$2215G	s8   A>A AA A>A 	A;(A66A;;A>)__name__
__module____qualname____doc__r   r   r   __annotations__r	   r!   r"   r   r    r/   r6   r=   r@   r+   r
   rG   staticmethodr   r^   __classcell__)r%   s   @r&   r   r   +   s    i*7H+<==+GX,=>>}}$::**
4-gx.?@
4 /w/@A
4
2
D7 Dt D
FH F5&n1GX<M1N 1, 	w01	=EgxFW=X	 	r'   r   )r!   r   rI   rK   google.api_core.exceptionsr   8google.cloud.pubsublite.internal.wire.permanent_failabler   -google.cloud.pubsublite.internal.status_codesr   6google.cloud.pubsublite.internal.wait_ignore_cancelledr   r   >google.cloud.pubsublite.internal.wire.connection_reinitializerr	   0google.cloud.pubsublite.internal.wire.connectionr
   r   r   r   /google.cloud.pubsublite.internal.wire.work_itemr   r   rU   rT   r    r'   r&   <module>ru      s`        0 P F  E V  AGX$568I Ar'   