我现在用谷歌搜索了一段时间,但我无法让它工作。我的方法是使用多线程软件收集一个线程上的数据,然后在第一个线程继续收集数据时在第二个线程中执行Mysql的批量查询。目标应该是使用尽可能少的RAM来存储数百万个插件。在我的数据收集过程中,我使用一个MYSQLInsertThread对象,我插入数据如下:
String[] types = {"Long", "Long"};
int[] pos = {1, 1};
Object[] values = {123, 456};
nodeTagsInsertThread.addBatch(types, pos, values);
第一个调用有效,如果我将批量计数设置为100,我的数据库中有104个条目,但这是我班级生成的唯一一个条目(它应该导入500万个托管!
public class MYSQLInsertThread implements Runnable
{
private Thread t;
private String threadName = "MYSQL InsertThread";
private String query;
private PreparedStatement pstmt;
private PreparedStatement pstmt2;
private long batchCount;
private long maxBatchAmount;
private Boolean pstmt1Active = true;
private Boolean isRunning = false;
public MYSQLInsertThread(String name, String query, Connection conn, int maxBatchAmount) throws SQLException
{
threadName = name;
this.pstmt = conn.prepareStatement(query);
this.pstmt2 = conn.prepareStatement(query);
this.query = query;
this.maxBatchCount = maxBatchAmount;
System.out.println("Creating Thread: " + name);
}
public synchronized void addBatch(String[] types, int[] positions, Object[] values) throws SQLException
{
PreparedStatement _pstmt;
if(pstmt1Active)
{
_pstmt = pstmt;
}
else
{
_pstmt = pstmt2;
}
if(_pstmt != null)
{
for(int i=0;i<types.length; i++)
{
switch(types[i])
{
case "string":
case "String":
_pstmt.setString(positions[i], (String)values[i]);
break;
case "long":
case "Long":
_pstmt.setLong(positions[i], (long)values[i]);
break;
case "int":
case "Integer":
_pstmt.setInt(positions[i], (int)values[i]);
break;
case "double":
case "Double":
_pstmt.setDouble(positions[i], (double)values[i]);
break;
default:
break;
}
}
_pstmt.addBatch();
batchCount++;
if(batchCount % maxBatchCount == 0)
{
System.out.println("["+ this.threadName +"]Adding " + batchCount + " Entrys to DB" );
this.executeBatch();
}
}
else
{
System.err.println("[MYSQLInsertThread]Error PreparedStatment is NULL, Parameter could not be added");
}
}
public synchronized void executeBatch()
{
PreparedStatement _pstmt;
if(pstmt1Active)
{
_pstmt = pstmt;
}
else
{
_pstmt = pstmt2;
}
if(_pstmt != null)
{
if(isRunning)System.out.println("Waiting for previous Batch Execution to finish");
while(isRunning)
{
System.out.print(".");
try {
Thread.sleep(100);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
this.start();
System.out.println("Execution Started Successfully");
}
else
{
System.err.println("[" + this.threadName + "]PSTMT is NULL");
}
}
@Override
public void run()
{
PreparedStatement _pstmt;
if(pstmt1Active)
{
_pstmt = pstmt;
}
else
{
_pstmt = pstmt2;
}
if(_pstmt != null)
{
isRunning = true;
pstmt1Active = !pstmt1Active;
try
{
_pstmt.executeBatch();
}
catch(Exception e)
{
e.printStackTrace();
}
isRunning = false;
}
}
public void start()
{
if(t == null)
{
t = new Thread(this, threadName);
t.start();
}
}
}
我现在通过youtube视频搜索了一些解决方案:D
我创建了一个新的Class,它持有我的PreparedStatements。如果一个Statement正在执行,则会正常设置一个标志。如果第二个Statemet想要立即执行Batch,但前一个Statement仍在运行,则会使收集线程等待写入过程的通知。完成执行语句后,写入过程通知!这是我的代码:
public class ThreadPreparedStatement
{
private String query;
private String threadName;
private PreparedStatement pstmt1;
private PreparedStatement pstmt2;
private boolean pstmt1Active;
private boolean isRunning;
private long maxBatchAmount;
private long batchCount;
public ThreadPreparedStatement(String threadName, String query, Connection conn, long maxBatchAmount) throws SQLException
{
this.threadName = threadName;
this.pstmt1 = conn.prepareStatement(query);
this.pstmt2 = conn.prepareStatement(query);
this.maxBatchAmount = maxBatchAmount;
this.query = query;
this.batchCount = 0;
}
public synchronized void addParameters(String[] types, int[] positions, Object[] values) throws SQLException
{
PreparedStatement _pstmt;
if(pstmt1Active)
{
_pstmt = pstmt1;
}
else
{
_pstmt = pstmt2;
}
if(_pstmt != null)
{
for(int i=0;i<types.length; i++)
{
switch(types[i])
{
case "string":
case "String":
_pstmt.setString(positions[i], (String)values[i]);
break;
case "long":
case "Long":
_pstmt.setLong(positions[i], (long)values[i]);
break;
case "int":
case "Integer":
_pstmt.setInt(positions[i], (int)values[i]);
break;
case "double":
case "Double":
_pstmt.setDouble(positions[i], (double)values[i]);
break;
default:
break;
}
}
_pstmt.addBatch();
batchCount++;
if(batchCount % maxBatchAmount == 0)
{
Thread t = new Thread(new Runnable()
{
@Override
public void run()
{
try
{
executeBatch(_pstmt);
}catch(Exception e)
{
e.printStackTrace();
}
}
});
t.start();
}
}
else
{
System.err.print("Error Prepared Statements are both NULL");
}
}
public synchronized void executeAllBatches()
{
try
{
if(isRunning)
{
try
{
wait(); //wait for finish execution
}catch(InterruptedException e)
{
e.printStackTrace();
}
}
isRunning = true;
pstmt1.executeBatch();
pstmt2.executeBatch();
isRunning = false;
notify();
}catch(Exception e)
{
}
}
public synchronized void executeBatch(PreparedStatement _pstmt) throws SQLException
{
System.out.println("["+ this.threadName +"]Adding " + batchCount + " Entrys to DB" );
if(isRunning)
{
try
{
wait(); //wait for finish execution
}catch(InterruptedException e)
{
e.printStackTrace();
}
}
pstmt1Active = !pstmt1Active;
isRunning = true;
_pstmt.executeBatch(); //execute Query Batch
isRunning = false;
notify(); //notify that Batch is executed
}
}