Skip to content

Commit

Permalink
Importing project
Browse files Browse the repository at this point in the history
  • Loading branch information
italorossi committed Feb 28, 2016
0 parents commit d609b6c
Show file tree
Hide file tree
Showing 11 changed files with 805 additions and 0 deletions.
Empty file added LICENSE
Empty file.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
init:
pip install -r requirements.txt

test:
python -m unittest discover tests/
25 changes: 25 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
GreenSWITCH: FreeSWITCH Event Socket Protocol
=============================================

.. image:: https://img.shields.io/pypi/v/greenswitch.svg
:target: https://pypi.python.org/pypi/greenswitch

.. image:: https://img.shields.io/pypi/dm/greenswitch.svg
:target: https://pypi.python.org/pypi/greenswitch

Battle proven FreeSWITCH Event Socket Protocol client implementation with Gevent.

This is an implementation of FreeSWITCH Event Socket Protocol using Gevent
Greenlets. It is already in production and processing hundreds of calls per day.

.. code-block:: python
>>> import greenswitch
>>> fs = greenswitch.InboundESL(host='127.0.0.1', port=8021, password='ClueCon')
>>> fs.connect()
>>> r = fs.send('api list_users')
>>> print r.data
Currently only Inbound Socket is implemented, support for outbound socket should
be done soon.
28 changes: 28 additions & 0 deletions examples/log_reg_attempt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-


from __future__ import print_function

import logging
import time
import greenswitch


def on_sofia_register_failure(event):
message = 'Failed register attempt from {network-ip} to user {to-user} profile {profile-name}'
print(message.format(**event.headers))

fs = greenswitch.InboundESL(host='192.168.50.4', port=8021, password='ClueCon')
fs.connect()
fs.register_handle('sofia::register_failure', on_sofia_register_failure)
fs.send('EVENTS PLAIN ALL')

print('Connected to FreeSWITCH!')
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
fs.stop()
break
print('ESL Disconnected.')
18 changes: 18 additions & 0 deletions greenswitch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-

"""
GreenSWITCH: FreeSWITCH Event Socket Protocol
---------------------------------------------
Complete documentation at https://github.com/italorossi/greenswitch
"""

import sys
# Avoiding threading KeyError when exiting
if 'threading' in sys.modules:
del sys.modules['threading']

from gevent import monkey; monkey.patch_all()

from .esl import InboundESL
190 changes: 190 additions & 0 deletions greenswitch/esl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Gevent imports
import gevent
from gevent.queue import Queue
import gevent.socket as socket
import logging
import pprint

import urllib


class NotConnectedError(Exception):
pass


class ESLEvent(object):
def __init__(self, data):
self.parse_data(data)

def parse_data(self, data):
headers = {}
data = urllib.unquote(data)
data = data.strip().split('\n')
last_key = None
value = ''
for line in data:
if ': ' in line:
key, value = line.split(': ', 1)
last_key = key
else:
key = last_key
value += '\n' + line
headers[key.strip()] = value.strip()
self.headers = headers


class InboundESL(object):
def __init__(self, host, port, password):
self.host = host
self.port = port
self.password = password
self.timeout = 5
self._run = True
self._EOL = '\n'
self._commands_sent = []
self._auth_request_event = gevent.event.Event()
self._receive_events_greenlet = None
self._process_events_greenlet = None
self.event_handlers = {}
self.connected = False

self._esl_event_queue = Queue()
self._process_esl_event_queue = True

def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.connect((self.host, self.port))
self.connected = True
self._receive_events_greenlet = gevent.spawn(self.receive_events)
self._process_events_greenlet = gevent.spawn(self.process_events)
self._auth_request_event.wait()
self.authenticate()
self.sock.settimeout(None)

def receive_events(self):
buf = ''
while self._run:
try:
data = self.sock.recv(1)
except Exception:
self._run = False
self.connected = False
self.sock.close()
# logging.exception("Error reading from socket.")
break
if not data:
if self.connected:
logging.error("Error receiving data, is FreeSWITCH running?")
self.connected = False
break
if data == self._EOL and buf[-1] == self._EOL:
event = ESLEvent(buf)
buf = ''
self.handle_event(event)
continue
buf += data

@staticmethod
def _read_socket(sock, length):
"""Receive data from socket until the length is reached."""
data = sock.recv(length)
data_length = len(data)
while data_length < length:
logging.warn(
'Socket should read %s bytes, but actually read %s bytes. '
'Consider increasing "net.core.rmem_default".' %
(length, data_length)
)
data += sock.recv(length - data_length)
data_length = len(data)
return data

