From 3cd69e973ffd4fbd7129b0227e2526c94e4443ac Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Mon, 22 Apr 2024 15:14:50 -0400 Subject: [PATCH 01/25] fix: nack pending writes on reconnect --- src/managedwriter/stream_connection.ts | 48 +++++++----- system-test/managed_writer_client_test.ts | 92 ++++++++++++++++++++--- 2 files changed, 111 insertions(+), 29 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 8dcd421a..85ad42e7 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -87,6 +87,7 @@ export class StreamConnection extends EventEmitter { this._connection.on('error', this.handleError); this._connection.on('close', () => { this.trace('connection closed'); + this.close(); }); this._connection.on('pause', () => { this.trace('connection paused'); @@ -107,20 +108,19 @@ export class StreamConnection extends EventEmitter { private handleError = (err: gax.GoogleError) => { this.trace('on error', err, JSON.stringify(err)); if (this.shouldReconnect(err)) { + err.message = 'reconnect triggered due to: ' + err.message; + this.ackAllPendingWrites(err); this.reconnect(); return; } - let nextPendingWrite = this.getNextPendingWrite(); if (this.isPermanentError(err)) { this.trace('found permanent error', err); - while (nextPendingWrite) { - this.ackNextPendingWrite(err); - nextPendingWrite = this.getNextPendingWrite(); - } + this.ackAllPendingWrites(err); this.emit('error', err); return; } - if (this.isRequestError(err) && nextPendingWrite) { + let nextPendingWrite = this.getNextPendingWrite(); + if (nextPendingWrite) { this.trace( 'found request error with pending write', err, @@ -144,6 +144,13 @@ export class StreamConnection extends EventEmitter { return !!err.code && reconnectionErrorCodes.includes(err.code); } + private isConnectionClosed() { + if (this._connection) { + return this._connection.destroyed || this._connection.closed; + } + return true; + } + private isPermanentError(err: gax.GoogleError): boolean { if (err.code === gax.Status.INVALID_ARGUMENT) { const storageErrors = parseStorageErrors(err); @@ -160,10 +167,6 @@ export class StreamConnection extends EventEmitter { return false; } - private isRequestError(err: gax.GoogleError): boolean { - return err.code === gax.Status.INVALID_ARGUMENT; - } - private resolveCallOptions( streamId: string, options?: gax.CallOptions @@ -245,6 +248,19 @@ export class StreamConnection extends EventEmitter { return null; } + private ackAllPendingWrites( + err: Error | null, + result?: + | protos.google.cloud.bigquery.storage.v1.IAppendRowsResponse + | undefined + ) { + let nextPendingWrite = this.getNextPendingWrite(); + while (nextPendingWrite) { + this.ackNextPendingWrite(err, result); + nextPendingWrite = this.getNextPendingWrite(); + } + } + private ackNextPendingWrite( err: Error | null, result?: @@ -280,16 +296,12 @@ export class StreamConnection extends EventEmitter { private send(pw: PendingWrite) { const request = pw.getRequest(); - if (!this._connection) { - pw._markDone(new Error('connection closed')); - return; - } - if (this._connection.destroyed || this._connection.closed) { + if (this.isConnectionClosed()) { this.reconnect(); } this.trace('sending pending write', pw); try { - this._connection.write(request, err => { + this._connection?.write(request, err => { this.trace('wrote pending write', err, this._pendingWrites.length); if (err) { pw._markDone(err); //TODO: add retries @@ -306,11 +318,11 @@ export class StreamConnection extends EventEmitter { * Check if connection is open and ready to send requests. */ isOpen(): boolean { - return !!this._connection; + return !this.isConnectionClosed(); } /** - * Reconnect and re send inflight requests. + * Re open appendRows BiDi gRPC connection. */ reconnect() { this.trace('reconnect called'); diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index d205d547..300a8db8 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -51,6 +51,11 @@ const generateUuid = () => `${GCLOUD_TESTS_PREFIX}_${uuid.v4()}`.replace(/-/gi, '_'); const datasetId = generateUuid(); +const sleep = (ms: number) => + new Promise(resolve => { + setTimeout(resolve, ms); + }); + const root = protobuf.Root.fromJSON(customerRecordProtoJson); if (!root) { throw Error('Proto must not be undefined'); @@ -1172,7 +1177,7 @@ describe('managedwriter.WriterClient', () => { } }); - xit('reconnect on idle connection', async () => { + it('reconnect on idle connection', async () => { bqWriteClient.initialize(); const client = new WriterClient(); client.setClient(bqWriteClient); @@ -1207,15 +1212,17 @@ describe('managedwriter.WriterClient', () => { let pw = writer.appendRows([row1, row2], 0); await pw.getResult(); - const sleep = (ms: number) => - new Promise(resolve => { - setTimeout(resolve, ms); - }); - const minutes = 10; - for (let i = 0; i <= minutes; i++) { - console.log('sleeping for a minute: ', minutes - i, 'to go'); - await sleep(60 * 1000); - } + // Simulate server sending ABORT error as the conn was idle + const conn = connection['_connection'] as gax.CancellableStream; // private method + const gerr = new gax.GoogleError( + 'Closing the stream because it has been inactive for 600 seconds' + ); + gerr.code = gax.Status.ABORTED; + conn.emit('error', gerr); + // simulate server closing conn. + await sleep(100); + conn.destroy(); + await sleep(100); const row3 = { customer_name: 'Test', @@ -1234,7 +1241,70 @@ describe('managedwriter.WriterClient', () => { } finally { client.close(); } - }).timeout(20 * 60 * 1000); + }).timeout(20 * 1000); + + it('should mark any pending writes with error if connection was closed', async () => { + bqWriteClient.initialize(); + const client = new WriterClient(); + client.setClient(bqWriteClient); + + const row1 = { + customer_name: 'Ada Lovelace', + row_num: 1, + }; + + try { + const connection = await client.createStreamConnection({ + streamType: managedwriter.PendingStream, + destinationTable: parent, + }); + + const writer = new JSONWriter({ + connection, + protoDescriptor, + }); + + const pw1 = writer.appendRows([row1], 0); + await pw1.getResult(); + + // Try to append a new row + const row2 = { + customer_name: 'Test', + row_num: 2, + customer_email: 'test@example.com', + }; + + let foundError: gax.GoogleError | null = null; + const pw2 = writer.appendRows([row2], 1); + pw2.getResult().catch(err => { + foundError = err as gax.GoogleError; + }); + + // Simulate server sending RESOURCE_EXHAUSTED error on a write + const conn = connection['_connection'] as gax.CancellableStream; // private method + // swallow ack for the last appendRow call, so we can simulate it failing + conn.removeAllListeners('data'); + await new Promise(resolve => conn.once('data', resolve)); + conn.addListener('data', connection['handleData']); + + const gerr = new gax.GoogleError('memory limit exceeded'); + gerr.code = gax.Status.RESOURCE_EXHAUSTED; + conn.emit('error', gerr); + // simulate server closing conn. + await sleep(100); + conn.destroy(); + await sleep(100); + + // should throw error of reconnection + assert.notEqual(foundError, null); + assert.equal(foundError!.message.includes('reconnect'), true); + + connection.close(); + writer.close(); + } finally { + client.close(); + } + }); }); describe('close', () => { From 7ba612c851fa9e85c546e18d25e3a30c8ce01a10 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Mon, 22 Apr 2024 15:29:03 -0400 Subject: [PATCH 02/25] fix: lint issues --- src/managedwriter/stream_connection.ts | 7 ++++--- system-test/managed_writer_client_test.ts | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 85ad42e7..c806ed09 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -119,7 +119,7 @@ export class StreamConnection extends EventEmitter { this.emit('error', err); return; } - let nextPendingWrite = this.getNextPendingWrite(); + const nextPendingWrite = this.getNextPendingWrite(); if (nextPendingWrite) { this.trace( 'found request error with pending write', @@ -180,8 +180,9 @@ export class StreamConnection extends EventEmitter { } // This header is required so that the BigQuery Storage API // knows which region to route the request to. - callOptions.otherArgs.headers['x-goog-request-params'] = - `write_stream=${streamId}`; + callOptions.otherArgs.headers[ + 'x-goog-request-params' + ] = `write_stream=${streamId}`; return callOptions; } diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 300a8db8..0e8f93a3 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -13,7 +13,7 @@ // limitations under the License. import * as assert from 'assert'; -import {describe, it, xit} from 'mocha'; +import {describe, it} from 'mocha'; import * as uuid from 'uuid'; import * as gax from 'google-gax'; import * as sinon from 'sinon'; @@ -1294,7 +1294,7 @@ describe('managedwriter.WriterClient', () => { await sleep(100); conn.destroy(); await sleep(100); - + // should throw error of reconnection assert.notEqual(foundError, null); assert.equal(foundError!.message.includes('reconnect'), true); From 70f6972d7d5d52e02ff0c3b18e8313ee22f6b4be Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Mon, 22 Apr 2024 15:31:34 -0400 Subject: [PATCH 03/25] fix: lint issues --- src/managedwriter/stream_connection.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index c806ed09..cea2318f 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -180,9 +180,8 @@ export class StreamConnection extends EventEmitter { } // This header is required so that the BigQuery Storage API // knows which region to route the request to. - callOptions.otherArgs.headers[ - 'x-goog-request-params' - ] = `write_stream=${streamId}`; + callOptions.otherArgs.headers['x-goog-request-params'] = + `write_stream=${streamId}`; return callOptions; } From a751d77cb49069828de24fd96dbecd15f8f91125 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 22 Apr 2024 19:32:40 +0000 Subject: [PATCH 04/25] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- src/managedwriter/stream_connection.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index c806ed09..cea2318f 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -180,9 +180,8 @@ export class StreamConnection extends EventEmitter { } // This header is required so that the BigQuery Storage API // knows which region to route the request to. - callOptions.otherArgs.headers[ - 'x-goog-request-params' - ] = `write_stream=${streamId}`; + callOptions.otherArgs.headers['x-goog-request-params'] = + `write_stream=${streamId}`; return callOptions; } From 4cfe0854248066e4f9dcd9b232438ece639c9b96 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 23 Apr 2024 15:01:30 -0400 Subject: [PATCH 05/25] feat: enable write retries on failed pending writes --- src/managedwriter/pending_write.ts | 6 + src/managedwriter/stream_connection.ts | 29 ++++- src/managedwriter/writer_client.ts | 18 +++ system-test/managed_writer_client_test.ts | 149 ++++++++++++++++++++++ 4 files changed, 198 insertions(+), 4 deletions(-) diff --git a/src/managedwriter/pending_write.ts b/src/managedwriter/pending_write.ts index 2e5a7483..82e9e362 100644 --- a/src/managedwriter/pending_write.ts +++ b/src/managedwriter/pending_write.ts @@ -28,18 +28,24 @@ type AppendRowRequest = export class PendingWrite { private request: AppendRowRequest; private response?: AppendRowsResponse; + private retryAttempts: number; private promise: Promise; private resolveFunc?: (response: AppendRowsResponse) => void; private rejectFunc?: (reason?: protos.google.rpc.IStatus) => void; constructor(request: AppendRowRequest) { this.request = request; + this.retryAttempts = 0; this.promise = new Promise((resolve, reject) => { this.resolveFunc = resolve; this.rejectFunc = reject; }); } + _increaseRetryAttempts(): number { + return this.retryAttempts++; + } + _markDone(err: Error | null, response?: AppendRowsResponse) { if (err) { this.rejectFunc && this.rejectFunc(err); diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index cea2318f..2e9b26db 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -108,9 +108,14 @@ export class StreamConnection extends EventEmitter { private handleError = (err: gax.GoogleError) => { this.trace('on error', err, JSON.stringify(err)); if (this.shouldReconnect(err)) { - err.message = 'reconnect triggered due to: ' + err.message; - this.ackAllPendingWrites(err); + const retrySettings = this._writeClient['_retrySettings']; this.reconnect(); + if (retrySettings.enableWriteRetries) { + this.resendAllPendingWrites(); + } else { + err.message = 'reconnect triggered due to: ' + err.message; + this.ackAllPendingWrites(err); + } return; } if (this.isPermanentError(err)) { @@ -180,8 +185,9 @@ export class StreamConnection extends EventEmitter { } // This header is required so that the BigQuery Storage API // knows which region to route the request to. - callOptions.otherArgs.headers['x-goog-request-params'] = - `write_stream=${streamId}`; + callOptions.otherArgs.headers[ + 'x-goog-request-params' + ] = `write_stream=${streamId}`; return callOptions; } @@ -248,6 +254,14 @@ export class StreamConnection extends EventEmitter { return null; } + private resendAllPendingWrites() { + let pw = this._pendingWrites.pop(); + while (pw) { + this.send(pw); + pw = this._pendingWrites.pop(); + } + } + private ackAllPendingWrites( err: Error | null, result?: @@ -300,6 +314,13 @@ export class StreamConnection extends EventEmitter { this.reconnect(); } this.trace('sending pending write', pw); + const retrySettings = this._writeClient['_retrySettings']; + const tries = pw._increaseRetryAttempts(); + if (tries > retrySettings.maxRetryAttempts) { + pw._markDone( + new Error(`pending write max retries reached: ${tries} attempts`) + ); + } try { this._connection?.write(request, err => { this.trace('wrote pending write', err, this._pendingWrites.length); diff --git a/src/managedwriter/writer_client.ts b/src/managedwriter/writer_client.ts index 55721ab6..55291575 100644 --- a/src/managedwriter/writer_client.ts +++ b/src/managedwriter/writer_client.ts @@ -23,6 +23,10 @@ import {StreamConnection} from './stream_connection'; type StreamConnections = { connectionList: StreamConnection[]; }; +type RetrySettings = { + enableWriteRetries: boolean; + maxRetryAttempts: number; +}; type CreateWriteStreamRequest = protos.google.cloud.bigquery.storage.v1.ICreateWriteStreamRequest; type BatchCommitWriteStreamsRequest = @@ -55,6 +59,7 @@ export class WriterClient { private _client: BigQueryWriteClient; private _connections: StreamConnections; private _open: boolean; + private _retrySettings: RetrySettings; constructor(opts?: ClientOptions) { const baseOptions = { @@ -69,6 +74,10 @@ export class WriterClient { connectionList: [], }; this._open = false; + this._retrySettings = { + enableWriteRetries: false, + maxRetryAttempts: 4, + }; } /** @@ -102,6 +111,15 @@ export class WriterClient { return this._open; } + // Enables StreamConnections to automatically retry failed appends. + // + // Enabling retries is best suited for cases where users want to achieve at-least-once + // append semantics. Use of automatic retries may complicate patterns where the user + // is designing for exactly-once append semantics. + enableWriteRetries(enable: boolean) { + this._retrySettings.enableWriteRetries = enable; + } + /** * Creates a write stream to the given table. * Additionally, every table has a special stream named DefaultStream diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 0e8f93a3..0dd65b09 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -862,6 +862,155 @@ describe('managedwriter.WriterClient', () => { } }); + describe('Flaky Scenarios', () => { + let flakyDatasetId: string; + const flakyRegion = 'us-east7'; + + let rowNum = 0; + const generateRows = (num: number) => { + const rows = []; + for (let i = 0; i < num; i++) { + rows.push({ + customer_name: generateUuid(), + row_num: rowNum++, + }); + } + return rows; + }; + + beforeEach(() => { + rowNum = 0; + }); + + before(async () => { + flakyDatasetId = generateUuid(); + console.log('Flaky dataset id:', flakyDatasetId); + await bigquery.createDataset(flakyDatasetId, { + location: flakyRegion, + }); + }); + + after(async () => { + console.log('Trying to delete flaky dataset id:', flakyDatasetId); + await bigquery + .dataset(flakyDatasetId) + .delete({force: true}) + .catch(console.warn); + }); + + it('should manage to send data in scenario where every 10 request drops the connection', async () => { + bqWriteClient.initialize(); + const client = new WriterClient(); + client.enableWriteRetries(true); + client.setClient(bqWriteClient); + + try { + const flakyTableId = generateUuid() + '_reconnect_on_close'; + const [table] = await bigquery + .dataset(flakyDatasetId) + .createTable(flakyTableId, { + schema, + location: flakyRegion, + }); + projectId = table.metadata.tableReference.projectId; + parent = `projects/${projectId}/datasets/${flakyDatasetId}/tables/${flakyTableId}`; + + const connection = await client.createStreamConnection({ + streamType: managedwriter.PendingStream, + destinationTable: parent, + }); + + connection.onConnectionError(err => { + console.log('flaky test error:', err); + }); + + const writer = new JSONWriter({ + connection, + protoDescriptor, + }); + + const iterations = new Array(50).fill(1); + let offset = 0; + for (const _ of iterations) { + const rows = generateRows(10); + const pw = writer.appendRows(rows, offset); + try { + await pw.getResult(); + } catch (err) { + console.error('found error trying to send rows'); + } + offset += 10; + } + + const res = await connection.finalize(); + connection.close(); + assert.equal(res?.rowCount, 500); + + writer.close(); + } finally { + client.close(); + } + }).timeout(2 * 60 * 1000); + + it('should manage to send data in scenario where opening the connection can fail more frequently', async () => { + bqWriteClient.initialize(); + const client = new WriterClient(); + client.enableWriteRetries(true); + client.setClient(bqWriteClient); + + try { + const flakyTableId = generateUuid() + '_initial_connect_failure'; + const [table] = await bigquery + .dataset(flakyDatasetId) + .createTable(flakyTableId, { + schema, + location: flakyRegion, + }); + projectId = table.metadata.tableReference.projectId; + parent = `projects/${projectId}/datasets/${flakyDatasetId}/tables/${flakyTableId}`; + + const connection = await client.createStreamConnection({ + streamType: managedwriter.PendingStream, + destinationTable: parent, + }); + client['_retrySettings'].maxRetryAttempts = 100; // aggresive retries + + connection.onConnectionError(err => { + console.log('flaky conn error:', err); + }); + + const writer = new JSONWriter({ + connection, + protoDescriptor, + }); + + const iterations = new Array(50).fill(1); + let offset = 0; + for (const _ of iterations) { + const rows = generateRows(10); + const pw = writer.appendRows(rows, offset); + try { + const res = await pw.getResult(); + assert.equal(res.error, null); + } catch (err) { + console.error('found error trying to send rows', err); + throw err; + } + offset += 10; + connection.close(); // Close connection on every append to trigger reconnection + } + + const res = await connection.finalize(); + connection.close(); + assert.equal(res?.rowCount, 500); + + writer.close(); + } finally { + client.close(); + } + }).timeout(2 * 60 * 1000); + }); + describe('Error Scenarios', () => { it('send request with mismatched proto descriptor', async () => { bqWriteClient.initialize(); From 474e2377b22a2da33167097a6e1f6409ec081774 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 23 Apr 2024 15:06:29 -0400 Subject: [PATCH 06/25] fix: lint issue --- src/managedwriter/stream_connection.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 2e9b26db..3ef1cc2d 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -185,9 +185,8 @@ export class StreamConnection extends EventEmitter { } // This header is required so that the BigQuery Storage API // knows which region to route the request to. - callOptions.otherArgs.headers[ - 'x-goog-request-params' - ] = `write_stream=${streamId}`; + callOptions.otherArgs.headers['x-goog-request-params'] = + `write_stream=${streamId}`; return callOptions; } From 50f2ae74a89f9587b5fafa26f46187f5d5111677 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 24 Apr 2024 13:01:48 -0400 Subject: [PATCH 07/25] fix: only reconnect on server-side close event --- src/managedwriter/stream_connection.ts | 40 ++++---- system-test/managed_writer_client_test.ts | 117 ++++++++++++---------- 2 files changed, 86 insertions(+), 71 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 3ef1cc2d..2557056e 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -87,7 +87,16 @@ export class StreamConnection extends EventEmitter { this._connection.on('error', this.handleError); this._connection.on('close', () => { this.trace('connection closed'); - this.close(); + this.reconnect(); + const retrySettings = this._writeClient['_retrySettings']; + if (retrySettings.enableWriteRetries) { + this.resendAllPendingWrites(); + } else { + const err = new Error( + 'aborted due to failed connection, please retry the request' + ); + this.ackAllPendingWrites(err); + } }); this._connection.on('pause', () => { this.trace('connection paused'); @@ -107,17 +116,6 @@ export class StreamConnection extends EventEmitter { private handleError = (err: gax.GoogleError) => { this.trace('on error', err, JSON.stringify(err)); - if (this.shouldReconnect(err)) { - const retrySettings = this._writeClient['_retrySettings']; - this.reconnect(); - if (retrySettings.enableWriteRetries) { - this.resendAllPendingWrites(); - } else { - err.message = 'reconnect triggered due to: ' + err.message; - this.ackAllPendingWrites(err); - } - return; - } if (this.isPermanentError(err)) { this.trace('found permanent error', err); this.ackAllPendingWrites(err); @@ -131,20 +129,26 @@ export class StreamConnection extends EventEmitter { err, nextPendingWrite ); - this.ackNextPendingWrite(err); + if (!this.isRetryableError(err)) { + this.ackNextPendingWrite(err); + return; + } + const retrySettings = this._writeClient['_retrySettings']; + if (!retrySettings.enableWriteRetries) { + this.ackNextPendingWrite(err); + } return; } this.emit('error', err); }; - private shouldReconnect(err: gax.GoogleError): boolean { + private isRetryableError(err: gax.GoogleError): boolean { const reconnectionErrorCodes = [ - gax.Status.UNAVAILABLE, - gax.Status.RESOURCE_EXHAUSTED, gax.Status.ABORTED, + gax.Status.UNAVAILABLE, gax.Status.CANCELLED, - gax.Status.DEADLINE_EXCEEDED, gax.Status.INTERNAL, + gax.Status.DEADLINE_EXCEEDED, ]; return !!err.code && reconnectionErrorCodes.includes(err.code); } @@ -248,7 +252,7 @@ export class StreamConnection extends EventEmitter { private getNextPendingWrite(): PendingWrite | null { if (this._pendingWrites.length > 0) { - return this._pendingWrites[0]; + return this._pendingWrites[this._pendingWrites.length - 1]; } return null; } diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 0dd65b09..2b332bf5 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -24,6 +24,7 @@ import * as protobuf from 'protobufjs'; import {ClientOptions} from 'google-gax'; import * as customerRecordProtoJson from '../samples/customer_record.json'; import {JSONEncoder} from '../src/managedwriter/encoder'; +import {PendingWrite} from '../src/managedwriter/pending_write'; const sandbox = sinon.createSandbox(); afterEach(() => sandbox.restore()); @@ -884,7 +885,6 @@ describe('managedwriter.WriterClient', () => { before(async () => { flakyDatasetId = generateUuid(); - console.log('Flaky dataset id:', flakyDatasetId); await bigquery.createDataset(flakyDatasetId, { location: flakyRegion, }); @@ -898,7 +898,7 @@ describe('managedwriter.WriterClient', () => { .catch(console.warn); }); - it('should manage to send data in scenario where every 10 request drops the connection', async () => { + it('should manage to send data in sequence scenario where every 10 request drops the connection', async () => { bqWriteClient.initialize(); const client = new WriterClient(); client.enableWriteRetries(true); @@ -952,6 +952,61 @@ describe('managedwriter.WriterClient', () => { } }).timeout(2 * 60 * 1000); + it('should manage to send data in parallel in scenario where every 10 request drops the connection', async () => { + bqWriteClient.initialize(); + const client = new WriterClient(); + client.enableWriteRetries(true); + client.setClient(bqWriteClient); + + try { + const flakyTableId = generateUuid() + '_reconnect_on_close'; + const [table] = await bigquery + .dataset(flakyDatasetId) + .createTable(flakyTableId, { + schema, + location: flakyRegion, + }); + projectId = table.metadata.tableReference.projectId; + parent = `projects/${projectId}/datasets/${flakyDatasetId}/tables/${flakyTableId}`; + + const connection = await client.createStreamConnection({ + streamId: managedwriter.DefaultStream, + destinationTable: parent, + }); + client['_retrySettings'].maxRetryAttempts = 10; + + connection.onConnectionError(err => { + console.log('flaky test error:', err); + }); + + const writer = new JSONWriter({ + connection, + protoDescriptor, + }); + + const pendingWrites: PendingWrite[] = []; + const iterations = new Array(50).fill(1); + for (const _ of iterations) { + const rows = generateRows(10); + const pw = writer.appendRows(rows); + pendingWrites.push(pw); + } + + await Promise.all(pendingWrites.map(pw => pw.getResult())); + + const [rows] = await bigquery.query( + `SELECT * FROM \`${projectId}.${flakyDatasetId}.${flakyTableId}\` order by row_num` + ); + + // allow some level of duplication. On testing we saw around 0-2 dup appendRequest + assert.strictEqual(rows.length >= 500 && rows.length <= 520, true); + connection.close(); + writer.close(); + } finally { + client.close(); + } + }).timeout(2 * 60 * 1000); + it('should manage to send data in scenario where opening the connection can fail more frequently', async () => { bqWriteClient.initialize(); const client = new WriterClient(); @@ -1237,7 +1292,7 @@ describe('managedwriter.WriterClient', () => { } }); - it('should trigger reconnection given some specific errors', async () => { + it('should trigger reconnection when connection closes from the server', async () => { bqWriteClient.initialize(); const client = new WriterClient(); client.setClient(bqWriteClient); @@ -1274,51 +1329,9 @@ describe('managedwriter.WriterClient', () => { ); await pw.getResult(); - const reconnectErrorCases: gax.GoogleError[] = [ - { - code: gax.Status.ABORTED, - msg: 'Closing the stream because it has been inactive', - }, - { - code: gax.Status.RESOURCE_EXHAUSTED, - msg: 'read econnreset', - }, - { - code: gax.Status.ABORTED, - msg: 'service is currently unavailable', - }, - { - code: gax.Status.RESOURCE_EXHAUSTED, - msg: 'bandwidth exhausted', - }, - { - code: gax.Status.RESOURCE_EXHAUSTED, - msg: 'memory limit exceeded', - }, - { - code: gax.Status.CANCELLED, - msg: 'any', - }, - { - code: gax.Status.DEADLINE_EXCEEDED, - msg: 'a msg', - }, - { - code: gax.Status.INTERNAL, - msg: 'received RST_STREAM with code', - }, - ].map(err => { - const gerr = new gax.GoogleError(err.msg); - gerr.code = err.code; - return gerr; - }); - for (const gerr of reconnectErrorCases) { - const conn = connection['_connection'] as gax.CancellableStream; // private method - conn.emit('error', gerr); - assert.equal(reconnectedCalled, true); - - reconnectedCalled = false; // reset flag - } + const conn = connection['_connection'] as gax.CancellableStream; // private method + conn.emit('close'); + assert.equal(reconnectedCalled, true); writer.close(); } finally { @@ -1429,24 +1442,22 @@ describe('managedwriter.WriterClient', () => { foundError = err as gax.GoogleError; }); - // Simulate server sending RESOURCE_EXHAUSTED error on a write + // Simulate server sending ABORTED error on a write const conn = connection['_connection'] as gax.CancellableStream; // private method // swallow ack for the last appendRow call, so we can simulate it failing conn.removeAllListeners('data'); await new Promise(resolve => conn.once('data', resolve)); conn.addListener('data', connection['handleData']); - const gerr = new gax.GoogleError('memory limit exceeded'); - gerr.code = gax.Status.RESOURCE_EXHAUSTED; - conn.emit('error', gerr); // simulate server closing conn. + conn.emit('close'); await sleep(100); conn.destroy(); await sleep(100); // should throw error of reconnection assert.notEqual(foundError, null); - assert.equal(foundError!.message.includes('reconnect'), true); + assert.equal(foundError!.message.includes('retry'), true); connection.close(); writer.close(); From 63ac8093be944010f493a405ab6773495e7e88ac Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 24 Apr 2024 15:56:14 -0400 Subject: [PATCH 08/25] fix: only reconnect on close if there are pending writes --- src/managedwriter/stream_connection.ts | 31 +++++++++++++---------- system-test/managed_writer_client_test.ts | 10 ++++++-- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 2557056e..60b959a7 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -87,15 +87,17 @@ export class StreamConnection extends EventEmitter { this._connection.on('error', this.handleError); this._connection.on('close', () => { this.trace('connection closed'); - this.reconnect(); - const retrySettings = this._writeClient['_retrySettings']; - if (retrySettings.enableWriteRetries) { - this.resendAllPendingWrites(); - } else { - const err = new Error( - 'aborted due to failed connection, please retry the request' - ); - this.ackAllPendingWrites(err); + if (this.hasPendingWrites()) { + this.reconnect(); + const retrySettings = this._writeClient['_retrySettings']; + if (retrySettings.enableWriteRetries) { + this.resendAllPendingWrites(); + } else { + const err = new Error( + 'aborted due to failed connection, please retry the request' + ); + this.ackAllPendingWrites(err); + } } }); this._connection.on('pause', () => { @@ -196,8 +198,7 @@ export class StreamConnection extends EventEmitter { private handleData = (response: AppendRowsResponse) => { this.trace('data arrived', response); - const pw = this.getNextPendingWrite(); - if (!pw) { + if (!this.hasPendingWrites()) { this.trace('data arrived with no pending write available', response); return; } @@ -250,6 +251,10 @@ export class StreamConnection extends EventEmitter { return this._streamId; }; + private hasPendingWrites(): boolean { + return this._pendingWrites.length > 0; + } + private getNextPendingWrite(): PendingWrite | null { if (this._pendingWrites.length > 0) { return this._pendingWrites[this._pendingWrites.length - 1]; @@ -271,10 +276,8 @@ export class StreamConnection extends EventEmitter { | protos.google.cloud.bigquery.storage.v1.IAppendRowsResponse | undefined ) { - let nextPendingWrite = this.getNextPendingWrite(); - while (nextPendingWrite) { + while (this.hasPendingWrites()) { this.ackNextPendingWrite(err, result); - nextPendingWrite = this.getNextPendingWrite(); } } diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 2b332bf5..f2cd5f23 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -891,7 +891,6 @@ describe('managedwriter.WriterClient', () => { }); after(async () => { - console.log('Trying to delete flaky dataset id:', flakyDatasetId); await bigquery .dataset(flakyDatasetId) .delete({force: true}) @@ -1292,7 +1291,7 @@ describe('managedwriter.WriterClient', () => { } }); - it('should trigger reconnection when connection closes from the server', async () => { + it('should trigger reconnection when connection closes and there are pending writes', async () => { bqWriteClient.initialize(); const client = new WriterClient(); client.setClient(bqWriteClient); @@ -1331,6 +1330,13 @@ describe('managedwriter.WriterClient', () => { const conn = connection['_connection'] as gax.CancellableStream; // private method conn.emit('close'); + + assert.equal(reconnectedCalled, false); + + // add a fake pending write + connection['_pendingWrites'].push(new PendingWrite({})); + conn.emit('close'); + assert.equal(reconnectedCalled, true); writer.close(); From b60db536017cf3c0de93d9a9ef2b65baf302e142 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 25 Apr 2024 12:03:45 -0400 Subject: [PATCH 09/25] feat: resend on retryable error --- src/managedwriter/stream_connection.ts | 21 +- system-test/managed_writer_client_test.ts | 352 +++++++++++++--------- 2 files changed, 227 insertions(+), 146 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 60b959a7..dcf7aea8 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -131,12 +131,12 @@ export class StreamConnection extends EventEmitter { err, nextPendingWrite ); - if (!this.isRetryableError(err)) { - this.ackNextPendingWrite(err); - return; - } const retrySettings = this._writeClient['_retrySettings']; - if (!retrySettings.enableWriteRetries) { + if (this.isRetryableError(err) && retrySettings.enableWriteRetries) { + process.nextTick(() => { + this.resendNextPendingWrite(); + }); + } else { this.ackNextPendingWrite(err); } return; @@ -263,10 +263,15 @@ export class StreamConnection extends EventEmitter { } private resendAllPendingWrites() { - let pw = this._pendingWrites.pop(); - while (pw) { + while (this.hasPendingWrites()) { + this.resendNextPendingWrite(); + } + } + + private resendNextPendingWrite() { + const pw = this._pendingWrites.pop(); + if (pw) { this.send(pw); - pw = this._pendingWrites.pop(); } } diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index f2cd5f23..0c7bf1a2 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -42,6 +42,8 @@ type DescriptorProto = protos.google.protobuf.IDescriptorProto; type IInt64Value = protos.google.protobuf.IInt64Value; type AppendRowsResponse = protos.google.cloud.bigquery.storage.v1.IAppendRowsResponse; +type AppendRowRequest = + protos.google.cloud.bigquery.storage.v1.IAppendRowsRequest; const FieldDescriptorProtoType = protos.google.protobuf.FieldDescriptorProto.Type; @@ -897,172 +899,246 @@ describe('managedwriter.WriterClient', () => { .catch(console.warn); }); - it('should manage to send data in sequence scenario where every 10 request drops the connection', async () => { - bqWriteClient.initialize(); - const client = new WriterClient(); - client.enableWriteRetries(true); - client.setClient(bqWriteClient); + describe('should manage to send data in sequence scenario', () => { + it('every 10 request drops the connection', async () => { + bqWriteClient.initialize(); + const client = new WriterClient(); + client.enableWriteRetries(true); + client.setClient(bqWriteClient); - try { - const flakyTableId = generateUuid() + '_reconnect_on_close'; - const [table] = await bigquery - .dataset(flakyDatasetId) - .createTable(flakyTableId, { - schema, - location: flakyRegion, + try { + const flakyTableId = generateUuid() + '_reconnect_on_close'; + const [table] = await bigquery + .dataset(flakyDatasetId) + .createTable(flakyTableId, { + schema, + location: flakyRegion, + }); + projectId = table.metadata.tableReference.projectId; + parent = `projects/${projectId}/datasets/${flakyDatasetId}/tables/${flakyTableId}`; + + const connection = await client.createStreamConnection({ + streamType: managedwriter.PendingStream, + destinationTable: parent, }); - projectId = table.metadata.tableReference.projectId; - parent = `projects/${projectId}/datasets/${flakyDatasetId}/tables/${flakyTableId}`; - - const connection = await client.createStreamConnection({ - streamType: managedwriter.PendingStream, - destinationTable: parent, - }); - connection.onConnectionError(err => { - console.log('flaky test error:', err); - }); + connection.onConnectionError(err => { + console.log('flaky test error:', err); + }); - const writer = new JSONWriter({ - connection, - protoDescriptor, - }); + const writer = new JSONWriter({ + connection, + protoDescriptor, + }); - const iterations = new Array(50).fill(1); - let offset = 0; - for (const _ of iterations) { - const rows = generateRows(10); - const pw = writer.appendRows(rows, offset); - try { - await pw.getResult(); - } catch (err) { - console.error('found error trying to send rows'); + const iterations = new Array(50).fill(1); + let offset = 0; + for (const _ of iterations) { + const rows = generateRows(10); + const pw = writer.appendRows(rows, offset); + try { + await pw.getResult(); + } catch (err) { + console.error('found error trying to send rows'); + } + offset += 10; } - offset += 10; - } - const res = await connection.finalize(); - connection.close(); - assert.equal(res?.rowCount, 500); + const res = await connection.finalize(); + connection.close(); + assert.equal(res?.rowCount, 500); - writer.close(); - } finally { - client.close(); - } - }).timeout(2 * 60 * 1000); + writer.close(); + } finally { + client.close(); + } + }).timeout(2 * 60 * 1000); - it('should manage to send data in parallel in scenario where every 10 request drops the connection', async () => { - bqWriteClient.initialize(); - const client = new WriterClient(); - client.enableWriteRetries(true); - client.setClient(bqWriteClient); + it('opening the connection can fail more frequently', async () => { + bqWriteClient.initialize(); + const client = new WriterClient(); + client.enableWriteRetries(true); + client.setClient(bqWriteClient); - try { - const flakyTableId = generateUuid() + '_reconnect_on_close'; - const [table] = await bigquery - .dataset(flakyDatasetId) - .createTable(flakyTableId, { - schema, - location: flakyRegion, + try { + const flakyTableId = generateUuid() + '_initial_connect_failure'; + const [table] = await bigquery + .dataset(flakyDatasetId) + .createTable(flakyTableId, { + schema, + location: flakyRegion, + }); + projectId = table.metadata.tableReference.projectId; + parent = `projects/${projectId}/datasets/${flakyDatasetId}/tables/${flakyTableId}`; + + const connection = await client.createStreamConnection({ + streamType: managedwriter.PendingStream, + destinationTable: parent, }); - projectId = table.metadata.tableReference.projectId; - parent = `projects/${projectId}/datasets/${flakyDatasetId}/tables/${flakyTableId}`; + client['_retrySettings'].maxRetryAttempts = 100; // aggresive retries - const connection = await client.createStreamConnection({ - streamId: managedwriter.DefaultStream, - destinationTable: parent, - }); - client['_retrySettings'].maxRetryAttempts = 10; + connection.onConnectionError(err => { + console.log('flaky conn error:', err); + }); - connection.onConnectionError(err => { - console.log('flaky test error:', err); - }); + const writer = new JSONWriter({ + connection, + protoDescriptor, + }); - const writer = new JSONWriter({ - connection, - protoDescriptor, - }); + const iterations = new Array(50).fill(1); + let offset = 0; + for (const _ of iterations) { + const rows = generateRows(10); + const pw = writer.appendRows(rows, offset); + try { + const res = await pw.getResult(); + assert.equal(res.error, null); + } catch (err) { + console.error('found error trying to send rows', err); + throw err; + } + offset += 10; + connection.close(); // Close connection on every append to trigger reconnection + } - const pendingWrites: PendingWrite[] = []; - const iterations = new Array(50).fill(1); - for (const _ of iterations) { - const rows = generateRows(10); - const pw = writer.appendRows(rows); - pendingWrites.push(pw); - } + const res = await connection.finalize(); + connection.close(); + assert.equal(res?.rowCount, 500); - await Promise.all(pendingWrites.map(pw => pw.getResult())); + writer.close(); + } finally { + client.close(); + } + }).timeout(2 * 60 * 1000); + }); - const [rows] = await bigquery.query( - `SELECT * FROM \`${projectId}.${flakyDatasetId}.${flakyTableId}\` order by row_num` - ); + describe('should manage to send data in parallel', () => { + it('every 10 request drops the connection', async () => { + bqWriteClient.initialize(); + const client = new WriterClient(); + client.enableWriteRetries(true); + client.setClient(bqWriteClient); - // allow some level of duplication. On testing we saw around 0-2 dup appendRequest - assert.strictEqual(rows.length >= 500 && rows.length <= 520, true); - connection.close(); - writer.close(); - } finally { - client.close(); - } - }).timeout(2 * 60 * 1000); + try { + const flakyTableId = generateUuid() + '_reconnect_on_close'; + const [table] = await bigquery + .dataset(flakyDatasetId) + .createTable(flakyTableId, { + schema, + location: flakyRegion, + }); + projectId = table.metadata.tableReference.projectId; + parent = `projects/${projectId}/datasets/${flakyDatasetId}/tables/${flakyTableId}`; + + const connection = await client.createStreamConnection({ + streamType: managedwriter.PendingStream, + destinationTable: parent, + }); + client['_retrySettings'].maxRetryAttempts = 10; - it('should manage to send data in scenario where opening the connection can fail more frequently', async () => { - bqWriteClient.initialize(); - const client = new WriterClient(); - client.enableWriteRetries(true); - client.setClient(bqWriteClient); + connection.onConnectionError(err => { + console.log('flaky test error:', err); + }); - try { - const flakyTableId = generateUuid() + '_initial_connect_failure'; - const [table] = await bigquery - .dataset(flakyDatasetId) - .createTable(flakyTableId, { - schema, - location: flakyRegion, + const writer = new JSONWriter({ + connection, + protoDescriptor, }); - projectId = table.metadata.tableReference.projectId; - parent = `projects/${projectId}/datasets/${flakyDatasetId}/tables/${flakyTableId}`; - const connection = await client.createStreamConnection({ - streamType: managedwriter.PendingStream, - destinationTable: parent, - }); - client['_retrySettings'].maxRetryAttempts = 100; // aggresive retries + const pendingWrites: PendingWrite[] = []; + const iterations = new Array(50).fill(1); + let offset = 10; + for (const _ of iterations) { + const rows = generateRows(10); + const pw = writer.appendRows(rows); + pendingWrites.push(pw); + offset += 10; + } - connection.onConnectionError(err => { - console.log('flaky conn error:', err); - }); + await Promise.all(pendingWrites.map(pw => pw.getResult())); - const writer = new JSONWriter({ - connection, - protoDescriptor, - }); + const res = await connection.finalize(); + connection.close(); + assert.equal(res?.rowCount, 500); - const iterations = new Array(50).fill(1); - let offset = 0; - for (const _ of iterations) { - const rows = generateRows(10); - const pw = writer.appendRows(rows, offset); - try { - const res = await pw.getResult(); - assert.equal(res.error, null); - } catch (err) { - console.error('found error trying to send rows', err); - throw err; - } - offset += 10; - connection.close(); // Close connection on every append to trigger reconnection + writer.close(); + } finally { + client.close(); } + }).timeout(2 * 60 * 1000); - const res = await connection.finalize(); - connection.close(); - assert.equal(res?.rowCount, 500); + it('every 10 request there is a quota error', async () => { + bqWriteClient.initialize(); + const client = new WriterClient(); + client.enableWriteRetries(true); + client.setClient(bqWriteClient); - writer.close(); - } finally { - client.close(); - } - }).timeout(2 * 60 * 1000); + try { + const connection = await client.createStreamConnection({ + streamType: managedwriter.PendingStream, + destinationTable: parent, + }); + + let numCalls = 0; + const conn = connection['_connection'] as gax.CancellableStream; + sandbox + .stub(conn, 'write') + .callsFake( + ( + chunk: any, + cb?: ((error: Error | null | undefined) => void) | undefined + ): boolean => { + const req = chunk as AppendRowRequest; + cb && cb(null); + numCalls++; + if (numCalls % 10 === 0) { + const quotaErr = new gax.GoogleError('quota error'); + quotaErr.code = gax.Status.UNAVAILABLE; + conn.emit('error', quotaErr); + } else { + const res: AppendRowsResponse = { + writeStream: req.writeStream, + appendResult: { + offset: req.offset, + }, + }; + conn?.emit('data', res); + } + return false; + } + ); + + connection.onConnectionError(err => { + console.log('flaky test error:', err); + }); + + const writer = new JSONWriter({ + connection, + protoDescriptor, + }); + + const pendingWrites: PendingWrite[] = []; + const iterations = new Array(50).fill(1); + let offset = 10; + for (const _ of iterations) { + const rows = generateRows(10); + const pw = writer.appendRows(rows); + pendingWrites.push(pw); + offset += 10; + } + + await Promise.all(pendingWrites.map(pw => pw.getResult())); + + connection.close(); + assert.equal(numCalls, 56); + + writer.close(); + } finally { + client.close(); + } + }).timeout(2 * 60 * 1000); + }); }); describe('Error Scenarios', () => { From 254d96be09f089c1b1f05a950eb91a95c05ec612 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 25 Apr 2024 12:53:00 -0400 Subject: [PATCH 10/25] fix: do not emit error if is retryable and no listerners are set up --- src/managedwriter/stream_connection.ts | 3 +++ system-test/managed_writer_client_test.ts | 4 ---- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index dcf7aea8..f1f89d98 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -141,6 +141,9 @@ export class StreamConnection extends EventEmitter { } return; } + if (this.isRetryableError(err) && this.listenerCount('error') === 0) { + return; + } this.emit('error', err); }; diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 0c7bf1a2..5b5eb690 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -1444,10 +1444,6 @@ describe('managedwriter.WriterClient', () => { destinationTable: parent, }); - connection.onConnectionError(err => { - console.log('idle conn err', err); - }); - const writer = new JSONWriter({ connection, protoDescriptor, From fb9f85fc48deb5a35809ca89c9e7b54cf484592e Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 25 Apr 2024 15:08:36 -0400 Subject: [PATCH 11/25] fix: let permanent errors to nack individual pending writes --- src/managedwriter/stream_connection.ts | 37 +++++------------------ src/managedwriter/writer_client.ts | 24 ++++++++++++--- system-test/managed_writer_client_test.ts | 20 ++---------- 3 files changed, 28 insertions(+), 53 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index f1f89d98..041bdde4 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -118,12 +118,6 @@ export class StreamConnection extends EventEmitter { private handleError = (err: gax.GoogleError) => { this.trace('on error', err, JSON.stringify(err)); - if (this.isPermanentError(err)) { - this.trace('found permanent error', err); - this.ackAllPendingWrites(err); - this.emit('error', err); - return; - } const nextPendingWrite = this.getNextPendingWrite(); if (nextPendingWrite) { this.trace( @@ -139,7 +133,6 @@ export class StreamConnection extends EventEmitter { } else { this.ackNextPendingWrite(err); } - return; } if (this.isRetryableError(err) && this.listenerCount('error') === 0) { return; @@ -148,14 +141,14 @@ export class StreamConnection extends EventEmitter { }; private isRetryableError(err: gax.GoogleError): boolean { - const reconnectionErrorCodes = [ + const errorCodes = [ gax.Status.ABORTED, gax.Status.UNAVAILABLE, gax.Status.CANCELLED, gax.Status.INTERNAL, gax.Status.DEADLINE_EXCEEDED, ]; - return !!err.code && reconnectionErrorCodes.includes(err.code); + return !!err.code && errorCodes.includes(err.code); } private isConnectionClosed() { @@ -165,22 +158,6 @@ export class StreamConnection extends EventEmitter { return true; } - private isPermanentError(err: gax.GoogleError): boolean { - if (err.code === gax.Status.INVALID_ARGUMENT) { - const storageErrors = parseStorageErrors(err); - for (const storageError of storageErrors) { - if ( - storageError.errorMessage?.includes( - 'Schema mismatch due to extra fields in user schema' - ) - ) { - return true; - } - } - } - return false; - } - private resolveCallOptions( streamId: string, options?: gax.CallOptions @@ -323,11 +300,6 @@ export class StreamConnection extends EventEmitter { } private send(pw: PendingWrite) { - const request = pw.getRequest(); - if (this.isConnectionClosed()) { - this.reconnect(); - } - this.trace('sending pending write', pw); const retrySettings = this._writeClient['_retrySettings']; const tries = pw._increaseRetryAttempts(); if (tries > retrySettings.maxRetryAttempts) { @@ -335,7 +307,12 @@ export class StreamConnection extends EventEmitter { new Error(`pending write max retries reached: ${tries} attempts`) ); } + if (this.isConnectionClosed()) { + this.reconnect(); + } + this.trace('sending pending write', pw); try { + const request = pw.getRequest(); this._connection?.write(request, err => { this.trace('wrote pending write', err, this._pendingWrites.length); if (err) { diff --git a/src/managedwriter/writer_client.ts b/src/managedwriter/writer_client.ts index 55291575..d27f3e41 100644 --- a/src/managedwriter/writer_client.ts +++ b/src/managedwriter/writer_client.ts @@ -111,15 +111,29 @@ export class WriterClient { return this._open; } - // Enables StreamConnections to automatically retry failed appends. - // - // Enabling retries is best suited for cases where users want to achieve at-least-once - // append semantics. Use of automatic retries may complicate patterns where the user - // is designing for exactly-once append semantics. + /** + * Enables StreamConnections to automatically retry failed appends. + * + * Enabling retries is best suited for cases where users want to achieve at-least-once + * append semantics. Use of automatic retries may complicate patterns where the user + * is designing for exactly-once append semantics. + */ enableWriteRetries(enable: boolean) { this._retrySettings.enableWriteRetries = enable; } + /** + * Change max retries attempts on child StreamConnections. + * + * The default valuen is to retry 4 times. + * + * Only valid right now when write retries are enabled. + * @see enableWriteRetries. + */ + setMaxRetryAttempts(retryAttempts: number) { + this._retrySettings.maxRetryAttempts = retryAttempts; + } + /** * Creates a write stream to the given table. * Additionally, every table has a special stream named DefaultStream diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 5b5eb690..9bb131f2 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -922,10 +922,6 @@ describe('managedwriter.WriterClient', () => { destinationTable: parent, }); - connection.onConnectionError(err => { - console.log('flaky test error:', err); - }); - const writer = new JSONWriter({ connection, protoDescriptor, @@ -958,6 +954,7 @@ describe('managedwriter.WriterClient', () => { bqWriteClient.initialize(); const client = new WriterClient(); client.enableWriteRetries(true); + client.setMaxRetryAttempts(100); // aggresive retries client.setClient(bqWriteClient); try { @@ -975,11 +972,6 @@ describe('managedwriter.WriterClient', () => { streamType: managedwriter.PendingStream, destinationTable: parent, }); - client['_retrySettings'].maxRetryAttempts = 100; // aggresive retries - - connection.onConnectionError(err => { - console.log('flaky conn error:', err); - }); const writer = new JSONWriter({ connection, @@ -1018,6 +1010,7 @@ describe('managedwriter.WriterClient', () => { bqWriteClient.initialize(); const client = new WriterClient(); client.enableWriteRetries(true); + client.setMaxRetryAttempts(10); client.setClient(bqWriteClient); try { @@ -1035,11 +1028,6 @@ describe('managedwriter.WriterClient', () => { streamType: managedwriter.PendingStream, destinationTable: parent, }); - client['_retrySettings'].maxRetryAttempts = 10; - - connection.onConnectionError(err => { - console.log('flaky test error:', err); - }); const writer = new JSONWriter({ connection, @@ -1109,10 +1097,6 @@ describe('managedwriter.WriterClient', () => { } ); - connection.onConnectionError(err => { - console.log('flaky test error:', err); - }); - const writer = new JSONWriter({ connection, protoDescriptor, From 94241d768260d272f94566e05dffd856df1b76e1 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 25 Apr 2024 17:00:13 -0400 Subject: [PATCH 12/25] fix: remove unused import --- src/managedwriter/stream_connection.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 041bdde4..9422911d 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -19,7 +19,6 @@ import * as protos from '../../protos/protos'; import {WriterClient} from './writer_client'; import {PendingWrite} from './pending_write'; import {logger} from './logger'; -import {parseStorageErrors} from './error'; type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema; type IInt64Value = protos.google.protobuf.IInt64Value; From b51ea185a8033d1e15a8d421196e4dfa7a3a187a Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 25 Apr 2024 22:24:27 -0400 Subject: [PATCH 13/25] fix: grpc conn.write sequence is not stable --- src/managedwriter/stream_connection.ts | 27 +++++++++++------------ src/managedwriter/writer.ts | 2 +- system-test/managed_writer_client_test.ts | 8 +++---- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 9422911d..00d90d41 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -126,9 +126,10 @@ export class StreamConnection extends EventEmitter { ); const retrySettings = this._writeClient['_retrySettings']; if (this.isRetryableError(err) && retrySettings.enableWriteRetries) { - process.nextTick(() => { - this.resendNextPendingWrite(); - }); + if (!this.isConnectionClosed()) { + const pw = this._pendingWrites.pop()!; + this.send(pw); + } } else { this.ackNextPendingWrite(err); } @@ -176,7 +177,7 @@ export class StreamConnection extends EventEmitter { } private handleData = (response: AppendRowsResponse) => { - this.trace('data arrived', response); + this.trace('data arrived', response, this._pendingWrites.length); if (!this.hasPendingWrites()) { this.trace('data arrived with no pending write available', response); return; @@ -242,15 +243,12 @@ export class StreamConnection extends EventEmitter { } private resendAllPendingWrites() { - while (this.hasPendingWrites()) { - this.resendNextPendingWrite(); - } - } - - private resendNextPendingWrite() { - const pw = this._pendingWrites.pop(); - if (pw) { - this.send(pw); + const pendingWrites = [...this._pendingWrites]; // copy array; + let pw = pendingWrites.pop(); + while (pw) { + this._pendingWrites.pop(); // remove from real queue + this.send(pw); // .send immediately adds to the queue + pw = pendingWrites.pop(); } } @@ -273,6 +271,7 @@ export class StreamConnection extends EventEmitter { ) { const pw = this._pendingWrites.pop(); if (pw) { + this.trace('ack pending write:', pw, err, result); pw._markDone(err, result); } } @@ -312,13 +311,13 @@ export class StreamConnection extends EventEmitter { this.trace('sending pending write', pw); try { const request = pw.getRequest(); + this._pendingWrites.unshift(pw); this._connection?.write(request, err => { this.trace('wrote pending write', err, this._pendingWrites.length); if (err) { pw._markDone(err); //TODO: add retries return; } - this._pendingWrites.unshift(pw); }); } catch (err) { pw._markDone(err as Error); diff --git a/src/managedwriter/writer.ts b/src/managedwriter/writer.ts index 04939e07..b6e0db31 100644 --- a/src/managedwriter/writer.ts +++ b/src/managedwriter/writer.ts @@ -160,7 +160,7 @@ export class Writer { offsetValue?: IInt64Value['value'] ): PendingWrite { let offset: AppendRowRequest['offset']; - if (offsetValue) { + if (offsetValue || offsetValue === 0) { offset = { value: offsetValue, }; diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 9bb131f2..ebed0616 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -1036,10 +1036,10 @@ describe('managedwriter.WriterClient', () => { const pendingWrites: PendingWrite[] = []; const iterations = new Array(50).fill(1); - let offset = 10; + let offset = 0; for (const _ of iterations) { const rows = generateRows(10); - const pw = writer.appendRows(rows); + const pw = writer.appendRows(rows, offset); pendingWrites.push(pw); offset += 10; } @@ -1104,10 +1104,10 @@ describe('managedwriter.WriterClient', () => { const pendingWrites: PendingWrite[] = []; const iterations = new Array(50).fill(1); - let offset = 10; + let offset = 0; for (const _ of iterations) { const rows = generateRows(10); - const pw = writer.appendRows(rows); + const pw = writer.appendRows(rows, offset); pendingWrites.push(pw); offset += 10; } From 4e7bcf32ee34a8f01ba4e7ebb09d904d9af20a9d Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 26 Apr 2024 11:59:08 -0400 Subject: [PATCH 14/25] feat: handle in stream response error and retry RESOURCE_EXAUSTED --- src/managedwriter/stream_connection.ts | 32 +++++++++++++----- system-test/managed_writer_client_test.ts | 40 +++++++++++++++-------- 2 files changed, 49 insertions(+), 23 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 00d90d41..28bcc445 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -124,15 +124,7 @@ export class StreamConnection extends EventEmitter { err, nextPendingWrite ); - const retrySettings = this._writeClient['_retrySettings']; - if (this.isRetryableError(err) && retrySettings.enableWriteRetries) { - if (!this.isConnectionClosed()) { - const pw = this._pendingWrites.pop()!; - this.send(pw); - } - } else { - this.ackNextPendingWrite(err); - } + this.handleRetry(err); } if (this.isRetryableError(err) && this.listenerCount('error') === 0) { return; @@ -140,6 +132,18 @@ export class StreamConnection extends EventEmitter { this.emit('error', err); }; + private handleRetry(err: gax.GoogleError) { + const retrySettings = this._writeClient['_retrySettings']; + if (this.isRetryableError(err) && retrySettings.enableWriteRetries) { + if (!this.isConnectionClosed()) { + const pw = this._pendingWrites.pop()!; + this.send(pw); + } + } else { + this.ackNextPendingWrite(err); + } + } + private isRetryableError(err: gax.GoogleError): boolean { const errorCodes = [ gax.Status.ABORTED, @@ -147,6 +151,7 @@ export class StreamConnection extends EventEmitter { gax.Status.CANCELLED, gax.Status.INTERNAL, gax.Status.DEADLINE_EXCEEDED, + gax.Status.RESOURCE_EXHAUSTED, ]; return !!err.code && errorCodes.includes(err.code); } @@ -185,6 +190,15 @@ export class StreamConnection extends EventEmitter { if (response.updatedSchema) { this.emit('schemaUpdated', response.updatedSchema); } + const rerr = response.error; + if (rerr) { + const gerr = new gax.GoogleError(rerr.message!); + gerr.code = rerr.code!; + if (this.isRetryableError(gerr)) { + this.handleRetry(gerr); + return; + } + } this.ackNextPendingWrite(null, response); }; diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index ebed0616..7d7b828b 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -1069,6 +1069,7 @@ describe('managedwriter.WriterClient', () => { }); let numCalls = 0; + let numSucess = 0; const conn = connection['_connection'] as gax.CancellableStream; sandbox .stub(conn, 'write') @@ -1080,10 +1081,18 @@ describe('managedwriter.WriterClient', () => { const req = chunk as AppendRowRequest; cb && cb(null); numCalls++; + if (!req.writeStream){ + return false; + } if (numCalls % 10 === 0) { - const quotaErr = new gax.GoogleError('quota error'); - quotaErr.code = gax.Status.UNAVAILABLE; - conn.emit('error', quotaErr); + const res: AppendRowsResponse = { + writeStream: req.writeStream, + error: { + code: gax.Status.RESOURCE_EXHAUSTED, + message: 'quota error', + }, + }; + conn?.emit('data', res); } else { const res: AppendRowsResponse = { writeStream: req.writeStream, @@ -1092,6 +1101,7 @@ describe('managedwriter.WriterClient', () => { }, }; conn?.emit('data', res); + numSucess++; } return false; } @@ -1115,7 +1125,7 @@ describe('managedwriter.WriterClient', () => { await Promise.all(pendingWrites.map(pw => pw.getResult())); connection.close(); - assert.equal(numCalls, 56); + assert.equal(numSucess, 50); writer.close(); } finally { @@ -1600,16 +1610,18 @@ describe('managedwriter.WriterClient', () => { ); for (const dataset of datasets) { - const [metadata] = await dataset.getMetadata(); - const creationTime = Number(metadata.creationTime); - - if (isResourceStale(creationTime)) { - try { - await dataset.delete({force: true}); - } catch (e) { - console.log(`dataset(${dataset.id}).delete() failed`); - console.log(e); - } + try { + console.log('getting data for dataset', dataset.id); + const [metadata] = await dataset.getMetadata(); + const creationTime = Number(metadata.creationTime); + + //if (isResourceStale(creationTime)) { + console.log('deleting dataset', dataset.id); + await dataset.delete({force: true}); + //} + } catch (e) { + console.log(`dataset(${dataset.id}).delete() failed`); + console.log(e); } } } From 17c0e71c69fe07256595f0546cede7620d0e0963 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 26 Apr 2024 12:07:06 -0400 Subject: [PATCH 15/25] fix: lint issue --- system-test/managed_writer_client_test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 7d7b828b..3a6dc249 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -1081,7 +1081,7 @@ describe('managedwriter.WriterClient', () => { const req = chunk as AppendRowRequest; cb && cb(null); numCalls++; - if (!req.writeStream){ + if (!req.writeStream) { return false; } if (numCalls % 10 === 0) { From 841d174f1e33319c5b564309c69cdcb70bc8e14c Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 26 Apr 2024 12:35:33 -0400 Subject: [PATCH 16/25] fix: remove in stream handling and RE as retryable error --- src/managedwriter/stream_connection.ts | 10 ---------- system-test/managed_writer_client_test.ts | 16 +++++++++------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 28bcc445..0adf4371 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -151,7 +151,6 @@ export class StreamConnection extends EventEmitter { gax.Status.CANCELLED, gax.Status.INTERNAL, gax.Status.DEADLINE_EXCEEDED, - gax.Status.RESOURCE_EXHAUSTED, ]; return !!err.code && errorCodes.includes(err.code); } @@ -190,15 +189,6 @@ export class StreamConnection extends EventEmitter { if (response.updatedSchema) { this.emit('schemaUpdated', response.updatedSchema); } - const rerr = response.error; - if (rerr) { - const gerr = new gax.GoogleError(rerr.message!); - gerr.code = rerr.code!; - if (this.isRetryableError(gerr)) { - this.handleRetry(gerr); - return; - } - } this.ackNextPendingWrite(null, response); }; diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 3a6dc249..3e3d862e 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -13,7 +13,7 @@ // limitations under the License. import * as assert from 'assert'; -import {describe, it} from 'mocha'; +import {describe, it, xit} from 'mocha'; import * as uuid from 'uuid'; import * as gax from 'google-gax'; import * as sinon from 'sinon'; @@ -929,6 +929,7 @@ describe('managedwriter.WriterClient', () => { const iterations = new Array(50).fill(1); let offset = 0; + // eslint-disable-next-line @typescript-eslint/no-unused-vars for (const _ of iterations) { const rows = generateRows(10); const pw = writer.appendRows(rows, offset); @@ -980,6 +981,7 @@ describe('managedwriter.WriterClient', () => { const iterations = new Array(50).fill(1); let offset = 0; + // eslint-disable-next-line @typescript-eslint/no-unused-vars for (const _ of iterations) { const rows = generateRows(10); const pw = writer.appendRows(rows, offset); @@ -1037,6 +1039,7 @@ describe('managedwriter.WriterClient', () => { const pendingWrites: PendingWrite[] = []; const iterations = new Array(50).fill(1); let offset = 0; + // eslint-disable-next-line @typescript-eslint/no-unused-vars for (const _ of iterations) { const rows = generateRows(10); const pw = writer.appendRows(rows, offset); @@ -1056,7 +1059,7 @@ describe('managedwriter.WriterClient', () => { } }).timeout(2 * 60 * 1000); - it('every 10 request there is a quota error', async () => { + xit('every 10 request there is a RESOURCE_EXAUSTED quota error', async () => { bqWriteClient.initialize(); const client = new WriterClient(); client.enableWriteRetries(true); @@ -1115,6 +1118,7 @@ describe('managedwriter.WriterClient', () => { const pendingWrites: PendingWrite[] = []; const iterations = new Array(50).fill(1); let offset = 0; + // eslint-disable-next-line @typescript-eslint/no-unused-vars for (const _ of iterations) { const rows = generateRows(10); const pw = writer.appendRows(rows, offset); @@ -1611,14 +1615,12 @@ describe('managedwriter.WriterClient', () => { for (const dataset of datasets) { try { - console.log('getting data for dataset', dataset.id); const [metadata] = await dataset.getMetadata(); const creationTime = Number(metadata.creationTime); - //if (isResourceStale(creationTime)) { - console.log('deleting dataset', dataset.id); - await dataset.delete({force: true}); - //} + if (isResourceStale(creationTime)) { + await dataset.delete({force: true}); + } } catch (e) { console.log(`dataset(${dataset.id}).delete() failed`); console.log(e); From 762f5d6192aceb139e72831055d02f7bcb0c731e Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 26 Apr 2024 12:45:39 -0400 Subject: [PATCH 17/25] fix: rollback changes to deleteDatasets method --- system-test/managed_writer_client_test.ts | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 3e3d862e..f9c43aca 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -1614,16 +1614,15 @@ describe('managedwriter.WriterClient', () => { ); for (const dataset of datasets) { - try { - const [metadata] = await dataset.getMetadata(); - const creationTime = Number(metadata.creationTime); - - if (isResourceStale(creationTime)) { + const [metadata] = await dataset.getMetadata(); + const creationTime = Number(metadata.creationTime); + if (isResourceStale(creationTime)) { + try { await dataset.delete({force: true}); + } catch (e) { + console.log(`dataset(${dataset.id}).delete() failed`); + console.log(e); } - } catch (e) { - console.log(`dataset(${dataset.id}).delete() failed`); - console.log(e); } } } From ac7af3289269ec5b835f02afe46dc56c1f3836ca Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 26 Apr 2024 16:06:29 -0400 Subject: [PATCH 18/25] fix: reconnect only on retryable errors and handle in stream errors --- src/managedwriter/stream_connection.ts | 26 +++++++++++++++++++---- system-test/managed_writer_client_test.ts | 14 ++++++++---- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 0adf4371..28f4efd1 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -55,6 +55,7 @@ export class StreamConnection extends EventEmitter { private _streamId: string; private _writeClient: WriterClient; private _connection?: gax.CancellableStream | null; + private _lastConnectionError?: gax.GoogleError | null; private _callOptions?: gax.CallOptions; private _pendingWrites: PendingWrite[]; @@ -75,6 +76,7 @@ export class StreamConnection extends EventEmitter { if (this.isOpen()) { this.close(); } + this._lastConnectionError = null; const callOptions = this.resolveCallOptions( this._streamId, this._callOptions @@ -85,11 +87,14 @@ export class StreamConnection extends EventEmitter { this._connection.on('data', this.handleData); this._connection.on('error', this.handleError); this._connection.on('close', () => { - this.trace('connection closed'); + this.trace('connection closed', this._lastConnectionError); if (this.hasPendingWrites()) { - this.reconnect(); const retrySettings = this._writeClient['_retrySettings']; - if (retrySettings.enableWriteRetries) { + if ( + this.isRetryableError(this._lastConnectionError) && + retrySettings.enableWriteRetries + ) { + this.reconnect(); this.resendAllPendingWrites(); } else { const err = new Error( @@ -117,6 +122,7 @@ export class StreamConnection extends EventEmitter { private handleError = (err: gax.GoogleError) => { this.trace('on error', err, JSON.stringify(err)); + this._lastConnectionError = err; const nextPendingWrite = this.getNextPendingWrite(); if (nextPendingWrite) { this.trace( @@ -144,7 +150,10 @@ export class StreamConnection extends EventEmitter { } } - private isRetryableError(err: gax.GoogleError): boolean { + private isRetryableError(err?: gax.GoogleError | null): boolean { + if (!err) { + return false; + } const errorCodes = [ gax.Status.ABORTED, gax.Status.UNAVAILABLE, @@ -189,6 +198,15 @@ export class StreamConnection extends EventEmitter { if (response.updatedSchema) { this.emit('schemaUpdated', response.updatedSchema); } + const responseErr = response.error; + if (responseErr) { + const gerr = new gax.GoogleError(responseErr.message!); + gerr.code = responseErr.code!; + if (this.isRetryableError(gerr)) { + this.handleRetry(gerr); + return; + } + } this.ackNextPendingWrite(null, response); }; diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index f9c43aca..3956cb43 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -13,7 +13,7 @@ // limitations under the License. import * as assert from 'assert'; -import {describe, it, xit} from 'mocha'; +import {describe, it} from 'mocha'; import * as uuid from 'uuid'; import * as gax from 'google-gax'; import * as sinon from 'sinon'; @@ -1059,7 +1059,7 @@ describe('managedwriter.WriterClient', () => { } }).timeout(2 * 60 * 1000); - xit('every 10 request there is a RESOURCE_EXAUSTED quota error', async () => { + it('every 10 request there is a in stream INTERNAL error', async () => { bqWriteClient.initialize(); const client = new WriterClient(); client.enableWriteRetries(true); @@ -1091,8 +1091,8 @@ describe('managedwriter.WriterClient', () => { const res: AppendRowsResponse = { writeStream: req.writeStream, error: { - code: gax.Status.RESOURCE_EXHAUSTED, - message: 'quota error', + code: gax.Status.INTERNAL, + message: 'internal error', }, }; conn?.emit('data', res); @@ -1368,6 +1368,7 @@ describe('managedwriter.WriterClient', () => { it('should trigger reconnection when connection closes and there are pending writes', async () => { bqWriteClient.initialize(); const client = new WriterClient(); + client.enableWriteRetries(true); client.setClient(bqWriteClient); const connection = await client.createStreamConnection({ @@ -1403,12 +1404,17 @@ describe('managedwriter.WriterClient', () => { await pw.getResult(); const conn = connection['_connection'] as gax.CancellableStream; // private method + + const gerr = new gax.GoogleError('aborted'); + gerr.code = gax.Status.ABORTED; + conn.emit('error', gerr); conn.emit('close'); assert.equal(reconnectedCalled, false); // add a fake pending write connection['_pendingWrites'].push(new PendingWrite({})); + conn.emit('error', gerr); conn.emit('close'); assert.equal(reconnectedCalled, true); From 2f2e6008bc8512333bbfc61f054609b1814adca6 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Mon, 29 Apr 2024 11:50:24 -0400 Subject: [PATCH 19/25] feat: log number of pending writes on reconnect --- src/managedwriter/stream_connection.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 28f4efd1..d5e2b1c9 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -357,7 +357,11 @@ export class StreamConnection extends EventEmitter { * Re open appendRows BiDi gRPC connection. */ reconnect() { - this.trace('reconnect called'); + this.trace( + 'reconnect called with', + this._pendingWrites.length, + 'pending writes' + ); this.close(); this.open(); } From f35bc488341f4dbff9a37ad44af877a1409e2bb4 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Mon, 29 Apr 2024 12:17:52 -0400 Subject: [PATCH 20/25] fix: reconnect trace msg --- src/managedwriter/stream_connection.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index d5e2b1c9..8aea97e5 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -358,9 +358,7 @@ export class StreamConnection extends EventEmitter { */ reconnect() { this.trace( - 'reconnect called with', - this._pendingWrites.length, - 'pending writes' + `reconnect called with ${this._pendingWrites.length} pending writes` ); this.close(); this.open(); From 62ff5dda612bc93271d22b80ff61778adcae830a Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 30 Apr 2024 13:29:03 -0400 Subject: [PATCH 21/25] fix: don't close conn on flush --- src/managedwriter/stream_connection.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 8aea97e5..26be8657 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -393,7 +393,6 @@ export class StreamConnection extends EventEmitter { async flushRows(request?: { offset?: IInt64Value['value']; }): Promise { - this.close(); if (this.isDefaultStream()) { return null; } From a921b9fe153b99e44c31c3cce5c827fb7917587c Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 30 Apr 2024 13:58:49 -0400 Subject: [PATCH 22/25] fix: rename var/properties for clarity --- src/managedwriter/pending_write.ts | 8 ++++---- src/managedwriter/stream_connection.ts | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/managedwriter/pending_write.ts b/src/managedwriter/pending_write.ts index 82e9e362..210a2bfb 100644 --- a/src/managedwriter/pending_write.ts +++ b/src/managedwriter/pending_write.ts @@ -28,22 +28,22 @@ type AppendRowRequest = export class PendingWrite { private request: AppendRowRequest; private response?: AppendRowsResponse; - private retryAttempts: number; + private attempts: number; private promise: Promise; private resolveFunc?: (response: AppendRowsResponse) => void; private rejectFunc?: (reason?: protos.google.rpc.IStatus) => void; constructor(request: AppendRowRequest) { this.request = request; - this.retryAttempts = 0; + this.attempts = 0; this.promise = new Promise((resolve, reject) => { this.resolveFunc = resolve; this.rejectFunc = reject; }); } - _increaseRetryAttempts(): number { - return this.retryAttempts++; + _increaseAttempts(): number { + return this.attempts++; } _markDone(err: Error | null, response?: AppendRowsResponse) { diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 26be8657..4b493b57 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -265,12 +265,12 @@ export class StreamConnection extends EventEmitter { } private resendAllPendingWrites() { - const pendingWrites = [...this._pendingWrites]; // copy array; - let pw = pendingWrites.pop(); + const pendingWritesToRetry = [...this._pendingWrites]; // copy array; + let pw = pendingWritesToRetry.pop(); while (pw) { this._pendingWrites.pop(); // remove from real queue this.send(pw); // .send immediately adds to the queue - pw = pendingWrites.pop(); + pw = pendingWritesToRetry.pop(); } } @@ -321,7 +321,7 @@ export class StreamConnection extends EventEmitter { private send(pw: PendingWrite) { const retrySettings = this._writeClient['_retrySettings']; - const tries = pw._increaseRetryAttempts(); + const tries = pw._increaseAttempts(); if (tries > retrySettings.maxRetryAttempts) { pw._markDone( new Error(`pending write max retries reached: ${tries} attempts`) From 1e4e851f1a807a5cfd8e6f8f493e11b78d85cb54 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 2 May 2024 09:37:54 -0400 Subject: [PATCH 23/25] fix: address review comments --- src/managedwriter/pending_write.ts | 17 +++++++++++++++++ src/managedwriter/stream_connection.ts | 17 +++++++++-------- src/managedwriter/writer.ts | 2 +- src/managedwriter/writer_client.ts | 7 ++++++- system-test/managed_writer_client_test.ts | 2 +- 5 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/managedwriter/pending_write.ts b/src/managedwriter/pending_write.ts index 210a2bfb..699e133d 100644 --- a/src/managedwriter/pending_write.ts +++ b/src/managedwriter/pending_write.ts @@ -42,10 +42,27 @@ export class PendingWrite { }); } + /** + * Increase number of attempts and return current value. + * + * @private + * @internal + * @returns {number} current number of attempts + */ _increaseAttempts(): number { return this.attempts++; } + /** + * Resolve pending write with error or AppendRowResponse. + * This resolves the promise accessed via GetResult() + * + * @see GetResult + * + * @private + * @internal + * @returns {number} current number of attempts + */ _markDone(err: Error | null, response?: AppendRowsResponse) { if (err) { this.rejectFunc && this.rejectFunc(err); diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 4b493b57..b0708309 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -89,17 +89,18 @@ export class StreamConnection extends EventEmitter { this._connection.on('close', () => { this.trace('connection closed', this._lastConnectionError); if (this.hasPendingWrites()) { - const retrySettings = this._writeClient['_retrySettings']; + const retrySettings = this._writeClient._retrySettings; if ( - this.isRetryableError(this._lastConnectionError) && - retrySettings.enableWriteRetries + retrySettings.enableWriteRetries && + this.isRetryableError(this._lastConnectionError) ) { this.reconnect(); this.resendAllPendingWrites(); } else { - const err = new Error( + const err = new gax.GoogleError( 'aborted due to failed connection, please retry the request' ); + err.code = gax.Status.ABORTED; this.ackAllPendingWrites(err); } } @@ -132,15 +133,15 @@ export class StreamConnection extends EventEmitter { ); this.handleRetry(err); } - if (this.isRetryableError(err) && this.listenerCount('error') === 0) { + if (this.listenerCount('error') === 0 && this.isRetryableError(err)) { return; } this.emit('error', err); }; private handleRetry(err: gax.GoogleError) { - const retrySettings = this._writeClient['_retrySettings']; - if (this.isRetryableError(err) && retrySettings.enableWriteRetries) { + const retrySettings = this._writeClient._retrySettings; + if (retrySettings.enableWriteRetries && this.isRetryableError(err)) { if (!this.isConnectionClosed()) { const pw = this._pendingWrites.pop()!; this.send(pw); @@ -320,7 +321,7 @@ export class StreamConnection extends EventEmitter { } private send(pw: PendingWrite) { - const retrySettings = this._writeClient['_retrySettings']; + const retrySettings = this._writeClient._retrySettings; const tries = pw._increaseAttempts(); if (tries > retrySettings.maxRetryAttempts) { pw._markDone( diff --git a/src/managedwriter/writer.ts b/src/managedwriter/writer.ts index b6e0db31..3902eb09 100644 --- a/src/managedwriter/writer.ts +++ b/src/managedwriter/writer.ts @@ -160,7 +160,7 @@ export class Writer { offsetValue?: IInt64Value['value'] ): PendingWrite { let offset: AppendRowRequest['offset']; - if (offsetValue || offsetValue === 0) { + if (offsetValue !== undefined && offsetValue !== null) { offset = { value: offsetValue, }; diff --git a/src/managedwriter/writer_client.ts b/src/managedwriter/writer_client.ts index d27f3e41..9b05dfaf 100644 --- a/src/managedwriter/writer_client.ts +++ b/src/managedwriter/writer_client.ts @@ -59,7 +59,12 @@ export class WriterClient { private _client: BigQueryWriteClient; private _connections: StreamConnections; private _open: boolean; - private _retrySettings: RetrySettings; + /** + * Retry settings, only internal for now. + * @private + * @internal + */ + _retrySettings: RetrySettings; constructor(opts?: ClientOptions) { const baseOptions = { diff --git a/system-test/managed_writer_client_test.ts b/system-test/managed_writer_client_test.ts index 3956cb43..e66e9693 100644 --- a/system-test/managed_writer_client_test.ts +++ b/system-test/managed_writer_client_test.ts @@ -1078,7 +1078,7 @@ describe('managedwriter.WriterClient', () => { .stub(conn, 'write') .callsFake( ( - chunk: any, + chunk: unknown, cb?: ((error: Error | null | undefined) => void) | undefined ): boolean => { const req = chunk as AppendRowRequest; From 4c01180c8ab7fae7480568a14cb6e52530755e6c Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 2 May 2024 11:49:53 -0400 Subject: [PATCH 24/25] fix: return after nack pending write due to max retries --- src/managedwriter/stream_connection.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index b0708309..82f988d3 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -327,6 +327,7 @@ export class StreamConnection extends EventEmitter { pw._markDone( new Error(`pending write max retries reached: ${tries} attempts`) ); + return; } if (this.isConnectionClosed()) { this.reconnect(); From de07922b36485932d1cf00464baae620ec4e2e24 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 2 May 2024 12:03:55 -0400 Subject: [PATCH 25/25] fix: change connect error msg and code --- src/managedwriter/stream_connection.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index 82f988d3..56e05e10 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -98,9 +98,9 @@ export class StreamConnection extends EventEmitter { this.resendAllPendingWrites(); } else { const err = new gax.GoogleError( - 'aborted due to failed connection, please retry the request' + 'Connection failure, please retry the request' ); - err.code = gax.Status.ABORTED; + err.code = gax.Status.UNAVAILABLE; this.ackAllPendingWrites(err); } }