Skip to content

Commit 7ab05d5

Browse files
authored
feat: Add PipelineJob.submit to create PipelineJob without monitoring it's completion. (#798)
1 parent 45401c0 commit 7ab05d5

File tree

4 files changed

+120
-7
lines changed

4 files changed

+120
-7
lines changed

README.rst

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -358,14 +358,12 @@ To delete an endpoint:
358358
Pipelines
359359
---------
360360

361-
To create a Vertex Pipeline run:
361+
To create a Vertex Pipeline run and monitor until completion:
362362

363363
.. code-block:: Python
364364
365365
# Instantiate PipelineJob object
366366
pl = PipelineJob(
367-
# Display name is required but seemingly not used
368-
# see https://github.com/googleapis/python-aiplatform/blob/9dcf6fb0bc8144d819938a97edf4339fe6f2e1e6/google/cloud/aiplatform/pipeline_jobs.py#L260
369367
display_name="My first pipeline",
370368
371369
# Whether or not to enable caching
@@ -384,7 +382,7 @@ To create a Vertex Pipeline run:
384382
pipeline_root=pipeline_root,
385383
)
386384
387-
# Execute pipeline in Vertex
385+
# Execute pipeline in Vertex and monitor until completion
388386
pl.run(
389387
# Email address of service account to use for the pipeline run
390388
# You must have iam.serviceAccounts.actAs permission on the service account to use it
@@ -395,6 +393,37 @@ To create a Vertex Pipeline run:
395393
sync=True
396394
)
397395
396+
To create a Vertex Pipeline without monitoring until completion, use `submit` instead of `run`:
397+
398+
.. code-block:: Python
399+
400+
# Instantiate PipelineJob object
401+
pl = PipelineJob(
402+
display_name="My first pipeline",
403+
404+
# Whether or not to enable caching
405+
# True = always cache pipeline step result
406+
# False = never cache pipeline step result
407+
# None = defer to cache option for each pipeline component in the pipeline definition
408+
enable_caching=False,
409+
410+
# Local or GCS path to a compiled pipeline definition
411+
template_path="pipeline.json",
412+
413+
# Dictionary containing input parameters for your pipeline
414+
parameter_values=parameter_values,
415+
416+
# GCS path to act as the pipeline root
417+
pipeline_root=pipeline_root,
418+
)
419+
420+
# Submit the Pipeline to Vertex
421+
pl.submit(
422+
# Email address of service account to use for the pipeline run
423+
# You must have iam.serviceAccounts.actAs permission on the service account to use it
424+
service_account=service_account,
425+
)
426+
398427
399428
Explainable AI: Get Metadata
400429
----------------------------

google/cloud/aiplatform/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ def wrapper(*args, **kwargs):
671671
# if sync then wait for any Futures to complete and execute
672672
if sync:
673673
if self:
674-
self.wait()
674+
VertexAiResourceNounWithFutureManager.wait(self)
675675
return method(*args, **kwargs)
676676

677677
# callbacks to call within the Future (in same Thread)

google/cloud/aiplatform/pipeline_jobs.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def run(
232232
network: Optional[str] = None,
233233
sync: Optional[bool] = True,
234234
) -> None:
235-
"""Run this configured PipelineJob.
235+
"""Run this configured PipelineJob and monitor the job until completion.
236236
237237
Args:
238238
service_account (str):
@@ -247,6 +247,26 @@ def run(
247247
sync (bool):
248248
Optional. Whether to execute this method synchronously. If False, this method will unblock and it will be executed in a concurrent Future.
249249
"""
250+
self.submit(service_account=service_account, network=network)
251+
252+
self._block_until_complete()
253+
254+
def submit(
255+
self, service_account: Optional[str] = None, network: Optional[str] = None,
256+
) -> None:
257+
"""Run this configured PipelineJob.
258+
259+
Args:
260+
service_account (str):
261+
Optional. Specifies the service account for workload run-as account.
262+
Users submitting jobs must have act-as permission on this run-as account.
263+
network (str):
264+
Optional. The full name of the Compute Engine network to which the job
265+
should be peered. For example, projects/12345/global/networks/myVPC.
266+
267+
Private services access must already be configured for the network.
268+
If left unspecified, the job is not peered with any network.
269+
"""
250270
if service_account:
251271
self._gca_resource.service_account = service_account
252272

@@ -267,7 +287,12 @@ def run(
267287

268288
_LOGGER.info("View Pipeline Job:\n%s" % self._dashboard_uri())
269289

270-
self._block_until_complete()
290+
def wait(self):
291+
"""Wait for thie PipelineJob to complete."""
292+
if self._latest_future is None:
293+
self._block_until_complete()
294+
else:
295+
super().wait()
271296

272297
@property
273298
def pipeline_spec(self):

tests/unit/aiplatform/test_pipeline_jobs.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,65 @@ def test_run_call_pipeline_service_pipeline_job_create(
275275
gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED
276276
)
277277

278+
@pytest.mark.usefixtures("mock_load_pipeline_job_json")
279+
def test_submit_call_pipeline_service_pipeline_job_create(
280+
self, mock_pipeline_service_create, mock_pipeline_service_get
281+
):
282+
aiplatform.init(
283+
project=_TEST_PROJECT,
284+
staging_bucket=_TEST_GCS_BUCKET_NAME,
285+
location=_TEST_LOCATION,
286+
credentials=_TEST_CREDENTIALS,
287+
)
288+
289+
job = pipeline_jobs.PipelineJob(
290+
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
291+
template_path=_TEST_TEMPLATE_PATH,
292+
job_id=_TEST_PIPELINE_JOB_ID,
293+
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
294+
enable_caching=True,
295+
)
296+
297+
job.submit(service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK)
298+
299+
expected_runtime_config_dict = {
300+
"gcs_output_directory": _TEST_GCS_BUCKET_NAME,
301+
"parameters": {"name_param": {"stringValue": "hello"}},
302+
}
303+
runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb
304+
json_format.ParseDict(expected_runtime_config_dict, runtime_config)
305+
306+
# Construct expected request
307+
expected_gapic_pipeline_job = gca_pipeline_job_v1beta1.PipelineJob(
308+
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
309+
pipeline_spec={
310+
"components": {},
311+
"pipelineInfo": _TEST_PIPELINE_JOB["pipelineSpec"]["pipelineInfo"],
312+
"root": _TEST_PIPELINE_JOB["pipelineSpec"]["root"],
313+
},
314+
runtime_config=runtime_config,
315+
service_account=_TEST_SERVICE_ACCOUNT,
316+
network=_TEST_NETWORK,
317+
)
318+
319+
mock_pipeline_service_create.assert_called_once_with(
320+
parent=_TEST_PARENT,
321+
pipeline_job=expected_gapic_pipeline_job,
322+
pipeline_job_id=_TEST_PIPELINE_JOB_ID,
323+
)
324+
325+
assert not mock_pipeline_service_get.called
326+
327+
job.wait()
328+
329+
mock_pipeline_service_get.assert_called_with(
330+
name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY
331+
)
332+
333+
assert job._gca_resource == make_pipeline_job(
334+
gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED
335+
)
336+
278337
@pytest.mark.usefixtures("mock_load_pipeline_spec_json")
279338
@pytest.mark.parametrize("sync", [True, False])
280339
def test_run_call_pipeline_service_pipeline_spec_create(

0 commit comments

Comments
 (0)