Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial work on kubernetes support #79

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion frestq/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def post_message(queue_name):
req['isinstance']):
return error(400, "invalid/notfound %s parameter" % req['name'])

sender_ssl_cert = request.environ.get('X-Sender-SSL-Certificate', None)
header_name = current_app.config['SSL_HEADER_NAME']
sender_ssl_cert = request.environ.get(header_name, None)
# NOTE: nginx adds \t to the certificate because otherwise it would be not
# possible to send it as a proxy header, so we have to remove those tabs.
# A PEM certificate does never contain tabs, so this replace is safe anyway.
Expand Down
108 changes: 59 additions & 49 deletions frestq/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

logging.basicConfig(level=logging.DEBUG)


class FrestqRequest(Request):
'''
We have to customize request so that by default it can overload the json
Expand Down Expand Up @@ -65,6 +66,7 @@ def get_json(self, force=False, silent=False, cache=True):

return rv


class FrestqApp(Flask):
def __init__(self, *args, **kwargs):
super(FrestqApp, self).__init__(*args, **kwargs)
Expand All @@ -73,35 +75,31 @@ def __init__(self, *args, **kwargs):

pargs = None

def init_db(self):
logging.info("initializing db instance..")
db.init_app(self)

def configure_app(self, scheduler=True, config_object=None):
'''
Configures the application. It's intended to do everything to be able to
run the application except calling app.run, so that it can be reused when
using gunicorn or similar.
run the application except calling app.run, so that it can be reused
when using gunicorn or similar.
'''
self.config.from_object(__name__)
if config_object:
self.config.from_object(config_object)

frestq_settings = os.environ.get('FRESTQ_SETTINGS', None)
if frestq_settings is not None:
if not os.path.isabs(frestq_settings):
os.environ['FRESTQ_SETTINGS'] = os.path.abspath(frestq_settings)
logging.debug("FRESTQ_SETTINGS = %s" % os.environ['FRESTQ_SETTINGS'])
self.config.from_envvar('FRESTQ_SETTINGS', silent=False)
else:
logging.warning("FRESTQ_SETTINGS not set")

# store cert in
if self.config.get('SSL_CERT_PATH', None) and\
self.config.get('SSL_KEY_PATH', None):

if (
self.config.get('SSL_CERT_PATH', None) and
self.config.get('SSL_KEY_PATH', None)
):
with open(self.config.get('SSL_CERT_PATH', ''), 'r') as f:
self.config['SSL_CERT_STRING'] = f.read()
else:
self.config['SSL_CERT_STRING'] = ''
logging.warning("You are NOT using SSL in this instance")

self.init_db()
if not scheduler:
return

Expand Down Expand Up @@ -159,11 +157,14 @@ def run(self, *args, **kwargs):
if 'parse_args' in kwargs and kwargs['parse_args'] == True:
del kwargs['parse_args']
self.parse_args(kwargs.get('extra_parse_func', lambda a,b: None))
if 'extra_parse_func' in kwargs:
del kwargs['extra_parse_func']

if self.pargs is not None:
if self.pargs.createdb:
print("creating the database: " + self.config.get('SQLALCHEMY_DATABASE_URI', ''))
db.create_all()
self.init_db()
db.create_all(app=self)
return
elif self.pargs.messages:
list_messages(self.pargs)
Expand Down Expand Up @@ -204,56 +205,65 @@ def run(self, *args, **kwargs):
if 'use_reloader' in kwargs:
print("use_reloader provided but ignored (always set to True): " + kwargs['use_reloader'])
del kwargs['use_reloader']
# ignore extra_run, we used already if needed
if 'extra_run' in kwargs:
del kwargs['extra_run']

if 'port' not in kwargs:
kwargs['port'] = app.config.get('SERVER_PORT', None)

return super(FrestqApp, self).run(threaded=True, use_reloader=False,
*args, **kwargs)
return super(FrestqApp, self)\
.run(threaded=True, use_reloader=False, *args, **kwargs)

app = FrestqApp(__name__)

### configuration

# debug, set to false on production deployment
DEBUG = True

# see https://stackoverflow.com/questions/33738467/how-do-i-know-if-i-can-disable-sqlalchemy-track-modifications/33790196#33790196
SQLALCHEMY_TRACK_MODIFICATIONS = False
class DefaultConfig(object):
# debug, set to false on production deployment
DEBUG = True

# database configuration
# example: sqlite:////absolute/path/to/db.sqlite
SQLALCHEMY_DATABASE_URI = ''
# see https://stackoverflow.com/questions/33738467/how-do-i-know-if-i-can-disable-sqlalchemy-track-modifications/33790196#33790196
SQLALCHEMY_TRACK_MODIFICATIONS = False

# own certificate, None if there isn't any
SSL_CERT_PATH = None
SSL_KEY_PATH = None
# database configuration
# example: sqlite:////absolute/path/to/db.sqlite
SQLALCHEMY_DATABASE_URI = ''

# queues root url
ROOT_URL = 'http://127.0.0.1:5000/api/queues'
# own certificate, None if there isn't any
SSL_CERT_PATH = None
SSL_KEY_PATH = None

# time a thread can be reserved in for synchronization purposes. In seconds.
RESERVATION_TIMEOUT = 60
# with uwsgi this would be "X-Sender-SSL-Certificate" and in unicorn it's
# "HTTP_X_SENDER_SSL_CERTIFICATE"
SSL_HEADER_NAME = "X-Sender-SSL-Certificate"

app.config.from_object(__name__)
# queues root url
ROOT_URL = 'http://127.0.0.1:5000/api/queues'

# boostrap our little application
db = SQLAlchemy(app, engine_options={"pool_pre_ping": True})
# time a thread can be reserved in for synchronization purposes. In seconds.
RESERVATION_TIMEOUT = "60"

# set to True to get real security
ALLOW_ONLY_SSL_CONNECTIONS = False
# set to True to get real security
ALLOW_ONLY_SSL_CONNECTIONS = "False"

# options for each queue. example:
#QUEUES_OPTIONS = {
#'mycustom_queue': {
#'max_threads': 3,
# options for each queue. example:
#QUEUES_OPTIONS = {
#'mycustom_queue': {
#'max_threads': 3,
#}
#}
#}
# thread data mapper is a function that would be called when a Synchronous task
# in this queue is going to be executed. It allows to set queue-specific
# settings, and even custom queue settings that can be used by you later.
# thread data mapper is a function that would be called when a Synchronous task
# in this queue is going to be executed. It allows to set queue-specific
# settings, and even custom queue settings that can be used by you later.

QUEUES_OPTIONS = dict()


QUEUES_OPTIONS = dict()
app = FrestqApp(__name__)
app.config.from_object(DefaultConfig())

# boostrap our little application
db = SQLAlchemy(app, engine_options={"pool_pre_ping": True})

from . import models

Expand All @@ -266,4 +276,4 @@ def run(self, *args, **kwargs):
show_activity)

if __name__ == "__main__":
app.run()
app.run()
15 changes: 10 additions & 5 deletions frestq/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ def certs_differ(cert_a, cert_b):
if cert_b is None:
cert_b = u''

if app.config.get('ALLOW_ONLY_SSL_CONNECTIONS') and\
(not len(cert_a) or not len(cert_b)):
if app.config.get('ALLOW_ONLY_SSL_CONNECTIONS') == "False":
return False

if (not len(cert_a) or not len(cert_b)):
raise SecurityException()

if not len(cert_a) and not len(cert_b):
Expand Down Expand Up @@ -129,7 +131,7 @@ def reserve_task(task_id):

# 4. set reservation timeout
sched = FScheduler.get_scheduler(INTERNAL_SCHEDULER_NAME)
date = datetime.utcnow() + timedelta(seconds=app.config.get('RESERVATION_TIMEOUT'))
date = datetime.utcnow() + timedelta(seconds=int(app.config.get('RESERVATION_TIMEOUT')))
sched.add_date_job(cancel_reserved_subtask, date, [task.id])

# 5. wait for a cancel or execute message
Expand Down Expand Up @@ -249,7 +251,7 @@ def ack_reservation(task_id):
# reservation_timeout is also sent

logging.debug("SENDING ACK RESERVATION TO SENDER of task with id %s", task_id)
expire_secs = app.config.get('RESERVATION_TIMEOUT')
expire_secs = int(app.config.get('RESERVATION_TIMEOUT'))
task = ModelTask.query.get(task_id)
msg = {
"action": "frestq.confirm_task_reservation",
Expand Down Expand Up @@ -320,7 +322,10 @@ def synchronize_task(msg):
# schedule expiration
if task.expiration_date:
sched = FScheduler.get_scheduler(INTERNAL_SCHEDULER_NAME)
date = datetime.utcnow() + timedelta(seconds=app.config.get('RESERVATION_TIMEOUT'))
date = (
datetime.utcnow()
+ timedelta(seconds=int(app.config.get('RESERVATION_TIMEOUT')))
)
sched.add_date_job(cancel_reserved_subtask, date, [task.id])


Expand Down
18 changes: 9 additions & 9 deletions frestq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def traverse_tasktree(task, visitor_func, visitor_kwargs):
def show_task(args):
from .app import db
from .models import Task
task_id = unicode(args.show_task)
task_id = str(args.show_task)
task_model = db.session.query(Task).filter(Task.id.startswith(task_id)).all()
if not task_model:
print("task %s not found" % task_id)
Expand Down Expand Up @@ -260,7 +260,7 @@ def show_activity(args):
def show_message(args):
from .app import db
from .models import Message
msg_id = unicode(args.show_message)
msg_id = str(args.show_message)
msg_model = db.session.query(Message).filter(Message.id.startswith(msg_id)).all()
if not msg_model:
print("message %s not found" % msg_id)
Expand All @@ -273,15 +273,15 @@ def get_external_task(args):
from .app import db
from .models import Task

task_id = unicode(args.show_external)
task_id = str(args.show_external)
task_model = db.session.query(Task).filter(Task.id.startswith(task_id)).all()

return task_model

# drb
def show_external_task(args):

task_id = unicode(args.show_external)
task_id = str(args.show_external)
task_model = get_external_task(args)

if not task_model:
Expand All @@ -302,9 +302,9 @@ def finish_task(args):
from .models import Task
from .tasks import ExternalTask

task_id = unicode(args.finish[0])
task_id = str(args.finish[0])
try:
finish_data = loads(unicode(args.finish[1]))
finish_data = loads(str(args.finish[1]))
except:
print("error loading the json finish data")
return
Expand All @@ -331,7 +331,7 @@ def deny_task(args):
def task_tree(args):
from .app import db
from .models import Task
task_id = unicode(args.tree)
task_id = str(args.tree)
task_model = db.session.query(Task).filter(Task.id.startswith(task_id)).all()
if not task_model:
print("task %s not found" % task_id)
Expand All @@ -343,8 +343,8 @@ def task_tree(args):
task_model = db.session.query(Task).get(task_model.parent_id)
except:
print("task %s, which is the parent of %s not found" % (
str(task.parent_id)[:8],
str(task.id)[:8],
str(task_model.parent_id)[:8],
str(task_model.id)[:8],
))
break

Expand Down
Loading