Skip to content

Commit

Permalink
Added make_cluster documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
nfx committed Sep 12, 2024
1 parent 8575752 commit 51e19e0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 61 deletions.
36 changes: 12 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,32 +294,20 @@ See also [`ws`](#ws-fixture).
[[back to top](#python-testing-for-databricks)]

### `make_cluster` fixture
Fixture to manage Databricks clusters.
Create a Databricks cluster, waits for it to start, and clean it up after the test.
Returns a function to create clusters. You can get `cluster_id` attribute from the returned object.

This fixture provides a function to manage Databricks clusters using the provided workspace (ws).
Clusters can be created with specified configurations, and they will be permanently deleted after the test is complete.

Parameters:
-----------
ws : WorkspaceClient
A Databricks WorkspaceClient instance.
make_random : function
The make_random fixture to generate unique names.

Returns:
--------
function:
A function to manage Databricks clusters.

Usage Example:
--------------
To manage Databricks clusters using the make_cluster fixture:

.. code-block:: python
Keyword Arguments:
* `single_node` (bool, optional): Whether to create a single-node cluster. Defaults to False.
* `cluster_name` (str, optional): The name of the cluster. If not provided, a random name will be generated.
* `spark_version` (str, optional): The Spark version of the cluster. If not provided, the latest version will be used.
* `autotermination_minutes` (int, optional): The number of minutes before the cluster is automatically terminated. Defaults to 10.

def test_cluster_management(make_cluster):
cluster_info = make_cluster(cluster_name="my-cluster", single_node=True)
assert cluster_info is not None
Usage:
```python
def test_cluster(make_cluster):
logger.info(f"created {make_cluster(single_node=True)}")
```

See also [`ws`](#ws-fixture), [`make_random`](#make_random-fixture).

Expand Down
69 changes: 32 additions & 37 deletions src/databricks/labs/pytester/fixtures/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from collections.abc import Generator

from pytest import fixture
from databricks.sdk.service.compute import CreatePolicyResponse
from databricks.sdk.service._internal import Wait
from databricks.sdk.service.compute import CreatePolicyResponse, ClusterDetails
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs, compute

from databricks.labs.pytester.fixtures.baseline import factory, get_purge_suffix
from databricks.labs.pytester.fixtures.baseline import factory, get_purge_suffix, get_test_purge_time


@fixture
Expand Down Expand Up @@ -42,34 +43,22 @@ def create(*, name: str | None = None, **kwargs) -> CreatePolicyResponse:


@fixture
def make_cluster(ws: WorkspaceClient, make_random):
def make_cluster(ws, make_random) -> Generator[ClusterDetails, None, None]:
"""
Fixture to manage Databricks clusters.
Create a Databricks cluster, waits for it to start, and clean it up after the test.
Returns a function to create clusters. You can get `cluster_id` attribute from the returned object.
This fixture provides a function to manage Databricks clusters using the provided workspace (ws).
Clusters can be created with specified configurations, and they will be permanently deleted after the test is complete.
Parameters:
-----------
ws : WorkspaceClient
A Databricks WorkspaceClient instance.
make_random : function
The make_random fixture to generate unique names.
Returns:
--------
function:
A function to manage Databricks clusters.
Usage Example:
--------------
To manage Databricks clusters using the make_cluster fixture:
.. code-block:: python
Keyword Arguments:
* `single_node` (bool, optional): Whether to create a single-node cluster. Defaults to False.
* `cluster_name` (str, optional): The name of the cluster. If not provided, a random name will be generated.
* `spark_version` (str, optional): The Spark version of the cluster. If not provided, the latest version will be used.
* `autotermination_minutes` (int, optional): The number of minutes before the cluster is automatically terminated. Defaults to 10.
def test_cluster_management(make_cluster):
cluster_info = make_cluster(cluster_name="my-cluster", single_node=True)
assert cluster_info is not None
Usage:
```python
def test_cluster(make_cluster):
logger.info(f"created {make_cluster(single_node=True)}")
```
"""

def create(
Expand All @@ -79,29 +68,35 @@ def create(
spark_version: str | None = None,
autotermination_minutes=10,
**kwargs,
):
) -> Wait[ClusterDetails]:
if cluster_name is None:
cluster_name = f"sdk-{make_random(4)}"
cluster_name = f"dummy-{make_random(4)}"
if spark_version is None:
spark_version = ws.clusters.select_spark_version(latest=True)
if single_node:
kwargs["num_workers"] = 0
kwargs["spark_conf"] = {"spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]"}
if "spark_conf" in kwargs:
kwargs["spark_conf"] = kwargs["spark_conf"] | {
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
}
else:
kwargs["spark_conf"] = {"spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]"}
kwargs["custom_tags"] = {"ResourceClass": "SingleNode"}
elif "instance_pool_id" not in kwargs:
kwargs["node_type_id"] = ws.clusters.select_node_type(local_disk=True)

if "instance_pool_id" not in kwargs:
kwargs["node_type_id"] = ws.clusters.select_node_type(local_disk=True, min_memory_gb=16)
if "custom_tags" not in kwargs:
kwargs["custom_tags"] = {"RemoveAfter": get_test_purge_time()}
else:
kwargs["custom_tags"]["RemoveAfter"] = get_test_purge_time()
return ws.clusters.create(
cluster_name=cluster_name,
spark_version=spark_version,
autotermination_minutes=autotermination_minutes,
**kwargs,
)

def cleanup_cluster(cluster_info):
ws.clusters.permanent_delete(cluster_info.cluster_id)

yield from factory("cluster", create, cleanup_cluster)
yield from factory("cluster", create, lambda item: ws.clusters.permanent_delete(item.cluster_id))


@fixture
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/fixtures/test_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@

def test_cluster_policy(make_cluster_policy):
logger.info(f"created {make_cluster_policy()}")


def test_cluster(make_cluster):
logger.info(f"created {make_cluster(single_node=True)}")

0 comments on commit 51e19e0

Please sign in to comment.