Skip to content

Commit

Permalink
Issue #4: find the timestamp field dynamically
Browse files Browse the repository at this point in the history
  • Loading branch information
idrissneumann committed Dec 14, 2023
1 parent 67f56ba commit 4e7a873
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 50 deletions.
86 changes: 72 additions & 14 deletions pkg/quickwit/quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -26,6 +25,69 @@ type QuickwitDatasource struct {
dsInfo es.DatasourceInfo
}

type QuickwitMapping struct {
IndexConfig struct {
DocMapping struct {
TimestampField string `json:"timestamp_field"`
FieldMappings []struct {
Name string `json:"name"`
InputFormats []string `json:"input_formats"`
} `json:"field_mappings"`
} `json:"doc_mapping"`
} `json:"index_config"`
}

func getTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, string, error) {
mappingEndpointUrl := qwUrl + "/indexes/" + index
qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl)
r, err := cli.Get(mappingEndpointUrl)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", "", err
}

statusCode := r.StatusCode
if statusCode < 200 || statusCode >= 400 {
errMsg := fmt.Sprintf("Error when calling url = %s: statusCode = %d", mappingEndpointUrl, statusCode)
qwlog.Error(errMsg)
return "", "", fmt.Errorf(errMsg)
}

defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", "", err
}

var payload QuickwitMapping
err = json.Unmarshal(body, &payload)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: %s", string(body))
qwlog.Error(errMsg)
return "", "", fmt.Errorf(errMsg)
}

timestampFieldName := payload.IndexConfig.DocMapping.TimestampField
timestampFieldFormat := "undef"
for _, field := range payload.IndexConfig.DocMapping.FieldMappings {
if field.Name == timestampFieldName && len(field.InputFormats) > 0 {
timestampFieldFormat = field.InputFormats[0]
break
}
}

if timestampFieldFormat == "undef" {
errMsg := fmt.Sprintf("No format found for field: %s", string(timestampFieldName))
return timestampFieldName, "", fmt.Errorf(errMsg)
}

qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s, timestampFieldFormat = %s", timestampFieldName, timestampFieldFormat))
return timestampFieldName, timestampFieldFormat, nil
}

// Creates a Quickwit datasource.
func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
qwlog.Debug("Initializing new data source instance")
Expand All @@ -50,19 +112,8 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
return nil, err
}

timeField, ok := jsonData["timeField"].(string)
if !ok {
return nil, errors.New("timeField cannot be cast to string")
}

if timeField == "" {
return nil, errors.New("a time field name is required")
}

timeOutputFormat, ok := jsonData["timeOutputFormat"].(string)
if !ok {
return nil, errors.New("timeOutputFormat cannot be cast to string")
}
timeField, toOk := jsonData["timeField"].(string)
timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string)

logLevelField, ok := jsonData["logLevelField"].(string)
if !ok {
Expand Down Expand Up @@ -96,6 +147,13 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
maxConcurrentShardRequests = 256
}

if !toOk || !tofOk {
timeField, timeOutputFormat, err = getTimestampFieldInfos(index, settings.URL, httpCli)
if nil != err {
return nil, err
}
}

