flytekit.common package

Subpackages

Submodules

flytekit.common.component_nodes module

class flytekit.common.component_nodes.SdkTaskNode(sdk_task)[source]

Bases: flytekit.models.core.workflow.TaskNode

classmethod promote_from_model(base_model, tasks)[source]

Takes the idl wrapper for a TaskNode and returns the hydrated Flytekit object for it by fetching it from the engine.

Parameters
Return type

SdkTaskNode

property reference_id

A globally unique identifier for the task. :rtype: flytekit.models.core.identifier.Identifier

property sdk_task
Return type

flytekit.common.tasks.task.SdkTask

class flytekit.common.component_nodes.SdkWorkflowNode(sdk_workflow=None, sdk_launch_plan=None)[source]

Bases: flytekit.models.core.workflow.WorkflowNode

property launchplan_ref

[Optional] A globally unique identifier for the launch plan. Should map to Admin. :rtype: flytekit.models.core.identifier.Identifier

classmethod promote_from_model(base_model, sub_workflows, tasks)[source]
Parameters

base_model (flytekit.models.core.workflow.WorkflowNode) –

:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.workflow.WorkflowTemplate]

sub_workflows:

Parameters

flytekit.models.task.TaskTemplate] tasks (dict[flytekit.models.core.identifier.Identifier,) –

Return type

SdkWorkflowNode

property sdk_launch_plan
Return type

flytekit.common.launch_plan.SdkLaunchPlan

property sdk_workflow
Return type

flytekit.common.workflow.SdkWorkflow

property sub_workflow_ref

[Optional] Reference to a subworkflow, that should be defined with the compiler context. :rtype: flytekit.models.core.identifier.Identifier

flytekit.common.constants module

class flytekit.common.constants.CloudProvider[source]

Bases: object

AWS = 'aws'
GCP = 'gcp'
class flytekit.common.constants.SdkTaskType[source]

Bases: object

BATCH_HIVE_TASK = 'batch_hive'
CONTAINER_ARRAY_TASK = 'container_array'
DYNAMIC_TASK = 'dynamic-task'
HIVE_JOB = 'hive'
PRESTO_TASK = 'presto'
PYTHON_TASK = 'python-task'
PYTORCH_TASK = 'pytorch'
RAW_CONTAINER_TASK = 'raw-container'
SAGEMAKER_CUSTOM_TRAINING_JOB_TASK = 'sagemaker_custom_training_job_task'
SAGEMAKER_HYPERPARAMETER_TUNING_JOB_TASK = 'sagemaker_hyperparameter_tuning_job_task'
SAGEMAKER_TRAINING_JOB_TASK = 'sagemaker_training_job_task'
SENSOR_TASK = 'sensor-task'
SIDECAR_TASK = 'sidecar'
SPARK_TASK = 'spark'

flytekit.common.interface module

class flytekit.common.interface.BindingData(scalar=None, collection=None, promise=None, map=None)[source]

Bases: flytekit.models.literals.BindingData

classmethod from_python_std(literal_type, t_value, upstream_nodes=None)[source]
Parameters
Return type

BindingData

classmethod promote_from_model(model)[source]
Parameters

model (flytekit.models.literals.BindingData) –

Return type

BindingData

class flytekit.common.interface.TypedInterface(inputs, outputs)[source]

Bases: flytekit.models.interface.TypedInterface

create_bindings_for_inputs(map_of_bindings)[source]
Parameters

T] map_of_bindings (dict[Text,) – This can be scalar primitives, it can be node output references, lists, etc..

Return type

(list[flytekit.models.literals.Binding], list[flytekit.common.nodes.SdkNode])

Raises

flytekit.common.exceptions.user.FlyteAssertion

classmethod promote_from_model(model)[source]
Parameters

model (flytekit.models.interface.TypedInterface) –

Return type

TypedInterface

flytekit.common.launch_plan module

class flytekit.common.launch_plan.SdkLaunchPlan(*args, **kwargs)[source]

Bases: flytekit.models.launch_plan.LaunchPlanSpec, flytekit.common.mixins.launchable.LaunchableEntity

property auth_role
Return type

flytekit.models.common.AuthRole

property entity_type_text
Return type

Text

execute_with_literals(project, domain, literal_inputs, name=None, notification_overrides=None, label_overrides=None, annotation_overrides=None)[source]

Deprecated.

classmethod fetch(project, domain, name, version=None)[source]

This function uses the engine loader to call create a hydrated task from Admin. :param Text project: :param Text domain: :param Text name: :param Text version: [Optional] If not set, the SDK will fetch the active launch plan for the given project,

domain, and name.

Return type

SdkLaunchPlan

property id
Return type

flytekit.common.core.identifier.Identifier

property interface
Return type

flytekit.common.interface.TypedInterface

property is_scheduled
Return type

bool

launch_with_literals(project, domain, literal_inputs, name=None, notification_overrides=None, label_overrides=None, annotation_overrides=None)[source]

Executes the launch plan and returns the execution identifier. This version of execution is meant for when you already have a LiteralMap of inputs.

Parameters
Return type

flytekit.common.workflow_execution.SdkWorkflowExecution

classmethod promote_from_model(model)[source]
Parameters

model (flytekit.models.launch_plan.LaunchPlanSpec) –

Return type

SdkLaunchPlan

property raw_output_data_config
Return type

flytekit.models.common.RawOutputDataConfig

property resource_type

Integer from _identifier.ResourceType enum :rtype: int

update(state)[source]
Parameters

state (int) – Enum value from flytekit.models.launch_plan.LaunchPlanState

validate()[source]
class flytekit.common.launch_plan.SdkRunnableLaunchPlan(sdk_workflow, default_inputs=None, fixed_inputs=None, role=None, schedule=None, notifications=None, labels=None, annotations=None, auth_role=None, raw_output_data_config=None)[source]

Bases: flytekit.common.mixins.hash.HashOnReferenceMixin, flytekit.common.launch_plan.SdkLaunchPlan, flytekit.common.mixins.registerable.RegisterableEntity

classmethod fetch(project, domain, name, version=None)[source]

This function uses the engine loader to call create a hydrated task from Admin. :param Text project: :param Text domain: :param Text name: :param Text version: :rtype: SdkRunnableLaunchPlan

classmethod from_flyte_idl(_)[source]
Parameters

pb2 (flyteidl.admin.launch_plan_pb2.LaunchPlanSpec) –

Return type

LaunchPlanSpec

property interface
Return type

flytekit.common.interface.TypedInterface

classmethod promote_from_model(model)[source]
Parameters

model (flytekit.models.launch_plan.LaunchPlanSpec) –

Return type

SdkRunnableLaunchPlan

register(project, domain, name, version)[source]
Parameters
  • project (Text) –

  • domain (Text) –

  • name (Text) –

  • version (Text) –

serialize()[source]

Unlike the SdkWorkflow serialize call, nothing special needs to be done here. :rtype: flyteidl.admin.launch_plan_pb2.LaunchPlanSpec

property upstream_entities

Task, workflow, and launch plan that need to be registered in advance of this workflow. :rtype: set[_registerable.RegisterableEntity]

property workflow_id
Return type

flytekit.common.core.identifier.Identifier

flytekit.common.nodes module

class flytekit.common.nodes.OutputParameterMapper(type_map, node)[source]

Bases: flytekit.common.nodes.ParameterMapper

This subclass of ParameterMapper is used to represent outputs for a given node.

class flytekit.common.nodes.ParameterMapper(type_map, node)[source]

Bases: sortedcontainers.sorteddict.SortedDict

This abstract class provides functionality to reference specific inputs and outputs for a task instance. This allows for syntax such as:

my_task_instance.inputs.my_input

And is especially useful for linking tasks together via outputs -> inputs in workflow definitions:

my_second_task_instance(input=my_task_instances.outputs.my_output)

Attributes:

Dynamically discovered. Only the keys for inputs/outputs can be referenced.

Example:

@inputs(a=Types.Integer)
@outputs(b=Types.String)
@python_task(version='1')
def my_task(wf_params, a, b):
    pass

input_link = my_task.inputs.a # Success!
output_link = my_tasks.outputs.b # Success!

input_link = my_task.inputs.c # Attribute not found exception!
output_link = my_task.outputs.d # Attribute not found exception!
class flytekit.common.nodes.SdkNode(id, upstream_nodes, bindings, metadata, sdk_task=None, sdk_workflow=None, sdk_launch_plan=None, sdk_branch=None)[source]

Bases: flytekit.common.mixins.hash.HashOnReferenceMixin, flytekit.models.core.workflow.Node

assign_id_and_return(id)[source]
Parameters

id (Text) –

Return type

None

property executable_sdk_object
property outputs
Return type

dict[Text, flytekit.common.promise.NodeOutput]

classmethod promote_from_model(model, sub_workflows, tasks)[source]
Parameters

model (flytekit.models.core.workflow.Node) –

:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.workflow.WorkflowTemplate]

