> ## Documentation Index
> Fetch the complete documentation index at: https://cubed3-feat-druid-driver-streaming.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Integration with Apache Airflow

> Pair Airflow (including Astro) with Cube using `airflow-provider-cube` operators for loads, pre-aggregations, and orchestration triggers.

[Apache Airflow][airflow] is a popular open-source workflow scheduler commonly
used for data orchestration. [Astro][astro] is a fully managed service for
Airflow by [Astronomer][astro].

This guide demonstrates how to setup Cube and Airflow to work together so that
Airflow can push changes from upstream data sources to Cube via the
[Orchestration API][ref-orchestration-api].

## Tasks

In Airflow, pipelines are represented by directed acyclic graphs (DAGs), Python
function decorated with a `@dag` decorator. DAGs include calls to tasks,
implemented as instances of the `Operator` class. Operators can perform various
tasks: poll for some precondition, perform extract-load-transform (ETL), or
trigger external systems like Cube.

Integration between Cube and Airflow is enabled by the
[`airflow-provider-cube`][github-airflow-provider-cube] package that provides
the following operators.

### CubeQueryOperator

`CubeQueryOperator` is used to query Cube via the
[`/v1/load`][ref-load-endpoint] endpoint of the [REST (JSON) API][ref-rest-api].

It supports the following options:

| Option         | Type     | Default        | Description                              |
| -------------- | -------- | -------------- | ---------------------------------------- |
| `cube_conn_id` | `string` | `cube_default` | Airflow connection name.                 |
| `headers`      | `dict`   |                | HTTP headers to be added to the request. |
| `query`        | `dict`   |                | Cube query object.                       |
| `timeout`      | `int`    | 30             | Response wait timeout in seconds.        |
| `wait`         | `int`    | 10             | Interval between API calls in seconds.   |

### CubeBuildOperator

`CubeBuildOperator` is used to trigger pre-aggregation builds and check their
status via the [`/v1/pre-aggregations/jobs`][ref-ref-jobs-endpoint] endpoint of
the [Orchestration API][ref-orchestration-api].

It supports the following options:

| Option         | Type     | Default        | Description                                                    |
| -------------- | -------- | -------------- | -------------------------------------------------------------- |
| `cube_conn_id` | `string` | `cube_default` | Airflow connection name.                                       |
| `headers`      | `dict`   |                | HTTP headers to be added to the request.                       |
| `selector`     | `dict`   |                | [`/v1/pre-aggregations/jobs`][ref-ref-jobs-endpoint] selector. |
| `complete`     | `bool`   | `False`        | Whether a task should wait for builds to complete or not.      |
| `wait`         | `int`    | 10             | Interval between API calls in seconds.                         |

## Installation

Install [Astro CLI installed][astro-cli].

Create a new directory and [initialize][astro-cli-dev-init] a new Astro project:

```bash theme={null}
mkdir cube-astro
cd cube-astro
astro dev init
```

Add the integration package to `requirements.txt`:

```bash theme={null}
echo "airflow-provider-cube" >> ./requirements.txt
```

## Configuration

### Connection

Create an Airflow connection via the web console or by adding the following
contents to the `airflow_settings.yaml` file:

```yaml theme={null}
airflow:
  connections:
    - conn_id: cube_default
      conn_type: generic
      conn_host: https://awesome-ecom.gcp-us-central1.cubecloudapp.dev
      conn_schema:
      conn_login:
      conn_password: SECRET
      conn_port:
      conn_extra:
        security_context: {}
```

Let's break the options down:

* By default, Cube operators use `cube_default` as an Airflow connection name.
* The connection shoud be of the `generic` type.
* `conn_host` should be set to the URL of your Cube deployment.
* `conn_password` should be set to the value of the [`CUBEJS_API_SECRET`](/reference/configuration/environment-variables#cubejs_api_secret)
  environment variable.
* `conn_extra` should contain a security context (as `security_context`) that
  will be sent with API requests.

### DAGs

Create a new DAG named `cube_query.py` in the `dags` subdirectory with the
following contents. As you can see, the `CubeQueryOperator` accepts a Cube query
via the `query` option.

```python theme={null}
from typing import Any
from pendulum import datetime
from airflow.decorators import dag, task
from cube_provider.operators.cube import CubeQueryOperator

@dag(
  start_date=datetime(2023, 6, 1),
  schedule='*/1 * * * *',
  max_active_runs=1,
  concurrency=1,
  default_args={"retries": 1, "cube_conn_id": "cube_default"},
  tags=["cube"],
)
def cube_query_workflow():
  query_op = CubeQueryOperator(
    task_id="query_op",
    query={
      "measures": ["Orders.count"],
      "dimensions": ["Orders.status"]
    }
  )

  @task()
  def print_op(data: Any):
    print(f"Result: {data}")

  print_op(query_op.output)

cube_query_workflow()
```

Create a new DAG named `cube_build.py` in the `dags` subdirectory with the
following contents. As you can see, the `CubeBuildOperator` accepts a
pre-aggregation selector via the `selector` option.

```python theme={null}
from typing import Any
from pendulum import datetime
from airflow.decorators import dag, task
from cube_provider.operators.cube import CubeBuildOperator

@dag(
  start_date=datetime(2023, 6, 1),
  schedule='*/1 * * * *',
  max_active_runs=1,
  concurrency=1,
  default_args={"retries": 1, "cube_conn_id": "cube_default"},
  tags=["cube"],
)
def cube_build_workflow():
  build_op = CubeBuildOperator(
      task_id="build_op",
      selector={
        "contexts": [
          {"securityContext": {}}
        ],
        "timezones": ["UTC"]
      },
      complete=True,
      wait=10,
  )

  @task()
  def print_op(data: Any):
    print(f"Result: {data}")

  print_op(build_op.output)

cube_build_workflow()
```

Pay attention to the `complete` option. When it's set to `True`, the operator
will wait for pre-aggregation builds to complete before allowing downstream
tasks to run.

## Running workflows

Now, you can run these DAGs:

```bash theme={null}
astro run cube_query_workflow
astro run cube_build_workflow
```

Alternatively, you can run Airflow and navigate to the web console at
[`localhost:8080`](http://localhost:8080) (use `admin`/`admin` to authenticate):

```bash theme={null}
astro dev start
```

[airflow]: https://airflow.apache.org

[astro]: https://www.astronomer.io

[astro-cli]: https://docs.astronomer.io/astro/cli/overview

[astro-cli-dev-init]: https://docs.astronomer.io/astro/cli/astro-dev-init

[github-airflow-provider-cube]: https://github.com/cube-js/airflow-provider-cube/

[ref-load-endpoint]: /reference/core-data-apis/rest-api/reference#v1load

[ref-ref-jobs-endpoint]: /reference/core-data-apis/rest-api/reference#base_path/v1/pre-aggregations/jobs

[ref-rest-api]: /reference/core-data-apis/rest-api

[ref-orchestration-api]: /reference/orchestration-api
