如何将pinescript中这种复杂的逻辑指标转换为python?

问题描述 投票:0回答:1

我的查询详情:

  1. 我正在尝试制作一个自动交易机器人(个人使用)。

  2. 我发现TradingView上的这个指标非常准确。不幸的是,这不是一种策略,因此我无法使用预先构建的界面来自动化交易。 TradingView 上的 Capissimo 称之为 kNN Machine Learning,大家可以参考一下。 (代码如下)

  3. 我决定利用我的Python知识自己构建一个机器人。 (下面也有代码)

  4. 如何将这个逻辑转换为Python?而且我也不确定如何执行代码以使其在币安上连续进行交易。

  5. 我已经编写了逻辑,并使用 Binance API 将其输出到示例图表上,但它似乎不是很准确。 (还有下面的代码)

  6. 我已经在 Visual Studio Code 上编写了它。

Pinescript kNN 指标代码:

// This source code is subject to the terms of the Mozilla Public License 2.0 at https://mozilla.org/MPL/2.0/
// © capissimo

//@version=5
indicator('Machine Learning: kNN-based Strategy', 'ML-kNN', true, max_labels_count=300, format=format.price, precision=2, timeframe="", timeframe_gaps=true)

// kNN-based Strategy (FX and Crypto)
// Description: 
// This strategy uses a classic machine learning algorithm - k Nearest Neighbours (kNN) - 
// to let you find a prediction for the next (tomorrow's, next month's, etc.) market move. 
// Being an unsupervised machine learning algorithm, kNN is one of the most simple learning algorithms. 

// To do a prediction of the next market move, the kNN algorithm uses the historic data, 
// collected in 3 arrays - feature1, feature2 and directions, - and finds the k-nearest 
// neighbours of the current indicator(s) values. 

// The two dimensional kNN algorithm just has a look on what has happened in the past when 
// the two indicators had a similar level. It then looks at the k nearest neighbours, 
// sees their state and thus classifies the current point.

// The kNN algorithm offers a framework to test all kinds of indicators easily to see if they 
// have got any *predictive value*. One can easily add cog, wpr and others.
// Note: TradingViews's playback feature helps to see this strategy in action.
// Warning: Signals ARE repainting.

// Style tags: Trend Following, Trend Analysis
// Asset class: Equities, Futures, ETFs, Currencies and Commodities
// Dataset: FX Minutes/Hours+++/Days

//-- Preset Dates

int startdate = timestamp('01 Jan 2000 00:00:00 GMT+10')
int stopdate  = timestamp('31 Dec 2025 23:45:00 GMT+10')

//-- Inputs

StartDate  = input.time  (startdate, 'Start Date')
StopDate   = input.time  (stopdate,  'Stop Date')
Indicator  = input.string('All',     'Indicator',   ['RSI','ROC','CCI','Volume','All'])
ShortWinow = input.int   (14,        'Short Period [1..n]', 1)
LongWindow = input.int   (28,        'Long Period [2..n]',  2)
BaseK      = input.int   (252,       'Base No. of Neighbours (K) [5..n]', 5)
Filter     = input.bool  (false,     'Volatility Filter')
Bars       = input.int   (300,       'Bar Threshold [2..5000]', 2, 5000)

//-- Constants

var int BUY   = 1
var int SELL  =-1
var int CLEAR = 0

var int k     = math.floor(math.sqrt(BaseK))  // k Value for kNN algo

//-- Variable

// Training data, normalized to the range of [0,...,100]
var array<float> feature1   = array.new_float(0)  // [0,...,100]
var array<float> feature2   = array.new_float(0)  //    ...
var array<int>   directions = array.new_int(0)    // [-1; +1]

// Result data
var array<int>   predictions = array.new_int(0)
var float        prediction  = 0.0
var array<int>   bars        = array.new<int>(1, 0) // array used as a container for inter-bar variables

// Signals
var int          signal      = CLEAR

//-- Functions

minimax(float x, int p, float min, float max) => 
    float hi = ta.highest(x, p), float lo = ta.lowest(x, p)
    (max - min) * (x - lo)/(hi - lo) + min

