@@ -1092,6 +1092,7 @@ def batch_serve_to_df(
10921092 feature_destination_fields : Optional [Dict [str , str ]] = None ,
10931093 request_metadata : Optional [Sequence [Tuple [str , str ]]] = (),
10941094 serve_request_timeout : Optional [float ] = None ,
1095+ bq_dataset_id : Optional [str ] = None ,
10951096 ) -> "pd.DataFrame" : # noqa: F821 - skip check for undefined name 'pd'
10961097 """Batch serves feature values to pandas DataFrame
10971098
@@ -1176,6 +1177,11 @@ def batch_serve_to_df(
11761177 serve_request_timeout (float):
11771178 Optional. The timeout for the serve request in seconds.
11781179
1180+ bq_dataset_id (str):
1181+ Optional. The full dataset ID for the BigQuery dataset to use
1182+ for temporarily staging data. If specified, caller must have
1183+ `bigquery.tables.create` permissions for Dataset.
1184+
11791185 Returns:
11801186 pd.DataFrame: The pandas DataFrame containing feature values from batch serving.
11811187
@@ -1210,34 +1216,43 @@ def batch_serve_to_df(
12101216
12111217 self .wait ()
12121218 featurestore_name_components = self ._parse_resource_name (self .resource_name )
1213- featurestore_id = featurestore_name_components ["featurestore" ]
1214-
1215- temp_bq_dataset_name = f"temp_{ featurestore_id } _{ uuid .uuid4 ()} " .replace (
1216- "-" , "_"
1217- )
12181219
1219- project_id = resource_manager_utils .get_project_id (
1220- project_number = featurestore_name_components ["project" ],
1221- credentials = self .credentials ,
1222- )
1223- temp_bq_dataset_id = f"{ project_id } .{ temp_bq_dataset_name } " [:1024 ]
1224- temp_bq_dataset = bigquery .Dataset (dataset_ref = temp_bq_dataset_id )
1225- temp_bq_dataset .location = self .location
1226- temp_bq_dataset = bigquery_client .create_dataset (temp_bq_dataset )
1220+ # if user didn't specify BigQuery dataset, create an ephemeral one
1221+ if bq_dataset_id is None :
1222+ temp_bq_full_dataset_id = self ._get_ephemeral_bq_full_dataset_id (
1223+ featurestore_name_components ["featurestore" ],
1224+ featurestore_name_components ["project" ],
1225+ )
1226+ temp_bq_dataset = self ._create_ephemeral_bq_dataset (
1227+ bigquery_client , temp_bq_full_dataset_id
1228+ )
1229+ temp_bq_batch_serve_table_name = "batch_serve"
1230+ temp_bq_read_instances_table_name = "read_instances"
1231+
1232+ # if user specified BigQuery dataset, create ephemeral tables
1233+ else :
1234+ temp_bq_full_dataset_id = bq_dataset_id
1235+ temp_bq_dataset = bigquery .Dataset (dataset_ref = temp_bq_full_dataset_id )
1236+ temp_bq_batch_serve_table_name = f"tmp_batch_serve_{ uuid .uuid4 ()} " .replace (
1237+ "-" , "_"
1238+ )
1239+ temp_bq_read_instances_table_name = (
1240+ f"tmp_read_instances_{ uuid .uuid4 ()} " .replace ("-" , "_" )
1241+ )
12271242
1228- temp_bq_batch_serve_table_name = "batch_serve"
1229- temp_bq_read_instances_table_name = "read_instances"
12301243 temp_bq_batch_serve_table_id = (
1231- f"{ temp_bq_dataset_id } .{ temp_bq_batch_serve_table_name } "
1244+ f"{ temp_bq_full_dataset_id } .{ temp_bq_batch_serve_table_name } "
12321245 )
1246+
12331247 temp_bq_read_instances_table_id = (
1234- f"{ temp_bq_dataset_id } .{ temp_bq_read_instances_table_name } "
1248+ f"{ temp_bq_full_dataset_id } .{ temp_bq_read_instances_table_name } "
12351249 )
12361250
12371251 try :
12381252
12391253 job = bigquery_client .load_table_from_dataframe (
1240- dataframe = read_instances_df , destination = temp_bq_read_instances_table_id
1254+ dataframe = read_instances_df ,
1255+ destination = temp_bq_read_instances_table_id ,
12411256 )
12421257 job .result ()
12431258
@@ -1259,7 +1274,7 @@ def batch_serve_to_df(
12591274 read_session = bigquery_storage .types .ReadSession (
12601275 table = "projects/{project}/datasets/{dataset}/tables/{table}" .format (
12611276 project = self .project ,
1262- dataset = temp_bq_dataset_name ,
1277+ dataset = temp_bq_dataset . dataset_id ,
12631278 table = temp_bq_batch_serve_table_name ,
12641279 ),
12651280 data_format = bigquery_storage .types .DataFormat .ARROW ,
@@ -1273,9 +1288,60 @@ def batch_serve_to_df(
12731288 frames .append (message .to_dataframe ())
12741289
12751290 finally :
1276- bigquery_client .delete_dataset (
1277- dataset = temp_bq_dataset .dataset_id ,
1278- delete_contents = True ,
1279- )
1291+ # clean up: if user didn't specify dataset, delete ephemeral dataset
1292+ if bq_dataset_id is None :
1293+ bigquery_client .delete_dataset (
1294+ dataset = temp_bq_dataset .dataset_id ,
1295+ delete_contents = True ,
1296+ )
1297+
1298+ # clean up: if user specified BigQuery dataset, delete ephemeral tables
1299+ else :
1300+ bigquery_client .delete_table (temp_bq_batch_serve_table_id )
1301+ bigquery_client .delete_table (temp_bq_read_instances_table_id )
12801302
12811303 return pd .concat (frames , ignore_index = True ) if frames else pd .DataFrame (frames )
1304+
1305+ def _get_ephemeral_bq_full_dataset_id (
1306+ self , featurestore_id : str , project_number : str
1307+ ) -> str :
1308+ """Helper method to generate an id for an ephemeral dataset in BigQuery
1309+ used to temporarily stage data.
1310+
1311+ Args:
1312+ featurestore_id (str):
1313+ Required. The ID to use for this featurestore.
1314+ project_number (str):
1315+ Required. Project to retrieve featurestore from.
1316+ Returns:
1317+ str - full BigQuery dataset ID
1318+ """
1319+ temp_bq_dataset_name = f"temp_{ featurestore_id } _{ uuid .uuid4 ()} " .replace (
1320+ "-" , "_"
1321+ )
1322+
1323+ project_id = resource_manager_utils .get_project_id (
1324+ project_number = project_number ,
1325+ credentials = self .credentials ,
1326+ )
1327+
1328+ return f"{ project_id } .{ temp_bq_dataset_name } " [:1024 ]
1329+
1330+ def _create_ephemeral_bq_dataset (
1331+ self , bigquery_client : bigquery .Client , dataset_id : str
1332+ ) -> "bigquery.Dataset" :
1333+ """Helper method to create an ephemeral dataset in BigQuery used to
1334+ temporarily stage data.
1335+
1336+ Args:
1337+ bigquery_client (bigquery.Client):
1338+ Required. BigQuery client to use to generate the BigQuery dataset.
1339+ dataset_id (str):
1340+ Required. Identifier to use for the BigQuery dataset.
1341+ Returns:
1342+ bigquery.Dataset - new BigQuery dataset used to temporarily stage data
1343+ """
1344+ temp_bq_dataset = bigquery .Dataset (dataset_ref = dataset_id )
1345+ temp_bq_dataset .location = self .location
1346+
1347+ return bigquery_client .create_dataset (temp_bq_dataset )
0 commit comments