flytekit.sdk package

Submodules

flytekit.sdk.exceptions module

exception flytekit.sdk.exceptions.RecoverableException[source]

Bases: flytekit.common.exceptions.user.FlyteRecoverableException

Raise an exception of this type if user code detects an error and would like to force a retry of the entire task. Any exception raised from user code other than RecoverableException will NOT be considered retryable and the task will fail without additional retries.

flytekit.sdk.tasks module

flytekit.sdk.tasks.dynamic_task(_task_function=None, cache_version='', retries=0, deprecated='', storage_request=None, cpu_request=None, gpu_request=None, memory_request=None, storage_limit=None, cpu_limit=None, gpu_limit=None, memory_limit=None, cache=False, timeout=None, allowed_failure_ratio=None, max_concurrency=None, environment=None, cls=None)[source]

Decorator to create a custom dynamic task definition. Dynamic tasks should be used to split up work into an arbitrary number of parallel sub-tasks, or workflows.

@outputs(out=Types.Integer)
@python_task
def my_sub_task(wf_params, out):
    out.set(randint())

@outputs(out=[Types.Integer])
@dynamic_task
def my_task(wf_params, out):
    out_list = []
    for i in xrange(100):
        out_list.append(my_sub_task().outputs.out)
    out.set(out_list)

Note

All outputs of a batch task must be a list. This is because the individual outputs of sub-tasks should be appended into a list. There cannot be aggregation of outputs done in this task. To accomplish aggregation, it is recommended that a python_task take the outputs of this task as input and do the necessary work. If a sub-task does not contribute an output, it must be yielded from the task with the yield keyword or returned from the task in a list. If this isn’t done, the sub-task will not be executed.

Parameters
  • _task_function – this is the decorated method and shouldn’t be declared explicitly. The function must take a first argument, and then named arguments matching those defined in @inputs and @outputs. No keyword arguments are allowed.

  • cache_version (Text) –

    [optional] string representing logical version for discovery. This field should be updated whenever the underlying algorithm changes.

    Note

    This argument is required to be a non-empty string if cache is True.

  • retries (int) –

    [optional] integer determining number of times task can be retried on flytekit.sdk.exceptions.RecoverableException or transient platform failures. Defaults to 0.

    Note

    If retries > 0, the task must be able to recover from any remote state created within the user code. It is strongly recommended that tasks are written to be idempotent.

  • deprecated (Text) – [optional] string that should be provided if this task is deprecated. The string will be logged as a warning so it should contain information regarding how to update to a newer task.

  • storage_request (Text) –

    [optional] Kubernetes resource string for lower-bound of disk storage space for the task to run. Default is set by platform-level configuration.

    Note

    This is currently not supported by the platform.

  • cpu_request (Text) – [optional] Kubernetes resource string for lower-bound of cores for the task to execute. This can be set to a fractional portion of a CPU. Default is set by platform-level configuration. TODO: Add links to resource string documentation for Kubernetes

  • gpu_request (Text) – [optional] Kubernetes resource string for lower-bound of desired GPUs. Default is set by platform-level configuration. TODO: Add links to resource string documentation for Kubernetes

  • memory_request (Text) – [optional] Kubernetes resource string for lower-bound of physical memory necessary for the task to execute. Default is set by platform-level configuration. TODO: Add links to resource string documentation for Kubernetes

  • storage_limit (Text) –

    [optional] Kubernetes resource string for upper-bound of disk storage space for the task to run. This amount is not guaranteed! If not specified, it is set equal to storage_request.

    Note

    This is currently not supported by the platform.

  • cpu_limit (Text) – [optional] Kubernetes resource string for upper-bound of cores for the task to execute. This can be set to a fractional portion of a CPU. This amount is not guaranteed! If not specified, it is set equal to cpu_request.

  • gpu_limit (Text) – [optional] Kubernetes resource string for upper-bound of desired GPUs. This amount is not guaranteed! If not specified, it is set equal to gpu_request.

  • memory_limit (Text) – [optional] Kubernetes resource string for upper-bound of physical memory necessary for the task to execute. This amount is not guaranteed! If not specified, it is set equal to memory_request.

  • cache (bool) – [optional] boolean describing if the outputs of this task should be cached and re-usable.

  • timeout (datetime.timedelta) – [optional] describes how long the task should be allowed to run at max before triggering a retry (if retries are enabled). By default, tasks are allowed to run indefinitely. If a null timedelta is passed (i.e. timedelta(seconds=0)), the task will not timeout.

  • allowed_failure_ratio (float) – [optional] float value describing the ratio of sub-tasks that may fail before the master batch task considers itself a failure. By default, the value is 0 so if any task fails, the master batch task will be marked a failure. If specified, the value must be between 0 and 1 inclusive. In the event a non-zero value is specified, downstream tasks must be able to accept None values as outputs from individual sub-tasks because the output values will be set to None for any sub-task that fails.

  • max_concurrency (int) – [optional] integer value describing the maximum number of tasks to run concurrently. This is a stand-in pending better concurrency controls for special use-cases. The existence of this parameter is not guaranteed between versions and therefore it is NOT recommended that it be used.

  • environment (dict[Text,Text]) – [optional] environment variables to set when executing this task.

  • cls – This can be used to override the task implementation with a user-defined extension. The class provided must be a subclass of flytekit.common.tasks.sdk_runnable.SdkRunnableTask. Generally, it should be a subclass of flytekit.common.tasks.sdk_dynamic.SdkDynamicTask. A user can use this parameter to inject bespoke logic into the base Flyte programming model.

