如何使用 pymongo 将多个映射数据集加载到 mongodb 我的程序正在运行,但没有加载任何内容。我运行了它,我可以单独使用 parquet 和 csv 来完成此操作,但它不适用于我当前的方法。如果我犯了某种错误或忘记添加某些内容,请您指出给我。
'''
import csv
import os
import pyarrow.parquet as pq
import pymongo
# MongoDB connection settings
MONGO_URI = "mongodb://localhost:27017/"
DB_NAME = "market_data"
datasets = [
{'path': r'D:\TradingData\ticker data\finished data - Copy\archive',
'headers_map': {'open': 'open', 'high': 'high', 'low': 'low', 'close': 'close', 'volume': 'volume',
'unix_timestamp': 'unix_timestamp'}},
{'path': r'D:\TradingData\ticker data\finished data - Copy\archive (24)',
'headers_map': {'unix_timestamp': 'unix_timestamp', 'open': 'open', 'close': 'close', 'high': 'high', 'low': 'low',
'volume': 'volume'}},
{'path': r'D:\TradingData\ticker data\finished data - Copy\archive (26)\New folder',
'headers_map': {'close': 'close', 'high': 'high', 'low': 'low', 'open': 'open', 'volume': 'volume', 'sma5': 'sma5',
'sma10': 'sma10', 'sma15': 'sma15', 'sma20': 'sma20', 'ema5': 'ema5', 'ema10': 'ema10',
'ema15': 'ema15', 'ema20': 'ema20', 'upperband': 'upperband', 'middleband': 'middleband',
'lowerband': 'lowerband', 'HT_TRENDLINE': 'HT_TRENDLINE', 'KAMA10': 'KAMA10', 'KAMA20': 'KAMA20',
'KAMA30': 'KAMA30', 'SAR': 'SAR', 'TRIMA5': 'TRIMA5', 'TRIMA10': 'TRIMA10', 'TRIMA20': 'TRIMA20',
'ADX5': 'ADX5', 'ADX10': 'ADX10', 'ADX20': 'ADX20', 'APO': 'APO', 'CCI5': 'CCI5', 'CCI10': 'CCI10',
'CCI15': 'CCI15', 'macd510': 'macd510', 'macd520': 'macd520', 'macd1020': 'macd1020',
'macd1520': 'macd1520', 'macd1226': 'macd1226', 'MFI': 'MFI', 'MOM10': 'MOM10', 'MOM15': 'MOM15',
'MOM20': 'MOM20', 'ROC5': 'ROC5', 'ROC10': 'ROC10', 'ROC20': 'ROC20', 'PPO': 'PPO',
'RSI14': 'RSI14', 'RSI8': 'RSI8', 'slowk': 'slowk', 'slowd': 'slowd', 'fastk': 'fastk',
'fastd': 'fastd', 'fastksr': 'fastksr', 'fastdsr': 'fastdsr', 'ULTOSC': 'ULTOSC', 'WILLR': 'WILLR',
'ATR': 'ATR', 'Trange': 'Trange', 'TYPPRICE': 'TYPPRICE', 'HT_DCPERIOD': 'HT_DCPERIOD',
'BETA': 'BETA', 'unix_timestamp': 'unix_timestamp'}},
{'path': r'D:\TradingData\ticker data\finished data - Copy\binance',
'headers_map': {'unix_timestamp': 'unix_timestamp', 'Symbol': 'Symbol', 'Open': 'Open', 'High': 'High',
'Low': 'Low', 'Close': 'Close', 'volume': 'volume', 'volume standard': 'volume standard',
'tradecount': 'tradecount'}},
{'path': r'D:\TradingData\ticker data\finished data - Copy\binance trade data',
'headers_map': {'unix_timestamp': 'unix_timestamp', 'Cryptocurrency Pair': 'Cryptocurrency Pair',
'Open Price': 'Open Price', 'High Price': 'High Price', 'Low Price': 'Low Price',
'Close Price': 'Close Price', 'Trading Volume': 'Trading Volume',
'Weighted Average Price': 'Weighted Average Price', 'Number of Trades': 'Number of Trades'}},
{'path': r'D:\TradingData\ticker data\finished data - Copy\bitfinex',
'headers_map': {'unix_timestamp': 'unix_timestamp', 'symbol': 'symbol', 'open': 'open', 'high': 'high',
'low': 'low', 'close': 'close', 'volume': 'volume', 'volume standard': 'volume standard'}},
{'path': r'D:\TradingData\ticker data\finished data - Copy\cleaned_data\day and hour',
'headers_map': {'unix_timestamp': 'unix_timestamp', 'symbol': 'symbol', 'open': 'open', 'close': 'close', 'high': 'high', 'low': 'low',
'volume': 'volume'}},
{'path': r'D:\TradingData\ticker data\finished data - Copy\New folder (2)',
'headers_map': {'price': 'price', 'open': 'open', 'high': 'high', 'low': 'low', 'volume': 'volume',
'change %': 'change %', 'unix_timestamp': 'unix_timestamp'}},
{'path': r'D:\TradingData\ticker data\finished data - Copy\P New_folder (2)',
'headers_map': {'ADX10': 'ADX10', 'ADX20': 'ADX20', 'ADX5': 'ADX5', 'APO': 'APO', 'ATR': 'ATR', 'BETA': 'BETA',
'CCI10': 'CCI10', 'CCI15': 'CCI15', 'CCI5': 'CCI5', 'close': 'close', 'ema10': 'ema10',
'ema15': 'ema15', 'ema20': 'ema20', 'ema5': 'ema5', 'fastd': 'fastd', 'fastdsr': 'fastdsr',
'fastk': 'fastk', 'fastksr': 'fastksr', 'high': 'high', 'HT_DCPERIOD': 'HT_DCPERIOD',
'HT_TRENDLINE': 'HT_TRENDLINE', 'KAMA10': 'KAMA10', 'KAMA20': 'KAMA20', 'KAMA30': 'KAMA30',
'low': 'low', 'lowerband': 'lowerband', 'macd1020': 'macd1020', 'macd1226': 'macd1226',
'macd1520': 'macd1520', 'macd510': 'macd510', 'macd520': 'macd520', 'MFI': 'MFI',
'middleband': 'middleband', 'MOM10': 'MOM10', 'MOM15': 'MOM15', 'MOM20': 'MOM20', 'open': 'open',
'PPO': 'PPO', 'ROC10': 'ROC10', 'ROC20': 'ROC20', 'ROC5': 'ROC5', 'RSI14': 'RSI14', 'RSI8': 'RSI8',
'SAR': 'SAR', 'slowd': 'slowd', 'slowk': 'slowk', 'sma10': 'sma10', 'sma15': 'sma15',
'sma20': 'sma20', 'sma5': 'sma5', 'Trange': 'Trange', 'TRIMA10': 'TRIMA10', 'TRIMA20': 'TRIMA20',
'TRIMA5': 'TRIMA5', 'TYPPRICE': 'TYPPRICE', 'ULTOSC': 'ULTOSC', 'unix_timestamp': 'unix_timestamp',
'upperband': 'upperband', 'volume': 'volume', 'WILLR': 'WILLR'}},
{'path': r'D:\TradingData\ticker data\finished data - Copy\P timeseries-1d-stocks\data',
'headers_map': {'adj_close': 'adj_close', 'close': 'close', 'high': 'high', 'low': 'low', 'open': 'open',
'symbol': 'symbol', 'unix_timestamp': 'unix_timestamp', 'volume': 'volume'}},
{'path': r'D:\TradingData\ticker data\finished data - Copy\P timeseries-1m-stocks\data',
'headers_map': {'close': 'close', 'high': 'high', 'low': 'low', 'open': 'open', 'symbol': 'symbol',
'unix_timestamp': 'unix_timestamp', 'volume': 'volume'}}]
final_columns = [
'open', 'high', 'low', 'close', 'volume', 'unix_timestamp', 'Symbol',
'volume standard', 'tradecount', 'Cryptocurrency Pair', 'Weighted Average Price',
'Number of Trades', 'price', 'change %', 'ADX10', 'ADX20', 'ADX5', 'APO', 'ATR',
'BETA', 'CCI10', 'CCI15', 'CCI5', 'ema10', 'ema15', 'ema20', 'ema5', 'fastd',
'fastdsr', 'fastk', 'fastksr', 'HT_DCPERIOD', 'HT_TRENDLINE', 'KAMA10',
'KAMA20', 'KAMA30', 'lowerband', 'macd1020', 'macd1226', 'macd1520', 'macd510',
'macd520', 'MFI', 'middleband', 'MOM10', 'MOM15', 'MOM20', 'PPO', 'ROC10',
'ROC20', 'ROC5', 'RSI14', 'RSI8', 'SAR', 'slowd', 'slowk', 'sma10', 'sma15',
'sma20', 'sma5', 'Trange', 'TRIMA10', 'TRIMA20', 'TRIMA5', 'TYPPRICE', 'ULTOSC',
'upperband', 'WILLR', 'adj_close'
]
client = pymongo.MongoClient(MONGO_URI)
db = client[DB_NAME]
for dataset in datasets:
collection_name = dataset["path"].split("\\")[-1]
collection = db[collection_name]
if dataset["path"].endswith(".csv"):
files = [f for f in os.listdir(dataset["path"]) if f.endswith(".csv")]
for f in files:
with open(os.path.join(dataset["path"], f), 'r') as file:
reader = csv.DictReader(file)
data = [{dataset["headers_map"].get(k, k): v for k, v in row.items()} for row in reader]
collection.insert_many(data)
elif dataset["path"].endswith(".parquet"):
files = [f for f in os.listdir(dataset["path"]) if f.endswith(".parquet")]
for f in files:
table = pq.read_table(os.path.join(dataset["path"], f))
df = table.to_pandas()
data = df.to_dict(orient="records")
collection.insert_many(data)
for col in final_columns:
collection.update_many({}, [{"$set": {col: f"${col}"}}])
print("Data loaded to MongoDB")
'''
import csv
import os
import pyarrow.parquet as pq
import pymongo
# MongoDB connection settings
MONGO_URI = "mongodb://localhost:27017/"
DB_NAME = "market_data"
datasets = [
{'path': r'D:\TradingData\ticker data\finished data - Copy\archive',
'headers_map': {'open': 'open', 'high': 'high', 'low': 'low', 'close': 'close', 'volume': 'volume',
'unix_timestamp': 'unix_timestamp'}},
# Add other datasets here
]
final_columns = [
'open', 'high', 'low', 'close', 'volume', 'unix_timestamp'
# Add other final columns here
]
client = pymongo.MongoClient(MONGO_URI)
db = client[DB_NAME]
for dataset in datasets:
collection_name = dataset["path"].split("\\")[-1]
collection = db[collection_name]
if dataset["path"].endswith(".csv"):
files = [f for f in os.listdir(dataset["path"]) if f.endswith(".csv")]
for f in files:
with open(os.path.join(dataset["path"], f), 'r') as file:
reader = csv.DictReader(file)
data = [{dataset["headers_map"].get(k, k): v for k, v in row.items()} for row in reader]
collection.insert_many(data)
elif dataset["path"].endswith(".parquet"):
files = [f for f in os.listdir(dataset["path"]) if f.endswith(".parquet")]
for f in files:
table = pq.read_table(os.path.join(dataset["path"], f))
df = table.to_pandas()
data = df.to_dict(orient="records")
collection.insert_many(data)
# Move the update operation here, inside the loop
for col in final_columns:
collection.update_many({}, [{"$set": {col: f"${col}"}}])
print("Data loaded to MongoDB")
试试这个如果有效的话