From ae3d633256cc24bce0a2f6f303e41b77030adc35 Mon Sep 17 00:00:00 2001 From: Idriss Neumann Date: Fri, 8 Dec 2023 14:53:13 +0100 Subject: [PATCH 1/7] Issue #4: find the timestamp field dynamically --- pkg/quickwit/quickwit.go | 87 +++++++++++++++++++++++++----- src/configuration/ConfigEditor.tsx | 18 ------- src/datasource.ts | 48 ++++++++++------- 3 files changed, 103 insertions(+), 50 deletions(-) diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index 0aa9f1b..36ba450 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -26,6 +25,70 @@ type QuickwitDatasource struct { dsInfo es.DatasourceInfo } +type QuickwitMapping struct { + IndexConfig struct { + DocMapping struct { + TimestampField string `json:"timestamp_field"` + FieldMappings []struct { + Name string `json:"name"` + InputFormats []string `json:"input_formats"` + } `json:"field_mappings"` + } `json:"doc_mapping"` + } `json:"index_config"` +} + +func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, string, error) { + mappingEndpointUrl := qwUrl + "/indexes/" + index + qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl) + r, err := cli.Get(mappingEndpointUrl) + if err != nil { + errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) + qwlog.Error(errMsg) + return "", "", err + } + + statusCode := r.StatusCode + if statusCode < 200 || statusCode >= 400 { + errMsg := fmt.Sprintf("Error when calling url = %s: statusCode = %d", mappingEndpointUrl, statusCode) + qwlog.Error(errMsg) + return "", "", fmt.Errorf(errMsg) + } + + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + if err != nil { + errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) + qwlog.Error(errMsg) + return "", "", err + } + + var payload QuickwitMapping + err = json.Unmarshal(body, &payload) + if err != nil { + errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) + qwlog.Error(errMsg) + return "", "", fmt.Errorf(errMsg) + } + + timestampFieldName := payload.IndexConfig.DocMapping.TimestampField + timestampFieldFormat := "undef" + for _, field := range payload.IndexConfig.DocMapping.FieldMappings { + if field.Name == timestampFieldName && len(field.InputFormats) > 0 { + timestampFieldFormat = field.InputFormats[0] + break + } + } + + if timestampFieldFormat == "undef" { + errMsg := fmt.Sprintf("No format found for field: %s", string(timestampFieldName)) + qwlog.Error(errMsg) + return timestampFieldName, "", fmt.Errorf(errMsg) + } + + qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s, timestampFieldFormat = %s", timestampFieldName, timestampFieldFormat)) + return timestampFieldName, timestampFieldFormat, nil +} + // Creates a Quickwit datasource. func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { qwlog.Debug("Initializing new data source instance") @@ -50,19 +113,8 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc return nil, err } - timeField, ok := jsonData["timeField"].(string) - if !ok { - return nil, errors.New("timeField cannot be cast to string") - } - - if timeField == "" { - return nil, errors.New("a time field name is required") - } - - timeOutputFormat, ok := jsonData["timeOutputFormat"].(string) - if !ok { - return nil, errors.New("timeOutputFormat cannot be cast to string") - } + timeField, toOk := jsonData["timeField"].(string) + timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string) logLevelField, ok := jsonData["logLevelField"].(string) if !ok { @@ -96,6 +148,13 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc maxConcurrentShardRequests = 256 } + if !toOk || !tofOk { + timeField, timeOutputFormat, err = getTimestampFieldInfos(index, settings.URL, httpCli) + if nil != err { + return nil, err + } + } + configuredFields := es.ConfiguredFields{ TimeField: timeField, TimeOutputFormat: timeOutputFormat, diff --git a/src/configuration/ConfigEditor.tsx b/src/configuration/ConfigEditor.tsx index 18e7dba..f756d3c 100644 --- a/src/configuration/ConfigEditor.tsx +++ b/src/configuration/ConfigEditor.tsx @@ -73,24 +73,6 @@ export const QuickwitDetails = ({ value, onChange }: DetailsProps) => { width={40} /> - - onChange({ ...value, jsonData: {...value.jsonData, timeField: event.currentTarget.value}})} - placeholder="timestamp" - width={40} - /> - - - onChange({ ...value, jsonData: {...value.jsonData, timeOutputFormat: event.currentTarget.value}})} - placeholder="unix_timestamp_millisecs" - width={40} - /> - { + let fields = getAllFields(indexMetadata.index_config.doc_mapping.field_mappings); + let timestampFieldName = indexMetadata.index_config.doc_mapping.timestamp_field + let timestampField = fields.find((field) => field.json_path === timestampFieldName); + let timestampFormat = timestampField ? timestampField.field_mapping.output_format || '' : '' + let timestampFieldInfos = { 'field': timestampFieldName, 'format': timestampFormat } + console.log("timestampFieldInfos = " + JSON.stringify(timestampFieldInfos)) + return timestampFieldInfos + }) + ).subscribe(result => { + this.timeField = result.field; + this.timeOutputFormat = result.format; + this.queryBuilder = new ElasticQueryBuilder({ + timeField: this.timeField, + }); + }); + + this.logMessageField = settingsData.logMessageField || ''; + this.logLevelField = settingsData.logLevelField || ''; this.dataLinks = settingsData.dataLinks || []; this.languageProvider = new ElasticsearchLanguageProvider(this); } @@ -111,12 +129,7 @@ export class QuickwitDataSource message: 'Cannot save datasource, `index` is required', }; } - if (this.timeField === '' ) { - return { - status: 'error', - message: 'Cannot save datasource, `timeField` is required', - }; - } + return lastValueFrom( from(this.getResource('indexes/' + this.index)).pipe( mergeMap((indexMetadata) => { @@ -147,21 +160,19 @@ export class QuickwitDataSource if (this.timeField === '') { return `Time field must not be empty`; } - if (indexMetadata.index_config.doc_mapping.timestamp_field !== this.timeField) { - return `No timestamp field named '${this.timeField}' found`; - } + let fields = getAllFields(indexMetadata.index_config.doc_mapping.field_mappings); let timestampField = fields.find((field) => field.json_path === this.timeField); + // Should never happen. if (timestampField === undefined) { return `No field named '${this.timeField}' found in the doc mapping. This should never happen.`; } - if (timestampField.field_mapping.output_format !== this.timeOutputFormat) { - return `Timestamp output format is declared as '${timestampField.field_mapping.output_format}' in the doc mapping, not '${this.timeOutputFormat}'.`; - } + + let timeOutputFormat = timestampField.field_mapping.output_format || 'unknown'; const supportedTimestampOutputFormats = ['unix_timestamp_secs', 'unix_timestamp_millis', 'unix_timestamp_micros', 'unix_timestamp_nanos', 'iso8601', 'rfc3339']; - if (!supportedTimestampOutputFormats.includes(this.timeOutputFormat)) { - return `Timestamp output format '${this.timeOutputFormat} is not yet supported.`; + if (!supportedTimestampOutputFormats.includes(timeOutputFormat)) { + return `Timestamp output format '${timeOutputFormat} is not yet supported.`; } return; } @@ -310,6 +321,7 @@ export class QuickwitDataSource ignore_unavailable: true, index: this.index, }); + let esQuery = JSON.stringify(this.queryBuilder.getTermsQuery(queryDef)); esQuery = esQuery.replace(/\$timeFrom/g, range.from.valueOf().toString()); esQuery = esQuery.replace(/\$timeTo/g, range.to.valueOf().toString()); From 022d331db027d8ab5d31fe1275478a1f2b7528b5 Mon Sep 17 00:00:00 2001 From: Idriss Neumann Date: Sat, 16 Dec 2023 12:22:34 +0100 Subject: [PATCH 2/7] Issue #4: error handling --- pkg/quickwit/quickwit.go | 29 ++++++++++++++++++++++++----- src/datasource.ts | 24 +++++++++++++++++++++--- src/utils.ts | 14 ++++++++++++++ 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index 36ba450..7393069 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -37,6 +38,23 @@ type QuickwitMapping struct { } `json:"index_config"` } +type QuickwitCreationErrorPayload struct { + Message string `json:"message"` + StatusCode int `json:"status"` +} + +func newErrorCreationPayload(statusCode int, message string) error { + var payload QuickwitCreationErrorPayload + payload.Message = message + payload.StatusCode = statusCode + json, err := json.Marshal(payload) + if nil != err { + return err + } + + return errors.New(string(json)) +} + func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, string, error) { mappingEndpointUrl := qwUrl + "/indexes/" + index qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl) @@ -48,10 +66,11 @@ func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin } statusCode := r.StatusCode + if statusCode < 200 || statusCode >= 400 { - errMsg := fmt.Sprintf("Error when calling url = %s: statusCode = %d", mappingEndpointUrl, statusCode) + errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl) qwlog.Error(errMsg) - return "", "", fmt.Errorf(errMsg) + return "", "", newErrorCreationPayload(statusCode, errMsg) } defer r.Body.Close() @@ -59,7 +78,7 @@ func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin if err != nil { errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) qwlog.Error(errMsg) - return "", "", err + return "", "", newErrorCreationPayload(statusCode, errMsg) } var payload QuickwitMapping @@ -67,7 +86,7 @@ func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin if err != nil { errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) qwlog.Error(errMsg) - return "", "", fmt.Errorf(errMsg) + return "", "", newErrorCreationPayload(statusCode, errMsg) } timestampFieldName := payload.IndexConfig.DocMapping.TimestampField @@ -82,7 +101,7 @@ func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin if timestampFieldFormat == "undef" { errMsg := fmt.Sprintf("No format found for field: %s", string(timestampFieldName)) qwlog.Error(errMsg) - return timestampFieldName, "", fmt.Errorf(errMsg) + return timestampFieldName, "", newErrorCreationPayload(statusCode, errMsg) } qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s, timestampFieldFormat = %s", timestampFieldName, timestampFieldFormat)) diff --git a/src/datasource.ts b/src/datasource.ts index f7547dc..dc09b12 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -51,6 +51,7 @@ import { bucketAggregationConfig } from 'components/QueryEditor/BucketAggregatio import { isBucketAggregationWithField } from 'components/QueryEditor/BucketAggregationsEditor/aggregations'; import ElasticsearchLanguageProvider from 'LanguageProvider'; import { ReactNode } from 'react'; +import { extractJsonPayload } from 'utils'; export const REF_ID_STARTER_LOG_VOLUME = 'log-volume-'; @@ -91,8 +92,18 @@ export class QuickwitDataSource let timestampField = fields.find((field) => field.json_path === timestampFieldName); let timestampFormat = timestampField ? timestampField.field_mapping.output_format || '' : '' let timestampFieldInfos = { 'field': timestampFieldName, 'format': timestampFormat } - console.log("timestampFieldInfos = " + JSON.stringify(timestampFieldInfos)) return timestampFieldInfos + }), + catchError((err) => { + if (!err.data || !err.data.error) { + let err_source = extractJsonPayload(err.data.error) + if(!err_source) { + throw err + } + } + + // the error will be handle in the testDatasource function + return of({'field': '', 'format': ''}) }) ).subscribe(result => { this.timeField = result.field; @@ -143,7 +154,14 @@ export class QuickwitDataSource return of({ status: 'success', message: `Index OK. Time field name OK` }); }), catchError((err) => { - if (err.status === 404) { + if (err.data && err.data.error) { + let err_source = extractJsonPayload(err.data.error) + if (err_source) { + err = err_source + } + } + + if (err.status && err.status === 404) { return of({ status: 'error', message: 'Index does not exists.' }); } else if (err.message) { return of({ status: 'error', message: err.message }); @@ -377,7 +395,7 @@ export class QuickwitDataSource return _map(filteredFields, (field) => { return { text: field.json_path, - value: typeMap[field.field_mapping.type], + value: typeMap[field.field_mapping.type] }; }); }) diff --git a/src/utils.ts b/src/utils.ts index 5b8d940..a6747c3 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -13,6 +13,20 @@ export const describeMetric = (metric: MetricAggregation) => { return `${metricAggregationConfig[metric.type].label} ${metric.field}`; }; +export const extractJsonPayload = (msg: string) => { + const match = msg.match(/{.*}/); + + if (!match) { + return null; + } + + try { + return JSON.parse(match[0]); + } catch (error) { + return null; + } +} + /** * Utility function to clean up aggregations settings objects. * It removes nullish values and empty strings, array and objects From 6780309b8f0dc19bb789c63d5d9ae47c4db18b77 Mon Sep 17 00:00:00 2001 From: Idriss Neumann Date: Mon, 18 Dec 2023 17:05:58 +0100 Subject: [PATCH 3/7] Issue #4: use output_format instead input format and make it optional --- pkg/quickwit/quickwit.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index 7393069..5dfd7dd 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -31,8 +31,9 @@ type QuickwitMapping struct { DocMapping struct { TimestampField string `json:"timestamp_field"` FieldMappings []struct { - Name string `json:"name"` - InputFormats []string `json:"input_formats"` + Name string `json:"name"` + Type string `json:"type"` + OutputFormat *string `json:"output_format,omitempty"` } `json:"field_mappings"` } `json:"doc_mapping"` } `json:"index_config"` @@ -92,8 +93,8 @@ func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin timestampFieldName := payload.IndexConfig.DocMapping.TimestampField timestampFieldFormat := "undef" for _, field := range payload.IndexConfig.DocMapping.FieldMappings { - if field.Name == timestampFieldName && len(field.InputFormats) > 0 { - timestampFieldFormat = field.InputFormats[0] + if field.Type == "datetime" && field.Name == timestampFieldName && nil != field.OutputFormat { + timestampFieldFormat = *field.OutputFormat break } } From bc6373057ecca0d073d752a598a20652fdc8f76d Mon Sep 17 00:00:00 2001 From: Idriss Neumann Date: Fri, 22 Dec 2023 15:02:56 +0100 Subject: [PATCH 4/7] Issue #4: recursivity with fieldmappings with an object type --- pkg/quickwit/quickwit.go | 53 ++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index 5dfd7dd..e570bb5 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -26,15 +26,18 @@ type QuickwitDatasource struct { dsInfo es.DatasourceInfo } +type FieldMappings struct { + Name string `json:"name"` + Type string `json:"type"` + OutputFormat *string `json:"output_format,omitempty"` + FieldMappings []FieldMappings `json:"field_mappings,omitempty"` +} + type QuickwitMapping struct { IndexConfig struct { DocMapping struct { - TimestampField string `json:"timestamp_field"` - FieldMappings []struct { - Name string `json:"name"` - Type string `json:"type"` - OutputFormat *string `json:"output_format,omitempty"` - } `json:"field_mappings"` + TimestampField string `json:"timestamp_field"` + FieldMappings []FieldMappings `json:"field_mappings"` } `json:"doc_mapping"` } `json:"index_config"` } @@ -56,6 +59,30 @@ func newErrorCreationPayload(statusCode int, message string) error { return errors.New(string(json)) } +func findTimeStampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) *string { + if nil == fieldMappings { + return nil + } + + for _, field := range fieldMappings { + fieldName := field.Name + if nil != parentName { + fieldName = fmt.Sprintf("%s.%s", *parentName, fieldName) + } + + if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat { + return field.OutputFormat + } else if field.Type == "object" && nil != field.FieldMappings { + format := findTimeStampFormat(timestampFieldName, &field.Name, field.FieldMappings) + if nil != format { + return format + } + } + } + + return nil +} + func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, string, error) { mappingEndpointUrl := qwUrl + "/indexes/" + index qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl) @@ -91,22 +118,16 @@ func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin } timestampFieldName := payload.IndexConfig.DocMapping.TimestampField - timestampFieldFormat := "undef" - for _, field := range payload.IndexConfig.DocMapping.FieldMappings { - if field.Type == "datetime" && field.Name == timestampFieldName && nil != field.OutputFormat { - timestampFieldFormat = *field.OutputFormat - break - } - } + timestampFieldFormat := findTimeStampFormat(timestampFieldName, nil, payload.IndexConfig.DocMapping.FieldMappings) - if timestampFieldFormat == "undef" { + if nil == timestampFieldFormat { errMsg := fmt.Sprintf("No format found for field: %s", string(timestampFieldName)) qwlog.Error(errMsg) return timestampFieldName, "", newErrorCreationPayload(statusCode, errMsg) } - qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s, timestampFieldFormat = %s", timestampFieldName, timestampFieldFormat)) - return timestampFieldName, timestampFieldFormat, nil + qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s, timestampFieldFormat = %s", timestampFieldName, *timestampFieldFormat)) + return timestampFieldName, *timestampFieldFormat, nil } // Creates a Quickwit datasource. From 34a70c7ff052a8c14abc6508d35e0c10a36473d8 Mon Sep 17 00:00:00 2001 From: Idriss Neumann Date: Thu, 28 Dec 2023 10:10:23 +0100 Subject: [PATCH 5/7] Issue #4: add frontend unit tests --- src/utils.test.ts | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 src/utils.test.ts diff --git a/src/utils.test.ts b/src/utils.test.ts new file mode 100644 index 0000000..1bcec2c --- /dev/null +++ b/src/utils.test.ts @@ -0,0 +1,20 @@ +import { extractJsonPayload } from "utils"; + +describe('Test utils.extractJsonPayload', () => { + it('Extract valid JSON', () => { + const result = extractJsonPayload('Hey {"foo": "bar"}') + expect(result).toEqual({ + foo: "bar" + }); + }); + + it('Extract non valid JSON', () => { + const result = extractJsonPayload('Hey {"foo": invalid}') + expect(result).toEqual(null); + }); + + it('Extract multiple valid JSONs (not supported)', () => { + const result = extractJsonPayload('Hey {"foo": "bar"} {"foo2": "bar2"}') + expect(result).toEqual(null); + }); +}); From b66cce1ddc40dae05d76a85aebefd949f028d4bf Mon Sep 17 00:00:00 2001 From: Idriss Neumann Date: Thu, 28 Dec 2023 12:25:33 +0100 Subject: [PATCH 6/7] Issue #20: create a new module to make it more easily testable --- pkg/quickwit/quickwit.go | 100 +--------------------------- pkg/quickwit/timestamp_infos.go | 111 ++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 99 deletions(-) create mode 100644 pkg/quickwit/timestamp_infos.go diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index 9deef8f..4937372 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -33,103 +32,6 @@ type FieldMappings struct { FieldMappings []FieldMappings `json:"field_mappings,omitempty"` } -type QuickwitMapping struct { - IndexConfig struct { - DocMapping struct { - TimestampField string `json:"timestamp_field"` - FieldMappings []FieldMappings `json:"field_mappings"` - } `json:"doc_mapping"` - } `json:"index_config"` -} - -type QuickwitCreationErrorPayload struct { - Message string `json:"message"` - StatusCode int `json:"status"` -} - -func newErrorCreationPayload(statusCode int, message string) error { - var payload QuickwitCreationErrorPayload - payload.Message = message - payload.StatusCode = statusCode - json, err := json.Marshal(payload) - if nil != err { - return err - } - - return errors.New(string(json)) -} - -func findTimeStampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) *string { - if nil == fieldMappings { - return nil - } - - for _, field := range fieldMappings { - fieldName := field.Name - if nil != parentName { - fieldName = fmt.Sprintf("%s.%s", *parentName, fieldName) - } - - if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat { - return field.OutputFormat - } else if field.Type == "object" && nil != field.FieldMappings { - format := findTimeStampFormat(timestampFieldName, &field.Name, field.FieldMappings) - if nil != format { - return format - } - } - } - - return nil -} - -func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, string, error) { - mappingEndpointUrl := qwUrl + "/indexes/" + index - qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl) - r, err := cli.Get(mappingEndpointUrl) - if err != nil { - errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) - qwlog.Error(errMsg) - return "", "", err - } - - statusCode := r.StatusCode - - if statusCode < 200 || statusCode >= 400 { - errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl) - qwlog.Error(errMsg) - return "", "", newErrorCreationPayload(statusCode, errMsg) - } - - defer r.Body.Close() - body, err := io.ReadAll(r.Body) - if err != nil { - errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) - qwlog.Error(errMsg) - return "", "", newErrorCreationPayload(statusCode, errMsg) - } - - var payload QuickwitMapping - err = json.Unmarshal(body, &payload) - if err != nil { - errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) - qwlog.Error(errMsg) - return "", "", newErrorCreationPayload(statusCode, errMsg) - } - - timestampFieldName := payload.IndexConfig.DocMapping.TimestampField - timestampFieldFormat := findTimeStampFormat(timestampFieldName, nil, payload.IndexConfig.DocMapping.FieldMappings) - - if nil == timestampFieldFormat { - errMsg := fmt.Sprintf("No format found for field: %s", string(timestampFieldName)) - qwlog.Error(errMsg) - return timestampFieldName, "", newErrorCreationPayload(statusCode, errMsg) - } - - qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s, timestampFieldFormat = %s", timestampFieldName, *timestampFieldFormat)) - return timestampFieldName, *timestampFieldFormat, nil -} - // Creates a Quickwit datasource. func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { qwlog.Debug("Initializing new data source instance") @@ -190,7 +92,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc } if !toOk || !tofOk { - timeField, timeOutputFormat, err = getTimestampFieldInfos(index, settings.URL, httpCli) + timeField, timeOutputFormat, err = GetTimestampFieldInfos(index, settings.URL, httpCli) if nil != err { return nil, err } diff --git a/pkg/quickwit/timestamp_infos.go b/pkg/quickwit/timestamp_infos.go new file mode 100644 index 0000000..5a49bc5 --- /dev/null +++ b/pkg/quickwit/timestamp_infos.go @@ -0,0 +1,111 @@ +package quickwit + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" +) + +type QuickwitMapping struct { + IndexConfig struct { + DocMapping struct { + TimestampField string `json:"timestamp_field"` + FieldMappings []FieldMappings `json:"field_mappings"` + } `json:"doc_mapping"` + } `json:"index_config"` +} + +type QuickwitCreationErrorPayload struct { + Message string `json:"message"` + StatusCode int `json:"status"` +} + +func NewErrorCreationPayload(statusCode int, message string) error { + var payload QuickwitCreationErrorPayload + payload.Message = message + payload.StatusCode = statusCode + json, err := json.Marshal(payload) + if nil != err { + return err + } + + return errors.New(string(json)) +} + +func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) *string { + if nil == fieldMappings { + return nil + } + + for _, field := range fieldMappings { + fieldName := field.Name + if nil != parentName { + fieldName = fmt.Sprintf("%s.%s", *parentName, fieldName) + } + + if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat { + return field.OutputFormat + } else if field.Type == "object" && nil != field.FieldMappings { + format := FindTimeStampFormat(timestampFieldName, &field.Name, field.FieldMappings) + if nil != format { + return format + } + } + } + + return nil +} + +func DecodeTimestampFieldInfos(statusCode int, body []byte) (string, string, error) { + var payload QuickwitMapping + err := json.Unmarshal(body, &payload) + + if err != nil { + errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) + qwlog.Error(errMsg) + return "", "", NewErrorCreationPayload(statusCode, errMsg) + } + + timestampFieldName := payload.IndexConfig.DocMapping.TimestampField + timestampFieldFormat := FindTimeStampFormat(timestampFieldName, nil, payload.IndexConfig.DocMapping.FieldMappings) + + if nil == timestampFieldFormat { + errMsg := fmt.Sprintf("No format found for field: %s", string(timestampFieldName)) + qwlog.Error(errMsg) + return timestampFieldName, "", NewErrorCreationPayload(statusCode, errMsg) + } + + qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s, timestampFieldFormat = %s", timestampFieldName, *timestampFieldFormat)) + return timestampFieldName, *timestampFieldFormat, nil +} + +func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, string, error) { + mappingEndpointUrl := qwUrl + "/indexes/" + index + qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl) + r, err := cli.Get(mappingEndpointUrl) + if err != nil { + errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) + qwlog.Error(errMsg) + return "", "", err + } + + statusCode := r.StatusCode + + if statusCode < 200 || statusCode >= 400 { + errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl) + qwlog.Error(errMsg) + return "", "", NewErrorCreationPayload(statusCode, errMsg) + } + + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + if err != nil { + errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) + qwlog.Error(errMsg) + return "", "", NewErrorCreationPayload(statusCode, errMsg) + } + + return DecodeTimestampFieldInfos(statusCode, body) +} From 1d645408bc4413a47374313814d7a2db826fec3e Mon Sep 17 00:00:00 2001 From: Idriss Neumann Date: Thu, 28 Dec 2023 12:59:11 +0100 Subject: [PATCH 7/7] Issue #20: add backend unit tests --- pkg/quickwit/timestamp_infos_test.go | 313 +++++++++++++++++++++++++++ 1 file changed, 313 insertions(+) create mode 100644 pkg/quickwit/timestamp_infos_test.go diff --git a/pkg/quickwit/timestamp_infos_test.go b/pkg/quickwit/timestamp_infos_test.go new file mode 100644 index 0000000..55f5916 --- /dev/null +++ b/pkg/quickwit/timestamp_infos_test.go @@ -0,0 +1,313 @@ +package quickwit + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDecodeTimestampFieldInfos(t *testing.T) { + t.Run("Test decode timestam field infos", func(t *testing.T) { + t.Run("Test decode simple fields", func(t *testing.T) { + // Given + query := []byte(` + { + "version": "0.6", + "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", + "index_config": { + "version": "0.6", + "index_id": "myindex", + "index_uri": "s3://quickwit-indexes/myindex", + "doc_mapping": { + "field_mappings": [ + { + "name": "foo", + "type": "text", + "fast": false, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "default" + }, + { + "name": "timestamp", + "type": "datetime", + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "rfc3339", + "unix_timestamp" + ], + "output_format": "rfc3339", + "stored": true + } + ], + "tag_fields": [], + "store_source": true, + "index_field_presence": false, + "timestamp_field": "timestamp", + "mode": "dynamic", + "dynamic_mapping": {}, + "partition_key": "foo", + "max_num_partitions": 1, + "tokenizers": [] + }, + "indexing_settings": {}, + "search_settings": { + "default_search_fields": [ + "foo" + ] + }, + "retention": null + }, + "checkpoint": {}, + "create_timestamp": 1701075471, + "sources": [] + } + `) + + // When + timestampFieldName, timestampFieldFormat, err := DecodeTimestampFieldInfos(200, query) + + // Then + require.NoError(t, err) + require.Equal(t, timestampFieldName, "timestamp") + require.Equal(t, timestampFieldFormat, "rfc3339") + }) + + t.Run("Test decode nested fields", func(t *testing.T) { + // Given + query := []byte(` + { + "version": "0.6", + "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", + "index_config": { + "version": "0.6", + "index_id": "myindex", + "index_uri": "s3://quickwit-indexes/myindex", + "doc_mapping": { + "field_mappings": [ + { + "name": "foo", + "type": "text", + "fast": false, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "default" + }, + { + "name": "sub", + "type": "object", + "field_mappings": [ + { + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "rfc3339", + "unix_timestamp" + ], + "name": "timestamp", + "output_format": "rfc3339", + "stored": true, + "type": "datetime" + } + ] + } + ], + "tag_fields": [], + "store_source": true, + "index_field_presence": false, + "timestamp_field": "sub.timestamp", + "mode": "dynamic", + "dynamic_mapping": {}, + "partition_key": "foo", + "max_num_partitions": 1, + "tokenizers": [] + }, + "indexing_settings": {}, + "search_settings": { + "default_search_fields": [ + "foo" + ] + }, + "retention": null + }, + "checkpoint": {}, + "create_timestamp": 1701075471, + "sources": [] + } + `) + + // When + timestampFieldName, timestampFieldFormat, err := DecodeTimestampFieldInfos(200, query) + + // Then + require.NoError(t, err) + require.Equal(t, timestampFieldName, "sub.timestamp") + require.Equal(t, timestampFieldFormat, "rfc3339") + }) + + t.Run("The timestamp field is not at the expected path", func(t *testing.T) { + // Given + query := []byte(` + { + "version": "0.6", + "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", + "index_config": { + "version": "0.6", + "index_id": "myindex", + "index_uri": "s3://quickwit-indexes/myindex", + "doc_mapping": { + "field_mappings": [ + { + "name": "foo", + "type": "text", + "fast": false, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "default" + }, + { + "name": "sub", + "type": "object", + "field_mappings": [ + { + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "rfc3339", + "unix_timestamp" + ], + "name": "timestamp", + "output_format": "rfc3339", + "stored": true, + "type": "datetime" + } + ] + } + ], + "tag_fields": [], + "store_source": true, + "index_field_presence": false, + "timestamp_field": "timestamp", + "mode": "dynamic", + "dynamic_mapping": {}, + "partition_key": "foo", + "max_num_partitions": 1, + "tokenizers": [] + }, + "indexing_settings": {}, + "search_settings": { + "default_search_fields": [ + "foo" + ] + }, + "retention": null + }, + "checkpoint": {}, + "create_timestamp": 1701075471, + "sources": [] + } + `) + + // When + _, _, err := DecodeTimestampFieldInfos(200, query) + + // Then + require.Error(t, err) + }) + + t.Run("The timestamp field has not the right type", func(t *testing.T) { + // Given + query := []byte(` + { + "version": "0.6", + "index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W", + "index_config": { + "version": "0.6", + "index_id": "myindex", + "index_uri": "s3://quickwit-indexes/myindex", + "doc_mapping": { + "field_mappings": [ + { + "name": "foo", + "type": "text", + "fast": false, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "default" + }, + { + "name": "sub", + "type": "object", + "field_mappings": [ + { + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "rfc3339", + "unix_timestamp" + ], + "name": "timestamp", + "output_format": "rfc3339", + "stored": true, + "type": "whatever" + } + ] + } + ], + "tag_fields": [], + "store_source": true, + "index_field_presence": false, + "timestamp_field": "sub.timestamp", + "mode": "dynamic", + "dynamic_mapping": {}, + "partition_key": "foo", + "max_num_partitions": 1, + "tokenizers": [] + }, + "indexing_settings": {}, + "search_settings": { + "default_search_fields": [ + "foo" + ] + }, + "retention": null + }, + "checkpoint": {}, + "create_timestamp": 1701075471, + "sources": [] + } + `) + + // When + _, _, err := DecodeTimestampFieldInfos(200, query) + + // Then + require.Error(t, err) + }) + }) +} + +func TestNewErrorCreationPayload(t *testing.T) { + t.Run("Test marshall creation payload error", func(t *testing.T) { + // When + err := NewErrorCreationPayload(400, "No valid format") + + // Then + require.Error(t, err) + require.ErrorContains(t, err, "\"message\":\"No valid format\"") + require.ErrorContains(t, err, "\"status\":400") + }) +}