
    #                         d Z ddlmZ ddlmZ ddlmZ ddlZddlmZ ddlm	Z	 ddl
mZ dd	l
mZ  G d
 dej                        Zy)zBase class for Flink Job.    )absolute_import)division)unicode_literalsN)encoding)arg_parsers)base)utilc                   @    e Zd ZdZed        Zed        Zed        Zy)	FlinkBasez.Submit a Java or Scala Flink job to a cluster.c                    | j                  dd       | j                  dt        j                         dg d       | j                  dt        j                  d	
       | j                  dt        j
                         dd       | j                  dt        j                         | j                  dt        j
                         dd       y)z@Parses command-line arguments specific to submitting Flink jobs.z--savepointzHCFS URI of the savepoint that is used to refer to the state of the previously stopped job. The new job will resume previous state from there.)helpz--jarsJARzLComma-separated list of jar files to provide to the task manager classpaths.)typemetavardefaultr   job_argszThe job arguments to pass.)nargsr   z--propertieszPROPERTY=VALUEzList of key=value pairs to configure Flink. For a list of available properties, see: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/.)r   r   r   z--properties-filez--driver-log-levelszPACKAGE=LEVELzoList of package to log4j log level pairs to configure driver logging. For example: root=FATAL,com.example=INFO.N)add_argumentr   ArgListargparse	REMAINDERArgDictjob_utilPROPERTIES_FILE_HELP_TEXT)parsers    5lib/googlecloudsdk/command_lib/dataproc/jobs/flink.pyArgszFlinkBase.Args"   s           "'  	   )  
   " \  	 ("D"D     "A      c                 4    | j                   | j                  dS )z=Returns a dict of files by their type (main_jar, jars, etc.).main_jarjarsr    )argss    r   GetFilesByTypezFlinkBase.GetFilesByTypeS   s     tyy99r   c                 J   | j                  |j                  xs g |j                  |d   |d   ||j                        }t	        j
                  |j                  |j                        }|r1t        j                  || j                   j                  d      |_        ||_        y)z/Populates the flinkJob member of the given job.r!   r"   )r#   	mainClassmainJarFileUrijarFileUrisloggingConfigsavepointUriT)
sort_itemsN)FlinkJobr   
main_class	savepointr   BuildJobProperties
propertiesproperties_filer   DictToAdditionalPropertyMessagePropertiesValueflinkJob)messagesjobfiles_by_typelogging_configr#   	flink_jobjob_propertiess          r   ConfigureJobzFlinkBase.ConfigureJobX   s     !!]] b//$Z0!&)$^^ " I 00--N %EE
(++;;i CLr   N)__name__
__module____qualname____doc__staticmethodr   r$   r;    r   r   r   r      s@    6. .` : :  r   r   )r?   
__future__r   r   r   r   apitools.base.pyr   googlecloudsdk.callioper   (googlecloudsdk.command_lib.dataproc.jobsr   job_baser	   r   JobBaser   rA   r   r   <module>rH      s6       &  '  % / E EO   Or   