如何从控制台插入JSON数据流到MySQL数据库

问题描述 投票:-1回答:1

我每秒在Windows控制台上连续显示JSON块流10次。它们具有以下模式。

...
..
{
 "acl_chk_fail": 0,
 "audits_received": 0,
 "blocks_ack": 337,
 "connect_events": 2,
 "connected_clients": 1,
 "disconnect_events": 1,
 "dup_ack": 78226,
 "dup_ack_fail": 0,
 "dup_drop": 0,
 "hdr_chk_fail": 1,
 "historical_messages_dropped": 0,
 "messages_blocked": 34127,
 "messages_received": 112353,
 "messages_verified": 112353,
 "new_ack": 32522, 
 "new_ack_fail": 1504,
 "pub_block": 337,
 "pub_msg": 34026,
 "sig_chk_fail": 0
}
{
 "acl_chk_fail": 0,
 "audits_received": 0,
 "blocks_ack": 690,
 "connect_events": 74,
 "connected_clients": 2,
 "disconnect_events": 86,
 "dup_ack": 2549130,
 "dup_ack_fail": 2217,
 "dup_drop": 0,
 "hdr_chk_fail": 72,
 "historical_messages_dropped": 0,
 "messages_blocked": 77526,
 "messages_received": 2629872,
 "messages_verified": 2628902,
 "new_ack": 71157,
 "new_ack_fail": 6369,
 "pub_block": 690,
 "pub_msg": 77526,
 "sig_chk_fail": 0
}
..
..

我需要将控制台上显示的每个JSON块插入到本地MySQL数据库中。我创建了MySQL dB,如下所示:

CREATE DATABASE r_q_arch;
USE r_q_arch;

CREATE TABLE IF NOT EXISTS json_data(
    record_id INTEGER PRIMARY KEY NOT NULL auto_increment,
    recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP() , --  Eg: 2019-12-16 16:18:18.910
    acl_chk_fail INTEGER,
    audits_received INTEGER,
    blocks_ack INTEGER,
    connect_events INTEGER,
    connected_clients INTEGER,
    disconnect_events INTEGER,
    dup_ack INTEGER,
    dup_ack_fail INTEGER,
    dup_drop INTEGER,
    hdr_chk_fail INTEGER,
    historical_messages_dropped INTEGER,
    messages_blocked INTEGER,
    messages_received INTEGER,
    messages_verified INTEGER,
    new_ack INTEGER,
    new_ack_fail INTEGER,
    pub_block INTEGER,
    pub_msg INTEGER,
    sig_chk_fail INTEGER
);

COMMIT;

然后我使用Python脚本手动插入一条记录,以检查我的MySQL连接是否正常工作。是的。

from datetime import datetime
currentDT = datetime.now()

# path for module 'MySQLdb': C:\Python37\Lib\site-packages
import sys
sys.path.append('C:\Python37\Lib\site-packages')
import MySQLdb

conn = MySQLdb.connect(host='localhost', user='root', passwd='', db='r_q_arch')
c = conn.cursor()

sql_stmt = """INSERT INTO json_data VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """

values = (1, '2019-12-16 16:14:12.234543',
0,
0,
690,
74,
2,
86,
2549130,
2217,
0,
72,
0,
77526,
2629872,
2628902,
1157,
6369,
690,
77526,
0)
c.execute(sql_stmt,values) #https://stackoverflow.com/questions/41028774/python-mysql-mysql-exceptions-programmingerror-1064-you-have-an-error-in-y

conn.commit()
c.execute("""SELECT * FROM json_data;""")
rows = c.fetchall()
for eachRow in rows:
    print(eachRow)

我看到的最接近的资源涉及从Twitter API插入流式JSON数据(资源链接:https://towardsdatascience.com/streaming-twitter-data-into-a-mysql-database-d62a02b050d6)。但是,我的流JSON数据不是来自API,而是使用以下Python脚本在控制台上生成的。如何继续将控制台上显示的流JSON数据插入本地MySQL数据库? JSON数据的键和值具有第一个代码段中所述的形式。

import json
import zmq

topic = b'transaction-1m'

ctxt = zmq.Context.instance()
stats_sink = ctxt.socket(zmq.SUB)
stats_sink.subscribe(topic)
stats_sink.connect('tcp://my_ip_address:port’)

try:
  while True:
    tpc, msg = stats_sink.recv_multipart()
    msg = json.loads(str(msg, encoding='ascii'))
    print(json.dumps(msg, indent=True, sort_keys=True))
finally:
  stats_sink.close()
  ctxt.destroy()
python mysql json console streaming
1个回答
1
投票

在最后一个代码段的while循环中:我写道:

sig_chk_fail = jsondump["sig_chk_fail"]

    sql_query= '''INSERT INTO json_data (acl_chk_fail,audits_received,blocks_ack,connect_events,connected_clients,disconnect_events, 
    dup_ack, dup_ack_fail,dup_drop,hdr_chk_fail, historical_messages_dropped,messages_blocked,messages_received,messages_verified, new_ack, 
    new_ack_fail,pub_block,pub_msg,sig_chk_fail ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'''

    values= [acl_chk_fail,audits_received,blocks_ack,connect_events,connected_clients,disconnect_events, dup_ack, dup_ack_fail,dup_drop,
    hdr_chk_fail, historical_messages_dropped,messages_blocked,messages_received,messages_verified, new_ack, new_ack_fail,pub_block,pub_msg,sig_chk_fail]

    c.execute(sql_query, values)
    conn.commit()
© www.soinside.com 2019 - 2024. All rights reserved.