
    mY                         d Z ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ dd	lmZ dd
lmZ ddlmZ dZd Z G d d      Zy)z)Cloud Datastream connection profiles API.    )absolute_import)division)unicode_literals)
exceptions)util)base)labels_util)	resources)yaml)
console_iov1c                 x    t         j                  j                  | j                  d      }|j	                         S )Nz%datastream.projects.locations.streams)
collection)r
   REGISTRYParseRelativeNamenameSelfLink)resourcestreams     0lib/googlecloudsdk/api_lib/datastream/streams.pyGetStreamURIr       s6    //mm8 0 :& 
	    c                       e Zd ZdZddZd Zd Zd Zd Zd Z	d	 Z
d
 Zd Zd Zd Zd Zd Zd Zd Zd Zd Zd ZddZddZy)StreamsClientz&Client for streams service in the API.Nc                     |xs t        j                         | _        |xs t        j                         | _        | j                  j
                  | _        t        j                         | _        y N)	r   GetClientInstance_clientGetMessagesModule	_messagesprojects_locations_streams_serviceGetResourceParser_resource_parser)selfclientmessagess      r   __init__zStreamsClient.__init__*   sN    5T335DL9!7!7!9DNLL;;DM 224Dr   c                    |j                   rE| j                  j                  t        j                  | j                  |j                   |            S |j
                  rE| j                  j                  t        j                  | j                  |j
                  |            S |j                  rD| j                  j                  t        j                  | j                  |j                              S |j                  rD| j                  j                  t        j                  | j                  |j                              S |j                  rD| j                  j                  t        j                  | j                  |j                              S |j                  rD| j                  j                  t        j                  | j                  |j                              S | j                  j                         S )zEGets BackfillAllStrategy message based on Stream objects source type.)oracleExcludedObjects)mysqlExcludedObjects)postgresqlExcludedObjects)sqlServerExcludedObjects)salesforceExcludedObjects)mongodbExcludedObjects)oracle_excluded_objectsr    BackfillAllStrategyr   ParseOracleRdbmsFilemysql_excluded_objectsParseMysqlRdbmsFilepostgresql_excluded_objectsParsePostgresqlRdbmsFilesqlserver_excluded_objectsParseSqlServerRdbmsFilesalesforce_excluded_objectsParseSalesforceOrgFilemongodb_excluded_objectsParseMongodbFile)r%   release_trackargss      r   _GetBackfillAllStrategyz%StreamsClient._GetBackfillAllStrategy0   s   ##^^// $ 9 9nnd::M!K 0 L L 
	$	$^^//#77nnd99= J 0 K K 
	)	)^^//$($A$Annd>>%@ 0 A A 
	(	(^^//#'#?#?nnd==$ 0  
 
	)	)^^//$($?$?nnd>>% 0  
 
	&	&^^//!%!6!6nnd;;" 0  
 >>--//r   c                     |t         j                  j                  k(  r| j                  ||      S t	        j
                  |d| j                  j                        S )AParses a oracle_sorce_config into the OracleSourceConfig message.OracleSourceConfig)r   ReleaseTrackBETA_ParseOracleSourceConfigBetar   ParseMessageAndValidateSchemar    rB   )r%   oracle_source_config_filer=   s      r   _ParseOracleSourceConfigz&StreamsClient._ParseOracleSourceConfigR   sV    )).....
#]  --!)) r   c                    t        j                  |d      }	 t        j                  |      }|j                  d      }|r|n|}|j                  t        j                  d|      i       }t        j                  | j                  ||      }	|j                  t        j                  d|      i       }
t        j                  | j                  |
|      }| j                  j                  |	|      }|j                  d	      r|j                  d	      |_        |S # t        j                  $ r)}t        j                  dj                  |            d}~ww xY w)
rA   FbinaryCannot parse YAML:[{0}]Noracle_source_configinclude_objectsexclude_objectsincludeObjectsexcludeObjectsmax_concurrent_cdc_tasks)r   ReadFromFileOrStdinr   loadYAMLParseErrords_exceptions
ParseErrorformatgetr   GetRDBMSV1alpha1ToV1FieldName*ParseOracleSchemasListToOracleRdbmsMessager    rB   maxConcurrentCdcTasks)r%   rG   r=   dataoracle_source_config_head_dataeoracle_sorce_config_data_objectrM   include_objects_rawinclude_objects_dataexclude_objects_rawexclude_objects_dataoracle_source_config_msgs                r   rE   z*StreamsClient._ParseOracleSourceConfigBeta_   sm    ))!%1DJ'+yy$ 'E&H&H'#
 + 	(+  /22**+<mL
  JJ+]< /22**+<mL
  JJ+]<  $~~@@++  A  
  :;7K7O7O