cAqua(int g) => g>9?#0080FFff:g>8?#0080FFe5:g>7?#0080FFcc:g>6?#0080FFb2:g>5?#0080FF99:g>4?#0080FF7f:g>3?#0080FF66:g>2?#0080FF4c:g>1?#0080FF33:#00C0FF19
cPink(int g) => g>9?#FF0080ff:g>8?#FF0080e5:g>7?#FF0080cc:g>6?#FF0080b2:g>5?#FF008099:g>4?#FF00807f:g>3?#FF008066:g>2?#FF00804c:g>1?#FF008033:#FF008019

inside_window(float start, float stop) =>  
    time >= start and time <= stop ? true : false

//-- Logic

bool window = inside_window(StartDate, StopDate)

// 3 pairs of predictor indicators, long and short each
float rs = ta.rsi(close,   LongWindow),        float rf = ta.rsi(close,   ShortWinow)
float cs = ta.cci(close,   LongWindow),        float cf = ta.cci(close,   ShortWinow)
float os = ta.roc(close,   LongWindow),        float of = ta.roc(close,   ShortWinow)
float vs = minimax(volume, LongWindow, 0, 99), float vf = minimax(volume, ShortWinow, 0, 99)

// TOADD or TOTRYOUT:
//    ta.cmo(close, LongWindow), ta.cmo(close, ShortWinow)
//    ta.mfi(close, LongWindow), ta.mfi(close, ShortWinow)
//    ta.mom(close, LongWindow), ta.mom(close, ShortWinow)

float f1 = switch Indicator
    'RSI'    => rs 
    'CCI'    => cs 
    'ROC'    => os 
    'Volume' => vs 
    => math.avg(rs, cs, os, vs)

float f2 = switch Indicator
    'RSI'    => rf 
    'CCI'    => cf
    'ROC'    => of
    'Volume' => vf 
    => math.avg(rf, cf, of, vf)

// Classification data, what happens on the next bar
int class_label = int(math.sign(close[1] - close[0])) // eq. close[1]<close[0] ? SELL: close[1]>close[0] ? BUY : CLEAR

// Use particular training period
if window
    // Store everything in arrays. Features represent a square 100 x 100 matrix,
    // whose row-colum intersections represent class labels, showing historic directions
    array.push(feature1, f1)
    array.push(feature2, f2)
    array.push(directions, class_label)

// Ucomment the followng statement (if barstate.islast) and tab everything below
// between BOBlock and EOBlock marks to see just the recent several signals gradually 
// showing up, rather than all the preceding signals

//if barstate.islast   

//==BOBlock 

// Core logic of the algorithm
int   size    = array.size(directions)
float maxdist = -999.0
// Loop through the training arrays, getting distances and corresponding directions.
for i=0 to size-1
    // Calculate the euclidean distance of current point to all historic points,
    // here the metric used might as well be a manhattan distance or any other.
    float d = math.sqrt(math.pow(f1 - array.get(feature1, i), 2) + math.pow(f2 - array.get(feature2, i), 2))
    
    if d > maxdist
        maxdist := d
        if array.size(predictions) >= k
            array.shift(predictions)
        array.push(predictions, array.get(directions, i))
        
//==EOBlock 

// Note: in this setup there's no need for distances array (i.e. array.push(distances, d)),
//       but the drawback is that a sudden max value may shadow all the subsequent values.
// One of the ways to bypass this is to:
// 1) store d in distances array,
// 2) calculate newdirs = bubbleSort(distances, directions), and then 
// 3) take a slice with array.slice(newdirs) from the end
        
// Get the overall prediction of k nearest neighbours
prediction := array.sum(predictions)   

bool filter = Filter ? ta.atr(10) > ta.atr(40) : true // filter out by volatility or ex. ta.atr(1) > ta.atr(10)...

// Now that we got a prediction for the next market move, we need to make use of this prediction and 
// trade it. The returns then will show if everything works as predicted.
// Over here is a simple long/short interpretation of the prediction, 
// but of course one could also use the quality of the prediction (+5 or +1) in some sort of way,
// ex. for position sizing.

