Asterisk AMI 呼叫由 python 发起,一次用于一组联系人

问题描述 投票:0回答:1

我的目标: 一次从数据库生成对一组联系人的呼叫(假设我在一组中有 1000 个联系人)。例如,我有 30 个并发通道,我希望代码应该同时发起对 30 个号码的呼叫,如果某个通道空闲,则再次调用将生成等于空闲通道数量的呼叫,并且该过程继续。

我在下面的代码中接受了什么: 它会生成一个接一个的呼叫,当一个呼叫被应答或掉线时,会生成下一个呼叫,但我不希望这样。

非常感谢专家的帮助。

import time
import asterisk.manager
import pymysql

# Assuming you have pymysql library installed, you can install it with: pip install pymysql

DB_HOST = '127.0.0.1'
DB_USER = 'root'
DB_PASSWORD = 'XXXX'
DB_NAME = 'abcd'

AMI_USERNAME = 'admin'
AMI_PASSWORD = 'XYZ'
AMI_HOST = '127.0.0.1'
AMI_PORT = 5038

TOTAL_CHANNELS_LIMIT = 30  # Set your total channel limit here

def connect_to_db():
    connection = pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, db=DB_NAME, cursorclass=pymysql.cursors.DictCursor)
    return connection

def connect_to_ami():
    manager = asterisk.manager.Manager()
    manager.connect(AMI_HOST, AMI_PORT)
    manager.login(AMI_USERNAME, AMI_PASSWORD)
    return manager

def check_active_channels(manager):
    response = manager.command('core show channels concise')
    active_channels = [line.split()[0] for line in response.data.split('\n') if line.strip()]
    return active_channels

def get_active_campaigns(connection):
    with connection.cursor() as cursor:
        cursor.execute("SELECT name as campaign_name, call_group, fixed_channels FROM vb_schedule_play WHERE status = 'active'")
        result = cursor.fetchall()
        active_campaigns = [{'name': row['campaign_name'], 'fixed_channels': row['fixed_channels'], 'call_group': row['call_group']} for row in result]
        return active_campaigns

def get_contacts_for_campaign(connection, campaign_name):
    with connection.cursor() as cursor:
        sql = f"SELECT phone as contact_number FROM contact_list WHERE group_name = 'sohub_group'  AND (status IS NULL OR status = '') LIMIT 1"
        cursor.execute(sql)
        result = cursor.fetchone()
        if result:
            return result['contact_number']
        else:
            return None

def update_contact_status(connection, contact_number, campaign_name, status):
    with connection.cursor() as cursor:
        sql = f"UPDATE contact_list SET status = '{status}' WHERE phone = '{contact_number}'"
        cursor.execute(sql)
        connection.commit()

def initiate_calls(manager, campaign_name, fixed_channels, connection, total_channels_limit=TOTAL_CHANNELS_LIMIT):
    if fixed_channels is None:
        fixed_channels = 0

    fixed_channels = int(fixed_channels) if fixed_channels is not None else 0

    if fixed_channels > 0:
        # Use fixed_channels for the specified campaign
        channels_to_use = min(fixed_channels, total_channels_limit)
    else:
        # Dynamic channel allocation for campaigns with fixed_channels equal to 0
        available_channels = check_active_channels(manager)
        channels_to_use = min(len(available_channels), total_channels_limit)

    total_channels_used = 0  # Initialize the total channels used

    for _ in range(channels_to_use):
        if total_channels_used >= total_channels_limit:
            print(f"Reached the total channel limit ({total_channels_limit}). Stopping further calls.")
            break

        contact_number = get_contacts_for_campaign(connection, campaign_name)
        if contact_number:
            full_channel = f'PJSIP/{contact_number}'
            manager.originate(
                channel=full_channel,
                exten='s',
                context='ami-contact-center',
                priority=1,
                caller_id='0123456789',
                variables={
                    'CallerID': '0123456789',
                    'CALLERID(all)': '0123456789',
                    'play_type': 'tts',
                    'play': 'hello this is a test call',
                },
                timeout=30000
            )
            update_contact_status(connection, contact_number, campaign_name, 'calling')
            total_channels_used += 1  # Increment the total channels used
            time.sleep(1)  # Add a delay to avoid overwhelming the system
        else:
            print(f"No more contacts available for campaign {campaign_name}")

def main():
    db_connection = connect_to_db()
    ami_manager = connect_to_ami()

    active_campaigns = get_active_campaigns(db_connection)

    for campaign in active_campaigns:
        campaign_name = campaign['name']
        fixed_channels = campaign['fixed_channels']

        initiate_calls(ami_manager, campaign_name, fixed_channels, db_connection, total_channels_limit=TOTAL_CHANNELS_LIMIT)

    ami_manager.logoff()
    db_connection.close()

if __name__ == '__main__':
    main()

python python-3.x asterisk asteriskami
1个回答
0
投票

有很多方法可以做到。

  1. 建立并发连接。

  2. 使用通话文件

  3. 使用异步原始版本。

     Action: Originate
     Parameters:
    
     Channel: Channel on which to originate the call (The same as you specify in the Dial application command)
     Context: Context to use on connect (must use Exten & Priority with it)
     Exten: Extension to use on connect (must use Context & Priority with it)
     Priority: Priority to use on connect (must use Context & Exten with it)
     Timeout: Timeout (in milliseconds) for the originating connection to happen(defaults to 30000 milliseconds)
     CallerID: CallerID to use for the call
     Variable: Channels variables to set (max 32). Variables will be set for both channels (local and connected).
     Account: Account code for the call
     Application: Application to use on connect (use Data for parameters)
     Data : Data if Application parameter is used
     **Async**: For the origination to be asynchronous (allows multiple calls to be generated without waiting for a response)
     ActionID: The request identifier. It allows you to identify the response to this request. You may use a number or a string. Useful when you make several simultaneous requests.
    

    事件顺序:首先通道响铃。然后,当答案出现时,扩展

最可靠的方法是通话文件。

© www.soinside.com 2019 - 2024. All rights reserved.