Return type

flytekit.common.tasks.sdk_runnable.SdkDynamicTask

flytekit.sdk.tasks.hive_task(_task_function=None, cache_version='', retries=0, deprecated='', storage_request=None, cpu_request=None, gpu_request=None, memory_request=None, storage_limit=None, cpu_limit=None, gpu_limit=None, memory_limit=None, cache=False, timeout=None, environment=None, cls=None)[source]

Decorator to create a hive task. This task should output a list of hive queries which are run on a hive cluster.

This is a 2 step task:

  1. Generator step runs in the user container and outputs a list of queries.

  2. The list of queries produced in step1 is then submitted to Hive Cluster. The queries are monitored by Flyte Backend for completion.

Container properties(cpu, gpu, memory, etc) set on this task are only used in step1 above.

   @inputs(a=Types.Integer)
   @hive_task(
       cache_version='1',
   )
   def test_hive(wf_params, a):
       return [
           "SELECT * FROM users_table WHERE user_id=4",
           "INSERT INTO users_table VALUES ("user", 5, 4)"
       ]

:param _task_function: this is the decorated method and shouldn't be declared explicitly.  The function must
   take a first argument, and then named arguments matching those defined in @inputs and @outputs.  No keyword
   arguments are allowed for wrapped task functions.
Parameters
  • cache_version (Text) –

    [optional] string representing logical version for discovery. This field should be updated whenever the underlying algorithm changes.

    Note

    This argument is required to be a non-empty string if cache is True.

  • retries (int) –

    [optional] integer determining number of times task can be retried on flytekit.common.exceptions.RecoverableException or transient platform failures. Defaults to 0.

    Note

    If retries > 0, the task must be able to recover from any remote state created within the user code. It is strongly recommended that tasks are written to be idempotent.

  • deprecated (Text) – [optional] string that should be provided if this task is deprecated. The string will be logged as a warning so it should contain information regarding how to update to a newer task.

  • storage_request (Text) –

    [optional] Kubernetes resource string for lower-bound of disk storage space for the task to run. Default is set by platform-level configuration.

    Note

    This is currently not supported by the platform.

  • cpu_request (Text) – [optional] Kubernetes resource string for lower-bound of cores for the task to execute. This can be set to a fractional portion of a CPU. Default is set by platform-level configuration. TODO: Add links to resource string documentation for Kubernetes

  • gpu_request (Text) – [optional] Kubernetes resource string for lower-bound of desired GPUs. Default is set by platform-level configuration. TODO: Add links to resource string documentation for Kubernetes

  • memory_request (Text) – [optional] Kubernetes resource string for lower-bound of physical memory necessary for the task to execute. Default is set by platform-level configuration. TODO: Add links to resource string documentation for Kubernetes

  • storage_limit (Text) –

    [optional] Kubernetes resource string for upper-bound of disk storage space for the task to run. This amount is not guaranteed! If not specified, it is set equal to storage_request.

    Note

    This is currently not supported by the platform.

  • cpu_limit (Text) – [optional] Kubernetes resource string for upper-bound of cores for the task to execute. This can be set to a fractional portion of a CPU. This amount is not guaranteed! If not specified, it is set equal to cpu_request.

  • gpu_limit (Text) – [optional] Kubernetes resource string for upper-bound of desired GPUs. This amount is not guaranteed! If not specified, it is set equal to gpu_request.

  • memory_limit (Text) – [optional] Kubernetes resource string for upper-bound of physical memory necessary for the task to execute. This amount is not guaranteed! If not specified, it is set equal to memory_request.

  • cache (bool) – [optional] boolean describing if the outputs of this task should be cached and re-usable.

  • timeout (datetime.timedelta) – [optional] describes how long the task should be allowed to run at max before triggering a retry (if retries are enabled). By default, tasks are allowed to run indefinitely. If a null timedelta is passed (i.e. timedelta(seconds=0)), the task will not timeout.

  • environment (dict[Text,Text]) – Environment variables to set for the execution of the query-generating container.

  • cls – This can be used to override the task implementation with a user-defined extension. The class provided should be a subclass of flytekit.common.tasks.sdk_runnable.SdkRunnableTask. Generally, it should be a subclass of flytekit.common.tasks.hive_task.SdkHiveTask. A user can use this to inject bespoke logic into the base Flyte programming model.

Return type

flytekit.common.tasks.sdk_runnable.SdkHiveTask

flytekit.sdk.tasks.inputs(_task_template=None, **kwargs)[source]

Decorator that provides input definitions to a decorated task definition.

Note

Certain tasks have special input behavior. See comments on each task decorator for more information.

