-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcalc_indicators.py
143 lines (123 loc) · 5.38 KB
/
calc_indicators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
from dotenv import load_dotenv
import mysql.connector
import pandas as pd
import pandas_ta as ta
def get_db_connection():
"""
بارگذاری اطلاعات اتصال از .env و برقراری ارتباط با دیتابیس MySQL.
"""
load_dotenv()
return mysql.connector.connect(
host=os.getenv("MYSQL_HOST", "localhost"),
user=os.getenv("MYSQL_USER", "root"),
password=os.getenv("MYSQL_PASSWORD", "password"),
database=os.getenv("MYSQL_DATABASE", "axon_trading")
)
def read_ohlcv_from_db(symbol, timeframe):
"""
دادههای قیمتی (OHLCV) را از جدول ohlcv بر اساس symbol و timeframe میخواند.
خروجی یک دیتافریم پانداس است.
"""
conn = get_db_connection()
query = """
SELECT open_price, high_price, low_price, close_price, volume, bar_time
FROM ohlcv
WHERE symbol = %s AND timeframe = %s
ORDER BY bar_time ASC
"""
df = pd.read_sql(query, conn, params=[symbol, timeframe])
conn.close()
# تغییر نام ستونها برای سازگاری با pandas_ta
df.rename(columns={
"open_price": "Open",
"high_price": "High",
"low_price": "Low",
"close_price": "Close",
"volume": "Volume",
"bar_time": "DateTime"
}, inplace=True)
# تبدیل نوع دیتاتایم و تنظیم آن به عنوان ایندکس
df["DateTime"] = pd.to_datetime(df["DateTime"])
df.set_index("DateTime", inplace=True)
# در صورتی که Duplicate index داشته باشیم، پاک میکنیم
if df.index.duplicated().any():
# اولین مقدار را نگه میداریم و بقیه را حذف میکنیم
df = df[~df.index.duplicated(keep='first')]
# یا میتوانیم روش دیگری برای ادغام رکوردهای تکراری استفاده کنیم
return df
def calculate_indicators(df):
"""
محاسبهی اندیکاتورهای مختلف (RSI, MACD, ATR, Stochastic) و افزودن آنها به df.
"""
# RSI (طول ۱۴)
df["RSI"] = ta.rsi(df["Close"], length=14)
# MACD
macd_df = ta.macd(df["Close"], fast=12, slow=26, signal=9)
# برمیگرداند: MACD_12_26_9, MACDh_12_26_9 (هیستوگرام), MACDs_12_26_9 (سیگنال)
df["MACD_line"] = macd_df["MACD_12_26_9"]
df["MACD_signal"] = macd_df["MACDs_12_26_9"]
df["MACD_hist"] = macd_df["MACDh_12_26_9"]
# ATR (طول ۱۴)
df["ATR"] = ta.atr(df["High"], df["Low"], df["Close"], length=14)
# استوکستیک (K=14, D=3, SmoothK=3)
stoch_df = ta.stoch(df["High"], df["Low"], df["Close"], k=14, d=3, smooth_k=3)
df["Stoch_K"] = stoch_df["STOCHk_14_3_3"]
df["Stoch_D"] = stoch_df["STOCHd_14_3_3"]
return df
def save_technical_data(df, symbol, timeframe):
"""
درج نتایج اندیکاتورها در جدول technical_data با استفاده از INSERT IGNORE.
در صورت وجود رکورد تکراری (symbol, timeframe, data_time) خطایی تولید نمیکند و رکورد جدید نادیده گرفته میشود.
اگر مایل به آپدیت رکورد تکراری هستید، از ON DUPLICATE KEY UPDATE استفاده کنید (مثال در پایین).
"""
if df.empty:
print(f"No OHLCV data to save indicators for {symbol} ({timeframe}).")
return
conn = get_db_connection()
cursor = conn.cursor()
# INSERT IGNORE -> تکراریها را نادیده میگیرد
sql_insert = """
INSERT IGNORE INTO technical_data
(symbol, timeframe, data_time, rsi, macd_line, macd_signal, macd_hist, atr, stoch_k, stoch_d)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
rows_inserted = 0
for idx, row in df.iterrows():
data_time = idx.to_pydatetime()
# تبدیل NaN به None
def val(x):
return float(x) if pd.notnull(x) else None
insert_data = (
symbol,
timeframe,
data_time,
val(row["RSI"]),
val(row["MACD_line"]),
val(row["MACD_signal"]),
val(row["MACD_hist"]),
val(row["ATR"]),
val(row["Stoch_K"]),
val(row["Stoch_D"])
)
cursor.execute(sql_insert, insert_data)
rows_inserted += 1
conn.commit()
cursor.close()
conn.close()
print(f"Saved up to {rows_inserted} indicator rows (ignore duplicates) for {symbol} ({timeframe}).")
if __name__ == "__main__":
# نمونه: محاسبه اندیکاتورها برای چند نماد و تایمفریم
symbols = ["BTCUSD", "ETHUSD"]
timeframes = ["M30", "M5", "M1"]
for sym in symbols:
for tf in timeframes:
print(f"\nCalculating indicators for {sym} ({tf}) ...")
df_ohlcv = read_ohlcv_from_db(sym, tf)
if df_ohlcv.empty:
print(f"No OHLCV data found for {sym} ({tf}). Skipping.")
continue
# محاسبه اندیکاتورها
df_ind = calculate_indicators(df_ohlcv)
# درج در دیتابیس
save_technical_data(df_ind, sym, tf)