Asterisk + AMI + Python

Honestly, I have long thought whether to publish this material. For those who know how to work with AMI Asterisk, there is nothing interesting here. For those who are just starting to do something, they are unlikely to figure it out in my code (although I tried to write clearly). Vanguyu comments like: "Why use Habr for your notes?". On the other hand, the script given under the cut can be someone's starting point. The script does nothing except that it sends to the console all the events from AMI and is able to filter them. For example, I show all calls in the console that fall into any of the contexts “zadarma-in” or “sibseti_in”. If interested, please under the cat:

There was a real-time task to look at which trunk the call came from, which buttons in ivr the user clicked, who answered the call, etc. I have long wanted to try to work with AMI from Python, before that I had a little experience from Bash and then to organize a call back.

Having rummaged with various ready-made libraries, it quickly came to the realization that none of them suits me. As a result, his “bicycle” was invented in the form of a script, which gives all the information from AMI to json. Uses standard Python libraries. Plus the fact that in this form it is easy to receive and parse any events and not to lose binding to a specific call.

The first script prints only those events that fall into any of the contexts "zadarma-in" or "sibseti_in".

Script number 1
import telnetlib import json import re ## HOST = "192.168.10.10" PORT = "5038" user = "zabbix" password = "password" ## tn = telnetlib.Telnet(HOST,PORT) tn.write("Action: login".encode('ascii') + b"\n") username = "Username: " + user tn.write(username.encode('ascii') + b"\n") passWord = "Secret: " + password string_NOW = '' string_out = '' cd = 0 tn.write(passWord.encode('ascii') + b"\n\n") def telnet_for_string(string): global string_out string_out_def = '' for mes in string: try: if string[mes]['Context'] == 'zadarma-in' or string[mes]['Context'] == 'sibseti_in' or string[mes]['Context'] == 'IVR': Uniqueid = string[mes]['Uniqueid'] CallerIDNum = string[mes]['CallerIDNum'] Exten = string[mes]['Exten'] CallerIDName = string[mes]['CallerIDName'] try: Digit = string[mes]['Digit'] except KeyError: Digit = '' # if Exten == 's' or Exten == 'h': # Exten = '' # Context = string[mes]['Context'] string_out_def = json.dumps({'Uniqueid': Uniqueid, 'CallerIDNum':CallerIDNum, 'CallerIDName':CallerIDName, 'Exten':Exten, 'Digit':Digit}) # print(string_out_def) except UnboundLocalError: 1+1 except KeyError: 1+1 # print(string_out_def) if string_out_def: if string_out_def != string_out: print(string_out_def) string_out = string_out_def while True: string = '' event_string = '' elements_string = '' c = 0 read_some = tn.read_some() #    AMI string = read_some.decode('utf8', 'replace').replace('\r\n', '#') #        # # print(string) #       if not string.endswith('##'): string_NOW = string_NOW + string # print('1 --->',string_NOW) #   ,      #  ,          $, #       #, -      if string.endswith('##'): string_NOW = string_NOW + string string_NOW = string_NOW.replace('##', '$') #      $ string_NOW = string_NOW.replace('\n', '#') #    # string_NOW = string_NOW.replace('\r', '#') #    # string_NOW = string_NOW.replace('"', '') #   string_NOW = string_NOW.replace('\\', '') #    # print('string_NOW -->',string_NOW) # print() #      ..         events = re.findall(r'[AZ][\w]+:\s[^$]+', string_NOW) for event in events: c+=1 # print('event ---> ',event) event_elements = re.findall(r'[AZ][\w]+:\s[^#]+', event) #       for element in event_elements: element = '\"' + element.replace(': ', '\": "') + '\", '#    # print('element', element) elements_string = elements_string + element #   ,   # event_string = event_string + '\"' + elements_string.split(':')[1].split(',')[0].replace('"','') + '\": ' + '{' + elements_string + '}' # print(elements_string) # print(str(elements_string.split(':')[1].split(',')[0])) #      json: event_string = event_string + '\"' + str(c) + '\": ' + '{' + elements_string + '}' event_string = event_string.replace('}{', '},{') #     event_string = event_string.replace(', }', '}, ') # event_string = '{' + event_string + '}' event_string = event_string.replace('}, }', '}}') #     json,       json,      # json,         . try: parsed_string = json.loads(event_string) except json.decoder.JSONDecodeError: print('#############################################', '\n\n\n') print(event_string, '\n\n\n') print(string_NOW, '\n\n\n') print('#############################################', '\n\n\n') # print(event_string) # print(parsed_string['1']) #      "telnet_for_string",       . telnet_for_string(parsed_string) string_NOW = '' #   


And the second script, which writes all events to the console, looking at both scripts, it becomes clear what needs to be changed in order to achieve the desired result. If it is not completely clear, then parsit needs json "string [mes]" in the function "def telnet_for_string (string)":

Script number 2
 import telnetlib import time import json import re ## HOST = "192.168.10.10" PORT = "5038" user = "zabbix" password = "password" ## tn = telnetlib.Telnet(HOST,PORT) tn.write("Action: login".encode('ascii') + b"\n") username = "Username: " + user tn.write(username.encode('ascii') + b"\n") passWord = "Secret: " + password string_NOW = '' string_out = '' tn.write(passWord.encode('ascii') + b"\n\n") def telnet_for_string(string): for mes in string: print(string[mes]) while True: # time.sleep(0.1) string = '' event_string = '' elements_string = '' c = 0 read_some = tn.read_some() string = read_some.decode('utf8', 'replace').replace('\r\n', '#') # print(string) if not string.endswith('##'): string_NOW = string_NOW + string if string.endswith('##'): string_NOW = string_NOW + string string_NOW = string_NOW.replace('##', '$') string_NOW = string_NOW.replace('\n', '#') string_NOW = string_NOW.replace('\r', '#') string_NOW = string_NOW.replace('"', '') string_NOW = string_NOW.replace('\\', '') events = re.findall(r'[AZ][\w]+:\s[^$]+', string_NOW) for event in events: c+=1 event_elements = re.findall(r'[AZ][\w]+:\s[^#]+', event) for element in event_elements: element = '\"' + element.replace(': ', '\": "') + '\", ' elements_string = elements_string + element event_string = event_string + '\"' + str(c) + '\": ' + '{' + elements_string + '}' event_string = event_string.replace('}{', '},{') event_string = event_string.replace(', }', '}, ') event_string = '{' + event_string + '}' event_string = event_string.replace('}, }', '}}') try: parsed_string = json.loads(event_string) except json.decoder.JSONDecodeError: print('#############################################', '\n\n\n') print(event_string, '\n\n\n') print(string_NOW, '\n\n\n') print('#############################################', '\n\n\n') telnet_for_string(parsed_string) string_NOW = '' 

Source: https://habr.com/ru/post/415535/


All Articles