$8&4 $#C  J$$%>%E%Ea%HIIJ   D E,$EEc                     |t         j                  j                  k(  r| j                  ||      S t	        j
                  |d| j                  j                        S )z?Parses a mysql_sorce_config into the MysqlSourceConfig message.MysqlSourceConfig)r   rC   rD   _ParseMysqlSourceConfigBetar   rF   r    ri   )r%   mysql_source_config_filer=   s      r   _ParseMysqlSourceConfigz%StreamsClient._ParseMysqlSourceConfig   sV    ))...--
"M  -- (( r   c                    t        j                  |d      }	 t        j                  |      }|j                  d      }|r|n|}|j                  t        j                  d|      i       }t        j                  | j                  ||      }	|j                  t        j                  d|      i       }
t        j                  | j                  |
|      }| j                  j                  |	|      }|j                  d	      r|j                  d	      |_        |S # t        j                  $ r)}t        j                  dj                  |            d}~ww xY w)
zDParses an old mysql_sorce_config into the MysqlSourceConfig message.FrJ   rL   Nmysql_source_configrN   rO   rP   rS   )r   rT   r   rU   rV   rW   rX   rY   rZ   r   r[   (ParseMysqlSchemasListToMysqlRdbmsMessager    ri   r]   )r%   rk   r=   r^   mysql_sorce_config_head_datar`   mysql_sorce_config_data_objectrn   rb   rc   rd   re   mysql_sourec_config_msgs                r   rj   z)StreamsClient._ParseMysqlSourceConfigBeta   sl    )) 0DJ%)YYt_" &B%E%E&"
 * 	')  .11**+<mL
  HH+]< .11**+<mL
  HH+]< #nn>>++ ? 
 9:6I6M6M
$7&3 #"C  J$$%>%E%Ea%HIIJrg   c                 X    t        j                  |d| j                  j                        S )zIParses a postgresql_sorce_config into the PostgresqlSourceConfig message.PostgresqlSourceConfig)r   rF   r    rt   )r%   postgresql_source_config_files     r   _ParsePostgresqlSourceConfigz*StreamsClient._ParsePostgresqlSourceConfig   *     --% -- r   c                 X    t        j                  |d| j                  j                        S )zGParses a sqlserver_sorce_config into the SqlServerSourceConfig message.SqlServerSourceConfig)r   rF   r    ry   )r%   sqlserver_source_config_files     r   _ParseSqlServerSourceConfigz)StreamsClient._ParseSqlServerSourceConfig   s*     --$,, r   c                 X    t        j                  |d| j                  j                        S )zIParses a salesforce_sorce_config into the SalesforceSourceConfig message.SalesforceSourceConfig)r   rF   r    r}   )r%   salesforce_source_config_files     r   _ParseSalesforceSourceConfigz*StreamsClient._ParseSalesforceSourceConfig   rw   r   c                 X    t        j                  |d| j                  j                        S )zDParses a mongodb_source_config into the MongodbSourceConfig message.MongodbSourceConfig)r   rF   r    r   )r%   mongodb_source_config_files     r   _ParseMongodbSourceConfigz'StreamsClient._ParseMongodbSourceConfig   s*     --"** r   c                     |t         j                  j                  k(  r| j                  |      S t	        j
                  |d| j                  j                        S )zDParses a GcsDestinationConfig into the GcsDestinationConfig message.GcsDestinationConfig)r   rC   rD   _ParseGcsDestinationConfigBetar   rF   r    r   )r%   gcs_destination_config_filer=   s      r   _ParseGcsDestinationConfigz(StreamsClient._ParseGcsDestinationConfig   sP    
 ))...001LMM--#++ r   c                    t        j                  |d      }	 t        j                  |      }|j                  d      }|r|n|}|j                  dd      }|j                  di       }|j                  d	i       }	| j                  j                  |||	
      }
d|v r!| j                  j                         |
_        |
S d|v rS|j                  d      }| j                  j                  |j                  d      |j                  d            |
_        |
S t        j                  d      # t        j                  $ r)}t        j                  dj                  |            d}~ww xY w)zFParses a gcs_destination_config into the GcsDestinationConfig message.FrJ   rL   Ngcs_destination_configpath file_rotation_mbfile_rotation_interval)r   fileRotationMbfileRotationIntervalavro_file_formatjson_file_formatcompressionschema_file_format)r   schemaFileFormatz'Cannot parse YAML: missing file format.)r   rT   r   rU   rV   rW   rX   rY   rZ   r    r   AvroFileFormatavroFileFormatJsonFileFormatjsonFileFormat)r%   r   r^    gcs_destination_head_config_datar`   "gcs_destination_config_data_objectgcs_destination_config_datar   r   r   gcs_dest_config_msgjson_file_format_datas               r   r   z,StreamsClient._ParseGcsDestinationConfigBeta   s   ))#E3DJ)-4& *J)M)M *&
 . 	+-   '**626D2667I2N8<< "&..=="23 > 5 88+/>>+H+H+J(  
