
    q                        d dl Z d dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZmZ d dlmZmZmZ d	Zd
edefdZdedefdZde j                   defdZdede j                   fdZdedefdZdededefdZdedefdZdej6                  j8                  dej6                  j8                  fdZdedefdZdedefdZy)    N)InvalidArgument)	Timestamp)PubsubMessage)MessageTransformer)fast_serialize)	PartitionMessageMetadata)AttributeValuesSequencedMessagePubSubMessagezx-goog-pubsublite-event-timetsreturnc                 X    t        j                  | j                  | j                  g      S N)r   dumpsecondsnanos)r   s    Ilib/third_party/google/cloud/pubsublite/cloudpubsub/message_transforms.py"_encode_attribute_event_time_protor      s     

BHH566    attrc                     	 t               }t        j                  |       }|d   |_        |d   |_        |S # t
        $ r t        d      w xY w)Nr      z'Invalid value for event time attribute.)r   r   loadr   r   	Exceptionr   )r   r   loadeds      r   "_decode_attribute_event_time_protor   !   sW    I[$$T*AY
!9	 IGHHIs	   47 Adtc                     t               }|j                  | j                  t        j                  j
                               t        |      S r   )r   FromDatetime
astimezonedatetimetimezoneutcr   )r   r   s     r   encode_attribute_event_timer%   ,   s5    	BOOBMM("3"3"7"789-b11r   c                     t        |       j                         j                  t        j                  j
                        S )N)tzinfo)r   
ToDatetimereplacer"   r#   r$   )r   s    r   decode_attribute_event_timer*   2   s.    *40		))--	.r   valuesc                     t        | j                        dk(  st        d      | j                  d   }	 |j                  d      S # t        $ r t        d      w xY w)Nr   zFReceived an unparseable message with multiple values for an attribute.r   utf-8z:Received an unparseable message with a non-utf8 attribute.)lenr+   r   decodeUnicodeError)r+   values     r   _parse_attributesr2   :   sf    v}}"T
 	
 ==#E
||G$$ 
H
 	

s   A A	partitiontransformerc                 J     dt         f fd}t        j                  |      S )Nsourcec                     | j                   }j                  |       }|j                   }|j                  rt        d      t	        j
                  j                  |j                  j                        |_        |S )Nz8Message after transforming has the message_id field set.)	_pb	transform
message_idr   r	   _encode_partsr1   cursoroffset)r6   	source_pbmessage
message_pbr3   r4   s       r   add_id_to_messagez>add_id_to_cps_subscribe_transformer.<locals>.add_id_to_messageK   so    JJ	!,!6!6v!>[[
  !J  !0 = =OOY--44!

 r   )r   r   of_callable)r3   r4   rA   s   `` r   #add_id_to_cps_subscribe_transformerrC   H   s$    "2  ))*;<<r   r6   c                     | j                   }t        |j                        }|j                  j	                  |j                         t               }||_         |S r   )r8   _to_cps_publish_message_protor?   publish_timeCopyFromr   )r6   r>   out_pbouts       r   to_cps_subscribe_messagerJ   [   sH    

I*9+<+<=F
  !7!78
/CCGJr   c                    t         j                  j                         }	 | j                  j	                  d      |_        t        | j                  v rt        d      | j                  |_
        | j                  j                         D ]  \  }}t        |      |j                  |<    | j                  d      r&t        | j                        |j                  t        <   |S # t        $ r t        d      w xY w)Nr-   z4Received an unparseable message with a non-utf8 key.zLSpecial timestamp attribute exists in wire message. Unable to parse message.
event_time)r   metapbkeyr/   ordering_keyr0   r   PUBSUB_LITE_EVENT_TIME
attributesdataitemsr2   HasFieldr   rL   )r6   rI   rO   r+   s       r   rE   rE   d   s     




!CV!::,,W5 !2!22Z
 	
 {{CH((..0V/7s 1|$1S2
-. J  VTUUVs    C! !C6c                 N    t               }t        | j                        |_        |S r   )r   rE   r8   )r6   rI   s     r   to_cps_publish_messagerW   z   s    
/C+FJJ7CGJr   c                     | j                   }t               }|j                   }t        |j                  v r5|j                  j                  t        |j                  t                        |j                  |_        |j                  j                  d      |_
        |j                  j                         D ]F  \  }}|t        k7  s|j                  |   j                  j                  |j                  d             H |S )Nr-   )r8   r   rQ   rR   rL   rG   r   rS   rP   encoderO   rT   r+   append)r6   r>   rI   rH   rO   r1   s         r   from_cps_publish_messager[      s    

I
/CWWF!5!55"".$$%;<	

 ..FK''..w7FJ**002
U((c"))00g1FG 3 Jr   ) r"   google.api_core.exceptionsr   &cloudsdk.google.protobuf.timestamp_pb2r   google.pubsub_v1r   #google.cloud.pubsublite.cloudpubsubr    google.cloud.pubsublite.internalr   google.cloud.pubsublite.typesr   r	   google.cloud.pubsublite_v1r
   r   r   rQ   strr   r   r%   r*   r2   rC   rJ   rM   rN   rE   rW   r[    r   r   <module>re      s    6 < * B ; D W W7 79 7 7IS IY I2H$5$5 2# 2c h.?.? 
o 
# 
=='9==&%5 - !!,= ] ] } r   