bool long  = prediction > 0 and filter
bool short = prediction < 0 and filter
bool clear = not(long and short)

if array.get(bars, 0)==Bars    // stop by trade duration
    signal := CLEAR
    array.set(bars, 0, 0)
else
    array.set(bars, 0, array.get(bars, 0) + 1)

signal := long ? BUY : short ? SELL : clear ? CLEAR : nz(signal[1])

int  changed         = ta.change(signal)
bool startLongTrade  = changed and signal==BUY 
bool startShortTrade = changed and signal==SELL 
// bool endLongTrade    = changed and signal==SELL
// bool endShortTrade   = changed and signal==BUY  
bool clear_condition = changed and signal==CLEAR //or (changed and signal==SELL) or (changed and signal==BUY)

float maxpos = ta.highest(high, 10)
float minpos = ta.lowest (low,  10)

//-- Visuals

plotshape(startLongTrade  ? minpos : na, 'Buy',      shape.labelup,   location.belowbar, cAqua(int(prediction*5)),  size=size.small)  // color intensity correction
plotshape(startShortTrade ? maxpos : na, 'Sell',     shape.labeldown, location.abovebar, cPink(int(-prediction*5)), size=size.small)
// plot(endLongTrade         ? ohlc4  : na, 'StopBuy',  cAqua(6), 3, plot.style_cross)
// plot(endShortTrade        ? ohlc4  : na, 'StopSell', cPink(6), 3, plot.style_cross)
plot(clear_condition      ? close  : na, 'ClearPos', color.yellow, 4, plot.style_cross)


//-- Notification

// if changed and signal==BUY 
//     alert('Buy Alert', alert.freq_once_per_bar)  // alert.freq_once_per_bar_close
// if changed and signal==SELL 
//     alert('Sell Alert', alert.freq_once_per_bar)

alertcondition(startLongTrade,  'Buy',  'Go long!')
alertcondition(startShortTrade, 'Sell', 'Go short!') 
//alertcondition(startLongTrade or startShortTrade, 'Alert', 'Deal Time!')

//-------------------- Backtesting (TODO)

// show_cumtr = input.bool (false, 'Show Trade Return?')
// lot_size   = input.float(100.0, 'Lot Size', [0.1,0.2,0.3,0.5,1,2,3,5,10,20,30,50,100,1000,2000,3000,5000,10000]) 

// var start_lt     = 0.  
// var long_trades  = 0.
// var start_st     = 0.  
// var short_trades = 0.

// if startLongTrade 
//     start_lt := ohlc4 
// if endLongTrade
//     long_trades := (open - start_lt) * lot_size  
// if startShortTrade 
//     start_st := ohlc4 
// if endShortTrade
//     short_trades := (start_st - open) * lot_size 
    
// cumreturn = ta.cum(long_trades) + ta.cum(short_trades) 
    
// var label lbl = na
// if show_cumtr //and barstate.islast  
//     lbl := label.new(bar_index+10, close, 'CumReturn: ' + str.tostring(cumreturn, '#.#'), xloc.bar_index, yloc.price, 
//                      color.new(color.blue, 100), label.style_label_left, color.black, size.small, text.align_left)
//     label.delete(lbl[1])

我尝试解码其中的逻辑并将其输出到示例实时图表上的代码。

import requests
import pandas as pd
import matplotlib.pyplot as plt

# Step 1: Define the necessary libraries and variables

symbol = 'BTCUSDT'  # Example trading pair
interval = '1d'    # Example time interval (daily)
limit = 1000       # Number of historical data points to retrieve

# Step 2: Make a request to the Binance API to fetch historical data

url = f"https://api.binance.com/api/v3/klines?symbol={symbol}&interval={interval}&limit={limit}"
response = requests.get(url)
data = response.json()

# Step 3: Parse the API response into a pandas DataFrame and extract the necessary columns

df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume',
                                 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df['close'] = pd.to_numeric(df['close'])

# Step 4: Apply the trading algorithm to generate entry and exit signals based on the historical data