:	:9==
+/>>+H+H+//>0445IJ ,I ,L(  $$
35 59  J$$%>%E%Ea%HIIJs   D* *E&=$E!!E&c                 X    t        j                  |d| j                  j                        S )zNParses a BigQueryDestinationConfig into the BigQueryDestinationConfig message.BigQueryDestinationConfig)r   rF   r    r   )r%   config_files     r   _ParseBigqueryDestinationConfigz-StreamsClient._ParseBigqueryDestinationConfig  s(    --#00 r   c                    t        j                  || j                  j                  j                        }| j                  j                  |||j
                        }|t        j                  j                  k(  r%|j                  j                  j                         }n$|j                  j                  j                         }| j                  j                         }|j                         |_        |j                   r"| j#                  |j                   |      |_        n|j&                  r"| j)                  |j&                  |      |_        n|j,                  r!| j/                  |j,                        |_        n|j2                  r!| j5                  |j2                        |_        nY|j8                  r!| j;                  |j8                        |_        n,|j>                  r | jA                  |j>                        |_!        ||_"        |t        j                  j                  k(  r%|j                  jF                  j                         }n$|j                  jH                  j                         }| j                  jK                         }	|j                         |	_&        |jN                  r"| jQ                  |jN                  |      |	_)        n,|jT                  r | jW                  |jT                        |	_,        |	|_-        |j\                  r!| j                  j_                         |_0        |S |jb                  r| je                  ||      }
|
|_3        |S )zReturns a stream object.)r   labelsdisplayName)4r	   ParseCreateArgsr    StreamLabelsValuedisplay_namer   rC   rD   CONCEPTSsource_nameParsesourceSourceConfigRelativeNamesourceConnectionProfilerM   rH   oracleSourceConfigrn   rl   mysqlSourceConfigpostgresql_source_configrv   postgresqlSourceConfigsqlserver_source_configr{   sqlServerSourceConfigsalesforce_source_configr   salesforceSourceConfigmongodb_source_configr   mongodbSourceConfigsourceConfigdestination_namedestinationDestinationConfigdestinationConnectionProfiler   r   gcsDestinationConfigbigquery_destination_configr   bigqueryDestinationConfigdestinationConfigbackfill_noneBackfillNoneStrategybackfillNonebackfill_allr?   backfillAll)r%   	stream_idr=   r>   r   
stream_objsource_connection_profile_refstream_source_config"destination_connection_profile_refstream_destination_configbackfill_all_strategys              r   
_GetStreamzStreamsClient._GetStream"  s   ((dnn##//1F&&v43D3D ' FJ ))...&*mm&?&?&E&E&G#&*mm&:&:&@&@&B#>>668%224 0  040M0M

