如何使用 DBD::Pg 和 Parallel::ForkManager 在多线程 Perl 中创建多个并行数据库连接?

问题描述 投票:0回答:2

已于 2021 年 7 月 21 日编辑如下

我有一个脚本,在其中连接到 PostgreSQL DB 并使用

Parallel::ForkManager
生成多个线程。

我创建一个数据库句柄,然后准备一个

SELECT
SQL 语句和一个
INSERT
SQL 语句。 我希望能够从
Parallel::ForkManager
生成的每个线程运行 SQL 语句,但它失败了,因为我无法在线程之间共享数据库句柄。

我需要采用下面的脚本(如果我生成多个线程,则该脚本会失败,但如果我只生成一个线程,则该脚本会起作用),并对其进行更改,以便每个线程都可以分别从数据库读取/写入数据库。

我知道我可以

clone
数据库句柄,但我也知道还有更多。

如何拥有并行数据库句柄/SQL 语句?

对于本文的长度,我深表歉意,但我需要提供尽可能完整的示例。

示例:

use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;

#@codes = ("A");                 # testing single thread
@codes = ("A","B","F","M","S"); # testing multi-thread

################################################################
# connect to db
################################################################
my $dsn    = 'DBI:Pg:dbname=DB_NAME';
my $userid = "DB_USERNAME";
my $sesame = "DB_PASSWORD";
my $dbh    = DBI->connect($dsn, $userid, $sesame, {
    AutoCommit => 1,
    RaiseError => 1,
    PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;

################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
    . "I am $me, "
    . "version of DBD::Pg is $DBD::Pg::VERSION, "
    . "server is $sversion\n";
print "Name: $dbh->{Name}\n";

my %columns;   # hash for persistent mapping of column-values
my @columns;   # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table

my $placeholders = join(", ", map { '?' } @columns);

################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
    qq(SELECT count(*) FROM mytable WHERE id = ?;);
# prepare the SELECT statement handle
my $sth_select_statement = $dbh->prepare_cached($sql_select_statement);

################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
      "INSERT INTO mytable ("
    . join(", ", @columns)       # column names
    . ") VALUES ($placeholders)";
# prepare the INSERT statement handle
my $sth_insert_statement = $dbh->prepare_cached($sql_insert_statement);

################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);

$optimization->run_on_start(sub{
    my ($pid,$ident) = @_;
    print "Starting $ident under process id $pid\n";
});

$optimization->run_on_finish(sub{
    my ($pid,
        $exit_code,
        $ident,
        $exit_signal,
        $core_dump,
        $data_structure_reference) = @_;
});

my $thread_count = 0;

OPTIMIZATION:
for my $code (@codes) {
    $thread_count++;
    print "Thread $thread_count running for $code\n";

    # fork optimization threads - per code
    $optimization->start($code) and next OPTIMIZATION;

    if ($code =~ m/A/i) {
        sub_a("A");
    } elsif ($code =~ m/B/i) {
        sub_b("B");
    } elsif ($code =~ m/F/i) {
        sub_f("F");
    } elsif ($code =~ m/M/i) {
        sub_m("M");
    } elsif ($code =~ m/S/i) {
        sub_s("S");
    }
    print "\$optimization->finish on child $code\n";
    $optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();

################################################################
sub sub_a {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to A
    $varset{field1} = 'a_f1'; # for illustrative purposes
    $varset{field2} = 'a_f2';
    $varset{field3} = 'a_f3';
    $varset{field4} = 'a_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_b {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to B
    $varset{field1} = 'b_f1'; # for illustrative purposes
    $varset{field2} = 'b_f2';
    $varset{field3} = 'b_f3';
    $varset{field4} = 'b_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_f {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to F
    $varset{field1} = 'f_f1'; # for illustrative purposes
    $varset{field2} = 'f_f2';
    $varset{field3} = 'f_f3';
    $varset{field4} = 'f_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_m {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to M
    $varset{field1} = 'm_f1'; # for illustrative purposes
    $varset{field2} = 'm_f2';
    $varset{field3} = 'm_f3';
    $varset{field4} = 'm_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_s {
################################################################
    my $code = shift; # Code
    my @values;
    
    # generate values specific to S
    $varset{field1} = 's_f1'; # for illustrative purposes
    $varset{field2} = 's_f2';
    $varset{field3} = 's_f3';
    $varset{field4} = 's_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub write_to_db {
################################################################
    my ($code_name, @values) = @_;
    
    my $rv_code = $sth_select_statement->execute($code_name);
    if($rv_code < 0) {
       print $DBI::errstr;
    }
    my @row = $sth_select_statement->fetchrow_array();
    # if the SELECT found no existing records for this strategy, then INSERT it
    unless ($row[0] > 0) {
        # INSERT settings into 'mytable'
        $sth_insert_statement->execute(@values);
    }
}

################################################################
sub set_columns {
################################################################
    $columns{code}   = 0;
    $columns{field1} = 1;
    $columns{field2} = 2;
    $columns{field3} = 3;
    $columns{field4} = 4;
    
    $columns[0] = 'code';
    $columns[1] = 'field1';
    $columns[2] = 'field2';
    $columns[3] = 'field3';
    $columns[4] = 'field4';
}

编辑2021-07-21:

我添加了

clone_dbh
create_dbh
子例程来尝试不同的方法。都不起作用。我仍然遇到同样的错误 -
can't share DBH between threads
。除了放弃子例程并为
Parallel::ForkManager
生成的每个线程编写重复的代码之外,我不知道还能做什么,我真的不想这样做。

示例2:

use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;

#@codes = ("A");                 # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;

################################################################
# connect to db
################################################################
my $dsn    = 'DBI:Pg:dbname=$ENV{DB_NAME}';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};

my %dbh;   # hash for storing dbh handles

my $dbh    = DBI->connect($dsn, $userid, $sesame, {
    AutoCommit => 1,
    RaiseError => 1,
    PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;

################################################################
# test db connection
################################################################
my $me = $dbh->{Driver}{Name};
my $sversion = $dbh->{pg_server_version};
print "DBI is version $DBI::VERSION, "
    . "I am $me, "
    . "version of DBD::Pg is $DBD::Pg::VERSION, "
    . "server is $sversion\n";
print "Name: $dbh->{Name}\n";

################################################################
# prepare array and hash for matching db columns
################################################################
my %columns;   # hash for persistent mapping of column-values
my @columns;   # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table

my $placeholders = join(", ", map { '?' } @columns);

################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
    qq(SELECT count(*) FROM mytable WHERE id = ?;);

################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
      "INSERT INTO mytable ("
    . join(", ", @columns)       # column names
    . ") VALUES ($placeholders)";

################################################################################################
# create clones of database handle and SQL statements for threads
################################################################################################
my %sth_select_code;
my %sth_insert_code;

#for my $code (@codes) {
#    $dbh{$code} = $dbh->clone();
#    # prepare the SELECT statement handle
#    $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
#    # prepare the INSERT statement handle
#    $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
#}

################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);

$optimization->run_on_start(sub{
    my ($pid,$ident) = @_;
    print "Starting $ident under process id $pid\n";
});

$optimization->run_on_finish(sub{
    my ($pid,
        $exit_code,
        $ident,
        $exit_signal,
        $core_dump,
        $data_structure_reference) = @_;
});

my $thread_count = 0;

OPTIMIZATION:
for my $code (@codes) {
    $thread_count++;
    print "Thread $thread_count running for $code\n";

    # fork optimization threads - per code
    $optimization->start($code) and next OPTIMIZATION;

    if ($code =~ m/A/i) {
        sub_a("A");
    } elsif ($code =~ m/B/i) {
        sub_b("B");
    } elsif ($code =~ m/F/i) {
        sub_f("F");
    } elsif ($code =~ m/M/i) {
        sub_m("M");
    } elsif ($code =~ m/S/i) {
        sub_s("S");
    }
    print "\$optimization->finish on child $code\n";
    $optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();

################################################################################################
# disconnect from database
################################################################################################
for my $code (@codes) {
    $sth_select_code{$code}->finish();
    $sth_insert_code{$code}->finish();
    $dbh{$code}->disconnect;
}
$dbh->disconnect;

################################################################################################
# end
################################################################################################
exit;

################################################################
sub sub_a {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to A
    $varset{field2} = 'a_f1'; # for illustrative purposes
    $varset{field3} = 'a_f2';
    $varset{field4} = 'a_f3';
    $varset{field5} = 'a_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_b {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to B
    $varset{field2} = 'b_f1'; # for illustrative purposes
    $varset{field3} = 'b_f2';
    $varset{field4} = 'b_f3';
    $varset{field5} = 'b_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_f {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to F
    $varset{field2} = 'f_f1'; # for illustrative purposes
    $varset{field3} = 'f_f2';
    $varset{field4} = 'f_f3';
    $varset{field5} = 'f_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_m {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to M
    $varset{field2} = 'm_f1'; # for illustrative purposes
    $varset{field3} = 'm_f2';
    $varset{field4} = 'm_f3';
    $varset{field5} = 'm_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################
sub sub_s {
################################################################
    my $code = shift; # Code
    my @values;
#    clone_dbh($code);
    create_dbh($code);
#    my ($dbh, $sel_sth, $ins_sth) = create_dbh($code);
    
    # generate values specific to S
    $varset{field2} = 's_f1'; # for illustrative purposes
    $varset{field3} = 's_f2';
    $varset{field4} = 's_f3';
    $varset{field5} = 's_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
}

################################################################################
sub create_dbh {
################################################################################
    my $code = shift;
    
    $dbh{$code} = DBI->connect($dsn, $userid, $sesame, {
        AutoCommit => 1,
        RaiseError => 1,
        PrintError => 1
    }) or die "Connection failed!\n" . $DBI::errstr;
    
    # did it work? are we there yet?
    my $me = $dbh{$code}->{Driver}{Name};
    my $sversion = $dbh{$code}->{pg_server_version};
    print "DBI is version $DBI::VERSION, "
        . "I am $me, "
        . "version of DBD::Pg is $DBD::Pg::VERSION, "
        . "server is $sversion\n";
    print "Name: $dbh->{Name}\n";
    # prepare the SELECT statement handle
    $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
    # prepare the INSERT statement handle
    $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}

################################################################
sub clone_dbh {
################################################################
    my $code = shift;
    
    $dbh{$code} = $dbh->clone();
    # prepare the SELECT statement handle
    $sth_select_code{$code} = $dbh{$code}->prepare_cached($sql_select_statement);
    # prepare the INSERT statement handle
    $sth_insert_code{$code} = $dbh{$code}->prepare_cached($sql_insert_statement);
}

################################################################
sub write_to_db {
################################################################
    my ($code, @values) = shift @_;
    
    my $rv_code = $sth_select_code{$code}->execute($code);
    if($rv_code < 0) {
       print $DBI::errstr;
    }
    my @row = $sth_select_code{$code}->fetchrow_array();
    # if the SELECT found no existing records for this strategy, then INSERT it
    unless ($row[0] > 0) {
        # INSERT settings into 'mytable'
        $sth_insert_code{$code}->execute(@values);
    }
}

################################################################
sub set_columns {
################################################################
    $columns{field1} = 0;
    $columns{field2} = 1;
    $columns{field3} = 2;
    $columns{field4} = 3;
    $columns{field5} = 4;
    
    $columns[0] = 'field1';
    $columns[1] = 'field2';
    $columns[2] = 'field3';
    $columns[3] = 'field4';
    $columns[4] = 'field5';
}

我是否必须从调用

dbh
线程中显式地将
statement
create_dbh
句柄传递给
sub_x
子进程?

我是否必须将

handles
handle_refs
create_dbh
返回到调用
sub_x
线程?

我不知道如何解决这个问题,但这似乎是词法范围或对象/内存访问问题。

还有更多想法吗?

database postgresql multithreading perl clone
2个回答
2
投票

我不太确定你想要什么,但也许这些指示会有所帮助:

  • 准备好的语句位于创建它们的数据库会话的本地。因此,如果您希望在所有数据库会话中使用它们,则必须在每个数据库会话中准备它们。

  • 您始终可以并行运行语句,只要每个语句都在其自己的数据库会话中运行。

  • 您还可以对数据库内的单个查询进行并行化,以便多个数据库进程一起处理它。如果 PostgreSQL 配置正确并且优化器认为查询会受益,这种情况会自动发生。有关详细信息,请参阅文档


0
投票

我使用下面的代码让它工作:

use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use SQL::Abstract;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;

#my @codes = ("A");                 # testing single thread
my @codes = ("A","B","F","M","S"); # testing multi-thread
my %varset;

################################################################
# get db connection info
################################################################
my $dsn    = 'DBI:Pg:dbname=mt4_test';
my $userid = $ENV{DBI_USER};
my $sesame = $ENV{DBI_PASS};
my %dbh; # hash for storing dbh handles

################################################################
# prepare array and hash for matching db columns
################################################################
my %columns;   # hash for persistent mapping of column-values
my @columns;   # deterministic - $values[$columns{$column}];
set_columns(); # define column<->value mapping for db table

my $placeholders = join(", ", map { '?' } @columns);

################################################################
# Build the SELECT SQL statement
################################################################
my $sql_select_statement =
    qq(SELECT count(*) FROM mytable WHERE field1 = ?;);

################################################################
# Build the INSERT SQL statement
################################################################
my $sql_insert_statement =
      "INSERT INTO mytable ("
    . join(", ", @columns)        # column names
    . ") VALUES ($placeholders)";

################################################################
# hash for storing SQL statement handles for threads
################################################################
my %sth_select_code;
my %sth_insert_code;

################################################################
# create Parallel::ForkManager object for @codes
################################################################
my $optimization = Parallel::ForkManager->new(scalar @codes);

$optimization->run_on_start(sub{
    my ($pid,$ident) = @_;
    print "Starting $ident under process id $pid\n";
});

$optimization->run_on_finish(sub{
    my ($pid,
        $exit_code,
        $ident,
        $exit_signal,
        $core_dump,
        $data_structure_reference) = @_;
});

my $thread_count = 0;

OPTIMIZATION:
for my $code (@codes) {
    $thread_count++;
    print "Thread $thread_count running for $code\n";

    # fork optimization threads - per code
    if (scalar @codes > 1) {
        $optimization->start($code) and next OPTIMIZATION;
    } else {
        $optimization->start($code);
    }
    
    launch_sub($code);
    
    print "\$optimization->finish on child $code\n";
    $optimization->finish(0);
}
print "\$optimization->wait_all_children() is waiting...\n";
$optimization->wait_all_children();

################################################################
# THE END
################################################################
exit;
################################################################



################################################################
sub launch_sub {
################################################################
    my $code = shift;
    if ($code =~ m/A/i) {
        sub_a("A");
    } elsif ($code =~ m/B/i) {
        sub_b("B");
    } elsif ($code =~ m/F/i) {
        sub_f("F");
    } elsif ($code =~ m/M/i) {
        sub_m("M");
    } elsif ($code =~ m/S/i) {
        sub_s("S");
    }
}
################################################################
sub sub_a {
################################################################
    my $code = shift; # Code
    my @values;
    my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    
    # generate values specific to A
    $varset{field2} = 'a_f1'; # for illustrative purposes
    $varset{field3} = 'a_f2';
    $varset{field4} = 'a_f3';
    $varset{field5} = 'a_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
    disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}

################################################################
sub sub_b {
################################################################
    my $code = shift; # Code
    my @values;
    my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    
    # generate values specific to B
    $varset{field2} = 'b_f1'; # for illustrative purposes
    $varset{field3} = 'b_f2';
    $varset{field4} = 'b_f3';
    $varset{field5} = 'b_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
    disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}

################################################################
sub sub_f {
################################################################
    my $code = shift; # Code
    my @values;
    my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    
    # generate values specific to F
    $varset{field2} = 'f_f1'; # for illustrative purposes
    $varset{field3} = 'f_f2';
    $varset{field4} = 'f_f3';
    $varset{field5} = 'f_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
    disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}

################################################################
sub sub_m {
################################################################
    my $code = shift; # Code
    my @values;
    my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    
    # generate values specific to M
    $varset{field2} = 'm_f1'; # for illustrative purposes
    $varset{field3} = 'm_f2';
    $varset{field4} = 'm_f3';
    $varset{field5} = 'm_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
    disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}

################################################################
sub sub_s {
################################################################
    my $code = shift; # Code
    my @values;
    my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
    
    # generate values specific to S
    $varset{field2} = 's_f1'; # for illustrative purposes
    $varset{field3} = 's_f2';
    $varset{field4} = 's_f3';
    $varset{field5} = 's_f4';
    
    foreach my $key (keys %varset) {
        my $column = $key; # the column name as key
        my $value  = $varset{$key}; # the column value (field variable)
        $values[$columns{$column}] = $value; # add to list of column values
    }
    $values[0] = $code;
    write_to_db($code, @values);
    disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_code{$code});
}

################################################################
sub create_dbh {
################################################################
    my $dbh_ref = shift;
    my $sel_ref = shift;
    my $ins_ref = shift;
    
    ${$dbh_ref} = DBI->connect($dsn, $userid, $sesame, {
        AutoCommit => 1,
        RaiseError => 1,
        PrintError => 1
    }) or die "Connection failed!\n" . $DBI::errstr;
    
    # did it work? are we there yet?
    my $me = ${$dbh_ref}->{Driver}{Name};
    my $sversion = ${$dbh_ref}->{pg_server_version};
    print "DBI is version $DBI::VERSION, "
        . "I am $me, "
        . "version of DBD::Pg is $DBD::Pg::VERSION, "
        . "server is $sversion\n";
    print "Name: ${$dbh_ref}->{Name}\n";
    # prepare the SELECT statement handle
    ${$sel_ref} = ${$dbh_ref}->prepare_cached($sql_select_statement);
    # prepare the INSERT statement handle
    ${$ins_ref} = ${$dbh_ref}->prepare_cached($sql_insert_statement);
}

################################################################
sub write_to_db {
################################################################
    my ($code, @values) = @_;
    
    my $rv_code = $sth_select_code{$code}->execute($code);
    say "SQL SELECT for $code: rv_code = $rv_code";
    if($rv_code < 0) {
       print $DBI::errstr;
    }
    my @row = $sth_select_code{$code}->fetchrow_array();
    # if the SELECT found no existing records for this strategy, then INSERT it
    unless ($row[0] > 0) {
        # INSERT settings into 'mytable'
        $sth_insert_code{$code}->execute(@values);
        say "SQL INSERT for $code";
    }
}

################################################################
sub disconnect_dbh {
################################################################
    my $dbh_ref = shift;
    my $sel_ref = shift;
    my $ins_ref = shift;
    
    ${$sel_ref}->finish();
    ${$ins_ref}->finish();
    ${$dbh_ref}->disconnect;
    
    say "disconnected dbh_ref: $dbh_ref";
}

################################################################
sub set_columns {
################################################################
    $columns{field1} = 0;
    $columns{field2} = 1;
    $columns{field3} = 2;
    $columns{field4} = 3;
    $columns{field5} = 4;
    
    $columns[0] = 'field1';
    $columns[1] = 'field2';
    $columns[2] = 'field3';
    $columns[3] = 'field4';
    $columns[4] = 'field5';
}
© www.soinside.com 2019 - 2024. All rights reserved.