pymongo 未加载数据库的问题

问题描述 投票:0回答:1

如何使用 pymongo 将多个映射数据集加载到 mongodb 我的程序正在运行,但没有加载任何内容。我运行了它,我可以单独使用 parquet 和 csv 来完成此操作,但它不适用于我当前的方法。如果我犯了某种错误或忘记添加某些内容,请您指出给我。
'''

import csv
import os
import pyarrow.parquet as pq
import pymongo

# MongoDB connection settings
MONGO_URI = "mongodb://localhost:27017/"
DB_NAME = "market_data"

datasets = [

    {'path': r'D:\TradingData\ticker data\finished data - Copy\archive',
     'headers_map': {'open': 'open', 'high': 'high', 'low': 'low', 'close': 'close', 'volume': 'volume',
                     'unix_timestamp': 'unix_timestamp'}},
    {'path': r'D:\TradingData\ticker data\finished data - Copy\archive (24)',
     'headers_map': {'unix_timestamp': 'unix_timestamp', 'open': 'open', 'close': 'close', 'high': 'high', 'low': 'low',
                     'volume': 'volume'}},
    {'path': r'D:\TradingData\ticker data\finished data - Copy\archive (26)\New folder',
     'headers_map': {'close': 'close', 'high': 'high', 'low': 'low', 'open': 'open', 'volume': 'volume', 'sma5': 'sma5',
                     'sma10': 'sma10', 'sma15': 'sma15', 'sma20': 'sma20', 'ema5': 'ema5', 'ema10': 'ema10',
                     'ema15': 'ema15', 'ema20': 'ema20', 'upperband': 'upperband', 'middleband': 'middleband',
                     'lowerband': 'lowerband', 'HT_TRENDLINE': 'HT_TRENDLINE', 'KAMA10': 'KAMA10', 'KAMA20': 'KAMA20',
                     'KAMA30': 'KAMA30', 'SAR': 'SAR', 'TRIMA5': 'TRIMA5', 'TRIMA10': 'TRIMA10', 'TRIMA20': 'TRIMA20',
                     'ADX5': 'ADX5', 'ADX10': 'ADX10', 'ADX20': 'ADX20', 'APO': 'APO', 'CCI5': 'CCI5', 'CCI10': 'CCI10',
                     'CCI15': 'CCI15', 'macd510': 'macd510', 'macd520': 'macd520', 'macd1020': 'macd1020',
                     'macd1520': 'macd1520', 'macd1226': 'macd1226', 'MFI': 'MFI', 'MOM10': 'MOM10', 'MOM15': 'MOM15',
                     'MOM20': 'MOM20', 'ROC5': 'ROC5', 'ROC10': 'ROC10', 'ROC20': 'ROC20', 'PPO': 'PPO',
                     'RSI14': 'RSI14', 'RSI8': 'RSI8', 'slowk': 'slowk', 'slowd': 'slowd', 'fastk': 'fastk',
                     'fastd': 'fastd', 'fastksr': 'fastksr', 'fastdsr': 'fastdsr', 'ULTOSC': 'ULTOSC', 'WILLR': 'WILLR',
                     'ATR': 'ATR', 'Trange': 'Trange', 'TYPPRICE': 'TYPPRICE', 'HT_DCPERIOD': 'HT_DCPERIOD',
                     'BETA': 'BETA', 'unix_timestamp': 'unix_timestamp'}},
    {'path': r'D:\TradingData\ticker data\finished data - Copy\binance',
     'headers_map': {'unix_timestamp': 'unix_timestamp', 'Symbol': 'Symbol', 'Open': 'Open', 'High': 'High',
                     'Low': 'Low', 'Close': 'Close', 'volume': 'volume', 'volume standard': 'volume standard',
                     'tradecount': 'tradecount'}},
    {'path': r'D:\TradingData\ticker data\finished data - Copy\binance trade data',
     'headers_map': {'unix_timestamp': 'unix_timestamp', 'Cryptocurrency Pair': 'Cryptocurrency Pair',
                     'Open Price': 'Open Price', 'High Price': 'High Price', 'Low Price': 'Low Price',
                     'Close Price': 'Close Price', 'Trading Volume': 'Trading Volume',
                     'Weighted Average Price': 'Weighted Average Price', 'Number of Trades': 'Number of Trades'}},
    {'path': r'D:\TradingData\ticker data\finished data - Copy\bitfinex',
     'headers_map': {'unix_timestamp': 'unix_timestamp', 'symbol': 'symbol', 'open': 'open', 'high': 'high',
                     'low': 'low', 'close': 'close', 'volume': 'volume', 'volume standard': 'volume standard'}},
    {'path': r'D:\TradingData\ticker data\finished data - Copy\cleaned_data\day and hour',
     'headers_map': {'unix_timestamp': 'unix_timestamp', 'symbol': 'symbol', 'open': 'open', 'close': 'close', 'high': 'high', 'low': 'low',
                     'volume': 'volume'}},
    {'path': r'D:\TradingData\ticker data\finished data - Copy\New folder (2)',
     'headers_map': {'price': 'price', 'open': 'open', 'high': 'high', 'low': 'low', 'volume': 'volume',
                     'change %': 'change %', 'unix_timestamp': 'unix_timestamp'}},
    {'path': r'D:\TradingData\ticker data\finished data - Copy\P New_folder (2)',
     'headers_map': {'ADX10': 'ADX10', 'ADX20': 'ADX20', 'ADX5': 'ADX5', 'APO': 'APO', 'ATR': 'ATR', 'BETA': 'BETA',
                     'CCI10': 'CCI10', 'CCI15': 'CCI15', 'CCI5': 'CCI5', 'close': 'close', 'ema10': 'ema10',
                     'ema15': 'ema15', 'ema20': 'ema20', 'ema5': 'ema5', 'fastd': 'fastd', 'fastdsr': 'fastdsr',
                     'fastk': 'fastk', 'fastksr': 'fastksr', 'high': 'high', 'HT_DCPERIOD': 'HT_DCPERIOD',
                     'HT_TRENDLINE': 'HT_TRENDLINE', 'KAMA10': 'KAMA10', 'KAMA20': 'KAMA20', 'KAMA30': 'KAMA30',
                     'low': 'low', 'lowerband': 'lowerband', 'macd1020': 'macd1020', 'macd1226': 'macd1226',
                     'macd1520': 'macd1520', 'macd510': 'macd510', 'macd520': 'macd520', 'MFI': 'MFI',
                     'middleband': 'middleband', 'MOM10': 'MOM10', 'MOM15': 'MOM15', 'MOM20': 'MOM20', 'open': 'open',
                     'PPO': 'PPO', 'ROC10': 'ROC10', 'ROC20': 'ROC20', 'ROC5': 'ROC5', 'RSI14': 'RSI14', 'RSI8': 'RSI8',
                     'SAR': 'SAR', 'slowd': 'slowd', 'slowk': 'slowk', 'sma10': 'sma10', 'sma15': 'sma15',
                     'sma20': 'sma20', 'sma5': 'sma5', 'Trange': 'Trange', 'TRIMA10': 'TRIMA10', 'TRIMA20': 'TRIMA20',
                     'TRIMA5': 'TRIMA5', 'TYPPRICE': 'TYPPRICE', 'ULTOSC': 'ULTOSC', 'unix_timestamp': 'unix_timestamp',
                     'upperband': 'upperband', 'volume': 'volume', 'WILLR': 'WILLR'}},
    {'path': r'D:\TradingData\ticker data\finished data - Copy\P timeseries-1d-stocks\data',
     'headers_map': {'adj_close': 'adj_close', 'close': 'close', 'high': 'high', 'low': 'low', 'open': 'open',
                     'symbol': 'symbol', 'unix_timestamp': 'unix_timestamp', 'volume': 'volume'}},
    {'path': r'D:\TradingData\ticker data\finished data - Copy\P timeseries-1m-stocks\data',
     'headers_map': {'close': 'close', 'high': 'high', 'low': 'low', 'open': 'open', 'symbol': 'symbol',
                     'unix_timestamp': 'unix_timestamp', 'volume': 'volume'}}]

final_columns = [
    'open', 'high', 'low', 'close', 'volume', 'unix_timestamp', 'Symbol',
    'volume standard', 'tradecount', 'Cryptocurrency Pair', 'Weighted Average Price',
    'Number of Trades', 'price', 'change %', 'ADX10', 'ADX20', 'ADX5', 'APO', 'ATR',
    'BETA', 'CCI10', 'CCI15', 'CCI5', 'ema10', 'ema15', 'ema20', 'ema5', 'fastd',
    'fastdsr', 'fastk', 'fastksr', 'HT_DCPERIOD', 'HT_TRENDLINE', 'KAMA10',
    'KAMA20', 'KAMA30', 'lowerband', 'macd1020', 'macd1226', 'macd1520', 'macd510',
    'macd520', 'MFI', 'middleband', 'MOM10', 'MOM15', 'MOM20', 'PPO', 'ROC10',
    'ROC20', 'ROC5', 'RSI14', 'RSI8', 'SAR', 'slowd', 'slowk', 'sma10', 'sma15',
    'sma20', 'sma5', 'Trange', 'TRIMA10', 'TRIMA20', 'TRIMA5', 'TYPPRICE', 'ULTOSC',
    'upperband', 'WILLR', 'adj_close'
]

client = pymongo.MongoClient(MONGO_URI)
db = client[DB_NAME]

for dataset in datasets:

    collection_name = dataset["path"].split("\\")[-1]
    collection = db[collection_name]

    if dataset["path"].endswith(".csv"):
        files = [f for f in os.listdir(dataset["path"]) if f.endswith(".csv")]
        for f in files:
            with open(os.path.join(dataset["path"], f), 'r') as file:
                reader = csv.DictReader(file)
                data = [{dataset["headers_map"].get(k, k): v for k, v in row.items()} for row in reader]
                collection.insert_many(data)

    elif dataset["path"].endswith(".parquet"):
        files = [f for f in os.listdir(dataset["path"]) if f.endswith(".parquet")]
        for f in files:
            table = pq.read_table(os.path.join(dataset["path"], f))
            df = table.to_pandas()
            data = df.to_dict(orient="records")
            collection.insert_many(data)

    for col in final_columns:
        collection.update_many({}, [{"$set": {col: f"${col}"}}])

print("Data loaded to MongoDB")

'''

