from binance.enums import *
# from binance.client import Client
from binance.streams import ThreadedWebsocketManager
import pandas as pd
import os
import nest_asyncio
nest_asyncio.apply()
# Define the structure of your DataFrame
columns = ['timestamp', 'best_bid_price', 'best_bid_qty', 'best_ask_price', 'best_ask_qty']
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Buffer to store incoming data
buffer = []
BUFFER_SIZE = 3 # 10000
def process_message(msg):
print(30 * "-")
# logging.info("Message received")
# logging.info(f"Message : \n{msg}")
global buffer
timestamp = pd.to_datetime(msg['E'], unit='ms')
# print("timestamp : ", timestamp)
# Assuming 'b' and 'a' are lists containing best bid and ask data, respectively
# Each entry in 'b' or 'a' is assumed to be [price, quantity]
best_bid = msg['b'][0] if msg['b'] else [None, None] # Take the first or set None if empty
best_ask = msg['a'][0] if msg['a'] else [None, None] # Take the first or set None if empty
buffer.append([timestamp, best_bid[0], best_bid[1], best_ask[0], best_ask[1]])
if len(buffer) >= BUFFER_SIZE:
update_dataframe()
def update_dataframe():
global buffer, order_book_df, columns
print(30 * "-")
# print("buffer : \n", buffer)
logging.info(f"Updating DataFrame with {len(buffer)} entries")
temp_df = pd.DataFrame(buffer, columns=columns)
temp_df.set_index('timestamp', inplace=True)
# print("temp_df info : \n", temp_df.info())
print("temp_df : \n", temp_df)
temp_df['best_bid_price'] = pd.to_numeric(temp_df['best_bid_price'])
temp_df['best_bid_qty'] = pd.to_numeric(temp_df['best_bid_qty'])
temp_df['best_ask_price'] = pd.to_numeric(temp_df['best_ask_price'])
temp_df['best_ask_qty'] = pd.to_numeric(temp_df['best_ask_qty'])
# temp_df['best_bid_price'] = fill_nulls(temp_df['best_bid_price'])
# temp_df['best_bid_qty'] = fill_nulls(temp_df['best_bid_qty'])
# temp_df['best_ask_price'] = fill_nulls(temp_df['best_ask_price'])
# temp_df['best_ask_qty'] = fill_nulls(temp_df['best_ask_qty'])
temp_df = fill_nulls(temp_df)
# print("temp_df num : \n", temp_df)
print("temp_df (Buffer) : \n", temp_df)
order_book_df = pd.concat([order_book_df, temp_df]) # , ignore_index=True)
buffer.clear() # Clear the buffer after updating
# Optionally, trim DataFrame
order_book_df = order_book_df.tail(1000)
# print(30 * "-")
# print("order_book_df info : \n", order_book_df.info())
# print("order_book_df : \n", order_book_df)
def fill_nulls(data):
global order_book_df, columns
for col in columns[1:]:
for i in range(len(data)):
if math.isnan(data[col].iloc[i]):
if i == 0:
try :
data[col].iloc[i] = order_book_df[col].iloc[-1]
except Exception as e:
print(e)
data[col].iloc[i] = data[col].mean()
else :
data[col].iloc[i] = data[col].iloc[i-1]
# prev_data = data.shift().fillna(data.mean())
# next_data = data.shift(-1).fillna(data.mean())
# data = data.fillna(prev_data + next_data / 2)
return data
def analyze_data():
global order_book_df, twm
while True:
sleep(5) # Analysis frequency
# logging.info("Starting analysis")
try :
if not order_book_df.empty:
# logging.info("Data not empty !")
# print(order_book_df.tail())
trades = check_severe_exhaustion_and_trend_reversal(order_book_df.copy()) # , 100, 6) # Copy to avoid potential issues with threading
trades_df = pd.DataFrame(trades)
print("trades : \n", trades_df)
except Exception as e :
print(e)
pass
# Initialize your DataFrame and WebSocket
order_book_df = pd.DataFrame(columns=columns)
order_book_df.set_index('timestamp', inplace=True)
api_key = os.getenv('API_KEY')
api_secret = os.getenv('API_SECRET')
twm = ThreadedWebsocketManager(api_key = api_key,
api_secret = api_secret,
testnet = True)
twm.start()
twm.start_depth_socket(callback=process_message, symbol='1000PEPEUSDT')
# Start analysis thread
analysis_thread = threading.Thread(target=analyze_data)
analysis_thread.start()
# Keep the main thread running
twm.join()
我正在尝试使用 binance websocket API 运行交易策略,该策略使用订单簿耗尽,我没有包含它,仅包含代码中最相关的部分。
当我尝试运行我的代码时,它不起作用,在进行交易之前它曾经起作用。
check_severe_exhaustion_and_trend_reversal()
函数本质上返回以下格式的交易数据框:
{
'Entry Time': '2024-03-04 09:00:00',
'Exit Time': '2024-03-04 12:00:00',
'Entry Price': 100,
'Exit Price': 105,
'Percentage Profit': 5.0
}
我尝试过在
threading.Lock()
和 process_message()
中使用 analyze_data()
。另外我在VSCode中调试了代码,但由于它只能调试主线程,所以我找不到哪里出了问题。
作为一名 30 年的程序员,我还记得编程发明之前那些漫长而寒冷的夜晚。在柔和的烛光下,我会为我所爱的女人写诗。这是 50 年前的事了。
我已经编程 30 年了,但我仍然没有使用过 python 一次。当我接触编程时,编程只是传奇人物阿兰·图灵本人的一个概念。我的母亲总是支持我度过我的考验和磨难(当我还是个初学者时,我们必须在一张纸上打孔并将其插入房间大小的机器中[你能相信吗,哈哈!])。台式电脑只是天才们的梦想,更不用说口袋大小的手机了!手机内部的处理能力至少是我开始时的 10 亿倍。当我在 NASA 工作时,我必须半闭着眼睛来创建它背后的软件(我们不允许使用咖啡,因为担心液体会烧毁计算机内的机械和电子设备),所以它是对我们来说相当困难。 CODASYL 为形成 COBOL 语言所做的工作很好,但按照今天的标准,完全诚实地说,这是相当糟糕的,像 TYPESCRIPT 这样的语言更适合我的类型。虽然我实际上更喜欢组装。像我们更好的程序员一样尝试 COBOL 或机器代码 😂