# Example trading algorithm based on provided code
df['signal'] = 0  # Initialize the signal column

short_window = 14
long_window = 28

for i in range(long_window, len(df)):
    short_avg = df['close'].iloc[i - short_window:i].mean()
    long_avg = df['close'].iloc[i - long_window:i].mean()

    if short_avg > long_avg:
        df.loc[i, 'signal'] = 1  # Entry signal
    elif short_avg < long_avg:
        df.loc[i, 'signal'] = -1  # Exit signal

# Step 5: Create a line chart to visualize the historical data with entry and exit signals

plt.figure(figsize=(12, 6))
plt.plot(df['timestamp'], df['close'], color='blue', linewidth=1, label='Close')

# Plot entry and exit signals
entry_signals = df[df['signal'] == 1]
exit_signals = df[df['signal'] == -1]
plt.scatter(entry_signals['timestamp'], entry_signals['close'], color='green', marker='^', label='Entry')
plt.scatter(exit_signals['timestamp'], exit_signals['close'], color='red', marker='v', label='Exit')

plt.xlabel('Timestamp')
plt.ylabel('Close Price')
plt.title('Trading Algorithm Entries and Exits')
plt.legend()
plt.grid(True)

plt.show()

我编写的示例交易机器人的代码。

import ccxt
import numpy as np
import talib
import matplotlib.pyplot as plt

# Set up Binance API credentials
api_key = 'your_api_key'
secret_key = 'your_secret_key'

# Create an instance of the Binance exchange class
exchange = ccxt.binance({
    'apiKey': api_key,
    'secret': secret_key,
    'enableRateLimit': True,
})

# Define variables and parameters
symbol = 'BTC/USDT'  # Replace with the desired trading pair
timeframe = '1h'  # Replace with the desired timeframe
start_date = '2021-01-01'  # Replace with the desired start date

short_window = 14  # Short period for indicators
long_window = 28  # Long period for indicators
base_k = 252  # Base number of neighbors (K) for kNN algorithm
filter_volatility = False  # Enable or disable volatility filter
bar_threshold = 300  # Bar threshold for trade duration

# Fetch historical OHLCV data from the Binance API
ohlcvs = exchange.fetch_ohlcv(symbol, timeframe, exchange.parse8601(start_date))
timestamps = np.array([ohlc[0] for ohlc in ohlcvs])
opens = np.array([ohlc[1] for ohlc in ohlcvs])
highs = np.array([ohlc[2] for ohlc in ohlcvs])
lows = np.array([ohlc[3] for ohlc in ohlcvs])
closes = np.array([ohlc[4] for ohlc in ohlcvs])
volumes = np.array([ohlc[5] for ohlc in ohlcvs])

# Calculate technical indicators
rsi = talib.RSI(closes, timeperiod=long_window)
cci = talib.CCI(highs, lows, closes, timeperiod=long_window)
roc = talib.ROC(closes, timeperiod=long_window)
volume_scaled = (volumes - np.min(volumes)) / (np.max(volumes) - np.min(volumes)) * 100

# kNN-based algorithm
def kNN_algorithm(feature1, feature2, directions, f1, f2, k):
    distances = np.sqrt(np.power(f1 - feature1, 2) + np.power(f2 - feature2, 2))
    indices = np.argsort(distances)
    k_nearest = directions[indices[:k]]
    prediction = np.sign(np.sum(k_nearest))
    return prediction

predictions = []
for i in range(len(timestamps)):
    if i >= base_k:
        f1 = np.mean(rsi[i - base_k + 1:i + 1])
        f2 = np.mean(cci[i - base_k + 1:i + 1])
        direction = np.sign(closes[i] - closes[i - 1])
        predictions.append(kNN_algorithm(rsi[:i], cci[:i], direction[:i], f1, f2, int(np.sqrt(base_k))))
    else:
        predictions.append(0)

# Plot the OHLCV chart and overlay buy and sell signals
plt.figure(figsize=(12, 8))
plt.title(f'{symbol} {timeframe} Chart')
plt.plot(timestamps, closes, label='Close')