python mongodb csv pymongo parquet
1个回答
0
投票
import csv
import os
import pyarrow.parquet as pq
import pymongo

# MongoDB connection settings
MONGO_URI = "mongodb://localhost:27017/"
DB_NAME = "market_data"

datasets = [
{'path': r'D:\TradingData\ticker data\finished data - Copy\archive',
 'headers_map': {'open': 'open', 'high': 'high', 'low': 'low', 'close': 'close', 'volume': 'volume',
                 'unix_timestamp': 'unix_timestamp'}},
# Add other datasets here
]

final_columns = [
'open', 'high', 'low', 'close', 'volume', 'unix_timestamp'
# Add other final columns here
]

client = pymongo.MongoClient(MONGO_URI)
db = client[DB_NAME]

for dataset in datasets:
collection_name = dataset["path"].split("\\")[-1]
collection = db[collection_name]

if dataset["path"].endswith(".csv"):
    files = [f for f in os.listdir(dataset["path"]) if f.endswith(".csv")]
    for f in files:
        with open(os.path.join(dataset["path"], f), 'r') as file:
            reader = csv.DictReader(file)
            data = [{dataset["headers_map"].get(k, k): v for k, v in row.items()} for row in reader]
            collection.insert_many(data)

elif dataset["path"].endswith(".parquet"):
    files = [f for f in os.listdir(dataset["path"]) if f.endswith(".parquet")]
    for f in files:
        table = pq.read_table(os.path.join(dataset["path"], f))
        df = table.to_pandas()
        data = df.to_dict(orient="records")
        collection.insert_many(data)

# Move the update operation here, inside the loop
for col in final_columns:
    collection.update_many({}, [{"$set": {col: f"${col}"}}])

print("Data loaded to MongoDB")

试试这个如果有效的话

© www.soinside.com 2019 - 2024. All rights reserved.