我的查询详情:
我正在尝试制作一个自动交易机器人(个人使用)。
我发现TradingView上的这个指标非常准确。不幸的是,这不是一种策略,因此我无法使用预先构建的界面来自动化交易。 TradingView 上的 Capissimo 称之为 kNN Machine Learning,大家可以参考一下。 (代码如下)
我决定利用我的Python知识自己构建一个机器人。 (下面也有代码)
如何将这个逻辑转换为Python?而且我也不确定如何执行代码以使其在币安上连续进行交易。
我已经编写了逻辑,并使用 Binance API 将其输出到示例图表上,但它似乎不是很准确。 (还有下面的代码)
我已经在 Visual Studio Code 上编写了它。
Pinescript kNN 指标代码:
// This source code is subject to the terms of the Mozilla Public License 2.0 at https://mozilla.org/MPL/2.0/
// © capissimo
//@version=5
indicator('Machine Learning: kNN-based Strategy', 'ML-kNN', true, max_labels_count=300, format=format.price, precision=2, timeframe="", timeframe_gaps=true)
// kNN-based Strategy (FX and Crypto)
// Description:
// This strategy uses a classic machine learning algorithm - k Nearest Neighbours (kNN) -
// to let you find a prediction for the next (tomorrow's, next month's, etc.) market move.
// Being an unsupervised machine learning algorithm, kNN is one of the most simple learning algorithms.
// To do a prediction of the next market move, the kNN algorithm uses the historic data,
// collected in 3 arrays - feature1, feature2 and directions, - and finds the k-nearest
// neighbours of the current indicator(s) values.
// The two dimensional kNN algorithm just has a look on what has happened in the past when
// the two indicators had a similar level. It then looks at the k nearest neighbours,
// sees their state and thus classifies the current point.
// The kNN algorithm offers a framework to test all kinds of indicators easily to see if they
// have got any *predictive value*. One can easily add cog, wpr and others.
// Note: TradingViews's playback feature helps to see this strategy in action.
// Warning: Signals ARE repainting.
// Style tags: Trend Following, Trend Analysis
// Asset class: Equities, Futures, ETFs, Currencies and Commodities
// Dataset: FX Minutes/Hours+++/Days
//-- Preset Dates
int startdate = timestamp('01 Jan 2000 00:00:00 GMT+10')
int stopdate = timestamp('31 Dec 2025 23:45:00 GMT+10')
//-- Inputs
StartDate = input.time (startdate, 'Start Date')
StopDate = input.time (stopdate, 'Stop Date')
Indicator = input.string('All', 'Indicator', ['RSI','ROC','CCI','Volume','All'])
ShortWinow = input.int (14, 'Short Period [1..n]', 1)
LongWindow = input.int (28, 'Long Period [2..n]', 2)
BaseK = input.int (252, 'Base No. of Neighbours (K) [5..n]', 5)
Filter = input.bool (false, 'Volatility Filter')
Bars = input.int (300, 'Bar Threshold [2..5000]', 2, 5000)
//-- Constants
var int BUY = 1
var int SELL =-1
var int CLEAR = 0
var int k = math.floor(math.sqrt(BaseK)) // k Value for kNN algo
//-- Variable
// Training data, normalized to the range of [0,...,100]
var array<float> feature1 = array.new_float(0) // [0,...,100]
var array<float> feature2 = array.new_float(0) // ...
var array<int> directions = array.new_int(0) // [-1; +1]
// Result data
var array<int> predictions = array.new_int(0)
var float prediction = 0.0
var array<int> bars = array.new<int>(1, 0) // array used as a container for inter-bar variables
// Signals
var int signal = CLEAR
//-- Functions
minimax(float x, int p, float min, float max) =>
float hi = ta.highest(x, p), float lo = ta.lowest(x, p)
(max - min) * (x - lo)/(hi - lo) + min
cAqua(int g) => g>9?#0080FFff:g>8?#0080FFe5:g>7?#0080FFcc:g>6?#0080FFb2:g>5?#0080FF99:g>4?#0080FF7f:g>3?#0080FF66:g>2?#0080FF4c:g>1?#0080FF33:#00C0FF19
cPink(int g) => g>9?#FF0080ff:g>8?#FF0080e5:g>7?#FF0080cc:g>6?#FF0080b2:g>5?#FF008099:g>4?#FF00807f:g>3?#FF008066:g>2?#FF00804c:g>1?#FF008033:#FF008019
inside_window(float start, float stop) =>
time >= start and time <= stop ? true : false
//-- Logic
bool window = inside_window(StartDate, StopDate)
// 3 pairs of predictor indicators, long and short each
float rs = ta.rsi(close, LongWindow), float rf = ta.rsi(close, ShortWinow)
float cs = ta.cci(close, LongWindow), float cf = ta.cci(close, ShortWinow)
float os = ta.roc(close, LongWindow), float of = ta.roc(close, ShortWinow)
float vs = minimax(volume, LongWindow, 0, 99), float vf = minimax(volume, ShortWinow, 0, 99)
// TOADD or TOTRYOUT:
// ta.cmo(close, LongWindow), ta.cmo(close, ShortWinow)
// ta.mfi(close, LongWindow), ta.mfi(close, ShortWinow)
// ta.mom(close, LongWindow), ta.mom(close, ShortWinow)
float f1 = switch Indicator
'RSI' => rs
'CCI' => cs
'ROC' => os
'Volume' => vs
=> math.avg(rs, cs, os, vs)
float f2 = switch Indicator
'RSI' => rf
'CCI' => cf
'ROC' => of
'Volume' => vf
=> math.avg(rf, cf, of, vf)
// Classification data, what happens on the next bar
int class_label = int(math.sign(close[1] - close[0])) // eq. close[1]<close[0] ? SELL: close[1]>close[0] ? BUY : CLEAR
// Use particular training period
if window
// Store everything in arrays. Features represent a square 100 x 100 matrix,
// whose row-colum intersections represent class labels, showing historic directions
array.push(feature1, f1)
array.push(feature2, f2)
array.push(directions, class_label)
// Ucomment the followng statement (if barstate.islast) and tab everything below
// between BOBlock and EOBlock marks to see just the recent several signals gradually
// showing up, rather than all the preceding signals
//if barstate.islast
//==BOBlock
// Core logic of the algorithm
int size = array.size(directions)
float maxdist = -999.0
// Loop through the training arrays, getting distances and corresponding directions.
for i=0 to size-1
// Calculate the euclidean distance of current point to all historic points,
// here the metric used might as well be a manhattan distance or any other.
float d = math.sqrt(math.pow(f1 - array.get(feature1, i), 2) + math.pow(f2 - array.get(feature2, i), 2))
if d > maxdist
maxdist := d
if array.size(predictions) >= k
array.shift(predictions)
array.push(predictions, array.get(directions, i))
//==EOBlock
// Note: in this setup there's no need for distances array (i.e. array.push(distances, d)),
// but the drawback is that a sudden max value may shadow all the subsequent values.
// One of the ways to bypass this is to:
// 1) store d in distances array,
// 2) calculate newdirs = bubbleSort(distances, directions), and then
// 3) take a slice with array.slice(newdirs) from the end
// Get the overall prediction of k nearest neighbours
prediction := array.sum(predictions)
bool filter = Filter ? ta.atr(10) > ta.atr(40) : true // filter out by volatility or ex. ta.atr(1) > ta.atr(10)...
// Now that we got a prediction for the next market move, we need to make use of this prediction and
// trade it. The returns then will show if everything works as predicted.
// Over here is a simple long/short interpretation of the prediction,
// but of course one could also use the quality of the prediction (+5 or +1) in some sort of way,
// ex. for position sizing.
bool long = prediction > 0 and filter
bool short = prediction < 0 and filter
bool clear = not(long and short)
if array.get(bars, 0)==Bars // stop by trade duration
signal := CLEAR
array.set(bars, 0, 0)
else
array.set(bars, 0, array.get(bars, 0) + 1)
signal := long ? BUY : short ? SELL : clear ? CLEAR : nz(signal[1])
int changed = ta.change(signal)
bool startLongTrade = changed and signal==BUY
bool startShortTrade = changed and signal==SELL
// bool endLongTrade = changed and signal==SELL
// bool endShortTrade = changed and signal==BUY
bool clear_condition = changed and signal==CLEAR //or (changed and signal==SELL) or (changed and signal==BUY)
float maxpos = ta.highest(high, 10)
float minpos = ta.lowest (low, 10)
//-- Visuals
plotshape(startLongTrade ? minpos : na, 'Buy', shape.labelup, location.belowbar, cAqua(int(prediction*5)), size=size.small) // color intensity correction
plotshape(startShortTrade ? maxpos : na, 'Sell', shape.labeldown, location.abovebar, cPink(int(-prediction*5)), size=size.small)
// plot(endLongTrade ? ohlc4 : na, 'StopBuy', cAqua(6), 3, plot.style_cross)
// plot(endShortTrade ? ohlc4 : na, 'StopSell', cPink(6), 3, plot.style_cross)
plot(clear_condition ? close : na, 'ClearPos', color.yellow, 4, plot.style_cross)
//-- Notification
// if changed and signal==BUY
// alert('Buy Alert', alert.freq_once_per_bar) // alert.freq_once_per_bar_close
// if changed and signal==SELL
// alert('Sell Alert', alert.freq_once_per_bar)
alertcondition(startLongTrade, 'Buy', 'Go long!')
alertcondition(startShortTrade, 'Sell', 'Go short!')
//alertcondition(startLongTrade or startShortTrade, 'Alert', 'Deal Time!')
//-------------------- Backtesting (TODO)
// show_cumtr = input.bool (false, 'Show Trade Return?')
// lot_size = input.float(100.0, 'Lot Size', [0.1,0.2,0.3,0.5,1,2,3,5,10,20,30,50,100,1000,2000,3000,5000,10000])
// var start_lt = 0.
// var long_trades = 0.
// var start_st = 0.
// var short_trades = 0.
// if startLongTrade
// start_lt := ohlc4
// if endLongTrade
// long_trades := (open - start_lt) * lot_size
// if startShortTrade
// start_st := ohlc4
// if endShortTrade
// short_trades := (start_st - open) * lot_size
// cumreturn = ta.cum(long_trades) + ta.cum(short_trades)
// var label lbl = na
// if show_cumtr //and barstate.islast
// lbl := label.new(bar_index+10, close, 'CumReturn: ' + str.tostring(cumreturn, '#.#'), xloc.bar_index, yloc.price,
// color.new(color.blue, 100), label.style_label_left, color.black, size.small, text.align_left)
// label.delete(lbl[1])
我尝试解码其中的逻辑并将其输出到示例实时图表上的代码。
import requests
import pandas as pd
import matplotlib.pyplot as plt
# Step 1: Define the necessary libraries and variables
symbol = 'BTCUSDT' # Example trading pair
interval = '1d' # Example time interval (daily)
limit = 1000 # Number of historical data points to retrieve
# Step 2: Make a request to the Binance API to fetch historical data
url = f"https://api.binance.com/api/v3/klines?symbol={symbol}&interval={interval}&limit={limit}"
response = requests.get(url)
data = response.json()
# Step 3: Parse the API response into a pandas DataFrame and extract the necessary columns
df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume',
'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df['close'] = pd.to_numeric(df['close'])
# Step 4: Apply the trading algorithm to generate entry and exit signals based on the historical data
# Example trading algorithm based on provided code
df['signal'] = 0 # Initialize the signal column
short_window = 14
long_window = 28
for i in range(long_window, len(df)):
short_avg = df['close'].iloc[i - short_window:i].mean()
long_avg = df['close'].iloc[i - long_window:i].mean()
if short_avg > long_avg:
df.loc[i, 'signal'] = 1 # Entry signal
elif short_avg < long_avg:
df.loc[i, 'signal'] = -1 # Exit signal
# Step 5: Create a line chart to visualize the historical data with entry and exit signals
plt.figure(figsize=(12, 6))
plt.plot(df['timestamp'], df['close'], color='blue', linewidth=1, label='Close')
# Plot entry and exit signals
entry_signals = df[df['signal'] == 1]
exit_signals = df[df['signal'] == -1]
plt.scatter(entry_signals['timestamp'], entry_signals['close'], color='green', marker='^', label='Entry')
plt.scatter(exit_signals['timestamp'], exit_signals['close'], color='red', marker='v', label='Exit')
plt.xlabel('Timestamp')
plt.ylabel('Close Price')
plt.title('Trading Algorithm Entries and Exits')
plt.legend()
plt.grid(True)
plt.show()
我编写的示例交易机器人的代码。
import ccxt
import numpy as np
import talib
import matplotlib.pyplot as plt
# Set up Binance API credentials
api_key = 'your_api_key'
secret_key = 'your_secret_key'
# Create an instance of the Binance exchange class
exchange = ccxt.binance({
'apiKey': api_key,
'secret': secret_key,
'enableRateLimit': True,
})
# Define variables and parameters
symbol = 'BTC/USDT' # Replace with the desired trading pair
timeframe = '1h' # Replace with the desired timeframe
start_date = '2021-01-01' # Replace with the desired start date
short_window = 14 # Short period for indicators
long_window = 28 # Long period for indicators
base_k = 252 # Base number of neighbors (K) for kNN algorithm
filter_volatility = False # Enable or disable volatility filter
bar_threshold = 300 # Bar threshold for trade duration
# Fetch historical OHLCV data from the Binance API
ohlcvs = exchange.fetch_ohlcv(symbol, timeframe, exchange.parse8601(start_date))
timestamps = np.array([ohlc[0] for ohlc in ohlcvs])
opens = np.array([ohlc[1] for ohlc in ohlcvs])
highs = np.array([ohlc[2] for ohlc in ohlcvs])
lows = np.array([ohlc[3] for ohlc in ohlcvs])
closes = np.array([ohlc[4] for ohlc in ohlcvs])
volumes = np.array([ohlc[5] for ohlc in ohlcvs])
# Calculate technical indicators
rsi = talib.RSI(closes, timeperiod=long_window)
cci = talib.CCI(highs, lows, closes, timeperiod=long_window)
roc = talib.ROC(closes, timeperiod=long_window)
volume_scaled = (volumes - np.min(volumes)) / (np.max(volumes) - np.min(volumes)) * 100
# kNN-based algorithm
def kNN_algorithm(feature1, feature2, directions, f1, f2, k):
distances = np.sqrt(np.power(f1 - feature1, 2) + np.power(f2 - feature2, 2))
indices = np.argsort(distances)
k_nearest = directions[indices[:k]]
prediction = np.sign(np.sum(k_nearest))
return prediction
predictions = []
for i in range(len(timestamps)):
if i >= base_k:
f1 = np.mean(rsi[i - base_k + 1:i + 1])
f2 = np.mean(cci[i - base_k + 1:i + 1])
direction = np.sign(closes[i] - closes[i - 1])
predictions.append(kNN_algorithm(rsi[:i], cci[:i], direction[:i], f1, f2, int(np.sqrt(base_k))))
else:
predictions.append(0)
# Plot the OHLCV chart and overlay buy and sell signals
plt.figure(figsize=(12, 8))
plt.title(f'{symbol} {timeframe} Chart')
plt.plot(timestamps, closes, label='Close')
buys = np.where(np.array(predictions) > 0)[0]
sells = np.where(np.array(predictions) < 0)[0]
plt.scatter(timestamps[buys], closes[buys], color='green', marker='^', label='Buy')
plt.scatter(timestamps[sells], closes[sells], color='red', marker='v', label='Sell')
plt.xlabel('Timestamp')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.show()
非常感谢我能在这方面获得的任何帮助。我是自学成才的,并且在互联网上做了很多来回工作,包括聊天 GPT 来完成这么多工作。但没有具体的解决方案。非常感谢您阅读到这里(如果您有的话)!
我只是在寻找和你一样的东西。我创建了自己版本的 python 代码,尽管它运行了,但结果与 Capissimo 的不一样。所以开放不断改进它!
from pandas import DataFrame
from pandas_ta import rsi, cci, roc
from datetime import datetime
from numpy import average, sign
from math import floor, pow, sqrt
# df = read_csv("src/indicators/dummy_data.csv")
start_date = datetime(2000, 1, 1)
end_date = datetime(2025, 12, 31)
def minimax(x, period, min=0, max=99):
# MinMax Normalization
period = int(period)
min = int(min)
max = int(max)
high = x.rolling(window=period).max()
low = x.rolling(window=period).min()
return (max - min) * (x - low) / (high - low) + min
def knn_capissimo(df: DataFrame, param1="all", param2=14, param3=28, param4=252, param5=300) -> DataFrame:
# Machine Learning kNN based Strategy
close = df['close']
high = df['high']
low = df['low']
volume = df['volume']
param1 = str(param1)
short_period = int(param2)
long_period = int(param3)
base_neighbours = int(param4)
k = floor(base_neighbours ** 0.5)
rs = rsi(close, long_period)
cs = cci(high, low, close, long_period)
os = roc(close, long_period)
vs = minimax(volume, long_period, 0, 99)
if param1 == "all":
f1 = average([rs, cs, os, vs], axis=0)
elif param1 == "rsi":
f1 = rs
elif param1 == "cci":
f1 = cs
elif param1 == "roc":
f1 = os
elif param1 == "minimax":
f1 = vs
rf = rsi(close, short_period)
cf = cci(high, low, close, short_period)
of = roc(close, short_period)
vf = minimax(volume, short_period, 0, 99)
if param1 == "all":
f2 = average([rf, cf, of, vf], axis=0)
elif param1 == "rsi":
f2 = rf
elif param1 == "cci":
f2 = cf
elif param1 == "roc":
f2 = of
elif param1 == "minimax":
f2 = vf
class_label = sign(close.shift(-1) - close)
feature1 = f1
feature2 = f2
directions = class_label
size = len(directions)
df['knn_capissimo_Order'] = None
maxdist = -99999
for i in range(size):
predictions=[]
for j in range(i):
d = sqrt(pow(feature1[i] - feature1[j], 2) + pow(feature2[i] - feature2[j], 2))
if d > maxdist:
if len(predictions) >= k:
predictions.pop(0)
predictions.append(directions[j])
# Remove nan from predictions
predictions = [x for x in predictions if str(x) != 'nan']
prediction = sum(predictions)
df.loc[i, 'knn_capissimo_Order'] = "buy" if prediction > 0 else "sell" if prediction < 0 else None
df['knn_capissimo_Signal'] = (df['knn_capissimo_Order'].shift(1) != df['knn_capissimo_Order']) & (~df['knn_capissimo_Order'].isna())
return df
我不明白
Bars
变量的用途是什么,所以我没有包含它。尝试寻找适用于交易指标的 KNN 算法的更好定义,但我能找到的只是这个问题。
希望它有帮助,或者如果找到更好的解决方案,请张贴在这里!
谢谢!