@inputs(in1=Types.Integer, in2=[Types.String], in3=[[[Types.Integer]]])
@outputs(out1=Types.Integer, out2=Types.String)
@python_task
def my_task(wf_params, in1, in2, out1, out2):
    pass
Parameters
Return type

flytekit.common.tasks.sdk_runnable.SdkRunnableTask

flytekit.sdk.tasks.outputs(_task_template=None, **kwargs)[source]

Decorator that provides output definitions to a decorated task definition.

Note

Certain tasks have special output behavior. See comments on each task decorator for more information.

@outputs(out1=Types.Integer, out2=Types.String)
@python_task
def my_task(wf_params, out1, out2):
    out1.set(123)
    out2.set('hello world!')
Parameters
Return type

flytekit.common.tasks.sdk_runnable.SdkRunnableTask

flytekit.sdk.tasks.python_task(_task_function=None, cache_version='', retries=0, deprecated='', storage_request=None, cpu_request=None, gpu_request=None, memory_request=None, storage_limit=None, cpu_limit=None, gpu_limit=None, memory_limit=None, cache=False, timeout=None, environment=None, cls=None)[source]

Decorator to create a Python Task definition. This task will run as a single unit of work on the platform.

@inputs(int_list=[Types.Integer])
@outputs(sum_of_list=Types.Integer
@python_task
def my_task(wf_params, int_list, sum_of_list):
    sum_of_list.set(sum(int_list))
Parameters
  • _task_function – this is the decorated method and shouldn’t be declared explicitly. The function must take a first argument, and then named arguments matching those defined in @inputs and @outputs. No keyword arguments are allowed for wrapped task functions.

  • cache_version (Text) –

    [optional] string representing logical version for discovery. This field should be updated whenever the underlying algorithm changes.

    Note

    This argument is required to be a non-empty string if cache is True.

  • retries (int) –

    [optional] integer determining number of times task can be retried on flytekit.sdk.exceptions.RecoverableException or transient platform failures. Defaults to 0.

    Note

    If retries > 0, the task must be able to recover from any remote state created within the user code. It is strongly recommended that tasks are written to be idempotent.

  • deprecated (Text) – [optional] string that should be provided if this task is deprecated. The string will be logged as a warning so it should contain information regarding how to update to a newer task.

  • storage_request (Text) –

    [optional] Kubernetes resource string for lower-bound of disk storage space for the task to run. Default is set by platform-level configuration.

    Note

    This is currently not supported by the platform.

  • cpu_request (Text) –

    [optional] Kubernetes resource string for lower-bound of cores for the task to execute. This can be set to a fractional portion of a CPU. Default is set by platform-level configuration.

    TODO: Add links to resource string documentation for Kubernetes

  • gpu_request (Text) –

    [optional] Kubernetes resource string for lower-bound of desired GPUs. Default is set by platform-level configuration.

    TODO: Add links to resource string documentation for Kubernetes

  • memory_request (Text) –

    [optional] Kubernetes resource string for lower-bound of physical memory necessary for the task to execute. Default is set by platform-level configuration.

    TODO: Add links to resource string documentation for Kubernetes

  • storage_limit (Text) –

    [optional] Kubernetes resource string for upper-bound of disk storage space for the task to run. This amount is not guaranteed! If not specified, it is set equal to storage_request.

    Note

    This is currently not supported by the platform.

  • cpu_limit (Text) – [optional] Kubernetes resource string for upper-bound of cores for the task to execute. This can be set to a fractional portion of a CPU. This amount is not guaranteed! If not specified, it is set equal to cpu_request.

  • gpu_limit (Text) – [optional] Kubernetes resource string for upper-bound of desired GPUs. This amount is not guaranteed! If not specified, it is set equal to gpu_request.

  • memory_limit (Text) – [optional] Kubernetes resource string for upper-bound of physical memory necessary for the task to execute. This amount is not guaranteed! If not specified, it is set equal to memory_request.

  • cache (bool) – [optional] boolean describing if the outputs of this task should be cached and re-usable.

  • timeout (datetime.timedelta) – [optional] describes how long the task should be allowed to run at max before triggering a retry (if retries are enabled). By default, tasks are allowed to run indefinitely. If a null timedelta is passed (i.e. timedelta(seconds=0)), the task will not timeout.

  • environment (dict[Text,Text]) – [optional] environment variables to set when executing this task.

  • cls – This can be used to override the task implementation with a user-defined extension. The class provided must be a subclass of flytekit.common.tasks.sdk_runnable.SdkRunnableTask. A user can use this to inject bespoke logic into the base Flyte programming model.

Return type

flytekit.common.tasks.sdk_runnable.SdkRunnableTask

flytekit.sdk.tasks.qubole_hive_task(_task_function=None, cache_version='', retries=0, deprecated='', storage_request=None, cpu_request=None, gpu_request=None, memory_request=None, storage_limit=None, cpu_limit=None, gpu_limit=None, memory_limit=None, cache=False, timeout=None, cluster_label=None, tags=None, environment=None, cls=None)[source]

Decorator to create a qubole hive task. This is hive task runs on a qubole cluster, and therefore allows users to pass cluster labels and qubole query tags. Similar to hive task, this task should output a list of hive queries that are run on a hive cluster.

Similar to a hive task, this is also a 2 step task where step2 is run on a qubole hive cluster. Therefore, users can specify qubole cluster_label and query tags on this task.

   @inputs(a=Types.Integer)
   @qubole_hive_task(
       cache_version='1',
       cluster_label='cluster_label',
       tags=['tag1'],
       )
   def test_hive(wf_params, a):
       return [
           "SELECT * FROM users_table WHERE user_id=4",
           "INSERT INTO users_table VALUES ("user", 5, 4)"
       ]

:param _task_function: this is the decorated method and shouldn't be declared explicitly.  The function must
   take a first argument, and then named arguments matching those defined in @inputs and @outputs.  No keyword
   arguments are allowed for wrapped task functions.
Parameters
  • cache_version (Text) –

    [optional] string representing logical version for discovery. This field should be updated whenever the underlying algorithm changes.

    Note

    This argument is required to be a non-empty string if cache is True.

  • retries (int) –

    [optional] integer determining number of times task can be retried on flytekit.common.exceptions.RecoverableException or transient platform failures. Defaults to 0.

    Note

    If retries > 0, the task must be able to recover from any remote state created within the user code. It is strongly recommended that tasks are written to be idempotent.

  • deprecated (Text) – [optional] string that should be provided if this task is deprecated. The string will be logged as a warning so it should contain information regarding how to update to a newer task.

  • storage_request (Text) –

    [optional] Kubernetes resource string for lower-bound of disk storage space for the task to run. Default is set by platform-level configuration.

    Note

    This is currently not supported by the platform.

  • cpu_request (Text) – [optional] Kubernetes resource string for lower-bound of cores for the task to execute. This can be set to a fractional portion of a CPU. Default is set by platform-level configuration. TODO: Add links to resource string documentation for Kubernetes

  • gpu_request (Text) – [optional] Kubernetes resource string for lower-bound of desired GPUs. Default is set by platform-level configuration. TODO: Add links to resource string documentation for Kubernetes

  • memory_request (Text) – [optional] Kubernetes resource string for lower-bound of physical memory necessary for the task to execute. Default is set by platform-level configuration. TODO: Add links to resource string documentation for Kubernetes

  • storage_limit (Text) –

    [optional] Kubernetes resource string for upper-bound of disk storage space for the task to run. This amount is not guaranteed! If not specified, it is set equal to storage_request.

    Note

    This is currently not supported by the platform.

  • cpu_limit (Text) – [optional] Kubernetes resource string for upper-bound of cores for the task to execute. This can be set to a fractional portion of a CPU. This amount is not guaranteed! If not specified, it is set equal to cpu_request.

  • gpu_limit (Text) – [optional] Kubernetes resource string for upper-bound of desired GPUs. This amount is not guaranteed! If not specified, it is set equal to gpu_request.

  • memory_limit (Text) – [optional] Kubernetes resource string for upper-bound of physical memory necessary for the task to execute. This amount is not guaranteed! If not specified, it is set equal to memory_request.

  • cache (bool) – [optional] boolean describing if the outputs of this task should be cached and re-usable.

  • timeout (datetime.timedelta) – [optional] describes how long the task should be allowed to run at max before triggering a retry (if retries are enabled). By default, tasks are allowed to run indefinitely. If a null timedelta is passed (i.e. timedelta(seconds=0)), the task will not timeout.

  • cluster_label – The qubole cluster label where the query is to be executed

  • tags (list[Text]) – User defined tags(key-value pairs) defined by the user for the queries. These tags are passed to Qubole.

  • environment (dict[Text,Text]) – Environment variables to set for the execution of the query-generating container.

  • cls – This can be used to override the task implementation with a user-defined extension. The class provided should be a subclass of flytekit.common.tasks.sdk_runnable.SdkRunnableTask. Generally, it should be a subclass of flytekit.common.tasks.hive_task.SdkHiveTask. A user can use this to inject bespoke logic into the base Flyte programming model.

Return type

flytekit.common.tasks.sdk_runnable.SdkHiveTask

flytekit.sdk.tasks.qubole_spark_task(*args, **kwargs)[source]
Return type

flytekit.common.tasks.sdk_runnable.SdkRunnableTask

flytekit.sdk.tasks.sidecar_task(_task_function=None, cache_version='', retries=0, deprecated='', storage_request=None, cpu_request=None, gpu_request=None, memory_request=None, storage_limit=None, cpu_limit=None, gpu_limit=None, memory_limit=None, cache=False, timeout=None, environment=None, pod_spec=None, primary_container_name=None, cls=None)[source]

Decorator to create a Sidecar Task definition. This task will execute the primary task alongside the specified kubernetes PodSpec. Custom primary task container attributes can be defined in the PodSpec by defining a container whose name matches the primary_container_name. These container attributes will be applied to the container brought up to execute the primary task definition.

def generate_pod_spec_for_task():
    pod_spec = generated_pb2.PodSpec()
    secondary_container = generated_pb2.Container(
        name="secondary",
        image="alpine",
    )
    secondary_container.command.extend(["/bin/sh"])
    secondary_container.args.extend(["-c", "echo hi sidecar world > /data/message.txt"])
    shared_volume_mount = generated_pb2.VolumeMount(
        name="shared-data",
        mountPath="/data",
    )
    secondary_container.volumeMounts.extend([shared_volume_mount])

    primary_container = generated_pb2.Container(name="primary")
    primary_container.volumeMounts.extend([shared_volume_mount])

    pod_spec.volumes.extend([generated_pb2.Volume(
        name="shared-data",
        volumeSource=generated_pb2.VolumeSource(
            emptyDir=generated_pb2.EmptyDirVolumeSource(
                medium="Memory",
            )
        )
    )])
    pod_spec.containers.extend([primary_container, secondary_container])
    return pod_spec

@sidecar_task(
    pod_spec=generate_pod_spec_for_task(),
    primary_container_name="primary",
)
def a_sidecar_task(wfparams):
    while not os.path.isfile('/data/message.txt'):
        time.sleep(5)
Parameters
  • _task_function – this is the decorated method and shouldn’t be declared explicitly. The function must take a first argument, and then named arguments matching those defined in @inputs and @outputs. No keyword arguments are allowed for wrapped task functions.

  • cache_version (Text) –

    [optional] string representing logical version for discovery. This field should be updated whenever the underlying algorithm changes. .. note:

    This argument is required to be a non-empty string if `cache` is True.
    

  • retries (int) –

    [optional] integer determining number of times task can be retried on :py:ex:`flytekit.sdk.exceptions.RecoverableException` or transient platform failures. Defaults to 0.

    Note

    If retries > 0, the task must be able to recover from any remote state created within the user code. It is strongly recommended that tasks are written to be idempotent.

  • deprecated (Text) – [optional] string that should be provided if this task is deprecated. The string will be logged as a warning so it should contain information regarding how to update to a newer task.

  • storage_request (Text) –

    [optional] Kubernetes resource string for lower-bound of disk storage space for the task to run. Default is set by platform-level configuration.

    TODO: !!! .. note:

    This is currently not supported by the platform.
    

  • cpu_request (Text) –

    [optional] Kubernetes resource string for lower-bound of cores for the task to execute. This can be set to a fractional portion of a CPU. Default is set by platform-level configuration.

    TODO: Add links to resource string documentation for Kubernetes

  • gpu_request (Text) –

    [optional] Kubernetes resource string for lower-bound of desired GPUs. Default is set by platform-level configuration.

    TODO: Add links to resource string documentation for Kubernetes

  • memory_request (Text) –

    [optional] Kubernetes resource string for lower-bound of physical memory necessary for the task to execute. Default is set by platform-level configuration.

    TODO: Add links to resource string documentation for Kubernetes

  • storage_limit (Text) –

    [optional] Kubernetes resource string for upper-bound of disk storage space for the task to run. This amount is not guaranteed! If not specified, it is set equal to storage_request.

    TODO: !!! .. note:

    This is currently not supported by the platform.
    

  • cpu_limit (Text) – [optional] Kubernetes resource string for upper-bound of cores for the task to execute. This can be set to a fractional portion of a CPU. This amount is not guaranteed! If not specified, it is set equal to cpu_request.

  • gpu_limit (Text) – [optional] Kubernetes resource string for upper-bound of desired GPUs. This amount is not guaranteed! If not specified, it is set equal to gpu_request.

  • memory_limit (Text) – [optional] Kubernetes resource string for upper-bound of physical memory necessary for the task to execute. This amount is not guaranteed! If not specified, it is set equal to memory_request.

  • cache (bool) – [optional] boolean describing if the outputs of this task should be discoverable and re-usable.

  • timeout (datetime.timedelta) – [optional] describes how long the task should be allowed to run at max before triggering a retry (if retries are enabled). By default, tasks are allowed to run indefinitely. If a null timedelta is passed (i.e. timedelta(seconds=0)), the task will not timeout.

  • environment (dict[Text,Text]) – [optional] environment variables to set when executing this task.

  • pod_spec (k8s.io.api.core.v1.generated_pb2.PodSpec) – [optional] PodSpec to bring up alongside task execution.

  • primary_container_name (Text) – primary container to monitor for the duration of the task.

  • cls – This can be used to override the task implementation with a user-defined extension. The class provided must be a subclass of flytekit.common.tasks.sdk_runnable.SdkRunnableTask. A user can use this to inject bespoke logic into the base Flyte programming model.

Return type

flytekit.common.tasks.sdk_runnable.SdkRunnableTask

flytekit.sdk.tasks.spark_task(_task_function=None, cache_version='', retries=0, deprecated='', cache=False, timeout=None, spark_conf=None, hadoop_conf=None, environment=None, cls=None)[source]

Decorator to create a spark task. This task will connect to a Spark cluster, configure the environment, and then execute the code within the _task_function as the Spark driver program.

@inputs(a=Types.Integer)
@spark_task(
    spark_conf={
            'spark.executor.cores': '7',
            'spark.executor.instances': '31',
            'spark.executor.memory': '32G'
        }
    )
def sparky(wf_params, spark_context, a):
    pass
Parameters
  • _task_function – this is the decorated method and shouldn’t be declared explicitly. The function must take a first argument, and then named arguments matching those defined in @inputs and @outputs. No keyword arguments are allowed for wrapped task functions.

  • cache_version (Text) –

    [optional] string representing logical version for discovery. This field should be updated whenever the underlying algorithm changes.

    Note

    This argument is required to be a non-empty string if cache is True.

  • retries (int) –

    [optional] integer determining number of times task can be retried on flytekit.sdk.exceptions.RecoverableException or transient platform failures. Defaults to 0.

    Note

    If retries > 0, the task must be able to recover from any remote state created within the user code. It is strongly recommended that tasks are written to be idempotent.

  • deprecated (Text) – [optional] string that should be provided if this task is deprecated. The string will be logged as a warning so it should contain information regarding how to update to a newer task.

  • cache (bool) – [optional] boolean describing if the outputs of this task should be cached and re-usable.

  • timeout (datetime.timedelta) – [optional] describes how long the task should be allowed to run at max before triggering a retry (if retries are enabled). By default, tasks are allowed to run indefinitely. If a null timedelta is passed (i.e. timedelta(seconds=0)), the task will not timeout.

  • spark_conf (dict[Text,Text]) – A definition of key-value pairs for spark config for the job.

  • hadoop_conf (dict[Text,Text]) – A definition of key-value pairs for hadoop config for the job.

  • environment (dict[Text,Text]) – [optional] environment variables to set when executing this task.

  • cls – This can be used to override the task implementation with a user-defined extension. The class provided must be a subclass of flytekit.common.tasks.sdk_runnable.SdkRunnableTask. Generally, it should be a subclass of flytekit.common.tasks.spark_task.SdkSparkTask. A user can use this parameter to inject bespoke logic into the base Flyte programming model.

Return type

flytekit.common.tasks.sdk_runnable.SdkRunnableTask

flytekit.sdk.test_utils module

class flytekit.sdk.test_utils.LocalTestFileSystem[source]

Bases: object

Context manager for creating a temporary test file system locally for the purpose of unit testing and grabbing remote objects. This scratch space will be automatically cleaned up as long as sys.exit() is not called from within the context. This context need only be used in user scripts and tests–all task executions are guaranteed to have the necessary managed disk context available.

Note

This is especially useful when dealing with remote blob-like objects (Blob, CSV, MultiPartBlob, MultiPartCSV, Schema) as they require backing on disk. Using this context manager creates that disk context to support the downloads.

Note

Blob-like objects can be downloaded to user-specified locations. See documentation for flytekit.sdk.types.Types for more information.

Note

When this context is entered, it overrides any contexts already entered. All blobs will be written to the most recent entered context. Upon exiting the context, all data associated will be deleted. It is recommended to only use one LocalTestFileSystem() per test to avoid confusion.

with LocalTestFileSystem():
    wf_handle = SdkWorkflowExecution.fetch('project', 'domain', 'name')
    with wf_handle.node_executions['my_node'].outputs.blob as reader:
        assert reader.read() == "hello!"
flytekit.sdk.test_utils.flyte_test(wrapper=None, enabled=None, adapter=None)[source]

This is a decorator which can be used to annotate test functions. By using this decorator, the necessary local scratch context will be prepared and then cleaned up upon completion.

@inputs(input_blob=Types.Blob)
@outputs(response_blob=Types.Blob)
@python_task
def test_task(wf_params, input_blob, response_blob):
    response = Types.Blob()
    with response as writer:
        with input_blob as reader:
            txt = reader.read()
            if txt == "Hi":
                writer.write("Hello, world!")
            elif txt == "Goodnight":
                writer.write("Goodnight, moon.")
            else:
                writer.write("Does not compute".)
    response_blob.set(response)

@flyte_test
def some_test():
    blob = Types.Blob()
    with blob as writer:
        writer.write("Hi")

    result = test_task.unit_test(input_blob=blob)

    with result['response_blob'] as reader:
        assert reader.read() == 'Hello, world!"

flytekit.sdk.types module

class flytekit.sdk.types.Types[source]

Bases: object

class Blob(value)

Bases: flytekit.common.types.base_sdk_types.FlyteSdkValue

classmethod from_python_std(t_value)
Parameters

t_value (T) – It is up to each individual object as to whether or not this value can be cast.

Return type

FlyteSdkValue

Raises

flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) –

