From 7d3a1a7d0464fe2eb25308979930e361ccd7d802 Mon Sep 17 00:00:00 2001 From: Ryan Kelly Date: Fri, 21 Feb 2014 09:53:20 +1100 Subject: [PATCH] Add missing "staticnode" file --- syncserver/staticnode.py | 202 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100644 syncserver/staticnode.py diff --git a/syncserver/staticnode.py b/syncserver/staticnode.py new file mode 100644 index 0000000..aca8231 --- /dev/null +++ b/syncserver/staticnode.py @@ -0,0 +1,202 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. +""" +Simple node-assignment backend using a single, static node. + +This is a greatly-simplified node-assignment backend. It keeps user records +in an SQL database, but does not attempt to do any node management. All users +are implicitly assigned to a single, static node. + +XXX TODO: move this into the tokenserver repo. + +""" +import time +from mozsvc.exceptions import BackendError + +from sqlalchemy import Column, Integer, String, BigInteger, Index +from sqlalchemy import create_engine, Table, MetaData +from sqlalchemy.pool import QueuePool +from sqlalchemy.sql import text as sqltext +from sqlalchemy.exc import IntegrityError + +from tokenserver.assignment import INodeAssignment +from zope.interface import implements + + +metadata = MetaData() + + +users = Table( + "users", + metadata, + Column("uid", Integer(), primary_key=True, autoincrement=True, + nullable=False), + Column("service", String(32), nullable=False), + Column("email", String(255), nullable=False), + Column("generation", BigInteger(), nullable=False), + Column("client_state", String(32), nullable=False), + Column("created_at", BigInteger(), nullable=False), + Column("replaced_at", BigInteger(), nullable=True), + Index('lookup_idx', 'email', 'service', 'created_at'), + Index('clientstate_idx', 'email', 'service', 'client_state', unique=True), +) + + +_GET_USER_RECORDS = sqltext("""\ +select + uid, generation, client_state +from + users +where + email = :email +and + service = :service +order by + created_at desc, uid desc +limit + 20 +""") + + +_CREATE_USER_RECORD = sqltext("""\ +insert into + users + (service, email, generation, client_state, created_at, replaced_at) +values + (:service, :email, :generation, :client_state, :timestamp, NULL) +""") + + +_UPDATE_GENERATION_NUMBER = sqltext("""\ +update + users +set + generation = :generation +where + service = :service and email = :email and + generation < :generation and replaced_at is null +""") + + +_REPLACE_USER_RECORDS = sqltext("""\ +update + users +set + replaced_at = :timestamp +where + service = :service and email = :email + and replaced_at is null and created_at < :timestamp +""") + + +def get_timestamp(): + """Get current timestamp in milliseconds.""" + return int(time.time() * 1000) + + +class StaticNodeAssignment(object): + implements(INodeAssignment) + + def __init__(self, sqluri, node_url, **kw): + self.sqluri = sqluri + self.node_url = node_url + self._engine = create_engine(sqluri, poolclass=QueuePool, + pool_size=1, max_overflow=0) + users.create(self._engine, checkfirst=True) + + def get_user(self, service, email): + params = {'service': service, 'email': email} + res = self._engine.execute(_GET_USER_RECORDS, **params) + try: + row = res.fetchone() + if row is None: + return None + # The first row is the most up-to-date user record. + user = { + 'email': email, + 'uid': row.uid, + 'node': self.node_url, + 'generation': row.generation, + 'client_state': row.client_state, + 'old_client_states': {} + } + # Any subsequent rows are due to old client-state values. + row = res.fetchone() + while row is not None: + user['old_client_states'][row.client_state] = True + row = res.fetchone() + return user + finally: + res.close() + + def create_user(self, service, email, generation=0, client_state=''): + params = { + 'service': service, 'email': email, 'generation': generation, + 'client_state': client_state, 'timestamp': get_timestamp() + } + try: + res = self._engine.execute(_CREATE_USER_RECORD, **params) + except IntegrityError: + raise + return self.get_user(service, email) + else: + res.close() + return { + 'email': email, + 'uid': res.lastrowid, + 'node': self.node_url, + 'generation': generation, + 'client_state': client_state, + 'old_client_states': {} + } + + def update_user(self, service, user, generation=None, client_state=None): + if client_state is None: + # uid can stay the same, just update the generation number. + if generation is not None: + params = { + 'service': service, + 'email': user['email'], + 'generation': generation, + } + res = self._engine.execute(_UPDATE_GENERATION_NUMBER, **params) + res.close() + user['generation'] = max(generation, user['generation']) + else: + # reject previously-seen client-state strings. + if client_state == user['client_state']: + raise BackendError('previously seen client-state string') + if client_state in user['old_client_states']: + raise BackendError('previously seen client-state string') + # need to create a new record for new client_state. + if generation is not None: + generation = max(user['generation'], generation) + else: + generation = user['generation'] + now = get_timestamp() + params = { + 'service': service, 'email': user['email'], + 'generation': generation, 'client_state': client_state, + 'timestamp': now, + } + try: + res = self._engine.execute(_CREATE_USER_RECORD, **params) + except IntegrityError: + user.update(self.get_user(service, user['email'])) + else: + self.get_user(service, user['email']) + user['uid'] = res.lastrowid + user['generation'] = generation + user['old_client_states'][user['client_state']] = True + user['client_state'] = client_state + res.close() + # mark old records as having been replaced. + # if we crash here, they are unmarked and we may fail to + # garbage collect them for a while, but the active state + # will be undamaged. + params = { + 'service': service, 'email': user['email'], 'timestamp': now + } + res = self._engine.execute(_REPLACE_USER_RECORDS, **params) + res.close()