configuredFields := es.ConfiguredFields{
TimeField: timeField,
TimeOutputFormat: timeOutputFormat,
Expand Down
18 changes: 0 additions & 18 deletions src/configuration/ConfigEditor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,6 @@ export const QuickwitDetails = ({ value, onChange }: DetailsProps) => {
width={40}
/>
</InlineField>
<InlineField label="Timestamp field" labelWidth={26} tooltip="Timestamp field of your index. Required.">
<Input
id="quickwit_index_timestamp_field"
value={value.jsonData.timeField}
onChange={(event) => onChange({ ...value, jsonData: {...value.jsonData, timeField: event.currentTarget.value}})}
placeholder="timestamp"
width={40}
/>
</InlineField>
<InlineField label="Timestamp output format" labelWidth={26} tooltip="If you don't remember the output format, check the datasource and the error message will give you a hint.">
<Input
id="quickwit_index_timestamp_field_output_format"
value={value.jsonData.timeOutputFormat}
onChange={(event) => onChange({ ...value, jsonData: {...value.jsonData, timeOutputFormat: event.currentTarget.value}})}
placeholder="unix_timestamp_millisecs"
width={40}
/>
</InlineField>
<InlineField label="Message field name" labelWidth={26} tooltip="Field used to display a log line in the Explore view">
<Input
id="quickwit_log_message_field"
Expand Down
48 changes: 30 additions & 18 deletions src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,31 @@ export class QuickwitDataSource
super(instanceSettings);
const settingsData = instanceSettings.jsonData || ({} as QuickwitOptions);
this.index = settingsData.index || '';
this.timeField = settingsData.timeField || '';
this.timeOutputFormat = settingsData.timeOutputFormat || '';
this.logMessageField = settingsData.logMessageField || '';
this.logLevelField = settingsData.logLevelField || '';
this.timeField = ''
this.timeOutputFormat = ''
this.queryBuilder = new ElasticQueryBuilder({
timeField: this.timeField,
});
from(this.getResource('indexes/' + this.index)).pipe(
map((indexMetadata) => {
let fields = getAllFields(indexMetadata.index_config.doc_mapping.field_mappings);
let timestampFieldName = indexMetadata.index_config.doc_mapping.timestamp_field
let timestampField = fields.find((field) => field.json_path === timestampFieldName);
let timestampFormat = timestampField ? timestampField.field_mapping.output_format || '' : ''
let timestampFieldInfos = { 'field': timestampFieldName, 'format': timestampFormat }
console.log("timestampFieldInfos = " + JSON.stringify(timestampFieldInfos))
return timestampFieldInfos
})
).subscribe(result => {
this.timeField = result.field;
this.timeOutputFormat = result.format;
this.queryBuilder = new ElasticQueryBuilder({
timeField: this.timeField,
});
});

this.logMessageField = settingsData.logMessageField || '';
this.logLevelField = settingsData.logLevelField || '';
this.dataLinks = settingsData.dataLinks || [];
this.languageProvider = new ElasticsearchLanguageProvider(this);
}
Expand All @@ -111,12 +129,7 @@ export class QuickwitDataSource
message: 'Cannot save datasource, `index` is required',
};
}
if (this.timeField === '' ) {
return {
status: 'error',
message: 'Cannot save datasource, `timeField` is required',
};
}

return lastValueFrom(
from(this.getResource('indexes/' + this.index)).pipe(
mergeMap((indexMetadata) => {
Expand Down Expand Up @@ -147,21 +160,19 @@ export class QuickwitDataSource
if (this.timeField === '') {
return `Time field must not be empty`;
}
if (indexMetadata.index_config.doc_mapping.timestamp_field !== this.timeField) {
return `No timestamp field named '${this.timeField}' found`;
}

let fields = getAllFields(indexMetadata.index_config.doc_mapping.field_mappings);
let timestampField = fields.find((field) => field.json_path === this.timeField);

// Should never happen.
if (timestampField === undefined) {
return `No field named '${this.timeField}' found in the doc mapping. This should never happen.`;
}
if (timestampField.field_mapping.output_format !== this.timeOutputFormat) {
return `Timestamp output format is declared as '${timestampField.field_mapping.output_format}' in the doc mapping, not '${this.timeOutputFormat}'.`;
}

let timeOutputFormat = timestampField.field_mapping.output_format || 'unknown';
const supportedTimestampOutputFormats = ['unix_timestamp_secs', 'unix_timestamp_millis', 'unix_timestamp_micros', 'unix_timestamp_nanos', 'iso8601', 'rfc3339'];
if (!supportedTimestampOutputFormats.includes(this.timeOutputFormat)) {
return `Timestamp output format '${this.timeOutputFormat} is not yet supported.`;
if (!supportedTimestampOutputFormats.includes(timeOutputFormat)) {
return `Timestamp output format '${timeOutputFormat} is not yet supported.`;
}
return;
}
Expand Down Expand Up @@ -310,6 +321,7 @@ export class QuickwitDataSource
ignore_unavailable: true,
index: this.index,
});

let esQuery = JSON.stringify(this.queryBuilder.getTermsQuery(queryDef));
esQuery = esQuery.replace(/\$timeFrom/g, range.from.valueOf().toString());
esQuery = esQuery.replace(/\$timeTo/g, range.to.valueOf().toString());
Expand Down

0 comments on commit 4e7a873

Please sign in to comment.