如何从 Phemex Public API 获取历史烛台数据或 K 线?

问题描述 投票:0回答:1

我需要能够在指定时间(1m、3m、5m、1H 等)提取烛台的历史烛台数据(例如开盘价、收盘价、最高价、最低价和成交量)(时间戳)来自 Phemex

其他交易所,例如 Binance 或 FTX,似乎为此提供了 REST Websocket API,但我似乎找不到适用于 Phemex 的 API。介意帮我解决这个问题吗?非常感谢。

我已采取的步骤,但尚未找到解决方案:

  1. 前往https://phemex.com/user-guides/api-overview
  2. 去了https://github.com/phemex/phemex-api-docs/blob/master/Public-Contract-API-en.md
  3. “市场数据 API 列表”中列出的项目似乎都无法完成该任务
algorithmic-trading trading cryptocurrency candlestick-chart candlesticks
1个回答
0
投票

此代码将获取蜡烛并将其保存到 csv 文件中。希望这有帮助:)

exchange = ccxt.phemex({
    'options': { 'defaultType': 'swap' },
    'enableRateLimit': True
})
# Load the markets
markets = exchange.load_markets()

curent_time = int(time.time()*1000)
one_min = 60000     

def get_all_candels(symbol,start_time,stop_time):
    counter = 0
    candel_counter = 0
    data_set = []
    t = 0
    while t < stop_time:
        if data_set == []:
            block = exchange.fetch_ohlcv(symbol,'1m',start_time)
            for candle in block:
                if candle == []:
                    break
                data_set.append(candle)
            last_time_in_block = block[-1][0]
            counter += 1
            candel_counter += len(block)
            print(f'{counter} - {block[0]} - {candel_counter} - {last_time_in_block}')

        if data_set != []:
            t = last_time_in_block + one_min
            block = exchange.fetch_ohlcv(symbol,'1m',t)
            if block == []:
                break
            for candle in block:
                if candle == []:
                    break
                data_set.append(candle)
            last_time_in_block = block[-1][0]
            candel_counter += len(block)
            counter += 1
            print(f'{counter} - {block[0]} - {candel_counter} - {last_time_in_block}')

            time.sleep(1)
    return data_set
    
data_set = get_all_candels('BTCUSD',1574726400000,curent_time)

print(np.shape(data_set))

with open('raw.csv', 'w', newline='') as csv_file:
    column_names = ['time', 'open', 'high', 'low', 'close', 'volume']

    csv_writer = csv.DictWriter(csv_file,fieldnames=column_names)

    csv_writer.writeheader()

    for candel in data_set:
        csv_writer.writerow({
            'time':candel[0],
            'open':candel[1],
            'high':candel[2],
            'low':candel[3],
            'close':candel[4],
            'volume':candel[5]
        })
© www.soinside.com 2019 - 2024. All rights reserved.