Presto Tasks

Presto

Presto is a query engine, similar to Hive, that is compatible with a lot of different data stores using a pluggable connector model. At a high level, Presto tends to run a lot faster and more efficiently than Hive. To read more about Presto, refer to Presto official documentation

Flyte Presto Task

The Presto task in Flyte allows users to query Presto. Similar to the Hive task, the Presto task does some work behind the scenes in order to execute a user’s query. To run a single Presto task from a user’s point of view, we actually need to send 5 different requests to Presto. Together these requests (i.e. queries) take care of retrieving the data, saving it to an external table, and performing cleanup.

If a user wanted to run a Presto query like:

SELECT *
FROM foo
WHERE bar = 123

Then Flyte actually runs 5 requests that get executed which look something like:

CREATE TABLE hive.flyte_temporary_tables."nwdwxc7fjnksj9rtzdvbm894pjlvdgrm_temp" AS
SELECT *
FROM foo
WHERE bar = 123
CREATE TABLE hive.flyte_temporary_tables."nwdwxc7fjnksj9rtzdvbm894pjlvdgrm_external" (LIKE hive.flyte_temporary_tables."nwdwxc7fjnksj9rtzdvbm894pjlvdgrm_temp")
WITH (format = 'PARQUET', external_location = 's3://my-s3-bucket/ef/iktp762nhe-p-task-0/')
INSERT INTO hive.flyte_temporary_tables."nwdwxc7fjnksj9rtzdvbm894pjlvdgrm_external"
SELECT *
FROM hive.flyte_temporary_tables."nwdwxc7fjnksj9rtzdvbm894pjlvdgrm_temp"
DROP TABLE hive.flyte_temporary_tables."nwdwxc7fjnksj9rtzdvbm894pjlvdgrm_temp"
DROP TABLE hive.flyte_temporary_tables."nwdwxc7fjnksj9rtzdvbm894pjlvdgrm_external"

The reason why 5 separate queries are needed is that Presto does not support running grouped statements like Hive does. And so, we need to separate them and execute them one by one. Because of this, users don’t need to explicitly save their query results into a separate table as this is handled automatically by Flyte.

Presto task parameters

The following are various configurations that can be set for a Presto task

  • dict[Text,flytekit.common.types.base_sdk_types.FlyteSdkType] task_inputs: Optional inputs to the Presto task

  • Text statement: The templated Presto statement to execute.

  • flytekit.common.types.schema.Schema output_schema: Schema representation of the resulting data that was queried from Presto

  • Text routing_group: The routing group that a Presto query should be sent to for the given environment

  • Text catalog: The Presto catalog for the given query

  • Text schema: The Presto schema for the given query

Usage

The following is a simple example of a Presto Flyte task. Note that unlike other tasks, there is no annotation that denotes this as a Presto task. Instead, you use the SdkPrestoTask class directly.

Simple Presto task example
from __future__ import absolute_import

from flytekit.sdk.tasks import inputs
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Input, Output
from flytekit.common.tasks.presto_task import SdkPrestoTask

schema = Types.Schema([("a", Types.String), ("b", Types.Integer)])

presto_task = SdkPrestoTask(
    task_inputs=inputs(ds=Types.String, rg=Types.String),
    statement="SELECT * FROM hive.foo.bar WHERE ds = '{{ .Inputs.ds}}' LIMIT 10",
    output_schema=schema,
    routing_group="{{ .Inputs.rg }}",
    # catalog="hive",
    # schema="foo",
)


@workflow_class()
class PrestoWorkflow(object):
    ds = Input(Types.String, required=True, help="Test string with no default")

    p_task = presto_task(ds=ds, rg='etl')

    output_a = Output(p_task.outputs.results, sdk_type=schema)

This is another example usage of the Presto task, where each task is generated dynamically:

Dynamic Presto task example
from __future__ import absolute_import

from flytekit.sdk.tasks import inputs, outputs, dynamic_task
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Input, Output
from flytekit.common.tasks.presto_task import SdkPrestoTask

schema = Types.Schema([("session_id", Types.String), ("num_rides_completed", Types.Integer)])

statement_template = """
    SELECT
      session_id, num_rides_completed
    FROM
      hive.city.fact_airport_sessions
    WHERE
      ds = '{{ .Inputs.ds}}'
    LIMIT 10
"""

presto_task = SdkPrestoTask(
    task_inputs=inputs(ds=Types.String, rg=Types.String),
    statement=statement_template,
    output_schema=schema,
    routing_group="{{ .Inputs.rg }}",
    # catalog="hive",
    # schema="city",
)


@outputs(presto_results=[schema])
@dynamic_task
def multiple_presto_queries(wf_params, presto_results):
    temp = []
    for ds in ('2020-02-20', '2020-02-21', '2020-02-22'):
        x = presto_task(ds=ds, rg='etl')
        temp.append(x.outputs.results)

    presto_results.set(temp)


@workflow_class()
class PrestoWorkflow(object):
    ds = Input(Types.String, required=True, help="Test string with no default")

    p_task = presto_task(ds=ds, rg='etl')
    presto_dynamic = multiple_presto_queries()

    output_a = Output(p_task.outputs.results, sdk_type=schema)
    output_m = Output(presto_dynamic.outputs.presto_results, sdk_type=[schema])