-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdynamo_db.py
163 lines (136 loc) · 5.67 KB
/
dynamo_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import calendar
import random
import string
import time
import boto3
from product import Product
class DynamoDBProduct(Product):
"""
Save product data into dynamo db
:param table_name: name of table, set to :code:`"Products"`
:type table_name: str
:param client: aws client, set as dynamodb
:type client: boto3.client
.. todo::
should we have a table class instead?
"""
def __init__(self, *args):
Product.__init__(self, *args)
self.table_name = "Products"
self.client = boto3.client(service_name='dynamodb')
def putItem(self):
"""Save table in dynamo db?. Print response"""
try:
response = self.client.put_item(TableName=self.table_name, Item=self.getItem(),
ReturnConsumedCapacity='TOTAL')
print(response)
return True
except Exception as e:
print(e)
return False
def createTable(self):
"""Create table"""
try:
response = self.client.create_table(AttributeDefinitions=Product.getKeyAttributeDefs(),
TableName=self.table_name,
KeySchema=Product.getKeySchema(), BillingMode='PAY_PER_REQUEST')
responseDict = dict(response)
print("Successfully created table " + responseDict['TableDescription']['TableName'])
return True
except Exception as e:
print(e)
return False
def deleteTable(self):
"""Delete table"""
try:
response = self.client.delete_table(TableName=self.table_name)
except Exception as e:
print(e)
def ensureTableDeleted(self):
while True:
try:
response = self.client.describe_table(TableName=self.table_name)
time.sleep(2)
except Exception as e:
break
def ensureTableActive(self):
while True:
response = self.client.describe_table(TableName=self.table_name)
responseDict = dict(response)
if responseDict['Table']['TableStatus'] == 'ACTIVE':
break
time.sleep(1)
def dynamoDbTutorial():
"""Tutorial workflow for dynamo db"""
print("Starting tutorial")
# Generate random asin and price using current unix timestamp for tutorial
asin = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(10))
timestamp = str(calendar.timegm(time.gmtime()))
price = str(round(random.uniform(0, 100), 2))
products_db = DynamoDBProduct(asin, timestamp, price)
products_db.deleteTable()
products_db.ensureTableDeleted()
if not products_db.createTable():
print("Error while creating table, please consult the log")
return
products_db.ensureTableActive()
if not products_db.putItem():
print("Error while inserting an item into the table, please consult the log")
return
def queryTutorial():
# for i in range(0, 100):
# asin = ''.join(
# random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(10))
# timestamp = str(calendar.timegm(time.gmtime()))
# price = str(round(random.uniform(0, 100), 2))
# products_db = DynamoDBProduct(asin, timestamp, price)
# product_db2 = DynamoDBProduct(asin, str(calendar.timegm(time.gmtime()) + 5), price)
# if not products_db.putItem():
# print("Error while inserting an item into the table, please consult the log")
# return
# if not product_db2.putItem():
# print("Error while inserting an item into the table, please consult the log")
# return
tableName = 'Products'
#select = 'ALL_ATTRIBUTES'
# expressionAttributeValues = {':v3' : {'S' : 'Dv2PNdwQ7v'} }
#
# prod = DynamoDBProduct('Dv2PNdwQ7v', '1587933252', '178')
# prod.putItem()
#
# response = boto3.client(service_name='dynamodb').query(
# TableName=tableName,
# Select=select,
# KeyConditionExpression='ASIN = :v3',
# ExpressionAttributeValues = expressionAttributeValues)
# prod = DynamoDBProduct('Test', str(calendar.timegm(time.gmtime())), '120')
# prod.putItem()
# prod = DynamoDBProduct('Test', str(calendar.timegm(time.gmtime())+5), '189')
# prod.putItem()
# This is query challenge to print asin info for skus which have increased in price. Need to figure out more efficient way to do this
response = boto3.client(service_name='dynamodb').scan(
TableName=tableName,
ProjectionExpression='ASIN')
asinSet = set()
for i, item in enumerate(response['Items']):
asin = item['ASIN']['S']
if asin not in asinSet:
asinSet.add(asin)
expressionAttributeValues = {':v1': {'S': asin}}
response = boto3.client(service_name='dynamodb').query(
TableName=tableName,
Select=select,
Limit=2,
ScanIndexForward=False,
ExpressionAttributeValues=expressionAttributeValues,
KeyConditionExpression='ASIN = :v1')
if len(response['Items']) >= 2:
latest = response['Items'][0]
previous = response['Items'][1]
latestPrice = latest['Price']['N']
previousPrice = previous['Price']['N']
if latestPrice > previousPrice:
print(latest['ASIN']['S'], latest['Price']['N'], latest['timeStamp']['N'])
if __name__ == "__main__":
#dynamoDbTutorial()
queryTutorial()