This repository has been archived by the owner on Sep 18, 2024. It is now read-only.
forked from HHS/simpler-grants-gov
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Issue HHS#2092] Populate the search index from the opportunity tables (
#47) Fixes HHS#2092 Setup a script to populate the search index by loading opportunities from the DB, jsonify'ing them, loading them into a new index, and then aliasing that index. Several utilities were created for simplifying working with the OpenSearch client (a wrapper for setting up configuration / patterns) Iterating over the opportunities and doing something with them is a common pattern in several of our scripts, so nothing is really different there. The meaningful implementation is how we handle creating and aliasing the index. In OpenSearch you can give any index an alias (including putting multiple indexes behind the same alias). The approach is pretty simple: * Create an index * Load opportunities into the index * Atomically swap the index backing the `opportunity-index-alias` * Delete the old index if they exist This approach means that our search endpoint just needs to query the alias, and we can keep making new indexes and swapping them out behind the scenes. Because we could remake the index every few minutes, if we ever need to re-configure things like the number of shards, or any other index-creation configuration, we just update that in this script and wait for it to run again. I ran this locally after loading `83250` records, and it took about 61s. You can run this locally yourself by doing: ```sh make init make db-seed-local poetry run flask load-search-data load-opportunity-data ``` If you'd like to see the data, you can test it out on http://localhost:5601/app/dev_tools#/console - here is an example query that filters by the word `research` across a few fields and filters to just forecasted/posted. ```json GET opportunity-index-alias/_search { "size": 25, "from": 0, "query": { "bool": { "must": [ { "simple_query_string": { "query": "research", "default_operator": "AND", "fields": ["agency.keyword^16", "opportunity_title^2", "opportunity_number^12", "summary.summary_description", "opportunity_assistance_listings.assistance_listing_number^10", "opportunity_assistance_listings.program_title^4"] } } ], "filter": [ { "terms": { "opportunity_status": [ "forecasted", "posted" ] } } ] } } } ```
- Loading branch information
Showing
14 changed files
with
455 additions
and
80 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
from src.adapters.search.opensearch_client import SearchClient, get_opensearch_client | ||
from src.adapters.search.opensearch_client import SearchClient | ||
from src.adapters.search.opensearch_config import get_opensearch_config | ||
|
||
__all__ = ["SearchClient", "get_opensearch_client", "get_opensearch_config"] | ||
__all__ = ["SearchClient", "get_opensearch_config"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# import all files so they get initialized and attached to the blueprint | ||
from . import load_search_data # noqa: F401 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import logging | ||
from enum import StrEnum | ||
from typing import Iterator, Sequence | ||
|
||
from pydantic import Field | ||
from pydantic_settings import SettingsConfigDict | ||
from sqlalchemy import select | ||
from sqlalchemy.orm import noload, selectinload | ||
|
||
import src.adapters.db as db | ||
import src.adapters.search as search | ||
from src.api.opportunities_v0_1.opportunity_schemas import OpportunityV01Schema | ||
from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity | ||
from src.task.task import Task | ||
from src.util.datetime_util import get_now_us_eastern_datetime | ||
from src.util.env_config import PydanticBaseEnvConfig | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig): | ||
model_config = SettingsConfigDict(env_prefix="LOAD_OPP_SEARCH_") | ||
|
||
shard_count: int = Field(default=1) # LOAD_OPP_SEARCH_SHARD_COUNT | ||
replica_count: int = Field(default=1) # LOAD_OPP_SEARCH_REPLICA_COUNT | ||
|
||
# TODO - these might make sense to come from some sort of opportunity-search-index-config? | ||
# look into this a bit more when we setup the search endpoint itself. | ||
alias_name: str = Field(default="opportunity-index-alias") # LOAD_OPP_SEARCH_ALIAS_NAME | ||
index_prefix: str = Field(default="opportunity-index") # LOAD_OPP_INDEX_PREFIX | ||
|
||
|
||
class LoadOpportunitiesToIndex(Task): | ||
class Metrics(StrEnum): | ||
RECORDS_LOADED = "records_loaded" | ||
|
||
def __init__( | ||
self, | ||
db_session: db.Session, | ||
search_client: search.SearchClient, | ||
config: LoadOpportunitiesToIndexConfig | None = None, | ||
) -> None: | ||
super().__init__(db_session) | ||
|
||
self.search_client = search_client | ||
|
||
if config is None: | ||
config = LoadOpportunitiesToIndexConfig() | ||
self.config = config | ||
|
||
current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S") | ||
self.index_name = f"{self.config.index_prefix}-{current_timestamp}" | ||
self.set_metrics({"index_name": self.index_name}) | ||
|
||
def run_task(self) -> None: | ||
# create the index | ||
self.search_client.create_index( | ||
self.index_name, | ||
shard_count=self.config.shard_count, | ||
replica_count=self.config.replica_count, | ||
) | ||
|
||
# load the records | ||
for opp_batch in self.fetch_opportunities(): | ||
self.load_records(opp_batch) | ||
|
||
# handle aliasing of endpoints | ||
self.search_client.swap_alias_index( | ||
self.index_name, self.config.alias_name, delete_prior_indexes=True | ||
) | ||
|
||
def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]: | ||
""" | ||
Fetch the opportunities in batches. The iterator returned | ||
will give you each individual batch to be processed. | ||
Fetches all opportunities where: | ||
* is_draft = False | ||
* current_opportunity_summary is not None | ||
""" | ||
return ( | ||
self.db_session.execute( | ||
select(Opportunity) | ||
.join(CurrentOpportunitySummary) | ||
.where( | ||
Opportunity.is_draft.is_(False), | ||
CurrentOpportunitySummary.opportunity_status.isnot(None), | ||
) | ||
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries)) | ||
.execution_options(yield_per=5000) | ||
) | ||
.scalars() | ||
.partitions() | ||
) | ||
|
||
def load_records(self, records: Sequence[Opportunity]) -> None: | ||
logger.info("Loading batch of opportunities...") | ||
schema = OpportunityV01Schema() | ||
json_records = [] | ||
|
||
for record in records: | ||
logger.info( | ||
"Preparing opportunity for upload to search index", | ||
extra={ | ||
"opportunity_id": record.opportunity_id, | ||
"opportunity_status": record.opportunity_status, | ||
}, | ||
) | ||
json_records.append(schema.dump(record)) | ||
self.increment(self.Metrics.RECORDS_LOADED) | ||
|
||
self.search_client.bulk_upsert(self.index_name, json_records, "opportunity_id") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
import src.adapters.db as db | ||
import src.adapters.search as search | ||
from src.adapters.db import flask_db | ||
from src.search.backend.load_opportunities_to_index import LoadOpportunitiesToIndex | ||
from src.search.backend.load_search_data_blueprint import load_search_data_blueprint | ||
|
||
|
||
@load_search_data_blueprint.cli.command( | ||
"load-opportunity-data", help="Load opportunity data from our database to the search index" | ||
) | ||
@flask_db.with_db_session() | ||
def load_opportunity_data(db_session: db.Session) -> None: | ||
search_client = search.SearchClient() | ||
|
||
LoadOpportunitiesToIndex(db_session, search_client).run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from apiflask import APIBlueprint | ||
|
||
load_search_data_blueprint = APIBlueprint( | ||
"load-search-data", __name__, enable_openapi=False, cli_group="load-search-data" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.