#
#]14-		!	!/3/K/K

"
"M03,		&	&

+
+D,I,I
J 1 
	%	%

*
*4+G+G
H 0 
	&	&

+
+D,I,I
J 1 
	#	#151O1O

$
$2. 3J ))...+/==+I+I+O+O ,( ,0==+D+D+J+J+L( $ @ @ B*779 :""

)
)))=  4
 
	)	)

.
...0  9 $=J  $ C C Ej
 	 
		"::=$O4jr   c                 p    | j                   j                  |      }| j                  j                  |      S )N)r   )r    ,DatastreamProjectsLocationsStreamsGetRequestr"   Get)r%   r   get_reqs      r   _GetExistingStreamz StreamsClient._GetExistingStreamh  s6    nnII J G ==W%%r   c                 L   t        j                  |      }t        j                  |      }| j                  j                  j
                  }t        j                  |||j                        j                  ||j                        }|j                  r|j                  |_	        yy)zUpdates labels of the stream.)	additionssubtractionsclearN)r	   GetUpdateLabelsDictFromArgsGetRemoveLabelsListFromArgsr    r   r   Diffclear_labelsApplyr   needs_update)r%   r   r>   
add_labelsremove_labels
value_typeupdate_results          r   _UpdateLabelszStreamsClient._UpdateLabelsn  s    88>J;;DAM&&22J$$" eJ&	 
 !!#**fm "r   c                     |D cg c]  }|j                  |      r||z    }}|D cg c]  }|j                  |      r| }}|j                  |       |S c c}w c c}w )z?Returns an updated list of field masks with necessary prefixes.)
startswithextend)r%   update_fieldsprefix_to_checkprefix_to_addfieldtemp_fieldsxs          r    _UpdateListWithFieldNamePrefixesz.StreamsClient._UpdateListWithFieldNamePrefixes{  s    
 #"EO, 	"   ! ao)F=   %
s   AAAc                 t
   g }|j                   xs d}|j                  d      }|t        j                  j                  k(  rt        j                  |      }|j                  |       |j                  d      r|j                  |_
        |t        j                  j                  k(  r'|j                  j                  j                         }d}n&|j                  j                  j                         }d}|j                  |      rE|j                         |j                   _        ||v r"|j%                  |       |j'                  d       |j                  d      r@| j)                  |j*                  |      |j                   _        | j/                  |dd      }n<|j                  d	      r?| j1                  |j2                  |      |j                   _        | j/                  |d	d      }n|j                  d
      r>| j7                  |j8                        |j                   _        | j/                  |d
d      }n|j                  d      r>| j=                  |j>                        |j                   _         | j/                  |dd      }nN|j                  d      r=| jC                  |jD                        |j                   _#        | j/                  |dd      }|t        j                  j                  k(  r'|j                  jH                  j                         }	d}
n&|j                  jJ                  j                         }	d}
|j                  |
      rE|	j                         |jL                  _'        |
|v r"|j%                  |
       |j'                  d       |j                  d      r?| jQ                  |jR                  |      |jL                  _*        | j/                  |dd      }nN|j                  d      r=| jW                  |jX                        |jL                  _-        | j/                  |dd      }|j                  d      r2| j\                  j_                         |_0        	 |jc                  d       n<|j                  d      r+| jg                  ||      }||_4        	 |jc                  d       |j                  d      rB| j\                  jj                  jm                  |jn                  jq                               |_7        | js                  ||       ||fS # td        $ r Y tw xY w# td        $ r Y w xY w)zReturns updated stream.r   ,r   r   r   z'source_config.source_connection_profilerM   zsource_config.rn   r   r   r   r   r   z1destination_config.destination_connection_profiler   zdestination_config.r   r   r   r   r   state):update_masksplitr   rC   rD   r   UpdateV1alpha1ToV1MaskFieldsr   IsSpecifiedr   r   r   r   r   r   r   r   r   removeappendrH   rM   r   r   rl   rn   r   rv   r   r   r{   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r   r   resetAttributeErrorr?   r   r   StateValueValuesEnumr   upperr   )r%   r   r=   r>   r   user_update_maskuser_update_mask_listr   source_field_namer   destination_field_namer   s               r   _GetUpdatedStreamzStreamsClient._GetUpdatedStream  s    M''-2,2237))..."??
