diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index a53819cdec..f703f6660e 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -210,6 +210,9 @@ class Client(ClientWithProject): default_query_job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): Default ``QueryJobConfig``. Will be merged into job configs passed into the ``query`` method. + default_load_job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + Default ``LoadJobConfig``. + Will be merged into job configs passed into the ``load_table_*`` methods. client_info (Optional[google.api_core.client_info.ClientInfo]): The client info used to send a user-agent string along with API requests. If ``None``, then default info will be used. Generally, @@ -235,6 +238,7 @@ def __init__( _http=None, location=None, default_query_job_config=None, + default_load_job_config=None, client_info=None, client_options=None, ) -> None: @@ -260,6 +264,7 @@ def __init__( self._connection = Connection(self, **kw_args) self._location = location self._default_query_job_config = copy.deepcopy(default_query_job_config) + self._default_load_job_config = copy.deepcopy(default_load_job_config) @property def location(self): @@ -277,6 +282,17 @@ def default_query_job_config(self): def default_query_job_config(self, value: QueryJobConfig): self._default_query_job_config = copy.deepcopy(value) + @property + def default_load_job_config(self): + """Default ``LoadJobConfig``. + Will be merged into job configs passed into the ``load_table_*`` methods. + """ + return self._default_load_job_config + + @default_load_job_config.setter + def default_load_job_config(self, value: LoadJobConfig): + self._default_load_job_config = copy.deepcopy(value) + def close(self): """Close the underlying transport objects, releasing system resources. @@ -2348,9 +2364,19 @@ def load_table_from_uri( destination = _table_arg_to_table_ref(destination, default_project=self.project) + # Make a copy so that the job config isn't modified in-place. if job_config: - job_config = copy.deepcopy(job_config) _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) + job_config = copy.deepcopy(job_config) + else: + job_config = job.LoadJobConfig() + + # Merge this job config with a default job config + if self._default_load_job_config: + _verify_job_config_type( + self._default_load_job_config, google.cloud.bigquery.job.LoadJobConfig + ) + job_config = job_config._fill_from_default(self._default_load_job_config) load_job = job.LoadJob(job_ref, source_uris, destination, self, job_config) load_job._begin(retry=retry, timeout=timeout) @@ -2437,9 +2463,21 @@ def load_table_from_file( destination = _table_arg_to_table_ref(destination, default_project=self.project) job_ref = job._JobReference(job_id, project=project, location=location) + + # Make a copy so that the job config isn't modified in-place. if job_config: - job_config = copy.deepcopy(job_config) _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) + job_config = copy.deepcopy(job_config) + else: + job_config = job.LoadJobConfig() + + # Merge this job config with a default job config + if self._default_load_job_config: + _verify_job_config_type( + self._default_load_job_config, google.cloud.bigquery.job.LoadJobConfig + ) + job_config = job_config._fill_from_default(self._default_load_job_config) + load_job = job.LoadJob(job_ref, None, destination, self, job_config) job_resource = load_job.to_api_repr() @@ -2569,13 +2607,10 @@ def load_table_from_dataframe( """ job_id = _make_job_id(job_id, job_id_prefix) + # Make a copy so that the job config isn't modified in-place. if job_config: _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) - # Make a copy so that the job config isn't modified in-place. - job_config_properties = copy.deepcopy(job_config._properties) - job_config = job.LoadJobConfig() - job_config._properties = job_config_properties - + job_config = copy.deepcopy(job_config) else: job_config = job.LoadJobConfig() diff --git a/tests/unit/job/test_base.py b/tests/unit/job/test_base.py index 3ff96e8746..22e6c89d26 100644 --- a/tests/unit/job/test_base.py +++ b/tests/unit/job/test_base.py @@ -1104,7 +1104,7 @@ def test_ctor_with_unknown_property_raises_error(self): config = self._make_one() config.wrong_name = None - def test_fill_from_default(self): + def test_fill_query_job_config_from_default(self): from google.cloud.bigquery import QueryJobConfig job_config = QueryJobConfig() @@ -1120,6 +1120,22 @@ def test_fill_from_default(self): self.assertTrue(final_job_config.use_query_cache) self.assertEqual(final_job_config.maximum_bytes_billed, 1000) + def test_fill_load_job_from_default(self): + from google.cloud.bigquery import LoadJobConfig + + job_config = LoadJobConfig() + job_config.create_session = True + job_config.encoding = "UTF-8" + + default_job_config = LoadJobConfig() + default_job_config.ignore_unknown_values = True + default_job_config.encoding = "ISO-8859-1" + + final_job_config = job_config._fill_from_default(default_job_config) + self.assertTrue(final_job_config.create_session) + self.assertTrue(final_job_config.ignore_unknown_values) + self.assertEqual(final_job_config.encoding, "UTF-8") + def test_fill_from_default_conflict(self): from google.cloud.bigquery import QueryJobConfig diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index f52eb825a3..1cbc2f3d3f 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -239,6 +239,31 @@ def test_ctor_w_query_job_config(self): self.assertIsInstance(client._default_query_job_config, QueryJobConfig) self.assertTrue(client._default_query_job_config.dry_run) + def test_ctor_w_load_job_config(self): + from google.cloud.bigquery._http import Connection + from google.cloud.bigquery import LoadJobConfig + + creds = _make_credentials() + http = object() + location = "us-central" + job_config = LoadJobConfig() + job_config.create_session = True + + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + location=location, + default_load_job_config=job_config, + ) + self.assertIsInstance(client._connection, Connection) + self.assertIs(client._connection.credentials, creds) + self.assertIs(client._connection.http, http) + self.assertEqual(client.location, location) + + self.assertIsInstance(client._default_load_job_config, LoadJobConfig) + self.assertTrue(client._default_load_job_config.create_session) + def test__call_api_applying_custom_retry_on_timeout(self): from concurrent.futures import TimeoutError from google.cloud.bigquery.retry import DEFAULT_RETRY @@ -426,6 +451,19 @@ def test_default_query_job_config(self): client.default_query_job_config = job_config self.assertIsInstance(client.default_query_job_config, QueryJobConfig) + def test_default_load_job_config(self): + from google.cloud.bigquery import LoadJobConfig + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + self.assertIsNone(client.default_load_job_config) + + job_config = LoadJobConfig() + job_config.create_session = True + client.default_load_job_config = job_config + self.assertIsInstance(client.default_load_job_config, LoadJobConfig) + def test_get_service_account_email(self): path = "/projects/%s/serviceAccount" % (self.PROJECT,) creds = _make_credentials() @@ -3282,6 +3320,146 @@ def test_load_table_from_uri_w_invalid_job_config(self): self.assertIn("Expected an instance of LoadJobConfig", exc.exception.args[0]) + def test_load_table_from_uri_w_explicit_job_config(self): + from google.cloud.bigquery.job import LoadJobConfig + + JOB = "job_name" + DESTINATION = "destination_table" + SOURCE_URI = "http://example.com/source.csv" + RESOURCE = { + "jobReference": {"jobId": JOB, "projectId": self.PROJECT}, + "configuration": { + "load": { + "sourceUris": [SOURCE_URI], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": DESTINATION, + }, + "createSession": True, + "encoding": "UTF-8", + } + }, + } + + creds = _make_credentials() + http = object() + + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(RESOURCE) + destination = DatasetReference(self.PROJECT, self.DS_ID).table(DESTINATION) + + job_config = LoadJobConfig() + job_config.create_session = True + job_config.encoding = "UTF-8" + client.load_table_from_uri( + SOURCE_URI, destination, job_id=JOB, job_config=job_config + ) + + # Check that load_table_from_uri actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=DEFAULT_TIMEOUT, + ) + + def test_load_table_from_uri_w_explicit_job_config_override(self): + from google.cloud.bigquery.job import LoadJobConfig + + JOB = "job_name" + DESTINATION = "destination_table" + SOURCE_URI = "http://example.com/source.csv" + RESOURCE = { + "jobReference": {"jobId": JOB, "projectId": self.PROJECT}, + "configuration": { + "load": { + "sourceUris": [SOURCE_URI], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": DESTINATION, + }, + "createSession": False, + "encoding": "ISO-8859-1", + } + }, + } + + creds = _make_credentials() + http = object() + default_job_config = LoadJobConfig() + default_job_config.create_session = True + default_job_config.encoding = "ISO-8859-1" + + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + default_load_job_config=default_job_config, + ) + conn = client._connection = make_connection(RESOURCE) + destination = DatasetReference(self.PROJECT, self.DS_ID).table(DESTINATION) + + job_config = LoadJobConfig() + job_config.create_session = False + client.load_table_from_uri( + SOURCE_URI, destination, job_id=JOB, job_config=job_config + ) + + # Check that load_table_from_uri actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=DEFAULT_TIMEOUT, + ) + + def test_load_table_from_uri_w_default_load_config(self): + from google.cloud.bigquery.job import LoadJobConfig + + JOB = "job_name" + DESTINATION = "destination_table" + SOURCE_URI = "http://example.com/source.csv" + RESOURCE = { + "jobReference": {"jobId": JOB, "projectId": self.PROJECT}, + "configuration": { + "load": { + "sourceUris": [SOURCE_URI], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": DESTINATION, + }, + "encoding": "ISO-8859-1", + } + }, + } + + creds = _make_credentials() + http = object() + default_job_config = LoadJobConfig() + default_job_config.encoding = "ISO-8859-1" + + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + default_load_job_config=default_job_config, + ) + conn = client._connection = make_connection(RESOURCE) + destination = DatasetReference(self.PROJECT, self.DS_ID).table(DESTINATION) + + client.load_table_from_uri(SOURCE_URI, destination, job_id=JOB) + + # Check that load_table_from_uri actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=DEFAULT_TIMEOUT, + ) + @staticmethod def _mock_requests_response(status_code, headers, content=b""): return mock.Mock( @@ -6940,6 +7118,118 @@ def test_load_table_from_file_w_invalid_job_config(self): err_msg = str(exc.value) assert "Expected an instance of LoadJobConfig" in err_msg + def test_load_table_from_file_w_explicit_job_config(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + + client = self._make_client() + file_obj = self._make_file_obj() + + job_config = self._make_config() + job_config.create_session = True + job_config.encoding = "UTF-8" + do_upload_patch = self._make_do_upload_patch( + client, "_do_resumable_upload", self.EXPECTED_CONFIGURATION + ) + with do_upload_patch as do_upload: + client.load_table_from_file( + file_obj, + self.TABLE_REF, + job_id="job_id", + project=self.PROJECT, + location=self.LOCATION, + job_config=job_config, + ) + + expected_resource = copy.deepcopy(self.EXPECTED_CONFIGURATION) + expected_resource["jobReference"]["location"] = self.LOCATION + expected_resource["jobReference"]["projectId"] = self.PROJECT + expected_resource["configuration"]["load"]["createSession"] = True + expected_resource["configuration"]["load"]["encoding"] = "UTF-8" + do_upload.assert_called_once_with( + file_obj, + expected_resource, + _DEFAULT_NUM_RETRIES, + DEFAULT_TIMEOUT, + project=self.PROJECT, + ) + + def test_load_table_from_file_w_explicit_job_config_override(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery.job import LoadJobConfig + + client = self._make_client() + file_obj = self._make_file_obj() + + default_job_config = LoadJobConfig() + default_job_config.create_session = True + default_job_config.encoding = "ISO-8859-1" + client.default_load_job_config = default_job_config + + job_config = self._make_config() + job_config.create_session = False + do_upload_patch = self._make_do_upload_patch( + client, "_do_resumable_upload", self.EXPECTED_CONFIGURATION + ) + with do_upload_patch as do_upload: + client.load_table_from_file( + file_obj, + self.TABLE_REF, + job_id="job_id", + project=self.PROJECT, + location=self.LOCATION, + job_config=job_config, + ) + + expected_resource = copy.deepcopy(self.EXPECTED_CONFIGURATION) + expected_resource["jobReference"]["location"] = self.LOCATION + expected_resource["jobReference"]["projectId"] = self.PROJECT + expected_resource["configuration"]["load"]["createSession"] = False + expected_resource["configuration"]["load"]["encoding"] = "ISO-8859-1" + do_upload.assert_called_once_with( + file_obj, + expected_resource, + _DEFAULT_NUM_RETRIES, + DEFAULT_TIMEOUT, + project=self.PROJECT, + ) + + def test_load_table_from_file_w_default_load_config(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery.job import LoadJobConfig + + client = self._make_client() + file_obj = self._make_file_obj() + + default_job_config = LoadJobConfig() + default_job_config.encoding = "ISO-8859-1" + client.default_load_job_config = default_job_config + + job_config = self._make_config() + do_upload_patch = self._make_do_upload_patch( + client, "_do_resumable_upload", self.EXPECTED_CONFIGURATION + ) + with do_upload_patch as do_upload: + client.load_table_from_file( + file_obj, + self.TABLE_REF, + job_id="job_id", + project=self.PROJECT, + location=self.LOCATION, + job_config=job_config, + ) + + expected_resource = copy.deepcopy(self.EXPECTED_CONFIGURATION) + expected_resource["jobReference"]["location"] = self.LOCATION + expected_resource["jobReference"]["projectId"] = self.PROJECT + expected_resource["configuration"]["load"]["encoding"] = "ISO-8859-1" + do_upload.assert_called_once_with( + file_obj, + expected_resource, + _DEFAULT_NUM_RETRIES, + DEFAULT_TIMEOUT, + project=self.PROJECT, + ) + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe(self):