Return type

Blob

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: Blob

classmethod short_class_string()
Return type

Text

short_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

to_python_std()
Return type

flytekit.common.types.impl.blobs.Blob

class Boolean(value)

Bases: flytekit.common.types.base_sdk_types.FlyteSdkValue

classmethod from_python_std(t_value)
Parameters

t_value (T) – It is up to each individual object as to whether or not this value can be cast.

Return type

FlyteSdkValue

Raises

flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) –

Return type

Boolean

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: bool

classmethod short_class_string()
Return type

Text

short_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

to_python_std()
Return type

bool

class CSV(value)

Bases: flytekit.common.types.blobs.Blob

classmethod from_python_std(t_value)
Parameters

t_value (T) – It is up to each individual object as to whether or not this value can be cast.

Return type

FlyteSdkValue

Raises

flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) –

Return type

CSV

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: CSV

classmethod short_class_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

class Datetime(value)

Bases: flytekit.common.types.base_sdk_types.FlyteSdkValue

classmethod from_python_std(t_value)
Parameters

t_value (T) – It is up to each individual object as to whether or not this value can be cast.

Return type

FlyteSdkValue

Raises

flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) –

Return type

Datetime

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: Datetime

classmethod short_class_string()
Return type

Text

short_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

to_python_std()
Return type

datetime.datetime

class Float(value)

Bases: flytekit.common.types.base_sdk_types.FlyteSdkValue

classmethod from_python_std(t_value)
Parameters

t_value (T) – It is up to each individual object as to whether or not this value can be cast.

Return type

FlyteSdkValue

Raises

flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) –

Return type

Float

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: Float

classmethod short_class_string()
Return type

Text

short_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

to_python_std()
Return type

float

class Generic(value)

Bases: flytekit.common.types.base_sdk_types.FlyteSdkValue

classmethod from_python_std(t_value)
Parameters

t_value (T) – It is up to each individual object as to whether or not this value can be cast.

Return type

FlyteSdkValue

Raises

flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) – Should be a JSON formatted string

Return type

Generic

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

long_string()
Return type

Text

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: Generic

classmethod short_class_string()
Return type

Text

short_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

to_python_std()
Return type

dict[Text, T]

class Integer(value)

Bases: flytekit.common.types.base_sdk_types.FlyteSdkValue

classmethod from_python_std(t_value)
Parameters

t_value (T) – It is up to each individual object as to whether or not this value can be cast.

Return type

FlyteSdkValue

Raises

flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) –

Return type

Integer

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: Integer

classmethod short_class_string()
Return type

Text

short_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

to_python_std()
Return type