def handle_event(self, event):
if event.headers['Content-Type'] == 'auth/request':
self._auth_request_event.set()
elif event.headers['Content-Type'] == 'command/reply':
async_response = self._commands_sent.pop(0)
event.data = event.headers['Reply-Text']
async_response.set(event)
elif event.headers['Content-Type'] == 'api/response':
length = int(event.headers['Content-Length'])
data = self._read_socket(self.sock, length)
event.data = data
async_response = self._commands_sent.pop(0)
async_response.set(event)
elif event.headers['Content-Type'] == 'text/disconnect-notice':
self.connected = False
else:
length = int(event.headers['Content-Length'])
data = self._read_socket(self.sock, length)
event.parse_data(data)
self._esl_event_queue.put(event)

def process_events(self):
logging.debug('Event Processor Running')
while self._run:
if not self._process_esl_event_queue:
gevent.sleep(1)
continue

try:
event = self._esl_event_queue.get(timeout=1)
except gevent.queue.Empty:
continue

if event.headers.get('Event-Name') == 'CUSTOM':
handlers = self.event_handlers.get(event.headers.get('Event-Subclass'))
else:
handlers = self.event_handlers.get(event.headers.get('Event-Name'))
if not handlers:
continue

for handle in handlers:
try:
handle(event)
except:
logging.exception(
'ESL handler %s raised exception.' %
handle.__name__)
logging.error(pprint.pformat(event.headers))

def send(self, data):
if not self.connected:
raise NotConnectedError()
async_response = gevent.event.AsyncResult()
self._commands_sent.append(async_response)
raw_msg = data + self._EOL*2
self.sock.send(raw_msg)
response = async_response.get()
return response

def authenticate(self):
response = self.send('auth %s' % self.password)
if response.headers['Reply-Text'] != '+OK accepted':
raise ValueError('Invalid password.')

def register_handle(self, name, handler):
if name not in self.event_handlers:
self.event_handlers[name] = []
if handler in self.event_handlers[name]:
return
self.event_handlers[name].append(handler)

def unregister_handle(self, name, handler):
if name not in self.event_handlers:
raise ValueError('No handlers found for event: %s' % name)
self.event_handlers[name].remove(handler)
if not self.event_handlers[name]:
del self.event_handlers[name]

def stop(self):
if self.connected:
self.send('exit')
self._run = False
logging.info("Waiting for receive greenlet exit")
self._receive_events_greenlet.join()
logging.info("Waiting for event processing greenlet exit")
self._process_events_greenlet.join()
if self.connected:
self.sock.close()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
gevent
34 changes: 34 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-


from setuptools import setup, find_packages


with open('README.rst') as f:
readme = f.read()

with open('LICENSE') as f:
license = f.read()

with open('requirements.txt') as f:
requires = f.readlines()

setup(
name='greenswitch',
version='0.0.1',
description=u'Battle proven FreeSWITCH Event Socket Protocol client implementation with Gevent.',
long_description=readme,
author=u'Ítalo Rossi',
author_email=u'[email protected]',
url=u'https://github.com/italorossi/greenswitch',
license=license,
packages=find_packages(exclude=('tests', 'docs')),
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Programming Language :: Python',
'License :: OSI Approved :: MIT License'
],
install_requires=requires
)
41 changes: 41 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-


from gevent import monkey; monkey.patch_all()

import gevent
import os
import unittest

import fakeeslserver
from greenswitch import esl


class TestInboundESLBase(unittest.TestCase):

esl_class = esl.InboundESL

def setUp(self):
super(TestInboundESLBase, self).setUp()
self.switch_esl = fakeeslserver.FakeESLServer('0.0.0.0', 8021, 'ClueCon')
self.switch_esl.start_server()
self.esl = self.esl_class('127.0.0.1', 8021, 'ClueCon')
self.esl.connect()

def tearDown(self):
super(TestInboundESLBase, self).tearDown()
self.esl.sock.close()
self.switch_esl.stop()

def send_fake_event_plain(self, data):
self.switch_esl.fake_event_plain(data)
gevent.sleep(0.1)

def send_batch_fake_event_plain(self, events):
for event in events:
self.send_fake_event_plain(event)
gevent.sleep(0.1)

if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit d609b6c

Please sign in to comment.