sub_workflows:

Parameters

flytekit.models.task.TaskTemplate] tasks (dict[flytekit.models.core.identifier.Identifier,) – If specified, these task templates will be passed to the SdkTaskNode promote_from_model call, and used instead of fetching from Admin.

Return type

SdkNode

property upstream_node_ids
Return type

list[Text]

property upstream_nodes
Return type

list[SdkNode]

with_overrides(*args, **kwargs)[source]
class flytekit.common.nodes.SdkNodeExecution(*args, **kwargs)[source]

Bases: flytekit.models.node_execution.NodeExecution, flytekit.common.mixins.artifact.ExecutionArtifact

property error

If execution is in progress, raise an exception. Otherwise, return None if no error was present upon reaching completion. :rtype: flytekit.models.core.execution.ExecutionError or None

property executions

Returns a list of generic execution artifacts. :rtype: list[flytekit.common.mixins.artifact.ExecutionArtifact]

property inputs

Returns the inputs to the execution in the standard Python format as dictated by the type engine. :rtype: dict[Text, T]

property is_complete

Dictates whether or not the execution is complete. :rtype: bool

property outputs

Returns the outputs to the execution in the standard Python format as dictated by the type engine. If the execution ended in error or the execution is in progress, an exception will be raised. :rtype: dict[Text, T]

classmethod promote_from_model(base_model)[source]
Parameters

base_model (_node_execution_models.NodeExecution) –

Return type

SdkNodeExecution

sync()[source]

Syncs the state of this object with that held by the platform. :rtype: None

property task_executions

Returns the underlying task executions in order of try attempt. :rtype: list[flytekit.common.tasks.executions.SdkTaskExecution]

property workflow_executions

Returns the underlying workflow executions in order of try attempt. :rtype: list[flytekit.common.workflow_execution.SdkWorkflowExecution]

flytekit.common.notifications module

class flytekit.common.notifications.Email(phases, recipients_email)[source]

Bases: flytekit.common.notifications.Notification

classmethod promote_from_model(base_model)[source]
Parameters

base_model (flytekit.models.common.Notification) –

Return type

Notification

class flytekit.common.notifications.Notification(phases, email=None, pager_duty=None, slack=None)[source]

Bases: flytekit.models.common.Notification

VALID_PHASES = {4, 6, 7, 8}
classmethod from_flyte_idl(p)[source]
Parameters

p (flyteidl.admin.common_pb2.Notification) –

Return type

Notification

class flytekit.common.notifications.PagerDuty(phases, recipients_email)[source]

Bases: flytekit.common.notifications.Notification

classmethod promote_from_model(base_model)[source]
Parameters

base_model (flytekit.models.common.Notification) –

Return type

Notification

class flytekit.common.notifications.Slack(phases, recipients_email)[source]

Bases: flytekit.common.notifications.Notification

classmethod promote_from_model(base_model)[source]
Parameters

base_model (flytekit.models.common.Notification) –

Return type

Notification

flytekit.common.promise module

class flytekit.common.promise.Input(name, sdk_type, help=None, **kwargs)[source]

Bases: flytekit.models.interface.Parameter

property help
Return type

Text

property name
Return type

Text

property promise
Return type

flytekit.models.types.OutputReference

classmethod promote_from_model(model)[source]
Parameters

model (flytekit.models.interface.Parameter) –

Return type

Parameter

rename_and_return_reference(new_name)[source]
property sdk_default
Return type

T

property sdk_required
Return type

bool

property sdk_type
Return type

flytekit.common.types.base_sdk_types.FlyteSdkType

class flytekit.common.promise.NodeOutput(sdk_node, sdk_type, var)[source]

Bases: flytekit.models.types.OutputReference

property node_id

Override the underlying node_id property to refer to SdkNode. :rtype: Text

classmethod promote_from_model(model)[source]
Parameters

model (flytekit.models.types.OutputReference) –

Return type

NodeOutput

property sdk_node
Return type

flytekit.common.nodes.SdkNode

property sdk_type
Return type

flytekit.common.types.base_sdk_types.FlyteSdkType

flytekit.common.schedules module

class flytekit.common.schedules.CronSchedule(cron_expression, kickoff_time_input_arg=None)[source]

Bases: flytekit.common.schedules._ExtendedSchedule

classmethod promote_from_model(base_model)[source]
Parameters

base_model (flytekit.models.schedule.Schedule) –

Return type

CronSchedule

class flytekit.common.schedules.FixedRate(duration, kickoff_time_input_arg=None)[source]

Bases: flytekit.common.schedules._ExtendedSchedule

classmethod promote_from_model(base_model)[source]
Parameters

base_model (flytekit.models.schedule.Schedule) –

Return type

FixedRate

flytekit.common.sdk_bases module

class flytekit.common.sdk_bases.ExtendedSdkType[source]

Bases: flytekit.models.common.FlyteType

Abstract class that all SDK objects must inherit from. This provides the ability to promote a data model object into an actionable object.

from_flyte_idl(pb2_object)[source]
abstract promote_from_model(base_model)[source]
Parameters

base_model (flytekit.models.common.FlyteIdlEntity) –

Return type

ExtendedSdkType

flytekit.common.utils module

class flytekit.common.utils.AutoDeletingTempDir(working_dir_prefix=None, tmp_dir=None, cleanup=True)[source]

Bases: flytekit.common.utils.Directory

Creates a posix safe tempdir which is auto deleted once out of scope

force_cleanup()[source]
get_named_tempfile(name)[source]
class flytekit.common.utils.Directory(path)[source]

Bases: object

list_dir()[source]

The list of absolute filepaths for all immediate sub-paths :rtype: list[Text]

property name
Return type

Text

class flytekit.common.utils.ExitStack(entered_stack=None)[source]

Bases: object

enter_context(context)[source]
pop_all()[source]
class flytekit.common.utils.PerformanceTimer(context_statement)[source]

Bases: object

flytekit.common.utils.fqdn(module, name, entity_type=None)[source]
Parameters
  • module (Text) –

  • name (Text) –

  • entity_type (int) – _identifier.ResourceType enum

Return type

Text

flytekit.common.utils.fqdn_safe(module, key, entity_type=None)[source]
Parameters
  • module (Text) –

  • key (Text) –

  • entity_type (int) – _identifier.ResourceType enum

Return type

Text

flytekit.common.utils.get_version_message()[source]
flytekit.common.utils.load_proto_from_file(pb2_type, path)[source]
flytekit.common.utils.write_proto_to_file(proto, path)[source]

flytekit.common.workflow module

class flytekit.common.workflow.Output(name, value, sdk_type=None, help=None)[source]

Bases: object

property binding_data
Return type

flytekit.models.literals.BindingData

property name
Return type

Text

rename_and_return_reference(new_name)[source]
property var
Return type

flytekit.models.interface.Variable

class flytekit.common.workflow.SdkWorkflow(inputs, outputs, nodes, id=None, metadata=None, metadata_defaults=None, interface=None, output_bindings=None, disable_default_launch_plan=False)[source]

Bases: flytekit.common.mixins.hash.HashOnReferenceMixin, flytekit.models.core.workflow.WorkflowTemplate, flytekit.common.mixins.registerable.RegisterableEntity

create_launch_plan(default_inputs=None, fixed_inputs=None, schedule=None, role=None, notifications=None, labels=None, annotations=None, assumable_iam_role=None, kubernetes_service_account=None, raw_output_data_prefix=None, cls=None)[source]

This method will create a launch plan object that can execute this workflow. :param dict[Text,flytekit.common.promise.Input] default_inputs: :param dict[Text,T] fixed_inputs: :param flytekit.models.schedule.Schedule schedule: A schedule on which to execute this launch plan. :param Text role: Deprecated. Use assumable_iam_role instead. :param list[flytekit.models.common.Notification] notifications: A list of notifications to enact by default for this launch plan. :param flytekit.models.common.Labels labels: :param flytekit.models.common.Annotations annotations: :param cls: This parameter can be used by users to define an extension of a launch plan to instantiate. The class provided should be a subclass of flytekit.common.launch_plan.SdkLaunchPlan. :param Text assumable_iam_role: The IAM role to execute the workflow with. :param Text kubernetes_service_account: The kubernetes service account to execute the workflow with. :param Text raw_output_data_prefix: Bucket for offloaded data

Return type

flytekit.common.launch_plan.SdkRunnableLaunchPlan

property entity_type_text
Return type

Text

classmethod fetch(project, domain, name, version=None)[source]

This function uses the engine loader to call create a hydrated task from Admin. :param Text project: :param Text domain: :param Text name: :param Text version: :rtype: SdkWorkflow

classmethod get_non_system_nodes(nodes)[source]
Parameters

nodes (list[flytekit.models.core.workflow.Node]) –

Return type

list[flytekit.models.core.workflow.Node]

get_sub_workflows()[source]

Recursive call that returns all subworkflows in the current workflow

Return type

list[SdkWorkflow]

property interface
Return type

flytekit.common.interface.TypedInterface

classmethod promote_from_model(base_model, sub_workflows=None, tasks=None)[source]
Parameters

base_model (flytekit.models.core.workflow.WorkflowTemplate) –

:param dict[flytekit.models.core.identifier.Identifier, flytekit.models.core.workflow.WorkflowTemplate]

sub_workflows: Provide a list of WorkflowTemplate models (should be returned from Admin as part of the admin CompiledWorkflowClosure. Relevant sub-workflows should always be provided.

Parameters

flytekit.models.task.TaskTemplate] tasks (dict[flytekit.models.core.identifier.Identifier,) – Same as above but for tasks. If tasks are not provided relevant TaskTemplates will be fetched from Admin

Return type

SdkWorkflow

register(project, domain, name, version)[source]
Parameters
  • project (Text) –

  • domain (Text) –

  • name (Text) –

  • version (Text) –

property resource_type

Integer from _identifier.ResourceType enum :rtype: int

serialize()[source]

Serializing a workflow should produce an object similar to what the registration step produces, in preparation for actual registration to Admin.

Return type

flyteidl.admin.workflow_pb2.WorkflowSpec

property should_create_default_launch_plan

Determines whether registration flow should create a default launch plan for this workflow or not. :rtype: bool

property upstream_entities

Task, workflow, and launch plan that need to be registered in advance of this workflow. :rtype: set[T]

property user_inputs
Return type

list[flytekit.common.promise.Input]

validate()[source]
flytekit.common.workflow.build_sdk_workflow_from_metaclass(metaclass, on_failure=None, disable_default_launch_plan=False, cls=None)[source]
Parameters
  • metaclass (T) –

  • cls – This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of SdkWorkflow.

  • on_failure (flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy) – [Optional] The execution policy when the workflow detects a failure.

  • disable_default_launch_plan (bool) – Determines whether to create a default launch plan for the workflow or not.

Return type

SdkWorkflow

flytekit.common.workflow_execution module

class flytekit.common.workflow_execution.SdkWorkflowExecution(*args, **kwargs)[source]

Bases: flytekit.models.execution.Execution, flytekit.common.mixins.artifact.ExecutionArtifact

property error

If execution is in progress, raise an exception. Otherwise, return None if no error was present upon reaching completion. :rtype: flytekit.models.core.execution.ExecutionError or None

classmethod fetch(project, domain, name)[source]
Parameters
  • project (Text) –

  • domain (Text) –

  • name (Text) –

Return type

SdkWorkflowExecution

get_node_executions(filters=None)[source]
Parameters

filters (list[flytekit.models.filters.Filter]) –

Return type

dict[Text, flytekit.common.nodes.SdkNodeExecution]

property inputs

Returns the inputs to the execution in the standard Python format as dictated by the type engine. :rtype: dict[Text, T]

property is_complete

Dictates whether or not the execution is complete. :rtype: bool

property node_executions
Return type

dict[Text, flytekit.common.nodes.SdkNodeExecution]

property outputs

Returns the outputs to the execution in the standard Python format as dictated by the type engine. If the execution ended in error or the execution is in progress, an exception will be raised. :rtype: dict[Text, T] or None

classmethod promote_from_model(base_model)[source]
Parameters

base_model (_execution_models.Execution) –

Return type

SdkWorkflowExecution

sync()[source]

Syncs the state of the underlying execution artifact with the state observed by the platform. :rtype: None

terminate(cause)[source]
Parameters

cause (Text) –

Module contents