int

static List(sdk_type)

Use this to specify a list of any type–including nested lists.

When used with an SDK-decorated method, expect this behavior from the default type engine:

As input:
  1. If set, a Python list populated with values matching the behavior of the list’s sub-type.

  2. If not set, a None value.

As output:
  1. A Python list containing values adhering to the list’s sub-type.

  2. Set None to null the output.

From command-line:

Specify a valid JSON list string. The sub-values will be checked against the sub-type of the list.

Note

Shorthand syntax is supported of the form: [Types.Integer] in addition to longhand syntax like Types.List(Types.Integer). Both forms are equivalent.

Note

Lists can be arbitrarily deeply nested, however, the typing must be consistent between all sibling values in a nested list. Syntax for nesting is [[[Types.Integer]]] to create a 3D list of integers.

@inputs(a=[Types.Integer])
@outputs(b=[Types.Integer])
@python_task
def square_each(wf_params, a, b):
    b.set(
        [x * x for x in a]
    )
class MultiPartBlob(value)

Bases: flytekit.common.types.base_sdk_types.FlyteSdkValue

classmethod from_python_std(t_value)
Parameters

t_value (T) – It is up to each individual object as to whether or not this value can be cast.

Return type

FlyteSdkValue

Raises

flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) –

Return type

MultiPartBlob

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: MultiPartBlob