! ./',,f ))...&*mm&?&?&E&E&G#'&*mm&:&:&@&@&B#")*
'
4
4
6 1	m	+./FG.//3/L/L

#
#]04f, ;;
/1ACm 
		/	0.2.J.J

"
"M/3f+;;
.0@Bm 
		4	5

+
+D,I,I
J 0 ;;
35EGm 
		3	4

*
*4+G+G
H / ;;
24Dm 
		4	5

+
+D,I,I
J 0 ;;
35Em
 ))...
--
(
(
.
.
0 )1,0MM,E,E,K,K,M(,./
,
9
9
; ;	=	034?	A 01

)
)))= 3
 ;;
13HJm			7	8

.
...0 8 ;;
68MOm ( NN??Af]# 
		.	)"::=$O0f^$  ^^**??::


 fl 	vt$=  +   	   	s$   %T "T+ 	T('T(+	T76T7c                    | j                  |||      }|j                  }|j                  }t        j                         }| j
                  j                  }	 |	||j                  ||||      }
| j                  j                  |
      S )a  Creates a stream.

    Args:
      parent_ref: a Resource reference to a parent datastream.projects.locations
        resource for this stream.
      stream_id: str, the name of the resource to create.
      release_track: Some arguments are added based on the command release
        track.
      args: argparse.Namespace, The arguments that this command was invoked
        with.

    Returns:
      Operation: the operation for creating the stream.
    )r   streamIdparent	requestIdvalidateOnlyforce)
r   validate_onlyr  r   GenerateRequestIdr    /DatastreamProjectsLocationsStreamsCreateRequestr   r"   Create)r%   
parent_refr   r=   r>   r   r  r  
request_idcreate_req_type
create_reqs              r   r
  zStreamsClient.Create  s~     __Yt<F&&MJJE'')JnnTTO "J ==
++r   c                 |   |j                   }|j                  }| j                  |      }| j                  |||      \  }}t	        j
                         }	| j                  j                  }
 |
||j                  |	||      }|j                  rdj                  |      |_        | j                  j                  |      S )ak  Updates a stream.

    Args:
      name: str, the reference of the stream to
          update.
      release_track: Some arguments are added based on the command release
        track.
      args: argparse.Namespace, The arguments that this command was
          invoked with.

    Returns:
      Operation: the operation for updating the connection profile.
    )r   r   r  r  r  r   )r  r  r   r   r   r  r    .DatastreamProjectsLocationsStreamsPatchRequestr   r   join
updateMaskr"   Patch)r%   r   r=   r>   r  r  current_streamupdated_streamr   r  update_req_type
update_reqs               r   UpdatezStreamsClient.Update$  s     &&MJJE,,T2N$($:$:t%-!NM '')JnnSSO   "J !hh}5j==z**r   )NNr   )__name__
__module____qualname____doc__r(   r?   rH   rE   rl   rj   rv   r{   r   r   r   r   r   r   r   r   r   r   r
  r   r   r   r   r   '   sq    .5 0D)$V)#V$LDL&+z!x,>"+r   r   N)r  
__future__r   r   r   !googlecloudsdk.api_lib.datastreamr   rW   r   googlecloudsdk.callioper   $googlecloudsdk.command_lib.util.argsr	   googlecloudsdk.corer
   r   googlecloudsdk.core.consoler   _DEFAULT_API_VERSIONr   r   r  r   r   <module>r%     s?    0 &  ' I 2 ( < ) $ 2 _+ _+r   