This is an Amazon Redshift Streaming Ingestion from Kinesis Data Streams project for CDK development with Python.
The cdk.json
file tells the CDK Toolkit how to execute your app.
This project is set up like a standard Python project. The initialization
process also creates a virtualenv within this project, stored under the .venv
directory. To create the virtualenv it assumes that there is a python3
(or python
for Windows) executable in your path with access to the venv
package. If for any reason the automatic creation of the virtualenv fails,
you can create the virtualenv manually.
To manually create a virtualenv on MacOS and Linux:
$ python3 -m venv .venv
After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.
$ source .venv/bin/activate
If you are a Windows platform, you would activate the virtualenv like this:
% .venv\Scripts\activate.bat
Once the virtualenv is activated, you can install the required dependencies.
$ pip install -r requirements.txt
ℹ️ Before you deploy this project, you should create an AWS Secret for your Redshift Serverless Admin user. You can create an AWS Secret like this:
$ aws secretsmanager create-secret \ --name "your_redshift_secret_name" \ --description "(Optional) description of the secret" \ --secret-string '{"admin_username": "admin", "admin_user_password": "password_of_at_last_8_characters"}'
At this point you can now synthesize the CloudFormation template for this code.
(.venv) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text) (.venv) $ export CDK_DEFAULT_REGION=$(aws configure get region) (.venv) $ cdk synth --all \ -c vpc_name='your-existing-vpc-name' \ -c aws_secret_name='your_redshift_secret_name'
Use cdk deploy
command to create the stack shown above.
(.venv) $ cdk deploy --all \ -c vpc_name='your-existing-vpc-name' \ -c aws_secret_name='your_redshift_secret_name'
To add additional dependencies, for example other CDK libraries, just add
them to your setup.py
file and rerun the pip install -r requirements.txt
command.
Delete the CloudFormation stack by running the below command.
(.venv) $ cdk destroy --force --all \ -c vpc_name='your-existing-vpc-name' \ -c aws_secret_name='your_redshift_secret_name'
cdk ls
list all stacks in the appcdk synth
emits the synthesized CloudFormation templatecdk deploy
deploy this stack to your default AWS account/regioncdk diff
compare deployed stack with current statecdk docs
open CDK documentation
Enjoy!
These steps show you how to configure the materialized view to ingest data.
-
Connect to the Redshift query editor v2
-
Create an external schema to map the data from Kinesis to a Redshift object.
CREATE EXTERNAL SCHEMA evdata FROM KINESIS IAM_ROLE 'arn:aws:iam::{AWS-ACCOUNT-ID}:role/RedshiftStreamingRole';
For information about how to configure the IAM role, see Getting started with streaming ingestion from Amazon Kinesis Data Streams.
-
Create a materialized view to consume the stream data.
Note that Kinesis stream names are case-sensitive and can contain both uppercase and lowercase letters. To use case-sensitive identifiers, you can set the configuration setting
enable_case_sensitive_identifier
to true at either the session or cluster level.-- To create and use case sensitive identifiers SET enable_case_sensitive_identifier TO true; -- To check if enable_case_sensitive_identifier is turned on SHOW enable_case_sensitive_identifier;
The following example defines a materialized view with JSON source data.
Create the materialized view so it’s distributed on the UUID value from the stream and is sorted by therefresh_time
value. Therefresh_time
is the start time of the materialized view refresh that loaded the record. The materialized view is set to auto refresh and will be refreshed as data keeps arriving in the stream.CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS SELECT refresh_time, approximate_arrival_timestamp, partition_key, shard_id, sequence_number, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'_id',true)::character(36) as ID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'clusterID',true)::varchar(30) as clusterID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'stationID',true)::INTEGER as stationID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'spaceID',true)::varchar(100) as spaceID, json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'timezone',true)::varchar(30)as timezone, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'userID',true)::varchar(30) as userID FROM evdata."ev_stream_data" WHERE LENGTH(kinesis_data) < 65355;
The code above filters records larger than 65355 bytes. This is because
json_extract_path_text
is limited to varchar data type. The Materialized view should be defined so that there aren’t any type conversion errors. -
Refreshing materialized views for streaming ingestion
The materialized view is auto-refreshed as long as there is new data on the KDS stream. You can also disable auto-refresh and run a manual refresh or schedule a manual refresh using the Redshift Console UI.
To update the data in a materialized view, you can use theREFRESH MATERIALIZED VIEW
statement at any time.REFRESH MATERIALIZED VIEW ev_station_data_extract;
- Query data in the materialized view.
SELECT * FROM ev_station_data_extract;
- Query the refreshed materialized view to get usage statistics.
SELECT to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') as connectiontime ,SUM(kWhDelivered) AS Energy_Consumed ,count(distinct userID) AS #Users FROM ev_station_data_extract GROUP BY to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') ORDER BY 1 DESC;
- Amazon Redshift - Getting started with streaming ingestion from Amazon Kinesis Data Streams
- Amazon Redshift - Electric vehicle station-data streaming ingestion tutorial, using Kinesis
- Amazon Redshift Configuration Reference - enable_case_sensitive_identifier
- Real-time analytics with Amazon Redshift streaming ingestion (2022-04-27)
- Mimesis: Fake Data Generator - Mimesis is a high-performance fake data generator for Python, which provides data for a variety of purposes in a variety of languages