classmethod short_class_string()
Return type

Text

short_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

to_python_std()
Return type

flytekit.common.types.impl.blobs.MultiPartBlob

class MultiPartCSV(value)

Bases: flytekit.common.types.blobs.MultiPartBlob

classmethod from_python_std(t_value)
Parameters

t_value (T) – It is up to each individual object as to whether or not this value can be cast.

Return type

FlyteSdkValue

Raises

flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) –

Return type

MultiPartCSV

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: MultiPartCSV

classmethod short_class_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

static Proto(pb_type)

Use this to specify a custom protobuf type.

Note

The protobuf Python library should be installed on the PYTHONPATH to ensure the type engine can access the appropriate Python code to deserialize the protobuf.

When used with an SDK-decorated method, expect this behavior from the default type engine:

As input:
  1. If set, a Python protobuf object of the type specified in the definition.

  2. If not set, a None value.

As output:
  1. A Python protobuf object matching the type specified by the users.

  2. Set None to null the output.

From command-line:

A base-64 encoded string of the serialized protobuf.

from protos import my_protos_pb2

@inputs(a=Types.Proto(my_protos_pb2.Custom))
@outputs(b=Types.Proto(my_protos_pb2.Custom))
@python_task
def assert_and_create(wf_params, a, b):
    assert a.field1 == 1
    assert a.field2 == 'abc'
    b.set(
        my_protos_pb2.Custom(
            field1=100,
            field2='hello'
        )
    )
static Schema(columns=None)

Use this to specify a Schema blob object which is essentially a chunked stream of Parquet dataframes.

When used with an SDK-decorated method, expect this behavior from the default type engine:

Cast behavior:
  1. A generic schema (specified as Types.Schema()) can receive input from any schema type regardless of column definitions.

  2. A schema can receive as input any schema object as long as the upstream schema has a superset of the column names defined and the types match for paired columns. Ordering does not matter.

As input:
  1. If set, a flytekit.common.types.impl.schema.Schema object will be received.

  2. If not set, a None value.

