From 0f3a34144f1cdcdd918092ece445a45c9f16ae18 Mon Sep 17 00:00:00 2001 From: chelsealin Date: Tue, 14 Mar 2023 22:57:43 +0000 Subject: [PATCH] feat: add default LoadJobConfig to Client --- google/cloud/bigquery/client.py | 121 ++++--- google/cloud/bigquery/job/base.py | 6 +- tests/system/test_client.py | 8 +- tests/unit/job/test_base.py | 29 +- tests/unit/test_client.py | 513 ++++++++++++++++++++++++++++++ 5 files changed, 621 insertions(+), 56 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index a53819cde..d8fbfb69e 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -210,6 +210,9 @@ class Client(ClientWithProject): default_query_job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): Default ``QueryJobConfig``. Will be merged into job configs passed into the ``query`` method. + default_load_job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + Default ``LoadJobConfig``. + Will be merged into job configs passed into the ``load_table_*`` methods. client_info (Optional[google.api_core.client_info.ClientInfo]): The client info used to send a user-agent string along with API requests. If ``None``, then default info will be used. Generally, @@ -235,6 +238,7 @@ def __init__( _http=None, location=None, default_query_job_config=None, + default_load_job_config=None, client_info=None, client_options=None, ) -> None: @@ -260,6 +264,7 @@ def __init__( self._connection = Connection(self, **kw_args) self._location = location self._default_query_job_config = copy.deepcopy(default_query_job_config) + self._default_load_job_config = copy.deepcopy(default_load_job_config) @property def location(self): @@ -277,6 +282,17 @@ def default_query_job_config(self): def default_query_job_config(self, value: QueryJobConfig): self._default_query_job_config = copy.deepcopy(value) + @property + def default_load_job_config(self): + """Default ``LoadJobConfig``. + Will be merged into job configs passed into the ``load_table_*`` methods. + """ + return self._default_load_job_config + + @default_load_job_config.setter + def default_load_job_config(self, value: LoadJobConfig): + self._default_load_job_config = copy.deepcopy(value) + def close(self): """Close the underlying transport objects, releasing system resources. @@ -2330,8 +2346,8 @@ def load_table_from_uri( Raises: TypeError: - If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` - class. + If ``job_config`` is not an instance of + :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ job_id = _make_job_id(job_id, job_id_prefix) @@ -2348,11 +2364,14 @@ def load_table_from_uri( destination = _table_arg_to_table_ref(destination, default_project=self.project) - if job_config: - job_config = copy.deepcopy(job_config) - _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) + if job_config is not None: + _verify_job_config_type(job_config, LoadJobConfig) + else: + job_config = job.LoadJobConfig() - load_job = job.LoadJob(job_ref, source_uris, destination, self, job_config) + new_job_config = job_config._fill_from_default(self._default_load_job_config) + + load_job = job.LoadJob(job_ref, source_uris, destination, self, new_job_config) load_job._begin(retry=retry, timeout=timeout) return load_job @@ -2424,8 +2443,8 @@ def load_table_from_file( mode. TypeError: - If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` - class. + If ``job_config`` is not an instance of + :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ job_id = _make_job_id(job_id, job_id_prefix) @@ -2437,10 +2456,15 @@ def load_table_from_file( destination = _table_arg_to_table_ref(destination, default_project=self.project) job_ref = job._JobReference(job_id, project=project, location=location) - if job_config: - job_config = copy.deepcopy(job_config) - _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) - load_job = job.LoadJob(job_ref, None, destination, self, job_config) + + if job_config is not None: + _verify_job_config_type(job_config, LoadJobConfig) + else: + job_config = job.LoadJobConfig() + + new_job_config = job_config._fill_from_default(self._default_load_job_config) + + load_job = job.LoadJob(job_ref, None, destination, self, new_job_config) job_resource = load_job.to_api_repr() if rewind: @@ -2564,43 +2588,40 @@ def load_table_from_dataframe( If a usable parquet engine cannot be found. This method requires :mod:`pyarrow` to be installed. TypeError: - If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` - class. + If ``job_config`` is not an instance of + :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ job_id = _make_job_id(job_id, job_id_prefix) - if job_config: - _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) - # Make a copy so that the job config isn't modified in-place. - job_config_properties = copy.deepcopy(job_config._properties) - job_config = job.LoadJobConfig() - job_config._properties = job_config_properties - + if job_config is not None: + _verify_job_config_type(job_config, LoadJobConfig) else: job_config = job.LoadJobConfig() + new_job_config = job_config._fill_from_default(self._default_load_job_config) + supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET} - if job_config.source_format is None: + if new_job_config.source_format is None: # default value - job_config.source_format = job.SourceFormat.PARQUET + new_job_config.source_format = job.SourceFormat.PARQUET if ( - job_config.source_format == job.SourceFormat.PARQUET - and job_config.parquet_options is None + new_job_config.source_format == job.SourceFormat.PARQUET + and new_job_config.parquet_options is None ): parquet_options = ParquetOptions() # default value parquet_options.enable_list_inference = True - job_config.parquet_options = parquet_options + new_job_config.parquet_options = parquet_options - if job_config.source_format not in supported_formats: + if new_job_config.source_format not in supported_formats: raise ValueError( "Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format( - job_config.source_format + new_job_config.source_format ) ) - if pyarrow is None and job_config.source_format == job.SourceFormat.PARQUET: + if pyarrow is None and new_job_config.source_format == job.SourceFormat.PARQUET: # pyarrow is now the only supported parquet engine. raise ValueError("This method requires pyarrow to be installed") @@ -2611,8 +2632,8 @@ def load_table_from_dataframe( # schema, and check if dataframe schema is compatible with it - except # for WRITE_TRUNCATE jobs, the existing schema does not matter then. if ( - not job_config.schema - and job_config.write_disposition != job.WriteDisposition.WRITE_TRUNCATE + not new_job_config.schema + and new_job_config.write_disposition != job.WriteDisposition.WRITE_TRUNCATE ): try: table = self.get_table(destination) @@ -2623,7 +2644,7 @@ def load_table_from_dataframe( name for name, _ in _pandas_helpers.list_columns_and_indexes(dataframe) ) - job_config.schema = [ + new_job_config.schema = [ # Field description and policy tags are not needed to # serialize a data frame. SchemaField( @@ -2637,11 +2658,11 @@ def load_table_from_dataframe( if field.name in columns_and_indexes ] - job_config.schema = _pandas_helpers.dataframe_to_bq_schema( - dataframe, job_config.schema + new_job_config.schema = _pandas_helpers.dataframe_to_bq_schema( + dataframe, new_job_config.schema ) - if not job_config.schema: + if not new_job_config.schema: # the schema could not be fully detected warnings.warn( "Schema could not be detected for all columns. Loading from a " @@ -2652,13 +2673,13 @@ def load_table_from_dataframe( ) tmpfd, tmppath = tempfile.mkstemp( - suffix="_job_{}.{}".format(job_id[:8], job_config.source_format.lower()) + suffix="_job_{}.{}".format(job_id[:8], new_job_config.source_format.lower()) ) os.close(tmpfd) try: - if job_config.source_format == job.SourceFormat.PARQUET: + if new_job_config.source_format == job.SourceFormat.PARQUET: if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS: msg = ( "Loading dataframe data in PARQUET format with pyarrow " @@ -2669,13 +2690,13 @@ def load_table_from_dataframe( ) warnings.warn(msg, category=RuntimeWarning) - if job_config.schema: + if new_job_config.schema: if parquet_compression == "snappy": # adjust the default value parquet_compression = parquet_compression.upper() _pandas_helpers.dataframe_to_parquet( dataframe, - job_config.schema, + new_job_config.schema, tmppath, parquet_compression=parquet_compression, parquet_use_compliant_nested_type=True, @@ -2715,7 +2736,7 @@ def load_table_from_dataframe( job_id_prefix=job_id_prefix, location=location, project=project, - job_config=job_config, + job_config=new_job_config, timeout=timeout, ) @@ -2791,22 +2812,22 @@ def load_table_from_json( Raises: TypeError: - If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` - class. + If ``job_config`` is not an instance of + :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ job_id = _make_job_id(job_id, job_id_prefix) - if job_config: - _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) - # Make a copy so that the job config isn't modified in-place. - job_config = copy.deepcopy(job_config) + if job_config is not None: + _verify_job_config_type(job_config, LoadJobConfig) else: job_config = job.LoadJobConfig() - job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON + new_job_config = job_config._fill_from_default(self._default_load_job_config) + + new_job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON - if job_config.schema is None: - job_config.autodetect = True + if new_job_config.schema is None: + new_job_config.autodetect = True if project is None: project = self.project @@ -2828,7 +2849,7 @@ def load_table_from_json( job_id_prefix=job_id_prefix, location=location, project=project, - job_config=job_config, + job_config=new_job_config, timeout=timeout, ) diff --git a/google/cloud/bigquery/job/base.py b/google/cloud/bigquery/job/base.py index 55e80b2eb..4073e0137 100644 --- a/google/cloud/bigquery/job/base.py +++ b/google/cloud/bigquery/job/base.py @@ -269,7 +269,7 @@ def to_api_repr(self) -> dict: """ return copy.deepcopy(self._properties) - def _fill_from_default(self, default_job_config): + def _fill_from_default(self, default_job_config=None): """Merge this job config with a default job config. The keys in this object take precedence over the keys in the default @@ -283,6 +283,10 @@ def _fill_from_default(self, default_job_config): Returns: google.cloud.bigquery.job._JobConfig: A new (merged) job config. """ + if not default_job_config: + new_job_config = copy.deepcopy(self) + return new_job_config + if self._job_type != default_job_config._job_type: raise TypeError( "attempted to merge two incompatible job types: " diff --git a/tests/system/test_client.py b/tests/system/test_client.py index a69bb92c5..1437328a8 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -2319,7 +2319,7 @@ def _table_exists(t): return False -def test_dbapi_create_view(dataset_id): +def test_dbapi_create_view(dataset_id: str): query = f""" CREATE VIEW {dataset_id}.dbapi_create_view @@ -2332,7 +2332,7 @@ def test_dbapi_create_view(dataset_id): assert Config.CURSOR.rowcount == 0, "expected 0 rows" -def test_parameterized_types_round_trip(dataset_id): +def test_parameterized_types_round_trip(dataset_id: str): client = Config.CLIENT table_id = f"{dataset_id}.test_parameterized_types_round_trip" fields = ( @@ -2358,7 +2358,7 @@ def test_parameterized_types_round_trip(dataset_id): assert tuple(s._key()[:2] for s in table2.schema) == fields -def test_table_snapshots(dataset_id): +def test_table_snapshots(dataset_id: str): from google.cloud.bigquery import CopyJobConfig from google.cloud.bigquery import OperationType @@ -2429,7 +2429,7 @@ def test_table_snapshots(dataset_id): assert rows == [(1, "one"), (2, "two")] -def test_table_clones(dataset_id): +def test_table_clones(dataset_id: str): from google.cloud.bigquery import CopyJobConfig from google.cloud.bigquery import OperationType diff --git a/tests/unit/job/test_base.py b/tests/unit/job/test_base.py index 3ff96e874..a9760aa9b 100644 --- a/tests/unit/job/test_base.py +++ b/tests/unit/job/test_base.py @@ -1104,7 +1104,7 @@ def test_ctor_with_unknown_property_raises_error(self): config = self._make_one() config.wrong_name = None - def test_fill_from_default(self): + def test_fill_query_job_config_from_default(self): from google.cloud.bigquery import QueryJobConfig job_config = QueryJobConfig() @@ -1120,6 +1120,22 @@ def test_fill_from_default(self): self.assertTrue(final_job_config.use_query_cache) self.assertEqual(final_job_config.maximum_bytes_billed, 1000) + def test_fill_load_job_from_default(self): + from google.cloud.bigquery import LoadJobConfig + + job_config = LoadJobConfig() + job_config.create_session = True + job_config.encoding = "UTF-8" + + default_job_config = LoadJobConfig() + default_job_config.ignore_unknown_values = True + default_job_config.encoding = "ISO-8859-1" + + final_job_config = job_config._fill_from_default(default_job_config) + self.assertTrue(final_job_config.create_session) + self.assertTrue(final_job_config.ignore_unknown_values) + self.assertEqual(final_job_config.encoding, "UTF-8") + def test_fill_from_default_conflict(self): from google.cloud.bigquery import QueryJobConfig @@ -1132,6 +1148,17 @@ def test_fill_from_default_conflict(self): with self.assertRaises(TypeError): basic_job_config._fill_from_default(conflicting_job_config) + def test_fill_from_empty_default_conflict(self): + from google.cloud.bigquery import QueryJobConfig + + job_config = QueryJobConfig() + job_config.dry_run = True + job_config.maximum_bytes_billed = 1000 + + final_job_config = job_config._fill_from_default(default_job_config=None) + self.assertTrue(final_job_config.dry_run) + self.assertEqual(final_job_config.maximum_bytes_billed, 1000) + @mock.patch("google.cloud.bigquery._helpers._get_sub_prop") def test__get_sub_prop_wo_default(self, _get_sub_prop): job_config = self._make_one() diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index f52eb825a..c155e2bc6 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -239,6 +239,31 @@ def test_ctor_w_query_job_config(self): self.assertIsInstance(client._default_query_job_config, QueryJobConfig) self.assertTrue(client._default_query_job_config.dry_run) + def test_ctor_w_load_job_config(self): + from google.cloud.bigquery._http import Connection + from google.cloud.bigquery import LoadJobConfig + + creds = _make_credentials() + http = object() + location = "us-central" + job_config = LoadJobConfig() + job_config.create_session = True + + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + location=location, + default_load_job_config=job_config, + ) + self.assertIsInstance(client._connection, Connection) + self.assertIs(client._connection.credentials, creds) + self.assertIs(client._connection.http, http) + self.assertEqual(client.location, location) + + self.assertIsInstance(client._default_load_job_config, LoadJobConfig) + self.assertTrue(client._default_load_job_config.create_session) + def test__call_api_applying_custom_retry_on_timeout(self): from concurrent.futures import TimeoutError from google.cloud.bigquery.retry import DEFAULT_RETRY @@ -426,6 +451,19 @@ def test_default_query_job_config(self): client.default_query_job_config = job_config self.assertIsInstance(client.default_query_job_config, QueryJobConfig) + def test_default_load_job_config(self): + from google.cloud.bigquery import LoadJobConfig + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + self.assertIsNone(client.default_load_job_config) + + job_config = LoadJobConfig() + job_config.create_session = True + client.default_load_job_config = job_config + self.assertIsInstance(client.default_load_job_config, LoadJobConfig) + def test_get_service_account_email(self): path = "/projects/%s/serviceAccount" % (self.PROJECT,) creds = _make_credentials() @@ -3282,6 +3320,146 @@ def test_load_table_from_uri_w_invalid_job_config(self): self.assertIn("Expected an instance of LoadJobConfig", exc.exception.args[0]) + def test_load_table_from_uri_w_explicit_job_config(self): + from google.cloud.bigquery.job import LoadJobConfig + + JOB = "job_name" + DESTINATION = "destination_table" + SOURCE_URI = "http://example.com/source.csv" + RESOURCE = { + "jobReference": {"jobId": JOB, "projectId": self.PROJECT}, + "configuration": { + "load": { + "sourceUris": [SOURCE_URI], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": DESTINATION, + }, + "createSession": True, + "encoding": "UTF-8", + } + }, + } + + creds = _make_credentials() + http = object() + + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(RESOURCE) + destination = DatasetReference(self.PROJECT, self.DS_ID).table(DESTINATION) + + job_config = LoadJobConfig() + job_config.create_session = True + job_config.encoding = "UTF-8" + client.load_table_from_uri( + SOURCE_URI, destination, job_id=JOB, job_config=job_config + ) + + # Check that load_table_from_uri actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=DEFAULT_TIMEOUT, + ) + + def test_load_table_from_uri_w_explicit_job_config_override(self): + from google.cloud.bigquery.job import LoadJobConfig + + JOB = "job_name" + DESTINATION = "destination_table" + SOURCE_URI = "http://example.com/source.csv" + RESOURCE = { + "jobReference": {"jobId": JOB, "projectId": self.PROJECT}, + "configuration": { + "load": { + "sourceUris": [SOURCE_URI], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": DESTINATION, + }, + "createSession": False, + "encoding": "ISO-8859-1", + } + }, + } + + creds = _make_credentials() + http = object() + default_job_config = LoadJobConfig() + default_job_config.create_session = True + default_job_config.encoding = "ISO-8859-1" + + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + default_load_job_config=default_job_config, + ) + conn = client._connection = make_connection(RESOURCE) + destination = DatasetReference(self.PROJECT, self.DS_ID).table(DESTINATION) + + job_config = LoadJobConfig() + job_config.create_session = False + client.load_table_from_uri( + SOURCE_URI, destination, job_id=JOB, job_config=job_config + ) + + # Check that load_table_from_uri actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=DEFAULT_TIMEOUT, + ) + + def test_load_table_from_uri_w_default_load_config(self): + from google.cloud.bigquery.job import LoadJobConfig + + JOB = "job_name" + DESTINATION = "destination_table" + SOURCE_URI = "http://example.com/source.csv" + RESOURCE = { + "jobReference": {"jobId": JOB, "projectId": self.PROJECT}, + "configuration": { + "load": { + "sourceUris": [SOURCE_URI], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": DESTINATION, + }, + "encoding": "ISO-8859-1", + } + }, + } + + creds = _make_credentials() + http = object() + default_job_config = LoadJobConfig() + default_job_config.encoding = "ISO-8859-1" + + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + default_load_job_config=default_job_config, + ) + conn = client._connection = make_connection(RESOURCE) + destination = DatasetReference(self.PROJECT, self.DS_ID).table(DESTINATION) + + client.load_table_from_uri(SOURCE_URI, destination, job_id=JOB) + + # Check that load_table_from_uri actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=DEFAULT_TIMEOUT, + ) + @staticmethod def _mock_requests_response(status_code, headers, content=b""): return mock.Mock( @@ -6940,6 +7118,118 @@ def test_load_table_from_file_w_invalid_job_config(self): err_msg = str(exc.value) assert "Expected an instance of LoadJobConfig" in err_msg + def test_load_table_from_file_w_explicit_job_config(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + + client = self._make_client() + file_obj = self._make_file_obj() + + job_config = self._make_config() + job_config.create_session = True + job_config.encoding = "UTF-8" + do_upload_patch = self._make_do_upload_patch( + client, "_do_resumable_upload", self.EXPECTED_CONFIGURATION + ) + with do_upload_patch as do_upload: + client.load_table_from_file( + file_obj, + self.TABLE_REF, + job_id="job_id", + project=self.PROJECT, + location=self.LOCATION, + job_config=job_config, + ) + + expected_resource = copy.deepcopy(self.EXPECTED_CONFIGURATION) + expected_resource["jobReference"]["location"] = self.LOCATION + expected_resource["jobReference"]["projectId"] = self.PROJECT + expected_resource["configuration"]["load"]["createSession"] = True + expected_resource["configuration"]["load"]["encoding"] = "UTF-8" + do_upload.assert_called_once_with( + file_obj, + expected_resource, + _DEFAULT_NUM_RETRIES, + DEFAULT_TIMEOUT, + project=self.PROJECT, + ) + + def test_load_table_from_file_w_explicit_job_config_override(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery.job import LoadJobConfig + + client = self._make_client() + file_obj = self._make_file_obj() + + default_job_config = LoadJobConfig() + default_job_config.create_session = True + default_job_config.encoding = "ISO-8859-1" + client.default_load_job_config = default_job_config + + job_config = self._make_config() + job_config.create_session = False + do_upload_patch = self._make_do_upload_patch( + client, "_do_resumable_upload", self.EXPECTED_CONFIGURATION + ) + with do_upload_patch as do_upload: + client.load_table_from_file( + file_obj, + self.TABLE_REF, + job_id="job_id", + project=self.PROJECT, + location=self.LOCATION, + job_config=job_config, + ) + + expected_resource = copy.deepcopy(self.EXPECTED_CONFIGURATION) + expected_resource["jobReference"]["location"] = self.LOCATION + expected_resource["jobReference"]["projectId"] = self.PROJECT + expected_resource["configuration"]["load"]["createSession"] = False + expected_resource["configuration"]["load"]["encoding"] = "ISO-8859-1" + do_upload.assert_called_once_with( + file_obj, + expected_resource, + _DEFAULT_NUM_RETRIES, + DEFAULT_TIMEOUT, + project=self.PROJECT, + ) + + def test_load_table_from_file_w_default_load_config(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery.job import LoadJobConfig + + client = self._make_client() + file_obj = self._make_file_obj() + + default_job_config = LoadJobConfig() + default_job_config.encoding = "ISO-8859-1" + client.default_load_job_config = default_job_config + + job_config = self._make_config() + do_upload_patch = self._make_do_upload_patch( + client, "_do_resumable_upload", self.EXPECTED_CONFIGURATION + ) + with do_upload_patch as do_upload: + client.load_table_from_file( + file_obj, + self.TABLE_REF, + job_id="job_id", + project=self.PROJECT, + location=self.LOCATION, + job_config=job_config, + ) + + expected_resource = copy.deepcopy(self.EXPECTED_CONFIGURATION) + expected_resource["jobReference"]["location"] = self.LOCATION + expected_resource["jobReference"]["projectId"] = self.PROJECT + expected_resource["configuration"]["load"]["encoding"] = "ISO-8859-1" + do_upload.assert_called_once_with( + file_obj, + expected_resource, + _DEFAULT_NUM_RETRIES, + DEFAULT_TIMEOUT, + project=self.PROJECT, + ) + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe(self): @@ -7304,6 +7594,117 @@ def test_load_table_from_dataframe_w_list_inference_none(self): # the original config object should not have been modified assert job_config.to_api_repr() == original_config_copy.to_api_repr() + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_explicit_job_config_override(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] + dataframe = pandas.DataFrame(records) + + client.default_load_job_config = job.LoadJobConfig( + encoding="ISO-8859-1", + write_disposition=job.WriteDisposition.WRITE_TRUNCATE, + source_format=job.SourceFormat.PARQUET, + ) + + job_config = job.LoadJobConfig( + write_disposition=job.WriteDisposition.WRITE_APPEND, + source_format=job.SourceFormat.PARQUET, + ) + original_config_copy = copy.deepcopy(job_config) + + get_table_patch = mock.patch( + "google.cloud.bigquery.client.Client.get_table", + autospec=True, + return_value=mock.Mock( + schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")] + ), + ) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + with load_patch as load_table_from_file, get_table_patch: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + size=mock.ANY, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + timeout=DEFAULT_TIMEOUT, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.write_disposition == job.WriteDisposition.WRITE_APPEND + assert sent_config.source_format == job.SourceFormat.PARQUET + assert sent_config.encoding == "ISO-8859-1" + + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_default_load_config(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] + dataframe = pandas.DataFrame(records) + + client.default_load_job_config = job.LoadJobConfig( + write_disposition=job.WriteDisposition.WRITE_TRUNCATE, + source_format=job.SourceFormat.PARQUET, + ) + + get_table_patch = mock.patch( + "google.cloud.bigquery.client.Client.get_table", + autospec=True, + return_value=mock.Mock( + schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")] + ), + ) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + with load_patch as load_table_from_file, get_table_patch: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + size=mock.ANY, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + timeout=DEFAULT_TIMEOUT, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.write_disposition == job.WriteDisposition.WRITE_TRUNCATE + assert sent_config.source_format == job.SourceFormat.PARQUET + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_list_inference_false(self): @@ -8377,6 +8778,118 @@ def test_load_table_from_json_w_invalid_job_config(self): err_msg = str(exc.value) assert "Expected an instance of LoadJobConfig" in err_msg + def test_load_table_from_json_w_explicit_job_config_override(self): + from google.cloud.bigquery import job + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + + json_rows = [ + {"name": "One", "age": 11, "birthday": "2008-09-10", "adult": False}, + {"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True}, + ] + + schema = [ + SchemaField("name", "STRING"), + SchemaField("age", "INTEGER"), + SchemaField("adult", "BOOLEAN"), + ] + client.default_load_job_config = job.LoadJobConfig( + schema=schema, encoding="ISO-8859-1" + ) + + override_schema = schema + override_schema[0] = SchemaField("username", "STRING") + job_config = job.LoadJobConfig(schema=override_schema) + original_config_copy = copy.deepcopy(job_config) + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + with load_patch as load_table_from_file: + client.load_table_from_json( + json_rows, + self.TABLE_REF, + job_config=job_config, + project="project-x", + location="EU", + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + size=mock.ANY, + num_retries=_DEFAULT_NUM_RETRIES, + job_id=mock.ANY, + job_id_prefix=None, + location="EU", + project="project-x", + job_config=mock.ANY, + timeout=DEFAULT_TIMEOUT, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON + assert sent_config.schema == override_schema + assert sent_config.encoding == "ISO-8859-1" + assert not sent_config.autodetect + + # the original config object should not have been modified + assert job_config.to_api_repr() == original_config_copy.to_api_repr() + + def test_load_table_from_json_w_default_job_config(self): + from google.cloud.bigquery import job + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + + json_rows = [ + {"name": "One", "age": 11, "birthday": "2008-09-10", "adult": False}, + {"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True}, + ] + + schema = [ + SchemaField("name", "STRING"), + SchemaField("age", "INTEGER"), + SchemaField("adult", "BOOLEAN"), + ] + client.default_load_job_config = job.LoadJobConfig(schema=schema) + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + with load_patch as load_table_from_file: + client.load_table_from_json( + json_rows, + self.TABLE_REF, + job_config=None, + project="project-x", + location="EU", + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + size=mock.ANY, + num_retries=_DEFAULT_NUM_RETRIES, + job_id=mock.ANY, + job_id_prefix=None, + location="EU", + project="project-x", + job_config=mock.ANY, + timeout=DEFAULT_TIMEOUT, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON + assert sent_config.schema == schema + def test_load_table_from_json_unicode_emoji_data_case(self): from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES