
                             d dl mZmZmZ d dlmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d d	lmZ eegef   Z G d
 de      Z G d dee      Zy)    )FutureQueueensure_future)Callable
NamedTupleDictListSetOptional)Message)ReassignmentHandler)AsyncSingleSubscriber)wait_ignore_cancelledwait_ignore_errors)Assigner)PermanentFailable)	Partitionc                   "    e Zd ZU eed<   eed<   y)_RunningSubscriber
subscriberpollerN)__name__
__module____qualname__r   __annotations__r        Tlib/third_party/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.pyr   r   #   s    %%Nr   r   c                        e Zd ZU eg ef   ed<   eed<   eed<   ee	e
f   ed<   ee   ed<   ed   ed<   eed<   d	eg ef   d
edef fdZdee   fdZdefdZde	fdZde
fdZd Zd Zd Z xZS )AssigningSingleSubscriber_assigner_factory_subscriber_factory_reassignment_handler_subscribers	_assignerzQueue[List[Message]]_batches_assign_pollerassigner_factorysubscriber_factoryreassignment_handlerc                 v    t         |           || _        || _        || _        d| _        i | _        d| _        y)z
        Accepts a factory for an Assigner instead of an Assigner because GRPC asyncio uses the current thread's event
        loop.
        N)super__init__r!   r"   r#   r%   r$   r&   )selfr(   r)   r*   	__class__s       r   r-   z"AssigningSingleSubscriber.__init__4   s>     	!1#5 %9"r   returnc                 p   K   | j                  | j                  j                                d {   S 7 wN)await_unless_failedr&   getr.   s    r   readzAssigningSingleSubscriber.readF   s*     --dmm.?.?.ABBBBs   -646r   c                    K   |j                          d {   }| j                  j                  |       d {    y 7 (7 wr2   )r6   r&   put)r.   r   batchs      r   _subscribe_actionz+AssigningSingleSubscriber._subscribe_actionI   s6      oo''mm&&& (&s   A?"AAAA	partitionc                     K    j                  |      j                          d {    t         j                   fd            }t	        |       j
                  |<   y 7 <w)Nc                  &    j                         S r2   )r:   )new_subscriberr.   s   r   <lambda>z=AssigningSingleSubscriber._start_subscriber.<locals>.<lambda>Q   s    D$:$:>$Jr   )r"   
__aenter__r   
run_pollerr   r$   )r.   r;   r   r>   s   `  @r   _start_subscriberz+AssigningSingleSubscriber._start_subscriberM   s^     11)<'')))OOJK
 (:.&'Q)$	 	*s   %A(A&=A(runningc                    K   |j                   j                          t        |j                          d {    |j                  j	                  d d d        d {    y 7 *7 wr2   )r   cancelr   r   	__aexit__)r.   rC   s     r   _stop_subscriberz*AssigningSingleSubscriber._stop_subscriberU   sO     #GNN333  **4t<<< 	4<s!   3A$A $A$A"A$"A$c                   K   | j                   j                          d {   }t        | j                  j	                               }||z
  }||z
  }|D ]  }| j                  |       d {     |D ]7  }| j                  |   }| j                  |= | j                  |       d {    9 | j                  j                  ||      }|| d {    y y 7 7 n7 47 wr2   )	r%   get_assignmentsetr$   keysrB   rG   r#   handle_reassignment)r.   
assignmentold_assignmentadded_partitionsremoved_partitionsr;   r   maybe_awaitables           r   _assign_actionz(AssigningSingleSubscriber._assign_actionZ   s     +/>>+H+H+J%J
),T->->-C-C-E)F%6+j8)I((333 *+I**95J!!),''
333 , 44HHJ
 &!!! ' &K
 4 4
 "sF   C CA
C +C,;C 'C()C CC C C C c                    K   t               | _        | j                         | _        | j                  j	                          d {    t        | j                  | j                              | _        | S 7 /wr2   )	r   r&   r!   r%   r@   r   rA   rR   r'   r5   s    r   r@   z$AssigningSingleSubscriber.__aenter__k   s[     //1nn'')))+DOOD<O<O,PQ 	*s   AA6A40A6c                 d  K   | j                   j                          t        | j                          d {    t        | j                  j	                  |||             d {    | j
                  j                         D ]$  }t        | j                  |             d {    & y 7 t7 H7 wr2   )r'   rE   r   r%   rF   r$   valuesrG   )r.   exc_type	exc_value	tracebackrC   s        r   rF   z#AssigningSingleSubscriber.__aexit__r   s     ""$ !4!4555 NN$$Xy)D
 	
 	
 ((//1G$T%:%:7%CDDD 2 	6	
 Es3   3B0B*-B0#B,$>B0"B.#B0,B0.B0)r   r   r   r   r   r   PartitionSubscriberFactoryr   r   r   r   r   r   r-   r	   r   r6   r   r:   rB   rG   rR   r@   rF   __classcell__)r/   s   @r   r    r    (   s    H--33..y"4455 !!-.."2x<0 7 2	$CDM C'2G 'R R=.@ =
""r   r    N)asyncior   r   r   typingr   r   r   r	   r
   r   )google.cloud.pubsub_v1.subscriber.messager   8google.cloud.pubsublite.cloudpubsub.reassignment_handlerr   >google.cloud.pubsublite.cloudpubsub.internal.single_subscriberr   6google.cloud.pubsublite.internal.wait_ignore_cancelledr   r   .google.cloud.pubsublite.internal.wire.assignerr   8google.cloud.pubsublite.internal.wire.permanent_failabler   google.cloud.pubsublite.typesr   rY   r   r    r   r   r   <module>rd      s`    1 0 B B = X D V 3%yk3H&HI  
R 57H Rr   