As output:
  1. A user may specify a path string to a chunked dataframe non-recursive directory.

  2. A user may construct a flytekit.common.types.impl.schema.Schema object (with the correct column definitions) and pass it as output.

  3. Output can be nulled with a None value.

From command-line:

Specify a path to the schema object. This path must be accessible from the container when executing–either by being downloaded from an accessible remote location like s3 or as a local file.

@inputs(generic=Types.Schema(), typed=Types.Schema([('a', Types.Integer), ('b', Types.Float)]))
@outputs(b=Types.Schema([('a', Types.Integer), ('b', Types.Float)]))
@python_task
def concat_then_split(wf_params, generic, typed,):
    with typed as reader:
        # Each chunk is loaded as a pandas.DataFrame object
        for df in reader.iter_chunks():
            # Operate on the dataframe

    # Create at a random location specified in flytekit configuration
    out = Types.Schema([('a', Types.Integer), ('b', Types.Float)])()
    with out as writer:
        writer.write(
            pandas.DataFrame.from_dict(
                {
                    'a': [1, 2, 3],
                    'b': [5.0, 6.0, 7.0]
                }
            )
        )
    b.set(out)
class String(value)

Bases: flytekit.common.types.base_sdk_types.FlyteSdkValue

classmethod from_python_std(t_value)

Creates an object of this type from the model primitive defining it. :param T t_value: It is up to each individual object as to whether or not this value can be cast. :rtype: FlyteSdkValue :raises: flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) –

Return type

String

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: String

classmethod short_class_string()
Return type

Text

short_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

to_python_std()
Return type

Text

verbose_string()
Return type

Text

class Timedelta(value)

Bases: flytekit.common.types.base_sdk_types.FlyteSdkValue

classmethod from_python_std(t_value)
Parameters

t_value (T) – It is up to each individual object as to whether or not this value can be cast.

Return type

FlyteSdkValue

Raises

flytekit.common.exceptions.user.FlyteTypeException

classmethod from_string(string_value)
Parameters

string_value (Text) – Uses https://github.com/wroberts/pytimeparse for parsing

Return type

Timedelta

classmethod is_castable_from(other)
Parameters

other (flytekit.common.types.base_literal_types.FlyteSdkType) –

Return type

bool

classmethod promote_from_model(literal_model)

Creates an object of this type from the model primitive defining it. :param flytekit.models.literals.Literal literal_model: :rtype: Timedelta

classmethod short_class_string()
Return type

Text

short_string()
Return type

Text

classmethod to_flyte_literal_type()
Return type

flytekit.models.types.LiteralType

to_python_std()
Return type

datetime.timedelta

flytekit.sdk.workflow module

class flytekit.sdk.workflow.Input(sdk_type, help=None, **kwargs)[source]

Bases: flytekit.common.promise.Input

This object should be used to specify inputs. It can be used in conjunction with flytekit.common.workflow.workflow() and flytekit.common.workflow.workflow_class()

class flytekit.sdk.workflow.Output(value, sdk_type=None, help=None)[source]

Bases: flytekit.common.workflow.Output

This object should be used to specify outputs. It can be used in conjunction with flytekit.common.workflow.workflow() and flytekit.common.workflow.workflow_class()

flytekit.sdk.workflow.workflow(nodes, inputs=None, outputs=None, cls=None)[source]

This function provides a user-friendly interface for authoring workflows.

input_a = Input(Types.Integer, default=100, help="Tell me something")
input_b = Input(Types.Float, required=True)

node1 = my_task(a=input_a)
node2 = my_other_task(b=input_b, c=node1.outputs.c)

MyWorkflow = workflow(
    workflow_id='my_workflow_id',
    inputs={
        'a': input_a,
        'b': input_b
    },
    outputs={
        'd': Output(node2.outputs.d, sdk_type=Types.Integer, help='This is an integer output')
    },
    nodes=[
        node1,
        node2
    ]
)
Parameters
  • nodes (dict[Text,flytekit.common.nodes.SdkNode]) – A list of nodes to put inside the workflow.

  • inputs (dict[Text,Input]) – [Optional] A dictionary of input descriptors for the workflow.

  • outputs (dict[Text,Output]) – [Optional] A dictionary of output descriptors for a workflow.

  • cls (T) – This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of flytekit.common.workflow.SdkWorkflow.

Return type

flytekit.common.workflow.SdkWorkflow

flytekit.sdk.workflow.workflow_class(_workflow_metaclass=None, cls=None)[source]

This is a decorator for wrapping class definitions into workflows.

@workflow_class
class MyWorkflow(object):
    a = Input(Types.Integer, default=100, help="Tell me something")
    b = Input(Types.Float, required=True)
    first_task = my_task(a=a)
    second_task = my_other_task(b=b, c=first_task.outputs.c)
    d = Output(node2.outputs.d)
Parameters
  • _workflow_metaclass (T) – Do NOT specify this parameter directly. This is the class that is being wrapped by this decorator.

  • cls – This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of flytekit.common.workflow.SdkWorkflow.

Return type

flytekit.common.workflow.SdkWorkflow

Module contents