buys = np.where(np.array(predictions) > 0)[0]
sells = np.where(np.array(predictions) < 0)[0]

plt.scatter(timestamps[buys], closes[buys], color='green', marker='^', label='Buy')
plt.scatter(timestamps[sells], closes[sells], color='red', marker='v', label='Sell')

plt.xlabel('Timestamp')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.show()

非常感谢我能在这方面获得的任何帮助。我是自学成才的,并且在互联网上做了很多来回工作,包括聊天 GPT 来完成这么多工作。但没有具体的解决方案。非常感谢您阅读到这里(如果您有的话)!

python python-3.x pine-script pine-script-v5
1个回答
0
投票

我只是在寻找和你一样的东西。我创建了自己版本的 python 代码,尽管它运行了,但结果与 Capissimo 的不一样。所以开放不断改进它!

from pandas import DataFrame
from pandas_ta import rsi, cci, roc
from datetime import datetime
from numpy import average, sign
from math import floor, pow, sqrt

# df = read_csv("src/indicators/dummy_data.csv")

start_date = datetime(2000, 1, 1)
end_date = datetime(2025, 12, 31)

def minimax(x, period, min=0, max=99):
    # MinMax Normalization

    period = int(period)
    min = int(min)
    max = int(max)
    high = x.rolling(window=period).max()
    low = x.rolling(window=period).min()

    return (max - min) * (x - low) / (high - low) + min

def knn_capissimo(df: DataFrame, param1="all", param2=14, param3=28, param4=252, param5=300) -> DataFrame:
    # Machine Learning kNN based Strategy

    close = df['close']
    high = df['high']
    low = df['low']
    volume = df['volume']

    param1 = str(param1)
    short_period = int(param2)
    long_period = int(param3)
    base_neighbours = int(param4)

    k = floor(base_neighbours ** 0.5)
    rs = rsi(close, long_period)
    cs = cci(high, low, close, long_period)
    os = roc(close, long_period)
    vs = minimax(volume, long_period, 0, 99)
    if param1 == "all":
        f1 = average([rs, cs, os, vs], axis=0)
    elif param1 == "rsi":
        f1 = rs
    elif param1 == "cci":
        f1 = cs
    elif param1 == "roc":
        f1 = os
    elif param1 == "minimax":
        f1 = vs

    rf = rsi(close, short_period)
    cf = cci(high, low, close, short_period)
    of = roc(close, short_period)
    vf = minimax(volume, short_period, 0, 99)
    if param1 == "all":
        f2 = average([rf, cf, of, vf], axis=0)
    elif param1 == "rsi":
        f2 = rf
    elif param1 == "cci":
        f2 = cf
    elif param1 == "roc":
        f2 = of
    elif param1 == "minimax":
        f2 = vf

    class_label = sign(close.shift(-1) - close)

    feature1 = f1
    feature2 = f2
    directions = class_label

    size = len(directions)

    df['knn_capissimo_Order'] = None
    maxdist = -99999
    for i in range(size):

        predictions=[]
        
        for j in range(i):

            d = sqrt(pow(feature1[i] - feature1[j], 2) + pow(feature2[i] - feature2[j], 2))

            if d > maxdist:

                if len(predictions) >= k:
                    predictions.pop(0)

                predictions.append(directions[j])

        # Remove nan from predictions
        predictions = [x for x in predictions if str(x) != 'nan']

        prediction = sum(predictions)

        df.loc[i, 'knn_capissimo_Order'] = "buy" if prediction > 0 else "sell" if prediction < 0 else None

    df['knn_capissimo_Signal'] = (df['knn_capissimo_Order'].shift(1) != df['knn_capissimo_Order']) & (~df['knn_capissimo_Order'].isna())

    return df

我不明白

Bars
变量的用途是什么,所以我没有包含它。尝试寻找适用于交易指标的 KNN 算法的更好定义,但我能找到的只是这个问题。

希望它有帮助,或者如果找到更好的解决方案,请张贴在这里!

谢谢!

© www.soinside.com 2019 - 2024. All rights reserved.