tarfile.open(name=file_path, mode="r|gz").extractall('data') df = pd.concat( [pd.read_csv(csv_file, header=None) for csv_file in glob.glob('data/*.csv')]) df.to_csv(output_csv, index=False, header=False)
create_step_merge_csv = kfp.components.create_component_from_func( func=merge_csv, output_component_file='component.yaml', # This is optional. It saves the component spec for future use. base_image='python:3.7', packages_to_install=['pandas==1.1.4'])
# Define a pipeline and create a task from a component: defmy_pipeline(url): web_downloader_task = web_downloader_op(url=url) merge_csv_task = create_step_merge_csv(file=web_downloader_task.outputs['data']) # The outputs of the merge_csv_task can be referenced using the # merge_csv_task.outputs dictionary: merge_csv_task.outputs['output_csv']
#!/usr/bin/env python3 import argparse from pathlib import Path
# Function doing the actual work (Outputs first N lines from a text file) defdo_work(input1_file, output1_file, param1): for x, line inenumerate(input1_file): if x >= param1: break _ = output1_file.write(line) # Defining and parsing the command-line arguments parser = argparse.ArgumentParser(description='My program description') # Paths must be passed in, not hardcoded parser.add_argument('--input1-path', type=str, help='Path of the local file containing the Input 1 data.') parser.add_argument('--output1-path', type=str, help='Path of the local file where the Output 1 data should be written.') parser.add_argument('--param1', type=int, default=100, help='The number of lines to read from the input and write to the output.') args = parser.parse_args()
# Creating the directory where the output file is created (the directory # may or may not exist). Path(args.output1_path).parent.mkdir(parents=True, exist_ok=True)
withopen(args.input1_path, 'r') as input1_file: withopen(args.output1_path, 'w') as output1_file: do_work(input1_file, output1_file, args.param1)
implementation: container: # The strict name of a container image that you've pushed to a container registry. image:gcr.io/my-org/my-image@sha256:a172..752f
command为您的组件的实现定义一个。此字段指定用于在容器中运行程序的命令行参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
implementation: container: image: gcr.io/my-org/my-image@sha256:a172..752f # command is a list of strings (command-line arguments). # The YAML language has two syntaxes for lists and you can use either of them. # Here we use the "flow syntax" - comma-separated strings inside square brackets. command: [ python3, # Path of the program inside the container /pipelines/component/src/program.py, --input1-path, {inputPath: Input 1}, --param1, {inputValue: Parameter 1}, --output1-path, {outputPath: Output 1}, ]
implementation: container: image:gcr.io/my-org/my-image@sha256:a172..752f # command is a list of strings (command-line arguments). # The YAML language has two syntaxes for lists and you can use either of them. # Here we use the "flow syntax" - comma-separated strings inside square brackets. command: [ python3, # Path of the program inside the container /pipelines/component/src/program.py, --input1-path, {inputPath:Input1}, --param1, {inputValue:Parameter1}, --output1-path, {outputPath:Output1}, ]
create_step_get_lines = comp.load_component_from_text(""" name: Get Lines description: Gets the specified number of lines from the input file. inputs: - {name: Input 1, type: Data, description: 'Data for input 1'} - {name: Parameter 1, type: Integer, default: '100', description: 'Number of lines to copy'} outputs: - {name: Output 1, type: Data, description: 'Output 1 data.'} implementation: container: image: gcr.io/my-org/my-image@sha256:a172..752f # command is a list of strings (command-line arguments). # The YAML language has two syntaxes for lists and you can use either of them. # Here we use the "flow syntax" - comma-separated strings inside square brackets. command: [ python3, # Path of the program inside the container /pipelines/component/src/program.py, --input1-path, {inputPath: Input 1}, --param1, {inputValue: Parameter 1}, --output1-path, {outputPath: Output 1}, ]""")
# create_step_get_lines is a "factory function" that accepts the arguments # for the component's inputs and output paths and returns a pipeline step # (ContainerOp instance). # # To inspect the get_lines_op function in Jupyter Notebook, enter # "get_lines_op(" in a cell and press Shift+Tab. # You can also get help by entering `help(get_lines_op)`, `get_lines_op?`, # or `get_lines_op??`.
# Create a simple component using only bash commands. The output of this component # can be passed to a downstream component that accepts an input with the same type. create_step_write_lines = comp.load_component_from_text(""" name: Write Lines description: Writes text to a file. inputs: - {name: text, type: String} outputs: - {name: data, type: Data} implementation: container: image: busybox command: - sh - -c - | mkdir -p "$(dirname "$1")" echo "$0" > "$1" args: - {inputValue: text} - {outputPath: data} """)
# Define your pipeline defmy_pipeline(): write_lines_step = create_step_write_lines( text='one\ntwo\nthree\nfour\nfive\nsix\nseven\neight\nnine\nten')
get_lines_step = create_step_get_lines( # Input name "Input 1" is converted to pythonic parameter name "input_1" input_1=write_lines_step.outputs['data'], parameter_1='5', )
# If you run this command on a Jupyter notebook running on Kubeflow, # you can exclude the host parameter. # client = kfp.Client() client = kfp.Client(host='<your-kubeflow-pipelines-host-name>')
# Compile, upload, and submit this pipeline for execution. client.create_run_from_pipeline_func(my_pipeline, arguments={})