用于Mysql插入的Java多线程

问题描述 投票:0回答:1

我现在用谷歌搜索了一段时间,但我无法让它工作。我的方法是使用多线程软件收集一个线程上的数据,然后在第一个线程继续收集数据时在第二个线程中执行Mysql的批量查询。目标应该是使用尽可能少的RAM来存储数百万个插件。在我的数据收集过程中,我使用一个MYSQLInsertThread对象,我插入数据如下:

String[] types = {"Long", "Long"};
int[] pos = {1, 1};
Object[] values = {123, 456};

nodeTagsInsertThread.addBatch(types, pos, values);

第一个调用有效,如果我将批量计数设置为100,我的数据库中有104个条目,但这是我班级生成的唯一一个条目(它应该导入500万个托管!

public class MYSQLInsertThread  implements Runnable
{
    private Thread t;
    private String threadName = "MYSQL InsertThread";
    private String query;

    private PreparedStatement pstmt;
    private PreparedStatement pstmt2;

    private long batchCount;
    private long maxBatchAmount;

    private Boolean pstmt1Active = true;
    private Boolean isRunning = false;

    public MYSQLInsertThread(String name, String query, Connection conn, int maxBatchAmount) throws SQLException
    {
        threadName = name;
        this.pstmt = conn.prepareStatement(query);
        this.pstmt2 = conn.prepareStatement(query);
        this.query = query;
        this.maxBatchCount = maxBatchAmount;
        System.out.println("Creating Thread: " + name);
    }

    public synchronized void addBatch(String[] types, int[] positions, Object[] values) throws SQLException
    {
        PreparedStatement _pstmt;
        if(pstmt1Active) 
        {
            _pstmt = pstmt;
        }
        else 
        {
            _pstmt = pstmt2;
        }

        if(_pstmt != null)
        {
            for(int i=0;i<types.length; i++) 
            {
                switch(types[i]) 
                {
                    case "string":
                    case "String":
                        _pstmt.setString(positions[i], (String)values[i]);
                        break;
                    case "long":
                    case "Long":
                        _pstmt.setLong(positions[i], (long)values[i]);
                        break;
                    case "int":
                    case "Integer":
                        _pstmt.setInt(positions[i], (int)values[i]);
                        break;
                    case "double":
                    case "Double":
                        _pstmt.setDouble(positions[i], (double)values[i]);
                        break;
                    default:
                        break;
                }
            }
            _pstmt.addBatch();
            batchCount++;

            if(batchCount % maxBatchCount == 0) 
            {
                System.out.println("["+ this.threadName +"]Adding " + batchCount + " Entrys to DB" );
                this.executeBatch();
            }
        }
        else 
        {
            System.err.println("[MYSQLInsertThread]Error PreparedStatment is NULL, Parameter could not be added");
        }
    }

    public synchronized void executeBatch() 
    {
        PreparedStatement _pstmt;
        if(pstmt1Active) 
        {
            _pstmt = pstmt;
        }
        else 
        {
            _pstmt = pstmt2;
        }

        if(_pstmt != null) 
        {
            if(isRunning)System.out.println("Waiting for previous Batch Execution to finish");
            while(isRunning) 
            {
                System.out.print(".");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) 
                {
                    e.printStackTrace();
                }
            }
            this.start();
            System.out.println("Execution Started Successfully");
        }
        else 
        {
            System.err.println("[" + this.threadName + "]PSTMT is NULL");
        }
    }

    @Override
    public void run() 
    {
        PreparedStatement _pstmt;

        if(pstmt1Active) 
        {
            _pstmt = pstmt;
        }
        else 
        {
            _pstmt = pstmt2;
        }

        if(_pstmt != null) 
        {
            isRunning = true;   

            pstmt1Active = !pstmt1Active;

            try 
            {
                _pstmt.executeBatch();
            }
            catch(Exception e) 
            {
                e.printStackTrace();
            }

            isRunning = false;  
        }
    }

    public void start()
    {
        if(t == null)
        {
            t = new Thread(this, threadName);
            t.start();
        }
    }

}
java mysql multithreading
1个回答
0
投票

我现在通过youtube视频搜索了一些解决方案:D

我创建了一个新的Class,它持有我的PreparedStatements。如果一个Statement正在执行,则会正常设置一个标志。如果第二个Statemet想要立即执行Batch,但前一个Statement仍在运行,则会使收集线程等待写入过程的通知。完成执行语句后,写入过程通知!这是我的代码:

public class ThreadPreparedStatement 
{
    private String query;
    private String threadName;

    private PreparedStatement pstmt1;
    private PreparedStatement pstmt2;

    private boolean pstmt1Active;
    private boolean isRunning;

    private long maxBatchAmount;
    private long batchCount;

    public ThreadPreparedStatement(String threadName, String query, Connection conn, long maxBatchAmount) throws SQLException
    {
        this.threadName = threadName;

        this.pstmt1 = conn.prepareStatement(query);
        this.pstmt2 = conn.prepareStatement(query);

        this.maxBatchAmount = maxBatchAmount;
        this.query = query;

        this.batchCount = 0;
    }

    public synchronized void addParameters(String[] types, int[] positions, Object[] values) throws SQLException
    {

        PreparedStatement _pstmt;
        if(pstmt1Active) 
        {
            _pstmt = pstmt1;
        }
        else 
        {
            _pstmt = pstmt2;
        }

        if(_pstmt != null)
        {
            for(int i=0;i<types.length; i++) 
            {
                switch(types[i]) 
                {
                    case "string":
                    case "String":
                        _pstmt.setString(positions[i], (String)values[i]);
                        break;
                    case "long":
                    case "Long":
                        _pstmt.setLong(positions[i], (long)values[i]);
                        break;
                    case "int":
                    case "Integer":
                        _pstmt.setInt(positions[i], (int)values[i]);
                        break;
                    case "double":
                    case "Double":
                        _pstmt.setDouble(positions[i], (double)values[i]);
                        break;
                    default:
                        break;
                }
            }
            _pstmt.addBatch();
            batchCount++;

            if(batchCount % maxBatchAmount == 0) 
            {
                Thread t = new Thread(new Runnable() 
                {

                    @Override
                    public void run() 
                    {
                        try 
                        {
                            executeBatch(_pstmt);
                        }catch(Exception e) 
                        {
                            e.printStackTrace();
                        }
                    }
                });
                t.start();              
            }
        }
        else 
        {
            System.err.print("Error Prepared Statements are both NULL");
        }
    }


    public synchronized void executeAllBatches() 
    {
        try 
        {
            if(isRunning) 
            {
                try 
                {
                    wait(); //wait for finish execution
                }catch(InterruptedException e) 
                {
                    e.printStackTrace();
                }                   
            }
            isRunning = true;

            pstmt1.executeBatch();
            pstmt2.executeBatch();


            isRunning = false;
            notify();
        }catch(Exception e) 
        {

        }
    }


    public synchronized void executeBatch(PreparedStatement _pstmt) throws SQLException 
    {
        System.out.println("["+ this.threadName +"]Adding " + batchCount + " Entrys to DB" );
        if(isRunning) 
        {
            try 
            {
                wait(); //wait for finish execution
            }catch(InterruptedException e) 
            {
                e.printStackTrace();
            }           
        }

        pstmt1Active = !pstmt1Active;
        isRunning = true;
        _pstmt.executeBatch();  //execute Query Batch

        isRunning = false;
        notify();               //notify that Batch is executed
    }

}
© www.soinside.com 2019 - 2024. All rights reserved.