我每秒在Windows控制台上连续显示JSON块流10次。它们具有以下模式。
...
..
{
"acl_chk_fail": 0,
"audits_received": 0,
"blocks_ack": 337,
"connect_events": 2,
"connected_clients": 1,
"disconnect_events": 1,
"dup_ack": 78226,
"dup_ack_fail": 0,
"dup_drop": 0,
"hdr_chk_fail": 1,
"historical_messages_dropped": 0,
"messages_blocked": 34127,
"messages_received": 112353,
"messages_verified": 112353,
"new_ack": 32522,
"new_ack_fail": 1504,
"pub_block": 337,
"pub_msg": 34026,
"sig_chk_fail": 0
}
{
"acl_chk_fail": 0,
"audits_received": 0,
"blocks_ack": 690,
"connect_events": 74,
"connected_clients": 2,
"disconnect_events": 86,
"dup_ack": 2549130,
"dup_ack_fail": 2217,
"dup_drop": 0,
"hdr_chk_fail": 72,
"historical_messages_dropped": 0,
"messages_blocked": 77526,
"messages_received": 2629872,
"messages_verified": 2628902,
"new_ack": 71157,
"new_ack_fail": 6369,
"pub_block": 690,
"pub_msg": 77526,
"sig_chk_fail": 0
}
..
..
我需要将控制台上显示的每个JSON块插入到本地MySQL数据库中。我创建了MySQL dB,如下所示:
CREATE DATABASE r_q_arch;
USE r_q_arch;
CREATE TABLE IF NOT EXISTS json_data(
record_id INTEGER PRIMARY KEY NOT NULL auto_increment,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP() , -- Eg: 2019-12-16 16:18:18.910
acl_chk_fail INTEGER,
audits_received INTEGER,
blocks_ack INTEGER,
connect_events INTEGER,
connected_clients INTEGER,
disconnect_events INTEGER,
dup_ack INTEGER,
dup_ack_fail INTEGER,
dup_drop INTEGER,
hdr_chk_fail INTEGER,
historical_messages_dropped INTEGER,
messages_blocked INTEGER,
messages_received INTEGER,
messages_verified INTEGER,
new_ack INTEGER,
new_ack_fail INTEGER,
pub_block INTEGER,
pub_msg INTEGER,
sig_chk_fail INTEGER
);
COMMIT;
然后我使用Python脚本手动插入一条记录,以检查我的MySQL连接是否正常工作。是的。
from datetime import datetime
currentDT = datetime.now()
# path for module 'MySQLdb': C:\Python37\Lib\site-packages
import sys
sys.path.append('C:\Python37\Lib\site-packages')
import MySQLdb
conn = MySQLdb.connect(host='localhost', user='root', passwd='', db='r_q_arch')
c = conn.cursor()
sql_stmt = """INSERT INTO json_data VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """
values = (1, '2019-12-16 16:14:12.234543',
0,
0,
690,
74,
2,
86,
2549130,
2217,
0,
72,
0,
77526,
2629872,
2628902,
1157,
6369,
690,
77526,
0)
c.execute(sql_stmt,values) #https://stackoverflow.com/questions/41028774/python-mysql-mysql-exceptions-programmingerror-1064-you-have-an-error-in-y
conn.commit()
c.execute("""SELECT * FROM json_data;""")
rows = c.fetchall()
for eachRow in rows:
print(eachRow)
我看到的最接近的资源涉及从Twitter API插入流式JSON数据(资源链接:https://towardsdatascience.com/streaming-twitter-data-into-a-mysql-database-d62a02b050d6)。但是,我的流JSON数据不是来自API,而是使用以下Python脚本在控制台上生成的。如何继续将控制台上显示的流JSON数据插入本地MySQL数据库? JSON数据的键和值具有第一个代码段中所述的形式。
import json
import zmq
topic = b'transaction-1m'
ctxt = zmq.Context.instance()
stats_sink = ctxt.socket(zmq.SUB)
stats_sink.subscribe(topic)
stats_sink.connect('tcp://my_ip_address:port’)
try:
while True:
tpc, msg = stats_sink.recv_multipart()
msg = json.loads(str(msg, encoding='ascii'))
print(json.dumps(msg, indent=True, sort_keys=True))
finally:
stats_sink.close()
ctxt.destroy()
在最后一个代码段的while
循环中:我写道:
sig_chk_fail = jsondump["sig_chk_fail"]
sql_query= '''INSERT INTO json_data (acl_chk_fail,audits_received,blocks_ack,connect_events,connected_clients,disconnect_events,
dup_ack, dup_ack_fail,dup_drop,hdr_chk_fail, historical_messages_dropped,messages_blocked,messages_received,messages_verified, new_ack,
new_ack_fail,pub_block,pub_msg,sig_chk_fail ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'''
values= [acl_chk_fail,audits_received,blocks_ack,connect_events,connected_clients,disconnect_events, dup_ack, dup_ack_fail,dup_drop,
hdr_chk_fail, historical_messages_dropped,messages_blocked,messages_received,messages_verified, new_ack, new_ack_fail,pub_block,pub_msg,sig_chk_fail]
c.execute(sql_query, values)
conn.commit()